Spark Programs
Apache Spark is used for in-memory cluster computing. It lets you load large sets of data into memory and query them repeatedly. This makes it suitable for both iterative and interactive programs. Similar to MapReduce, Spark can access datasets as both input and output. Spark programs in CDAP can be written in either Java or Scala.
To process data using Spark, specify addSpark()
 in your application specification:
public void configure() {
...
addSpark(new WordCountProgram());
It is recommended to implement the Spark
 interface by extending the AbstractSpark
 class, which allows the overriding of these three methods:
configure()
initialize()
destroy()
You can extend from the abstract class AbstractSpark
 to simplify the implementation:
public class WordCountProgram extends AbstractSpark {
@Override
public SparkSpecification configure() {
return SparkSpecification.Builder.with()
.setName("WordCountProgram")
.setDescription("Calculates word frequency")
.setMainClassName("com.example.WordCounter")
.build();
}
...
}
The configure method is similar to the one found in service and MapReduce programs. It defines the name, description, and the class containing the Spark program to be executed by the Spark framework.
The initialize()
 method is invoked at runtime, before the Spark program is executed. Because many Spark programs do not need this method, the AbstractSpark
 class provides a default implementation that does nothing.
However, if your program requires it, you can override this method to obtain access to the SparkConf
 configuration and use it to set the Spark properties for the program:
import org.apache.spark.SparkConf;
. . .
@Override
protected void initialize() throws Exception {
getContext().setSparkConf(new SparkConf().set("spark.driver.extraJavaOptions", "-XX:MaxDirectMemorySize=1024m"));
}
The destroy()
 method is invoked after the Spark program has finished. You could perform cleanup or send a notification of program completion, if that was required. Like initialize()
, since many Spark programs do not need this method, the AbstractSpark
 class also provides a default implementation for this method that does nothing.
Spark and Resources
When a Spark program is configured, the resource requirements for both the Spark driver processes and the Spark executor processes can be set, both in terms of the amount of memory (in megabytes) and the number of virtual cores assigned.
If both the memory and the number of cores needs to be set, this can be done using:
In this case, 1024 MB and two cores is assigned to each executor process.
CDAP Spark Program
The main class being set through the setMainClass
 or setMainClassName
 method inside the Spark.configure()
 method will be executed by the Spark framework. The main class must have one of these properties:
Extends fromÂ
SparkMain
, if written in ScalaHave aÂ
def main(args: Array[String])
 method, if written in ScalaImplementsÂ
JavaSparkMain
, if written in JavaHave aÂ
public static void main(String[] args)
 method, if written in Java
A user program is responsible for creating a SparkContext
 or JavaSparkContext
 instance, either inside the run
 methods of SparkMain
 or JavaSparkMain
, or inside their main
 methods.
CDAP SparkExecutionContext
CDAP provides a SparkExecutionContext
, which is needed to access datasets and to interact with CDAP services such as metrics and service discovery. It is only available to Spark programs that are extended from either SparkMain
 or JavaSparkMain
.
Scala:
Java:
Spark and Datasets
Spark programs in CDAP can directly access datasets similar to the way a MapReduce can. These programs can create Spark's Resilient Distributed Dataset (RDD) by reading a dataset and can also write RDD to a dataset. In Scala, implicit objects are provided for reading and writing datasets directly through the SparkContext
 and RDD
 objects.
In order to access a dataset in Spark, both the key and value classes have to be serializable. Otherwise, Spark will fail to read or write them. For example, the Table dataset has a value type of Row, which is not serializable. An ObjectStore
 dataset can be used, provided its classes are serializable.
Creating an RDD from a dataset:
Scala:
Java
Â
Writing an RDD to a dataset:
Scala
Â
Java
Note: Spark programs can read or write to datasets in different namespaces using Cross Namespace Dataset Access by passing a String
 containing the namespace as an additional parameter before the dataset name parameter. (By default, if the namespace parameter is not supplied, the namespace in which the program runs is used.).
Spark and Services
Spark programs in CDAP, including worker nodes, can discover Services. Service Discovery by worker nodes ensures that if an endpoint changes during the execution of a Spark program, due to failure or another reason, worker nodes will see the most recent endpoint.
Here is an example of service discovery in a Spark program:
Spark Metrics
Spark programs in CDAP emit metrics, similar to a MapReduce program. CDAP collect system metrics emitted by Spark and display them in the CDAP UI. This helps in monitoring the progress and resources used by a Spark program. You can also emit custom user metrics from the worker nodes of your Spark program:
Spark in Workflows
Spark programs in CDAP can also be added to a workflow, similar to a MapReduce. The Spark program can get information about the workflow through the SparkExecutionContext.getWorkflowInfo
 method.
Transactions and Spark
When a Spark program interacts with datasets, CDAP will automatically create a long-running transaction that covers the Spark job execution. A Spark job refers to a Spark action and any tasks that need to be executed to evaluate the action (see Spark Job Scheduling for details).
You can also control the transaction scope yourself explicitly. It's useful when you want multiple Spark actions to be committed in the same transaction. For example, in Kafka Spark Streaming, you can persist the Kafka offsets together with the changes in the datasets in the same transaction to obtain exactly-once processing semantics.
When using an explicit transaction, you can access a dataset directly by calling the getDataset()
 method of the DatasetContext
 provided to the transaction. However, the dataset acquired through getDataset()
 cannot be used through a function closure. See the section on Using Datasets in Programs for additional information.
Here is an example of using an explicit transaction in Spark:
Scala:
Java:
Spark Versions
CDAP allows you to write Spark programs using either Spark 2 or Spark 3 with Scala 2.12.
To use it, you must add the cdap-api-spark3_2.12
 Maven dependency:
Â
Â
Created in 2020 by Google Inc.