Spark Programs

Apache Spark is used for in-memory cluster computing. It lets you load large sets of data into memory and query them repeatedly. This makes it suitable for both iterative and interactive programs. Similar to MapReduce, Spark can access datasets as both input and output. Spark programs in CDAP can be written in either Java or Scala.

To process data using Spark, specify addSpark() in your application specification:

public void configure() { ... addSpark(new WordCountProgram());

It is recommended to implement the Spark interface by extending the AbstractSpark class, which allows the overriding of these three methods:

  • configure()

  • initialize()

  • destroy()

You can extend from the abstract class AbstractSpark to simplify the implementation:

public class WordCountProgram extends AbstractSpark { @Override public SparkSpecification configure() { return SparkSpecification.Builder.with() .setName("WordCountProgram") .setDescription("Calculates word frequency") .setMainClassName("com.example.WordCounter") .build(); } ... }

The configure method is similar to the one found in service and MapReduce programs. It defines the name, description, and the class containing the Spark program to be executed by the Spark framework.

The initialize() method is invoked at runtime, before the Spark program is executed. Because many Spark programs do not need this method, the AbstractSpark class provides a default implementation that does nothing.

However, if your program requires it, you can override this method to obtain access to the SparkConf configuration and use it to set the Spark properties for the program:

import org.apache.spark.SparkConf; . . . @Override protected void initialize() throws Exception { getContext().setSparkConf(new SparkConf().set("spark.driver.extraJavaOptions", "-XX:MaxDirectMemorySize=1024m")); }

The destroy() method is invoked after the Spark program has finished. You could perform cleanup or send a notification of program completion, if that was required. Like initialize(), since many Spark programs do not need this method, the AbstractSpark class also provides a default implementation for this method that does nothing.

Spark and Resources

When a Spark program is configured, the resource requirements for both the Spark driver processes and the Spark executor processes can be set, both in terms of the amount of memory (in megabytes) and the number of virtual cores assigned.

If both the memory and the number of cores needs to be set, this can be done using:

In this case, 1024 MB and two cores is assigned to each executor process.

CDAP Spark Program

The main class being set through the setMainClass or setMainClassName method inside the Spark.configure() method will be executed by the Spark framework. The main class must have one of these properties:

  1. Extends from SparkMain, if written in Scala

  2. Have a def main(args: Array[String]) method, if written in Scala

  3. Implements JavaSparkMain, if written in Java

  4. Have a public static void main(String[] args) method, if written in Java

A user program is responsible for creating a SparkContext or JavaSparkContext instance, either inside the run methods of SparkMain or JavaSparkMain, or inside their main methods.

CDAP SparkExecutionContext

CDAP provides a SparkExecutionContext, which is needed to access datasets and to interact with CDAP services such as metrics and service discovery. It is only available to Spark programs that are extended from either SparkMain or JavaSparkMain.

Scala:

Java:

Spark and Datasets

Spark programs in CDAP can directly access datasets similar to the way a MapReduce can. These programs can create Spark's Resilient Distributed Dataset (RDD) by reading a dataset and can also write RDD to a dataset. In Scala, implicit objects are provided for reading and writing datasets directly through the SparkContext and RDD objects.

In order to access a dataset in Spark, both the key and value classes have to be serializable. Otherwise, Spark will fail to read or write them. For example, the Table dataset has a value type of Row, which is not serializable. An ObjectStore dataset can be used, provided its classes are serializable.

  • Creating an RDD from a dataset:

    • Scala:

    • Java

       

  • Writing an RDD to a dataset:

    • Scala

       

    • Java

Note: Spark programs can read or write to datasets in different namespaces using Cross Namespace Dataset Access by passing a String containing the namespace as an additional parameter before the dataset name parameter. (By default, if the namespace parameter is not supplied, the namespace in which the program runs is used.).

Spark and Services

Spark programs in CDAP, including worker nodes, can discover Services. Service Discovery by worker nodes ensures that if an endpoint changes during the execution of a Spark program, due to failure or another reason, worker nodes will see the most recent endpoint.

Here is an example of service discovery in a Spark program:

Spark Metrics

Spark programs in CDAP emit metrics, similar to a MapReduce program. CDAP collect system metrics emitted by Spark and display them in the CDAP UI. This helps in monitoring the progress and resources used by a Spark program. You can also emit custom user metrics from the worker nodes of your Spark program:

Spark in Workflows

Spark programs in CDAP can also be added to a workflow, similar to a MapReduce. The Spark program can get information about the workflow through the SparkExecutionContext.getWorkflowInfo method.

Transactions and Spark

When a Spark program interacts with datasets, CDAP will automatically create a long-running transaction that covers the Spark job execution. A Spark job refers to a Spark action and any tasks that need to be executed to evaluate the action (see Spark Job Scheduling for details).

You can also control the transaction scope yourself explicitly. It's useful when you want multiple Spark actions to be committed in the same transaction. For example, in Kafka Spark Streaming, you can persist the Kafka offsets together with the changes in the datasets in the same transaction to obtain exactly-once processing semantics.

When using an explicit transaction, you can access a dataset directly by calling the getDataset() method of the DatasetContext provided to the transaction. However, the dataset acquired through getDataset() cannot be used through a function closure. See the section on Using Datasets in Programs for additional information.

Here is an example of using an explicit transaction in Spark:

Scala:

Java:

Spark Versions

CDAP allows you to write Spark programs using either Spark 2 or Spark 3 with Scala 2.12.

To use it, you must add the cdap-api-spark3_2.12 Maven dependency:

 

 

Created in 2020 by Google Inc.