Metadata service not able to update the workflow token of successful run if not available for sometime after completion
Description
Release Notes
None
Activity
Show:
Fixed
Pinned fields
Click on the next to a field label to start pinning.
Details
Details
Assignee
Ankit Jain
Ankit JainReporter
Ankit Jain
Ankit JainLabels
Triaged
Yes
Size
S
Components
Fix versions
Priority
Created February 18, 2025 at 12:09 PM
Updated February 19, 2025 at 7:56 PM
Resolved February 19, 2025 at 7:56 PM
in appfabric processor, we saw logs:
2025-02-18 12:02:29,053 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:29,054 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:29,054 - WARN [scheduler-notification-subscriber-0:i.c.c.c.s.Retries@252] - Call failed with exception, retrying again after 5000 ms. io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger$WorkflowTokenNotFoundException: null at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.fetchWorkflowToken(ProgramStatusTrigger.java:122) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.lambda$null$0(ProgramStatusTrigger.java:89) at io.cdap.cdap.common.service.Retries.lambda$callWithRetries$2(Retries.java:195) at io.cdap.cdap.common.service.Retries.callWithRetries(Retries.java:228) at io.cdap.cdap.common.service.Retries.callWithRetries(Retries.java:195) at io.cdap.cdap.common.service.Retries.callWithRetries(Retries.java:174) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.lambda$isSatisfied$1(ProgramStatusTrigger.java:88) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.getTriggerSatisfiedResult(ProgramStatusTrigger.java:193) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.isSatisfied(ProgramStatusTrigger.java:76) at io.cdap.cdap.internal.app.runtime.schedule.trigger.OrTrigger.isSatisfied(OrTrigger.java:44) at io.cdap.cdap.internal.app.runtime.schedule.queue.JobQueueTable.isTriggerSatisfied(JobQueueTable.java:218) at io.cdap.cdap.internal.app.runtime.schedule.queue.JobQueueTable.addNotification(JobQueueTable.java:194) at io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService$ProgramStatusEventSubscriberService.processNotification(ScheduleNotificationSubscriberService.java:308) at io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService$AbstractSchedulerSubscriberService.processMessages(ScheduleNotificationSubscriberService.java:157) at io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService.lambda$processSingleTxn$0(AbstractMessagingSubscriberService.java:244) at io.cdap.cdap.spi.data.transaction.TransactionRunners.lambda$run$1(TransactionRunners.java:171) at io.cdap.cdap.storage.spanner.SpannerTransactionRunner.lambda$run$1(SpannerTransactionRunner.java:40) at com.google.cloud.spanner.TransactionRunnerImpl.lambda$runInternal$0(TransactionRunnerImpl.java:942) at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) at com.google.cloud.RetryHelper.run(RetryHelper.java:76) at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) at com.google.cloud.spanner.SpannerRetryHelper.runTxWithRetriesOnAborted(SpannerRetryHelper.java:79) at com.google.cloud.spanner.SpannerRetryHelper.runTxWithRetriesOnAborted(SpannerRetryHelper.java:68) at com.google.cloud.spanner.TransactionRunnerImpl.runInternal(TransactionRunnerImpl.java:999) at com.google.cloud.spanner.TransactionRunnerImpl.run(TransactionRunnerImpl.java:904) at com.google.cloud.spanner.SessionPool$SessionPoolTransactionRunner.run(SessionPool.java:934) at io.cdap.cdap.storage.spanner.SpannerTransactionRunner.run(SpannerTransactionRunner.java:39) at io.cdap.cdap.storage.spanner.RetryingSpannerTransactionRunner.run(RetryingSpannerTransactionRunner.java:58) at io.cdap.cdap.spi.data.transaction.TransactionRunners.run(TransactionRunners.java:171) at io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService.processSingleTxn(AbstractMessagingSubscriberService.java:239) at io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService.processMessages(AbstractMessagingSubscriberService.java:207) at io.cdap.cdap.messaging.subscriber.AbstractMessagingPollingService.fetchAndProcessMessages(AbstractMessagingPollingService.java:213) at io.cdap.cdap.messaging.subscriber.AbstractMessagingPollingService.runTask(AbstractMessagingPollingService.java:159) at io.cdap.cdap.common.service.AbstractRetryableScheduledService.runOneIteration(AbstractRetryableScheduledService.java:167) at com.google.common.util.concurrent.AbstractScheduledService$1$1.run(AbstractScheduledService.java:170) at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:355) at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:321) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 2025-02-18 12:02:34,064 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:34,064 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:39,071 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:39,072 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:44,080 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:44,081 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89
Reproduction steps:
create an inbound trigger from one pipeline to another
crash metadata service
run 1st pipeline
bring metadata service up
2nd pipeline run is not triggered
The token should be written by
io.cdap.cdap.metadata.MetadataSubscriberService.WorkflowProcessor#processMessage
after coming back up.This just hangs the schedule notification subscriber service to keep on retrying.