Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Property

Macro Enabled?

Description

Scala

Yes

Required. Spark code in Scala defining how to transform RDD to RDD. The code must implement a function called transform, whose signature should be one of:

Code Block
def transform(df: DataFrame) : DataFrame
def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame

The input DataFrame has the same schema as the input schema to this stage and the transform method should return a DataFrame that has the same schema as the output schema setup for this stage. Using the SparkExecutionPluginContext, you can access CDAP entities such as Stream and Dataset, as well as providing access to the underlying SparkContext in use.

Operating on lower level RDD is also possible by using the one of the following forms of the transform method:

Code Block
def transform(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord]
def transform(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord]

For example:

Code Block
def transform(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord] = {
  val outputSchema = context.getOutputSchema
  rdd
    .flatMap(_.get[String]("body").split("\\s+"))
    .map(s => (s, 1))
    .reduceByKey(_ + _)
    .map(t => StructuredRecord.builder(outputSchema).set("word", t._1).set("count", t._2).build)
}

The will perform a word count on the input field 'body', and produces records of two fields, 'word' and 'count'.

The following imports are included automatically and are ready for the user code to use:

Code Block
  import io.cdap.cdap.api.data.format._
  import io.cdap.cdap.api.data.schema._;
  import io.cdap.cdap.etl.api.batch._
  import org.apache.spark._
  import org.apache.spark.api.java._
  import org.apache.spark.rdd._
  import org.apache.spark.sql._
  import org.apache.spark.SparkContext._
  import scala.collection.JavaConversions._

Dependencies

Yes

Optional. Extra dependencies for the Spark program. It is a ‘,' separated list of URI for the location of dependency jars. A path can be ended with an asterisk ‘*’ as a wildcard, in which all files with extension '.jar’ under the parent path will be included.

Compile at Deployment Time

No

Optional. Decide whether to perform code compilation at deployment time. It will be useful to turn it off in cases when some library classes are only available at run time, but not at deployment time.

Default is true.