Metadata service not able to update the workflow token of successful run if not available for sometime after completion

Description

in appfabric processor, we saw logs:

2025-02-18 12:02:29,053 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:29,054 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:29,054 - WARN [scheduler-notification-subscriber-0:i.c.c.c.s.Retries@252] - Call failed with exception, retrying again after 5000 ms. io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger$WorkflowTokenNotFoundException: null at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.fetchWorkflowToken(ProgramStatusTrigger.java:122) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.lambda$null$0(ProgramStatusTrigger.java:89) at io.cdap.cdap.common.service.Retries.lambda$callWithRetries$2(Retries.java:195) at io.cdap.cdap.common.service.Retries.callWithRetries(Retries.java:228) at io.cdap.cdap.common.service.Retries.callWithRetries(Retries.java:195) at io.cdap.cdap.common.service.Retries.callWithRetries(Retries.java:174) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.lambda$isSatisfied$1(ProgramStatusTrigger.java:88) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.getTriggerSatisfiedResult(ProgramStatusTrigger.java:193) at io.cdap.cdap.internal.app.runtime.schedule.trigger.ProgramStatusTrigger.isSatisfied(ProgramStatusTrigger.java:76) at io.cdap.cdap.internal.app.runtime.schedule.trigger.OrTrigger.isSatisfied(OrTrigger.java:44) at io.cdap.cdap.internal.app.runtime.schedule.queue.JobQueueTable.isTriggerSatisfied(JobQueueTable.java:218) at io.cdap.cdap.internal.app.runtime.schedule.queue.JobQueueTable.addNotification(JobQueueTable.java:194) at io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService$ProgramStatusEventSubscriberService.processNotification(ScheduleNotificationSubscriberService.java:308) at io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService$AbstractSchedulerSubscriberService.processMessages(ScheduleNotificationSubscriberService.java:157) at io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService.lambda$processSingleTxn$0(AbstractMessagingSubscriberService.java:244) at io.cdap.cdap.spi.data.transaction.TransactionRunners.lambda$run$1(TransactionRunners.java:171) at io.cdap.cdap.storage.spanner.SpannerTransactionRunner.lambda$run$1(SpannerTransactionRunner.java:40) at com.google.cloud.spanner.TransactionRunnerImpl.lambda$runInternal$0(TransactionRunnerImpl.java:942) at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) at com.google.cloud.RetryHelper.run(RetryHelper.java:76) at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) at com.google.cloud.spanner.SpannerRetryHelper.runTxWithRetriesOnAborted(SpannerRetryHelper.java:79) at com.google.cloud.spanner.SpannerRetryHelper.runTxWithRetriesOnAborted(SpannerRetryHelper.java:68) at com.google.cloud.spanner.TransactionRunnerImpl.runInternal(TransactionRunnerImpl.java:999) at com.google.cloud.spanner.TransactionRunnerImpl.run(TransactionRunnerImpl.java:904) at com.google.cloud.spanner.SessionPool$SessionPoolTransactionRunner.run(SessionPool.java:934) at io.cdap.cdap.storage.spanner.SpannerTransactionRunner.run(SpannerTransactionRunner.java:39) at io.cdap.cdap.storage.spanner.RetryingSpannerTransactionRunner.run(RetryingSpannerTransactionRunner.java:58) at io.cdap.cdap.spi.data.transaction.TransactionRunners.run(TransactionRunners.java:171) at io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService.processSingleTxn(AbstractMessagingSubscriberService.java:239) at io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService.processMessages(AbstractMessagingSubscriberService.java:207) at io.cdap.cdap.messaging.subscriber.AbstractMessagingPollingService.fetchAndProcessMessages(AbstractMessagingPollingService.java:213) at io.cdap.cdap.messaging.subscriber.AbstractMessagingPollingService.runTask(AbstractMessagingPollingService.java:159) at io.cdap.cdap.common.service.AbstractRetryableScheduledService.runOneIteration(AbstractRetryableScheduledService.java:167) at com.google.common.util.concurrent.AbstractScheduledService$1$1.run(AbstractScheduledService.java:170) at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:355) at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:321) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 2025-02-18 12:02:34,064 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:34,064 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:39,071 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:39,072 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:44,080 - DEBUG [scheduler-notification-subscriber-0:i.c.c.i.a.s.AppMetadataStore@2446] - No workflow token available for workflow: program:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow, runId: 88412ee8-eded-11ef-9e0e-56c2b476dc89 2025-02-18 12:02:44,081 - WARN [scheduler-notification-subscriber-0:i.c.c.i.a.r.s.t.ProgramStatusTrigger@115] - Retrying invalid workflow token "BasicWorkflowToken{tokenValueMap={USER={}, SYSTEM={}}, maxSizeBytes=0, mapReduceCounters=null, nodeName='null', putAllowed=true, bytesLeft=0}" for program runId program_run:default.DataFusionQuickstart_copy.c3391fa3-ede1-11ef-a7fd-56c2b476dc89.workflow.DataPipelineWorkflow.88412ee8-eded-11ef-9e0e-56c2b476dc89


Reproduction steps:

  • create an inbound trigger from one pipeline to another

  • crash metadata service

  • run 1st pipeline

  • bring metadata service up

  • 2nd pipeline run is not triggered

The token should be written by io.cdap.cdap.metadata.MetadataSubscriberService.WorkflowProcessor#processMessage after coming back up.

This just hangs the schedule notification subscriber service to keep on retrying.

Release Notes

None

Activity

Show:
Fixed
Pinned fields
Click on the next to a field label to start pinning.

Details

Assignee

Reporter

Triaged

Yes

Size

S

Components

Fix versions

Priority

Created February 18, 2025 at 12:09 PM
Updated February 19, 2025 at 7:56 PM
Resolved February 19, 2025 at 7:56 PM