Info |
---|
The Spark sink in Scala plugin is available in the Hub. |
...
This plugin can be used when you want to have complete control on the Spark computation. For example, you may want to join the input RDD with another Dataset and select a subset of the join result using Spark SQL before writing the results out to files in parquet format.
Configuration
Property | Macro Enabled? | Description |
---|
Scala | Yes | Required. Spark code in Scala defining how to transform RDD to RDD. The code must implement a function called sink , whose signature should be one of: Code Block |
---|
def sink(df: DataFrame) : DataFrame
def sink(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame |
The input DataFrame has the same schema as the input schema to this stage. Using the SparkExecutionPluginContext , you can access CDAP entities such as Datasets, as well as providing access to the underlying SparkContext in use. Operating on lower level RDD is also possible by using the one of the following forms of the sink method: Code Block |
---|
def sink(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord]
def sink(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord] |
For example: Code Block |
---|
def sink(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : Unit = {
val outputSchema = context.getOutputSchema
rdd
.flatMap(_.get[String]("body").split("\\s+"))
.map(s => (s, 1))
.reduceByKey(_ + _)
.saveAsTextFile("output")
} |
This will perform a word count on the input field 'body' , then write out the results as a text file. The following imports are included automatically and are ready for the user code to use: Code Block |
---|
import co.cask.cdap.api.data.format._
import co.cask.cdap.api.data.schema._;
import co.cask.cdap.etl.api.batch._
import org.apache.spark._
import org.apache.spark.api.java._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.SparkContext._
import scala.collection.JavaConversions._ |
|
Dependencies | Yes | Optional. Extra dependencies for the Spark program. It is a comma separated list of URI for the location of the dependency jars. A path can be ended with an asterisk * as a wildcard, in which all files with extension .jar under the parent path will be included. |
Compile at Deployment Time | No | Optional. Specify whether the code will get validated during pipeline creation time. Setting this to false will skip the validation. Default is true. |
...