...
Code Block |
---|
//-------------- CDAP API changes -------------- public interface ApplicationContext<T extends Config> { T getConfig(); } public interface Application<T extends Config> { void configure(ApplicationConfigurer configurer, ApplicationContext<T> context); } public abstract class AbstractApplication<T> implements Application<T extends Config> { ... protected final ApplicationContext<T> getContext() { return context; } } //-------------- Example Application -------------- public class MyApp extends AbstractApplication<MyApp.MyConfig> { public static class MyConfig extends Config { @Nullable @Description("The name of the stream to read from. Defaults to 'A'.") private String stream; @Nullable @Description("The name of the table to write to. Defaults to 'X'.") private String table; @Name("flow") private MyFlowConfig flowConfig; private MyConfig() { this.stream = "A"; this.table = "X"; } } public void configure() { // ApplicationContext now has a method to get a custom config object whose fields will // be injected using the values given in the RESTful API MyConfig config = getContext().getConfig(); addStream(new Stream(config.stream)); createDataset(config.table, Table.class); addFlow(new MyFlow(config.stream, config.table, config.flowConfig)); } } public class MyFlow implements Flow { @Property private String stream; @Property private String table; @Property private FlowConfig flowConfig; public static final FlowConfig extends Config { private ReaderConfig reader; private WriterConfig writer; } MyFlow(String stream, String table, FlowConfig flowConfig) { this.stream = stream; this.table = table; this.flowConfig = flowConfig; } @Override public FlowSpecification configure() { return FlowSpecification.Builder.with() .setName("MyFlow") .setDescription("Reads from a stream and writes to a table") .withFlowlets() .add("reader", new StreamReader(flowConfig.reader)) .add("writer", new TableWriter(flowConfig.writer)) .connect() .fromStream(stream).to("reader") .from("reader").to("writer") .build(); } } public class StreamReader extends AbstractFlowlet { private OutputEmitter<Put> emitter; @Property private ReaderConfig readerConfig; private Reader reader; public static class ReaderConfig extends Config { @Description("The name of the reader plugin to use.") String name; @Description("The properties needed by the chosen reader plugin.") @PluginType("reader") PluginProperties properties; } public static interface Reader { Put read(StreamEvent); } StreamReader(ReaderConfig readerConfig) { this.readerConfig = readerConfig; } @Override public FlowletSpecification configure() { // arguments are: type, name, id, properties usePlugin("reader", readerConfig.name, "streamReader", readerConfig.properties); } @Override public void initialize(FlowletContext context) throws Exception { reader = context.newPluginInstance("streamReader"); } @ProcessInput public void process(StreamEvent event) { emitter.emit(reader.read(event)); } } @Plugin(type = "reader") @Name("default") @Description("Default stream reader. ExpectsWrites timestamp and body as two columns and expects the row key to come as a header in the stream event.") public class DefaultStreamReader implements StreamReader.Reader { private DefaultConfig config; public static class DefaultConfig extends PluginConfig { private String rowkey; private DefaultConfig() { @Description("The header that should be used as the row key to write to. Defaults to 'rowkey'.") @Nullable rowkeyprivate =String "rowkey"; } } private DefaultConfig() { rowkey = "rowkey"; } } public Put read(StreamEvent event) { Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey))); put.add("timestamp", event.getTimestamp()); put.add("body", Bytes.toBytes(event.getBody())); return put; } } |
Use Case Walkthrough
1. Deploying an Artifact
...
Code Block |
---|
GET /namespaces/default/artifacts/myapp/versions/1.0.0 { "name": "purchase", "version": "3.1.0", "meta": { "created": "1234567890000", ... }, "classes": { "apps": [ { "className": "co.cask.cdap.examples.myapp.MyApp", "properties": { "stream": { "name": "stream", "description": "The name of the stream to read from. Defaults to 'A'.", "type": "string", "required": false }, "table": { "name": "table", "description": "The name of the table to write to. Defaults to 'X'.", "type": "string", "required": false, }, "flowConfig": { "name": "flow", "description": "", "type": "config", "fields": { "reader": { "name": "reader", "description": "", "type": "config", "required": true, "fields": { "name": { "name": "name", "description": "The name of the reader plugin to use.", "type": "string", "required": true }, "properties": { "name": "properties", "description": "The properties needed by the chosen reader plugin.", "type": "plugin", "plugintype": "reader", "required": true } } }, "writer": { ... } } } } } ], "flows": [ ... ], "pluginsflowlets": [ ... ], "plugins": [ { "name": "default", "type": "reader", "description": "Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.", "className": "co.cask.cdap.examples.myapp.plugins.DefaultStreamReader", "configFieldName": "config", "properties": { "rowkey": { "name": "rowkey", "description": "The header that should be used as the row key to write to. Defaults to 'rowkey'.", "type": "string", "required": false } } } ] } } |
Deploying an Application
...