Workers can be used to run background processes. Workers provide the ability to write data processing logic that doesn't fit into the other paradigms such as MapReduce or real-time and batch processing.

You can add workers to your application by calling the addWorker method in the application's configure method:

1 2 3 4 5 6 7 8 9 public class AnalyticsApp extends AbstractApplication { @Override public void configure() { setName("AnalyticsApp"); ... addWorker(new ProcessWorker()); ... } }

Workers have semantics similar to a Java thread and are run in a thread when CDAP is run in either in-memory or local sandbox modes. In distributed mode, each instance of a worker runs in its own YARN container. Their instances may be updated via the Command Line Interface or a Microservices:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ProcessWorker extends AbstractWorker { @Override public void initialize(WorkerContext context) { super.initialize(context); ... } @Override public void run() { ... } @Override public void stop() { ... } @Override public void destroy() { ... } }

Workers and Resources

The size of the YARN container and the number of virtual cores allocated to a worker can be specified using a setResources method in the worker initialize method:

1 2 3 4 5 6 7 @Override public void initialize(WorkerContext context) { super.initialize(context); ... setResources(new Resources(1024, 2)); ... }

In this example, each worker container will be initialized with 1024 MB of memory and 2 virtual cores.

Workers and Datasets

Workers can access and use datasets via a DatasetContext inside the run method of a TxRunnable. A TxRunnable can be executed using the execute method of a WorkerContext. Note that WorkerContext is thread-safe and that each thread will receive its own instance of the Dataset being accessed. Though it is necessary to use the getDataset method of DatasetContext to access a Dataset instance inside a TxRunnable, this operation is performance-optimized with the help of internal caching logic. In this example, the dataset tableName is accessed from within a TxRunnable:

1 2 3 4 5 6 7 8 9 10 @Override public void run() { getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) { Dataset dataset = context.getDataset("tableName"); ... } }); }

Operations executed on datasets within a run are committed as part of a single transaction. The transaction is started before run is invoked and is committed upon successful execution. Exceptions thrown while committing the transaction or thrown by user-code result in a rollback of the transaction. It is recommended that TransactionConflictException be caught and handled appropriately; for example, you can retry a dataset operation.

Services can be discovered from inside of workers. Services can either belong to the same application or to another application in the same namespace. WorkerContext can be used to discover the URL of the service of interest:

1 2 3 4 5 6 7 @Override public void run() { URL url = getContext().getServiceURL("myService"); // To discover a service in another application in the same namespace, use: url = getContext().getServiceURL("anotherAppName", "anotherServiceId"); }