Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Workers can be used to run background processes. Workers provide the ability to write data processing logic that doesn't fit into the other paradigms such as MapReduce or real-time and batch processing.

You can add workers to your application by calling the addWorker method in the application's configure method:

public class AnalyticsApp extends AbstractApplication {
  @Override
  public void configure() {
    setName("AnalyticsApp");
    ...
    addWorker(new ProcessWorker());
    ...
  }
}

Workers have semantics similar to a Java thread and are run in a thread when CDAP is run in either in-memory or local sandbox modes. In distributed mode, each instance of a worker runs in its own YARN container. Their instances may be updated via the Command Line Interface or a RESTful API:

public class ProcessWorker extends AbstractWorker {

  @Override
  public void initialize(WorkerContext context) {
    super.initialize(context);
    ...
  }

  @Override
  public void run() {
    ...
  }

  @Override
  public void stop() {
    ...
  }

  @Override
  public void destroy() {
    ...
  }
}

Workers and Resources

The size of the YARN container and the number of virtual cores allocated to a worker can be specified using a setResources method in the worker initialize method:

@Override
public void initialize(WorkerContext context) {
  super.initialize(context);
  ...
  setResources(new Resources(1024, 2));
  ...
}

In this example, each worker container will be initialized with 1024 MB of memory and 2 virtual cores.

Workers and Datasets

Workers can access and use datasets via a DatasetContext inside the run method of a TxRunnable. A TxRunnable can be executed using the execute method of a WorkerContext. Note that WorkerContext is thread-safe and that each thread will receive its own instance of the Dataset being accessed. Though it is necessary to use the getDataset method of DatasetContext to access a Dataset instance inside a TxRunnable, this operation is performance-optimized with the help of internal caching logic. In this example, the dataset tableName is accessed from within a TxRunnable:

@Override
public void run() {
  getContext().execute(new TxRunnable() {
    @Override
    public void run(DatasetContext context) {
      Dataset dataset = context.getDataset("tableName");
      ...
    }
  });
}

Operations executed on datasets within a run are committed as part of a single transaction. The transaction is started before run is invoked and is committed upon successful execution. Exceptions thrown while committing the transaction or thrown by user-code result in a rollback of the transaction. It is recommended that TransactionConflictException be caught and handled appropriately; for example, you can retry a dataset operation.

Services can be discovered from inside of workers. Services can either belong to the same application or to another application in the same namespace. WorkerContext can be used to discover the URL of the service of interest:

@Override
public void run() {
  URL url = getContext().getServiceURL("myService");

  // To discover a service in another application in the same namespace, use:
  url = getContext().getServiceURL("anotherAppName", "anotherServiceId");
}

  • No labels