...
The purpose of this page is to illustrate the plan for ApplicationTemplate and Application consolidation. This work is being tracked in
Jira Legacy | ||||||
---|---|---|---|---|---|---|
|
Motivation
Why do we want to consolidate templates and applications? In CDAP 3.0, an ApplicationTemplate is a way for somebody to write an Application that can be given some configuration to create an Adapter. The story is confusing; one would expect an ApplicationTemplate to create... Applications. Instead, we use the term Adapter because Application means something else already. In addition an ApplicationTemplate can only include a single workflow or a single worker, giving people different experiences for templates and applications.
Really, the goal of templates was to be able to write one piece of Application code that could be used to create multiple Applications. To do this requires that an Application can be configured at creation time instead of at compile time. For example, a user should be able to set the name of their dataset based on configuration instead of hardcoding it in the code. To support this, we plan on making it possible to get a configuration object from the ApplicationContext available in Application's configure() method. This allows somebody to pass in a config when creating an Application through the RESTful API, which can be used to configure an Application. The relevant programmatic API changes are shown below.
...
Definitions
Artifact - A jar file containing classes that can be used by CDAP.
Application Class - A java class that implements the CDAP Application interface. Deployed by bundling it in an artifact.
Application Config - Configuration given to CDAP to create an Application (can be empty).
Application - An instantiation of an Application Class, created by passing an Application Config to an Application Class
Plugin - An extension to an Application Class. Usually implements an interface used by the Application Class
Use Case Walkthrough
1. Create an Application that uses config
A developer writes a configurable Application Class that uses a Flow to read from a stream and write to a Table
Code Block |
---|
public class MyApp extends AbstractApplication<MyApp.MyConfig> { void configure(ApplicationConfigurer configurer,public ApplicationContext<T>static context);class }MyConfig extends publicConfig abstract{ class AbstractApplication<T> implements Application<T extends@Nullable Config> { ... protected final ApplicationContext<T> getContext() { return context; } } |
Use Case Walkthrough
We will use this example to walk through some use cases.
...
@Description("The name of the stream to read from. Defaults to 'A'.") private String stream; @Nullable @Description("The name of the |
...
|
...
table to write to. Defaults to 'X'.")
private String table;
|
...
private MyConfig() {
this.stream = "A";
this.table = "X";
}
}
public void configure() {
// ApplicationContext now has a method to get a custom config object whose fields will
// be injected using the values given in the RESTful API
MyConfig config = getContext().getConfig();
addStream(new Stream(config.stream));
createDataset(config.table, Table.class);
addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
}
}
public class MyFlow implements Flow {
@Property
private String stream;
@Property
private String table;
|
...
MyFlow(String stream, String table |
...
) {
this.stream = stream;
this.table = table;
this.flowConfig = flowConfig;
}
@Override
public FlowSpecification configure() {
return FlowSpecification.Builder.with()
.setName("MyFlow")
.setDescription("Reads from a stream and writes to a table")
.withFlowlets()
.add("reader", new StreamReader( |
...
))
|
...
. |
...
connect()
.fromStream(stream).to("reader")
|
...
.build(); } } |
...
public class StreamReader extends AbstractFlowlet { |
...
@Property |
...
|
...
private String |
...
tableName; private |
...
Table |
...
table; |
...
...
|
...
StreamReader(String tableName) { |
...
|
...
|
...
this.tableName |
...
= |
...
tableName; |
...
|
...
} |
...
|
...
@Override |
...
|
...
|
...
public |
...
void |
...
initialize(FlowletContext context) throws Exception { |
...
|
...
table |
...
= context.getDataset(tableName); } @ProcessInput public void process(StreamEvent event) { |
...
|
...
Put put = |
...
new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey))); |
...
|
...
put.add("timestamp", event.getTimestamp()); put.add("body", Bytes.toBytes(event.getBody())); |
...
|
...
table.put(put);
}
}
public class TableWriter extends AbstractFlowlet {
private
private OutputEmitter<Put> emitter;
@ProcessInput
public void process(StreamEvent event) {
Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
put.add("timestamp", event.getTimestamp());
put.add("body", Bytes.toBytes(event.getBody()));
emitter.emit(put);
}
} |
2. Create an Application that uses config and plugins
Code Block |
---|
public class MyApp extends AbstractApplication<MyApp.MyConfig> { public static class MyConfig extends Config { @Nullable @Description("The name of the stream to read from. Defaults to 'A'.") private String stream; @Nullable @Description("The name of the table to write to. Defaults to 'rowkeyX'.") @Nullable private String rowkeytable; @Name("flow") private DefaultConfig MyFlowConfig flowConfig; private MyConfig() { this.stream = "A"; rowkey this.table = "rowkeyX"; } } public Putvoid readconfigure(StreamEvent event) { // ApplicationContext Putnow has puta =method new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey))); put.add("timestamp", event.getTimestamp())to get a custom config object whose fields will // be injected using the values given in the RESTful API MyConfig config = getContext().getConfig(); put.add("body", Bytes.toBytes(event.getBody()))addStream(new Stream(config.stream)); return putcreateDataset(config.table, Table.class); } } |
1. Deploying an Artifact
A development team creates a project built on top of CDAP. Their CI build runs and produces a jar file. An administrator deploys the jar by making a REST call:
Code Block |
---|
POST /namespaces/default/artifacts/myapp --data-binary @myapp-1.0.0.jar |
CDAP opens the jar, figures out the artifact version based on the the bundle-version in the manifest, figures out what apps, programs, datasets, and plugins are in the artifact, then stores the artifact on the filesystem and metadata in a table.
The administrator can examine the metadata by making a call:
...
addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
}
}
public class MyFlow implements Flow {
@Property
private String stream;
@Property
private String table;
@Property
private FlowConfig flowConfig;
public static final FlowConfig extends Config {
private ReaderConfig reader;
private WriterConfig writer;
}
MyFlow(String stream, String table, FlowConfig flowConfig) {
this.stream = stream;
this.table = table;
this.flowConfig = flowConfig;
}
@Override
public FlowSpecification configure() {
return FlowSpecification.Builder.with()
.setName("MyFlow")
.setDescription("Reads from a stream and writes to a table")
.withFlowlets()
.add("reader", new StreamReader(flowConfig.reader))
.add("writer", new TableWriter(flowConfig.writer))
.connect()
.fromStream(stream).to("reader")
.from("reader").to("writer")
.build();
}
}
public class StreamReader extends AbstractFlowlet {
private OutputEmitter<Put> emitter;
@Property
private ReaderConfig readerConfig;
private Reader reader;
public static class ReaderConfig extends Config {
@Description("The name of the reader plugin to use.")
String name;
@Description("The properties needed by the chosen reader plugin.")
@PluginType("reader")
PluginProperties properties;
}
public static interface Reader {
Put read(StreamEvent);
}
StreamReader(ReaderConfig readerConfig) {
this.readerConfig = readerConfig;
}
@Override
public FlowletSpecification configure() {
// arguments are: type, name, id, properties
usePlugin("reader", readerConfig.name, "streamReader", readerConfig.properties);
}
@Override
public void initialize(FlowletContext context) throws Exception {
reader = context.newPluginInstance("streamReader");
}
@ProcessInput
public void process(StreamEvent event) {
emitter.emit(reader.read(event));
}
}
@Plugin(type = "reader")
@Name("default")
@Description("Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.")
public class DefaultStreamReader implements StreamReader.Reader {
private DefaultConfig config;
public static class DefaultConfig extends PluginConfig {
@Description("The header that should be used as the row key to write to. Defaults to 'rowkey'.")
@Nullable
private String rowkey;
private DefaultConfig() {
rowkey = "rowkey";
}
}
public Put read(StreamEvent event) {
Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
put.add("timestamp", event.getTimestamp());
put.add("body", Bytes.toBytes(event.getBody()));
return put;
}
} |
1. Deploying an Artifact
A development team creates a project built on top of CDAP. Their CI build runs and produces a jar file. An administrator deploys the jar by making a REST call:
Code Block |
---|
POST /namespaces/default/artifacts/myapp --data-binary @myapp-1.0.0.jar |
CDAP opens the jar, figures out the artifact version based on the the bundle-version in the manifest, figures out what apps, programs, datasets, and plugins are in the artifact, then stores the artifact on the filesystem and metadata in a table.
The administrator can examine the metadata by making a call:
Code Block |
---|
GET /namespaces/default/artifacts/myapp/versions/1.0.0 { "name": "myapp", "version": "1.0.0", "classes": { "apps": [ { "className": "co.cask.cdap.examples.myapp.MyApp", "properties": { "stream": { "name": "stream", "description": "The name of the stream to read from. Defaults to 'A'.", "type": "string", "required": false "properties": { }, "streamtable": { "name": "streamtable", "description": "The name of the streamtable to readwrite fromto. Defaults to 'AX'.", "type": "string", "required": false false, }, "tableflowConfig": { "name": "tableflow", "description": ""The, name of the table to write to. Defaults to 'X'."type": "config", "typefields": "string",{ "requiredreader": false,{ }, "name": "reader", "flowConfig": { "namedescription": "flow", "descriptiontype": "config", "typerequired": "config"true, "fields": { "readername": { "name": "readername", "description": "The name of the reader plugin to use.", "type": "configstring", "required": true, "fields": { }, "nameproperties": { "name": "nameproperties", "description": "The properties nameneeded ofby the chosen reader plugin to use..", "type": "plugin", "typeplugintype": "stringreader", "required": true }, } "properties": { }, "namewriter": "properties", { ... } } "description": "The properties needed} by the chosen reader plugin.", } } ], "typeplugins": "plugin",[ { "plugintypename": "readerdefault", "required": true type": "reader", "description": "Writes timestamp and body as }two columns and expects the row key to come as a header in the stream event.", } "className": "co.cask.cdap.examples.myapp.plugins.DefaultStreamReader", }, "properties": { "writerrowkey": { ... } "name": "rowkey", } "description": "The header }that should be used as the row key to }write to. Defaults to 'rowkey'.", } ], "pluginstype": ["string", { "required": false "name": "default", } "type": "reader", } "description": "Writes timestamp and} body as two columns and], expects the row key to come as a header in the stream event.", "flows": [ ... ], "flowlets": [ ... ], "classNamedatasetModules": "co[ ..cask.cdap.examples.myapp.plugins.DefaultStreamReader", "properties": { . ] } } |
Reverse indices will be maintained to allow querying the classes in artifacts directly:
Code Block |
---|
GET /namespaces/default/artifacts/myapp/versions/1.0.0/extensions [ "reader" ] GET /namespaces/default/artifacts/myapp/versions/1.0.0/extensions/reader [ { "rowkeytype": {"reader", "name": "rowkeydefault", "description": "The header that should be used as the row key to write to. Defaults to 'rowkey'.", "type": "string", Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.", "className": "required": falseco.cask.cdap.examples.myapp.plugins.DefaultStreamReader" "extendsArtifact": { } "name": "myapp", } "versions": "[1.0.0, 2.0.0)" } ], "flowspluginArtifact": [{ ... ], "flowletsname": [ ... ],"readers", "datasetModulesversion": [ "0.6..0" ] } } |
Reverse indices will be mained to allow querying the classes in artifacts directly:
Code Block |
---|
GET /namespaces/default/classes/plugintypes [ "reader"} ] GET /namespaces/default/classes/plugintypes/reader [ artifacts/myapp/versions/1.0.0/extensions/reader/plugins/default { "typename": "readerdefault", "nametype": "defaultreader", "description": "Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.", "className": "co.cask.cdap.examples.myapp.plugins.DefaultStreamReader", "properties": { "rowkey": { "artifact": {name": "rowkey", "description": "The header that should be used as the row key to write to. Defaults to 'rowkey'.", "namespacetype": "defaultstring", "namerequired": "myapp", false "version": "1.0.0"} } } ]} GET /namespaces/default/classes/appsappClasses [ { "className": "co.cask.cdap.examples.myapp.MyApp", "description": "", "artifact": { "namespace": "default", "name": "myapp", "version": "1.0.0" } } ] |
...
Code Block |
---|
GET /namespaces/default/classes/plugintypesartifacts/myapp/versions/1.0.0/extensions/reader [ { "type": "reader", "name": "default", "description": "Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.", "className": "co.cask.cdap.examples.myapp.plugins.DefaultStreamReader" "artifact": { "namespace": "default", "name": "myapp", "version": "1.0.0" } } ] |
...
Code Block |
---|
GET /namespaces/default/classes/plugintypes/artifacts/myapp/versions/1.0.0/extensions/reader/plugins/default [ { "type": "reader", "name": "default", "description": "Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.", "className": "co.cask.cdap.examples.myapp.plugins.DefaultStreamReader", "properties": { "rowkey": { "name": "rowkey", "description": "The header that should be used as the row key to write to. Defaults to 'rowkey'.", "type": "string", "required": false } }, "artifact": { "namespace": "default", "name": "myapp", "version": "1.0.0" } } ] |
...
System artifacts are special artifacts that can be accessed in other namespaces. They cannot be deployed through the RESTful API unless a conf setting is set. Instead, they are placed in a directory on the CDAP master host. When CDAP starts up, the directory will be scanned and those artifacts will be added to the system. Example uses for system artifacts are the ETLBatch and ETLRealtime applications that we want to include out of the box.
...
The programmatic API changes are all backwards compatible, so existing apps will not need to be recompiled. They will, however, need to be added to the artifact repository as part of the upgrade tool (or force people to redeploy their existing apps).
Any existing adapters will need to be migrated. Ideally, the upgrade tool will create matching applications based on the adapter conf, but at a minimum we will simply delete existing adapters and templates.
...