Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
//-------------- CDAP API changes --------------
public interface ApplicationContext<T extends Config> {
  T getConfig();
}

public interface Application<T extends Config> {
  void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);
}
 
public abstract class AbstractApplication<T> implements Application<T extends Config> {
  ...
  protected final ApplicationContext<T> getContext() { return context; }
}
 
//-------------- Example Application --------------
public class MyApp extends AbstractApplication<MyApp.MyConfig> {
 
  public static class MyConfig extends Config {
    @Nullable
    @Description("The name of the stream to read from. Defaults to 'A'.")
    private String stream;
 
    @Nullable
    @Description("The name of the table to write to. Defaults to 'X'.")
    private String table;
 
    @Name("flow")
    private MyFlowConfig flowConfig;
 
    private MyConfig() {
      this.stream = "A";
      this.table = "X";
    }
  }
 
  public void configure() {
    // ApplicationContext now has a method to get a custom config object whose fields will
    // be injected using the values given in the RESTful API
    MyConfig config = getContext().getConfig();
    addStream(new Stream(config.stream));
    createDataset(config.table, Table.class);
    addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
  }
}
 
public class MyFlow implements Flow {
  @Property
  private String stream;
  @Property
  private String table;
  @Property
  private FlowConfig flowConfig;
 
  public static final FlowConfig extends Config {
    private ReaderConfig reader;
    private WriterConfig writer;
  }
 
  MyFlow(String stream, String table, FlowConfig flowConfig) {
    this.stream = stream;
    this.table = table;
    this.flowConfig = flowConfig;
  }
 
  @Override
  public FlowSpecification configure() {
    return FlowSpecification.Builder.with()
      .setName("MyFlow")
      .setDescription("Reads from a stream and writes to a table")
      .withFlowlets()
        .add("reader", new StreamReader(flowConfig.reader))
        .add("writer", new TableWriter(flowConfig.writer))
      .connect()
        .fromStream(stream).to("reader")
        .from("reader").to("writer")
      .build();
  }
} 
 
public class StreamReader extends AbstractFlowlet {
  private OutputEmitter<Put> emitter;
  @Property
  private ReaderConfig readerConfig;
  private Reader reader; 

  public static class ReaderConfig extends Config {
    @Description("The name of the reader plugin to use.")
    String name; 

    @Description("The properties needed by the chosen reader plugin.")
    @PluginType("reader")
    PluginProperties properties;
  }

  public static interface Reader {
    Put read(StreamEvent);
  }
 
  StreamReader(ReaderConfig readerConfig) {
    this.readerConfig = readerConfig;
  }
 
  @Override
  public FlowletSpecification configure() {
    // arguments are: type, name, id, properties
    usePlugin("reader", readerConfig.name, "streamReader", readerConfig.properties);
  }

  @Override
  public void initialize(FlowletContext context) throws Exception {
    reader = context.newPluginInstance("streamReader");
  }
  @ProcessInput
  public void process(StreamEvent event) {
    emitter.emit(reader.read(event));
  }
}
 
@Plugin(type = "reader")
@Name("default")
@Description("Default stream reader. ExpectsWrites timestamp and body as two columns and expects the row key to come as a header in the stream event.")
public class DefaultStreamReader implements StreamReader.Reader {
  private DefaultConfig config;
 
  public static class DefaultConfig extends PluginConfig {
     private String rowkey;
    
    private DefaultConfig() {
 @Description("The header that should be used as the row key to write to. Defaults to 'rowkey'.")
    @Nullable
    rowkeyprivate =String "rowkey";
    }
  }   private DefaultConfig() {
      rowkey = "rowkey";
    }
  }
 
  public Put read(StreamEvent event) {
    Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
    put.add("timestamp", event.getTimestamp());
    put.add("body", Bytes.toBytes(event.getBody()));
    return put;
  }
}

 

Use Case Walkthrough

1. Deploying an Artifact

...

Code Block
GET /namespaces/default/artifacts/myapp/versions/1.0.0
 
{
  "name": "purchase",
  "version": "3.1.0",
  "meta": {
    "created": "1234567890000",
    ...
  },
  "classes": {
    "apps": [
      {
        "className": "co.cask.cdap.examples.myapp.MyApp",
        "properties": {
          "stream": { 
            "name": "stream", 
            "description": "The name of the stream to read from. Defaults to 'A'.", 
            "type": "string", 
            "required": false 
          },
          "table": {
            "name": "table",
            "description": "The name of the table to write to. Defaults to 'X'.",
            "type": "string",
            "required": false,
          },
          "flowConfig": {
            "name": "flow",
            "description": "",
            "type": "config",
            "fields": {
              "reader": {
                "name": "reader",
                "description": "",
                "type": "config",
                "required": true,
                "fields": {
                  "name": {
                    "name": "name",
                    "description": "The name of the reader plugin to use.",
                    "type": "string",
                    "required": true
                  },
                  "properties": {
                    "name": "properties",
                    "description": "The properties needed by the chosen reader plugin.",
                    "type": "plugin",
                    "plugintype": "reader",
                    "required": true
                  }
                }
              },
              "writer": { ... }
            }
          }
        }
      }
    ],
    "flows": [ ... ],
    "pluginsflowlets": [ ... ],
    "plugins": [
      {
        "name": "default",
        "type": "reader",
        "description": "Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.",
        "className": "co.cask.cdap.examples.myapp.plugins.DefaultStreamReader",
        "configFieldName": "config",
        "properties": {
          "rowkey": {
            "name": "rowkey",
            "description": "The header that should be used as the row key to write to. Defaults to 'rowkey'.",
            "type": "string",
            "required": false
          }
        }
      }
    ]
  }
}

 

Deploying an Application

...