Preview Mode
Goals
Checklist
Terminology
Pipeline Developer - Somebody who create pipelines through the UI or REST API. May be a business analyst, researcher, plugin developer, etc.
Stage - a node in the pipeline. Corresponds to a plugin instance.
Phase - one or more stages in the pipeline. Corresponds to a node in the physical workflow.
Validation - static checks for plugin configuration
Debug - actually running plugin code on example input
Use Cases
A pipeline developer is configuring a JavaScript transform and wants to make sure the logic is correct. The developer fills in all the configuration settings, provides some input, and wants to see the corresponding output, or any errors that happened.
A pipeline developer has configured several stages and connected them together. The developer wants to see the input and output (or errors) at each stage.
A pipeline developer is using the database plugin and does not know if the jdbc connection string given is correct. Syntax might wrong, or maybe the host or port or credentials are wrong, and the developer wants to check these things before publishing and running the pipeline.
User Stories
As a pipeline developer, I want to be able to see what a pipeline stage would output given the config for the stage and input to the stage
As a pipeline developer, I want to be able to debug a single stage by providing config and input to see the output
As a pipeline developer, I want to be able to see a message describing the cause of any errors encountered while debugging
As a pipeline developer, I want to be able to validate specific fields of a plugin, so that I can debug without publishing the pipeline
As a pipeline developer, I want to be able to manually provide the input records for my pipeline while debugging
Approach (WIP)
PREVIEW APPLICATION DEPLOYMENT:
Preview application runtime:
Applications also write to the user datasets.
Meeting notes from Friday, June 3 2016
In preview mode we write to the records to the preview dataset. If mapper dies for some reason then records written to the preview dataset will not get cleaned.
We should make clear to the user that in preview mode it is not advisable to write to the actual production sink.
When would the preview datasets be cleaned up? If we decide to clean it up after the publishing the pipeline, what if user decides not to publish the pipeline?
What if multiple users are doing preview on SDK? In order to isolate the writes to the preview dataset for the same pipeline from multiple user we can use some sort of session id. However ideally single SDK should not be shared by multiple users.
Think about how the platform will expose a way to view preview data.
Creating streams, datasets in preview?
For logs we will need different appender.
Pipeline preview may write to the actual user datasets used in pipeline as well as preview datasets. Should lineage information be written for the actual datasets and preview datasets?
Is it possible to use separate in-memory injector for the preview process, so that preview data will be stored in memory and will not affect the actual metadata? Is it possible to write the data to different directory while in preview mode? In that case how it will be able to access the datasets stored in the directory as configured by "local.data.dir"
How application communicate back to the UI something like preview collector interface?
REST endpoints to start the preview and preview data collections.
Design should be easy to extend for cluster mode and add more features such as schema propagation where we will simply call the pipeline.configure and return the data.
Action Items:
Try using separate injector for the preview mode.
Come up with the REST endpoints and mechanism for preview and preview data collection.
REST Endpoints:
To start the preview for an application:
Request Method and EndpointPOST /v3/namespaces/{namespace-id}/preview where namespace-id is the name of the namespace Response will contain the CDAP generated unique preview-id which can be used further to get the preview data.Request body will contain the application configuration along with few additional configs for the preview section.
{ "artifact":{ "name":"cdap-data-pipeline", "version":"3.5.0-SNAPSHOT", "scope":"SYSTEM" }, "name":"MyPipeline", "config":{ "connections":[ { "from":"FTP", "to":"CSVParser" }, { "from":"CSVParser", "to":"Table" } ], "stages":[ { "name":"FTP", "plugin":{ "name":"FTP", "type":"batchsource", "label":"FTP", "artifact":{ "name":"core-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "referenceName":"myfile", "path":"/tmp/myfile" } }, "outputSchema":"{\"fields\":[{\"name\":\"offset\",\"type\":\"long\"},{\"name\":\"body\",\"type\":\"string\"}]}" }, { "name":"MyCSVParser", "plugin":{ "name":"CSVParser", "type":"transform", "label":"CSVParser", "artifact":{ "name":"transform-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "format":"DEFAULT", "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}", "field":"body" } }, "outputSchema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}" }, { "name":"MyTable", "plugin":{ "name":"Table", "type":"batchsink", "label":"Table", "artifact":{ "name":"core-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}", "name":"mytable", "schema.row.field":"id" } }, "outputSchema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}", "inputSchema":[ { "name":"id", "type":"int", "nullable":false }, { "name":"name", "type":"string", "nullable":false } ] } ], "preview": { "startStages": ["MyCSVParser"], "endStages": ["MyTable"], "useSinks": ["MyTable"], "outputs": { "FTP": { "data": [ {"offset": 1, "body": "100,bob"}, {"offset": 2, "body": "200,rob"}, {"offset": 3, "body": "300,tom"} ], "schema": { "type" : "record", "fields": [ {"name":"offset","type":"long"}, {"name":"body","type":"string"} ] } } } } } }a. Simple pipeline
Consider simple pipeline represented by following connections. (FTP)-------->(CSV Parser)-------->(Table) CASE 1: To preview the entire pipeline: "preview": { "startStages": ["FTP"], "endStages": ["Table"], "useSinks": ["Table"], "numRecords": 10, "programName": "SmartWorkflow", // The program to be previewed "programType": "workflow" // The program type to be previewed } CASE 2: To preview section of the pipeline: (CSV Parser)-------->(Table) "preview": { "startStages": ["CSVParser"], "endStages": ["Table"], "useSinks": ["Table"], "outputs": { "FTP": { "data": [ {"offset": 1, "body": "100,bob"}, {"offset": 2, "body": "200,rob"}, {"offset": 3, "body": "300,tom"} ], "schema": { "type" : "record", "fields": [ {"name":"offset","type":"long"}, {"name":"body","type":"string"} ] } } } } CASE 3: To preview only single stage (CSV Parser) in the pipeline: "preview": { "startStages": ["CSV Parser"], "endStages": ["CSV Parser"], "outputs": { "FTP": { "data": [ {"offset": 1, "body": "100,bob"}, {"offset": 2, "body": "200,rob"}, {"offset": 3, "body": "300,tom"} ], "schema": { "type" : "record", "fields": [ {"name":"offset","type":"long"}, {"name":"body","type":"string"} ] } } } } CASE 4: To verify if records are read correctly from FTP: "preview": { "startStages": ["FTP"], "endStages": ["FTP"], "numOfRecords": 10 } CASE 5: To verify the data is getting written to Table properly: "preview": { "startStages": ["Table"], "endStages": ["Table"], "useSinks": ["Table"], "outputs": { "CSV Parser": { "data": [ {"id": 1, "name": "bob"}, {"id": 2, "name": "rob"}, {"id": 3, "name": "tom"} ], "schema": { "type" : "record", "fields": [ {"name":"id","type":"long"}, {"name":"name","type":"string"} ] } } } }b. Fork in the pipeline (multiple sinks)
Consider the following pipeline: (S3 Source) --------->(Log Parser)--------->(Group By Aggregator)--------->(Python Evaluator)--------->(Aggregated Result) | | --------->(Javascript Transform)--------->(Raw Data) CASE 1: To preview entire pipeline "preview": { "startStages": ["S3 Source"], "endStages": ["Aggregated Result", "Raw Data"], "useSinks": ["Aggregated Result", "Raw Data"], // useSinks seem redundant as endStages is there which can control till what point the pipeline need to run "numOfRecords": 10 } CASE 2: To mock the source "preview": { "startStages": ["Log Parser", "Javascript Transform"], "endStages": ["Aggregated Result", "Raw Data"], "useSinks": ["Aggregated Result", "Raw Data"], "outputs": { "S3 Source": { "data": [ "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326" ], "schema": { "type" : "record", "fields": [ {"name":"log_line","type":"string"} ] } } } } CASE 3: To preview the section of the pipeline (Log Parser)--------->(Group By Aggregator)--------->(Python Evaluator) "preview": { "startStages": ["Log Parser"], "endStages": ["Aggregated Result"], "useSinks": ["Aggregated Result"], "outputs": { "S3 Source": { "data": [ "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326" ], "schema": { "type" : "record", "fields": [ {"name":"log_line","type":"string"} ] } } } } CASE 4: To preview the single stage Python Evaluator "preview": { "startStages": ["Python Evaluator"], "endStages": ["Python Evaluator"], "outputs": { "Group By Aggregator": { "data": [ {"ip":"127.0.0.1", "counts":3}, {"ip":"127.0.0.2", "counts":4}, {"ip":"127.0.0.3", "counts":5}, {"ip":"127.0.0.4", "counts":6}, ], "schema": { "type" : "record", "fields": [ {"name":"ip","type":"string"}, {"name":"counts","type":"long"} ] } } } }c. Join in the pipeline (multiple sources)
Consider the following pipeline: (Database)--------->(Python Evaluator)---------> | |------------>(Join)-------->(Projection)------->(HBase Sink) | (FTP)--------->(CSV Parser)---------> CASE 1: To preview entire pipeline "preview": { "startStages": ["Database", "FTP"], "endStages": ["HBase Sink"], "useSinks": ["HBase Sink"], "numOfRecords": 10 } CASE 2: To mock both sources "preview": { "startStages": ["Python Evaluator", "CSV Parser"], "endStages": ["HBase Sink"], "useSinks": ["HBase Sink"], "outputs": { "Database": { "data": [ {"name":"tom", "counts":3}, {"name":"bob", "counts":4}, {"name":"rob", "counts":5}, {"name":"milo", "counts":6} ], "schema": { "type" : "record", "fields": [ {"name":"name","type":"string"}, {"name":"counts","type":"long"} ] } }, "FTP": { "data": [ {"offset":1, "body":"tom,100"}, {"offset":2, "body":"bob,200"}, {"offset":3, "body":"rob,300"}, {"offset":4, "body":"milo,400"} ], "schema": { "fields": [ {"name":"name","type":"string"}, {"name":"offset","type":"long"} ] } } } } CASE 3: To preview JOIN transform only "preview": { "startStages": ["JOIN"], "endStages": ["JOIN"], "outputs": { "Python Evaluator": { "data": [ {"name":"tom", "counts":3}, {"name":"bob", "counts":4}, {"name":"rob", "counts":5}, {"name":"milo", "counts":6} ], "schema": { "type" : "record", "fields": [ {"name":"name","type":"string"}, {"name":"counts","type":"long"} ] } }, "CSV Parser": { "data": [ {"offset":1, "body":"tom,100"}, {"offset":2, "body":"bob,200"}, {"offset":3, "body":"rob,300"}, {"offset":4, "body":"milo,400"} ], "schema": { "fields": [ {"name":"name","type":"string"}, {"name":"offset","type":"long"} ] } } } }d. Preview for a single stage (TBD)
Consider the pipeline containing only one transform which has no connections yet- (Javascript Transform) The preview configurations can be provided as "preview": { "startStages": ["Javascript Transform"], "endStages": ["Javascript Transform"], "outputs": { "MockSource": { "data": [ {"name":"tom", "id":3}, {"name":"bob", "id":4}, {"name":"rob", "id":5}, {"name":"milo", "id":6} ], "schema": { "type" : "record", "fields": [ {"name":"name","type":"string"}, {"name":"id","type":"long"} ] } } } }How to specify the input data: User can specify the input data for preview by inserting the data directly in table format in UI or can upload a file containing the records.
When the data is inserted in Table format, UI will convert the data into appropriate JSON records.
When user decides to upload a file, he can upload the JSON file conforming to the schema of the next stage. Ideally we should allow uploading the CSV file as well, however how to interpret the data will be plugin dependent. For example consider the list of CSV records. Now for CSVParser plugin, the entire record will be treated as body however for Table sink, we will have to split the record based on comma to create multiple fields as specified by the next stage's input schema.Once the preview is started, the unique preview-id will be generated for it. The runtime information (<preview-id, STATUS) for the preview will be generated and will be stored (in-memory or disk).
Once the preview execution is complete, its runtime information will be updated with the status of the preview (COMPLETED or FAILED).
To get the status of the preview
Request Method and EndpointGET /v3/namespaces/{namespace-id}/previews/{preview-id}/status where namespace-id is the name of the namespace preview-id is the id of the preview for which status is to be requestedResponse body will contain JSON encoded preview status and optional message if the preview failed.
1. If preview is RUNNING { "status": "RUNNING" } 2. If preview is COMPLETED { "status": "COMPLETED" } 3. If preview application deployment FAILED { "status": "DEPLOY_FAILED", "failureMessage": "Exception message explaining the failure" } 4. If preview application FAILED during execution of the stages { "status": "RUNTIME_FAILED", "failureMessage": "Failure message" /* "stages": { [ "stage_1": { "numOfInputRecords": 10, "numOfOutputRecords": 10 }, "stage_2": { "numOfInputRecords": 10, "numOfOutputRecords": 7 }, "stage_3": { "numOfInputRecords": 7, "numOfOutputRecords": 4, "errorMessage": "Failure reason for the stage" } ] } */ }To get the preview data for stage:
Request Method and EndpointGET /v3/namespaces/{namespace-id}/previews/{preview-id}/stages/{stage-id} where namespace-id is the name of the namespace preview-id is the id of the preview for which data is to be requested stage-id is the unique name used to identify the emitterResponse body will contain JSON encoded input data and output data for the emitter as well as input and output schema.
{ "inputData": [ {"first_name": "rob", "zipcode": 95131}, {"first_name": "bob", "zipcode": 95054}, {"first_name": "tom", "zipcode": 94306} ], "outputData":[ {"name": "rob", "zipcode": 95131, "age": 21}, {"name": "bob", "zipcode": 95054, "age": 22}, {"name": "tom", "zipcode": 94306, "age": 23} ], "inputSchema": { "type":"record", "name":"etlSchemaBody", "fields":[ {"name":"first_name", "type":"string"}, {"name":"zipcode", "type":"int"} ] }, "outputSchema": { "type":"record", "name":"etlSchemaBody", "fields":[ {"name":"name", "type":"string"}, {"name":"zipcode", "type":"int"}, {"name":"age", "type":"int"} ] }, "errorRecordSchema": { "type":"record", "name":"schemaBody", "fields":[ {"name":"errCode", "type":"int"}, {"name":"errMsg", "type":"String"}, {"name":"invalidRecord", "type":"String"} ] }, "errorRecords": [ { "errCode":12, "errMsg":"Invalid record", "invalidRecord":"{\"offset\":3,\"body\":\"This error record is not comma separated!\"}" } ], // Logs per stage may not make sense will need to think more about it. "logs" : { "stage level logs" } }To get the logs/metrics for the preview:
Request Method and EndpointGET /v3/namespaces/{namespace-id}/previews/{preview-id}/logs where namespace-id is the name of the namespace preview-id is the id of the preview for which data is to be requested logs end point return the entire preview logs. Sample response for the logs endpoint. Note that it is similar to the regular app. [ { "log":"This is sample log - 0", "offset":"0.1466470037680" }, { "log":"This is sample log - 1", "offset":"1.1466470037680" }, { "log":"This is sample log - 2", "offset":"2.1466470037680" }, { "log":"This is sample log - 3", "offset":"3.1466470037680" }, { "log":"This is sample log - 4", "offset":"4.1466470037680" } ] GET /v3/namespaces/{namespace-id}/previews/{preview-id}/metrics where namespace-id is the name of the namespace preview-id is the id of the preview for which data is to be requested Sample response for the metrics. Note that it is similar to the regular app. { "endTime": 1466469538, "resolution": "2147483647s", "series": [ { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Projection.records.out" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Projection.records.in" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Stream.records.out" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.JavaScript.records.in" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Stream.records.in" } ], "startTime": 0 }Response would be similar to the regular app.
Open Questions:
How to make it easy for the user to upload the CSV file?
Lookup data is the user dataset. Should we allow mocking of the look up dataset as well?
Created in 2020 by Google Inc.