Spark Sink
This is a sub-task (single user story) of Cask Hydrator++.
JIRA:
Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.
User Story:
User should be able to run a spark job on the output of an transform stage in ETL batch scenario.
API Design:
Below is the SparkSink class that users can implement to define their SparkSink plugin.
Because it extends BatchConfigurable, users will be able to override:
- configurePipeline - in order to add datasets and streams needed
- prepareRun - configure the job before starting the run
- onRunFinish - perform any end of run logic
The SparkSink also exposes a run method, which gives access to an RDD representing data from previous stages of the pipeline.
- run - perform the core of the computation. User will be responsible for persisting the output as he wishes (save the model to the path of a FileSet, for instance).
SparkSink
/**
* SparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it
* can also perform RDD operations on the key value pairs provided by the Batch run.
*
* {@link SparkSink#run} method is called inside the Batch Run while {@link SparkSink#prepareRun} and
* {@link SparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the
* Batch run starts and after it finishes respectively.
*
* @param <IN> The type of input record to the SparkSink.
*/
@Beta
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> implements Serializable {
public static final String PLUGIN_TYPE = "sparksink";
private static final long serialVersionUID = -8600555200583639593L;
/**
* User Spark job which will be executed
*
* @param context {@link SparkPluginContext} for this job
* @param input the input from previous stages of the Batch run.
*/
public abstract void run(SparkPluginContext context, JavaRDD<IN> input) throws Exception;
}
Users will have access to a SparkPluginContext which exposes functionality that would be available to a regular Spark program via SparkContext, except it excludes the following methods.:
- getMetrics
- getWorkflowToken
- getTaskLocalizationContext
- getSpecification
- getServiceDiscoverer
- setExecutorResources
SparkPluginContext
/**
* Context passed to spark plugins.
*/
public interface SparkPluginContext extends BatchContext {
/**
* Returns the logical start time of this Spark job. Logical start time is the time when this Spark
* job is supposed to start if this job is started by the scheduler. Otherwise it would be the current time when the
* job runs.
*
* @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
*/
long getLogicalStartTime();
/**
* Create a Spark RDD that uses {@link Dataset} as input source
*
* @param datasetName the name of the {@link Dataset} to be read as an RDD
* @param kClass the key class
* @param vClass the value class
* @param <T> type of RDD
* @return the RDD created from Dataset
* @throws UnsupportedOperationException if the SparkContext is not yet initialized
*/
<T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass);
/**
* Create a Spark RDD that uses {@link Dataset} instantiated using the provided arguments as an input source.
*
* @param datasetName the name of the {@link Dataset} to be read as an RDD
* @param kClass the key class
* @param vClass the value class
* @param datasetArgs arguments for the dataset
* @param <T> type of RDD
* @return the RDD created from Dataset
* @throws UnsupportedOperationException if the SparkContext is not yet initialized
*/
<T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs);
/**
* Writes a Spark RDD to {@link Dataset}
*
* @param rdd the rdd to be stored
* @param datasetName the name of the {@link Dataset} where the RDD should be stored
* @param kClass the key class
* @param vClass the value class
* @param <T> type of RDD
* @throws UnsupportedOperationException if the SparkContext is not yet initialized
*/
<T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass);
/**
* Writes a Spark RDD to {@link Dataset} instantiated using the provided arguments.
*
* @param rdd the rdd to be stored
* @param datasetName the name of the {@link Dataset} where the RDD should be stored
* @param kClass the key class
* @param vClass the value class
* @param datasetArgs arguments for the dataset
* @param <T> type of RDD
* @throws UnsupportedOperationException if the SparkContext is not yet initialized
*/
<T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs);
/**
* Create a Spark RDD that uses complete {@link Stream} as input source
*
* @param streamName the name of the {@link Stream} to be read as an RDD
* @param vClass the value class
* @param <T> type of RDD
* @return the RDD created from {@link Stream}
*/
<T> T readFromStream(String streamName, Class<?> vClass);
/**
* Create a Spark RDD that uses {@link Stream} as input source
*
* @param streamName the name of the {@link Stream} to be read as an RDD
* @param vClass the value class
* @param startTime the starting time of the stream to be read in milliseconds. To read from the starting of the
* stream set this to 0
* @param endTime the ending time of the streams to be read in milliseconds. To read up to the end of the stream
* set this to Long.MAX_VALUE
* @param <T> type of RDD
* @return the RDD created from {@link Stream}
*/
<T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime);
/**
* Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamEventDecoder}
*
* @param streamName the name of the {@link Stream} to be read as an RDD
* @param vClass the value class
* @param startTime the starting time of the stream to be read in milliseconds. To read from the starting of the
* stream set this to 0
* @param endTime the ending time of the streams to be read in milliseconds. To read up to the end of the stream
* set this to Long.MAX_VALUE
* @param decoderType the decoder to use while reading streams
* @param <T> type of RDD
* @return the RDD created from {@link Stream}
*/
<T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime,
Class<? extends StreamEventDecoder> decoderType);
/**
* Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamBatchReadable}.
*
* @param stream a {@link StreamBatchReadable} containing information on the stream to read from
* @param vClass the value class
* @return the RDD created from {@link Stream}
*/
<T> T readFromStream(StreamBatchReadable stream, Class<?> vClass);
/**
* Returns
* <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaSparkContext">
* JavaSparkContext</a> or
* <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext">SparkContext</a>
* depending on user's job type.
*
* @param <T> the type of Spark Context
* @return the Spark Context
*/
<T> T getOriginalSparkContext();
/**
* Returns a {@link Serializable} {@link PluginContext} which can be used to request for plugins instances. The
* instance returned can also be used in Spark program's closures.
*
* @return A {@link Serializable} {@link PluginContext}.
*/
PluginContext getPluginContext();
/**
* Sets a
* <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a>
* to be used for the Spark execution. Only configurations set inside the
* {@link SparkSink#prepareRun} call will affect the Spark execution.
*
* @param <T> the SparkConf type
*/
<T> void setSparkConf(T sparkConf);
}
Implementation Summary:
- SmartWorkflow will check if there are any plugins of type SparkSink. If there are, it will choose to execute a ETLSpark instead of an ETLMapReduce for that phase.
- Because SparkSink extends BatchConfigurable, its prepareRun and onFinish will automatically be handled by ETLSpark, though we will need to special case for the type of context passed in.
- There will be special casing for the SparkSink plugin type in ETLSparkProgram to transform the RDD outputted by the TransformExecutor and call the user plugin's run function.
, multiple selections available,
Created in 2020 by Google Inc.