Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

The Spark sink in Scala plugin is available in the Hub.

...

This plugin can be used when you want to have complete control on the Spark computation. For example, you may want to join the input RDD with another Dataset and select a subset of the join result using Spark SQL before writing the results out to files in parquet format.

Configuration

Property

Macro Enabled?

Description

Scala

Yes

Required. Spark code in Scala defining how to transform RDD to RDD. The code must implement a function called sink, whose signature should be one of:

Code Block
def sink(df: DataFrame) : DataFrame
def sink(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame

The input DataFrame has the same schema as the input schema to this stage. Using the SparkExecutionPluginContext, you can access CDAP entities such as Datasets, as well as providing access to the underlying SparkContext in use.

Operating on lower level RDD is also possible by using the one of the following forms of the sink method:

Code Block
def sink(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord]
def sink(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord]

For example:

Code Block
def sink(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : Unit = {
  val outputSchema = context.getOutputSchema
  rdd
    .flatMap(_.get[String]("body").split("\\s+"))
    .map(s => (s, 1))
    .reduceByKey(_ + _)
    .saveAsTextFile("output")
}

This will perform a word count on the input field 'body', then write out the results as a text file.

The following imports are included automatically and are ready for the user code to use:

Code Block
  import co.cask.cdap.api.data.format._
  import co.cask.cdap.api.data.schema._;
  import co.cask.cdap.etl.api.batch._
  import org.apache.spark._
  import org.apache.spark.api.java._
  import org.apache.spark.rdd._
  import org.apache.spark.sql._
  import org.apache.spark.SparkContext._
  import scala.collection.JavaConversions._

Dependencies

Yes

Optional. Extra dependencies for the Spark program. It is a comma separated list of URI for the location of the dependency jars. A path can be ended with an asterisk * as a wildcard, in which all files with extension .jar under the parent path will be included.

Compile at Deployment Time

No

Optional. Specify whether the code will get validated during pipeline creation time. Setting this to false will skip the validation.

Default is true.

...