Motivation
CDAP has - roughly speaking - two sets of APIs: the programming APIs that are available for developing applications (and plugins, datasets, tests, etc.), and administrative or operational APIs that are available to external clients through REST calls. There is a little (but not much) overlap between the two sets of APIs, and that has its foundation in the fact that the two APIs are targeted for distinctly different audiences: one is for developers, the other is for operations.
...
Typically, these kinds of operations are not done from inside an application. But here are some reasons why we may want to allow that:
- Uber-Apps
An Uber-app is an application that controls other applications. A perfect example is our own Hydrator. Right now, Hydrator is a UI that knows how to configure and create ETL pipelines. Creating an ETL pipeline is really the same as deploying a new application, as an instance of existing artifact with a new configuration. Today, the Hydrator UI understands every single detail about how that configuration has to be created. But this is not perfect, because it moves a lot of logic into the UI (which in theory should be a pure presentation layer). For example, to retrieve the logs for a batch pipeline, the UI checks whether the pipeline is implemented as MapReduce or a Spark job, and retrieves the logs of that particular program.
It would be a lot cleaner if Hydrator were an actual CDAP app with well-defined endpoints that the UI could rely on. Changes in the underlying implementation can then be hidden from the UI. That requires that the Hydrator app becomes an Uber
...
- -app: It needs to be able to create apps, start and stop programs, define schedules, retrieve logs and metrics etc. In other words, it needs to be able to perform the operations that the Hydrator UI makes via REST today through a program API.
- Job Preparation
Sometimes programs need to perform operations on datasets to ensure proper functioning. Examples are:- A workflow contains a MapReduce program that writes its output to a Table. Before the MapReduce starts, the workflow needs to ensure that the Table is empty. The easiest way to that would be to call truncate() from a custom action in preparation for the MapReduce. That, however, is an administrative action is currently not available to workflows.
- A workflow contains two MapReduce jobs A and B. A writes its output to a file, B reads that file. Ideally, because that file is intermediate output, the file should be removed after the workflow is complete. Even better, if the workflow can create a "temporary" file set when it starts, use that for the intermediate output, and delete that file set at the end of the workflow, then it would be completely self-contained. That is only possible if create and drop dataset are available as operations in a workflow action.
- Advanced Scheduling
CDAP offers certain ways to schedule workflows out of the box. But that is not always sufficient for every application. If an application needs to implement specific scheduling logic, it should be able to that, say, using a worker. This worker would run as a daemon, observe the resources that influence its scheduling decisions, and then trigger a workflow when appropriate. For that, this worker needs a way to retrieve metrics, program run status and history, etc, and also a way to start and stop programs.
What types of programs will need access to these operational APIs? For sure, it makes sense in Services, Workers, and Workflow actions. It may also make sense for batch jobs like MapReduce and Spark, at least during lifecycle methods such as beforeSubmit() and onFinish(). Does it also make sense for flowlets? Perhaps it is not needed there, but it would seem odd excluded a single program type from this capability when all other programs can. We will therefore implement an operational interface that is available to all programs through their program context.
Caveats
Allowing programs to perform admin operations can, of course, introduce complications:
- A program can stop itself (maybe that is good?) but it can also start another run of itself (flooding the system with active runs of the same program)
- A program can delete a dataset that it is currently using for data operations
- Admin operations are not transactional. Allowing to execute them in a transactional context can lead to unexpected behavior. For example, if you read an event from a queue and react to it by starting a workflow, then the commit of the transaction fails, making the event available again to the next transaction, which will now start the workflow again.
- Admin operations can take a long time, longer than the current transaction timeout.
Therefore caution is required from developers who will use these APIs in programs. Good documentation is key.
Use Cases
For CDAP 3.4, we are targeting only administrative operations on datasets. Future versions of CDAP will add capabilities to control app and program lifecycle, manipulate meta data, access logs and metrics, etc.
- As a developer, I am implementing a service that allows users to configure aggregations in a Cube dataset. When a user adds a new aggregation, the service needs to update the dataset's properties.
- As a developer, I am implementing a workflow that creates a temporary dataset in a first custom action, and deletes that dataset in the last custom action of the workflow.
- As a developer, I am implementing a MapReduce program that writes to a Table. In its beforeSubmit() method, the MapReduce needs to make sure the Table is empty, that is, truncate the table.
Proposed Design
We will add a method to all program contexts, more specifically, to RuntimeContext (which is inherited by all program contexts).
Code Block |
---|
interface RuntimeContext {
...
Admin getAdmin();
}
interface Admin {
boolean datasetExists(String name) throws DatasetManagementException;
String getDatasetType(String name) throws DatasetManagementException;
DatasetProperties getDatasetProperties(String name) throws DatasetManagementException;
void createDataset(String name, String type, DatasetProperties properties) throws DatasetManagementException;
void updateDataset(String name, DatasetProperties properties) throws DatasetManagementException;
void dropDataset(String name) throws DatasetManagementException;
void truncateDataset(String name) throws DatasetManagementException;
} |
All methods throw:
InstanceNotFoundException
if a dataset does not exist - except fordatasetExists()
.InstanceConflictException
if creation of a dataset conflicts with an existing datasetDatasetManagementException
for errors encountered inside the dataset framework, or inside the dataset type'sDatasetAdmin
code.
Why are we adding this to RuntimeContext, and not to DatasetContext? The idea is that DatasetContext represents a way to obtain an instance of a dataset in a transactional context. Admin operations are not transactional, and therefore it seems cleaner to add them separately from DatasetContext. Also, in the future this Admin interface will provide, non-dataset related operations.
One complication lies hidden in the implementation of getDatasetProperties()
: When creating a dataset with a certain set of properties, the dataset framework of CDAP does not store that set of properties. Instead, it calls the configure()
method of the dataset definition with these properties. This method returns a dataset spec that contains properties - but it is up to the implementation of every single dataset definition to construct that spec, and it may not reflect the original properties that were passed in. For example, it may contain some properties that are derived from the original properties, or it may use the original properties to set properties on its embedded datasets, but not its own properties.
After checking all CDAP implementations of DatasetDefinition.configure()
for whether they preserve the original dataset properties (there are about 40 implementations in our code base):, found the following that manipulate the properties before storing them in the spec:
- FileSet: adds a FILESET_VERSION property
- TimePartitionedFileSet: adds the PARTITIONING property
- ObjectMappedTable: adds TABLE_SCHEMA and TABLE_SCHEMA_ROW_FIELD
- LineageDataset: adds CONFLICT_LEVEL=NONE
- UsageDataset: adds CONFLICT_LEVEL=NONE
This is a problem. To address this, we need to change DatasetFramework to store the original properties along with dataset spec returned by configure()
. That is the only way we can reliably reproduce the properties with which a dataset was created. For existing datasets (created pre-3.4), we can only make a best effort: Read the spec and get the properties from the spec. If it is one of the known dataset types above, remove the additional properties before returning. Otherwise return these properties unmodified. Because most users would have used AbstractDataset to define their custom datasets, and AbstractDatasetDefinition leaves the properties unmodified, it is highly likely that this will return the correct set of properties. If a user has really implemented his own configure()
method that modifies the properties, hope is that it only adds new properties, and that it can accept creation properties that already contain those. As a last resort, the user can manually update the dataset properties through the REST API to the original properties he used to create the dataset.