Spark Programs
Apache Spark is used for in-memory cluster computing. It lets you load large sets of data into memory and query them repeatedly. This makes it suitable for both iterative and interactive programs. Similar to MapReduce, Spark can access datasets as both input and output. Spark programs in CDAP can be written in either Java or Scala.
To process data using Spark, specify addSpark()
in your application specification:
public void configure() {
...
addSpark(new WordCountProgram());
It is recommended to implement the Spark
interface by extending the AbstractSpark
class, which allows the overriding of these three methods:
configure()
initialize()
destroy()
You can extend from the abstract class AbstractSpark
to simplify the implementation:
public class WordCountProgram extends AbstractSpark {
@Override
public SparkSpecification configure() {
return SparkSpecification.Builder.with()
.setName("WordCountProgram")
.setDescription("Calculates word frequency")
.setMainClassName("com.example.WordCounter")
.build();
}
...
}
The configure method is similar to the one found in service and MapReduce programs. It defines the name, description, and the class containing the Spark program to be executed by the Spark framework.
The initialize()
method is invoked at runtime, before the Spark program is executed. Because many Spark programs do not need this method, the AbstractSpark
class provides a default implementation that does nothing.
However, if your program requires it, you can override this method to obtain access to the SparkConf
configuration and use it to set the Spark properties for the program:
import org.apache.spark.SparkConf;
. . .
@Override
protected void initialize() throws Exception {
getContext().setSparkConf(new SparkConf().set("spark.driver.extraJavaOptions", "-XX:MaxDirectMemorySize=1024m"));
}
The destroy()
method is invoked after the Spark program has finished. You could perform cleanup or send a notification of program completion, if that was required. Like initialize()
, since many Spark programs do not need this method, the AbstractSpark
class also provides a default implementation for this method that does nothing.
Spark and Resources
When a Spark program is configured, the resource requirements for both the Spark driver processes and the Spark executor processes can be set, both in terms of the amount of memory (in megabytes) and the number of virtual cores assigned.
If both the memory and the number of cores needs to be set, this can be done using:
In this case, 1024 MB and two cores is assigned to each executor process.
CDAP Spark Program
The main class being set through the setMainClass
or setMainClassName
method inside the Spark.configure()
method will be executed by the Spark framework. The main class must have one of these properties:
Extends from
SparkMain
, if written in ScalaHave a
def main(args: Array[String])
method, if written in ScalaImplements
JavaSparkMain
, if written in JavaHave a
public static void main(String[] args)
method, if written in Java
A user program is responsible for creating a SparkContext
or JavaSparkContext
instance, either inside the run
methods of SparkMain
or JavaSparkMain
, or inside their main
methods.
CDAP SparkExecutionContext
CDAP provides a SparkExecutionContext
, which is needed to access datasets and to interact with CDAP services such as metrics and service discovery. It is only available to Spark programs that are extended from either SparkMain
or JavaSparkMain
.
Scala:
Java:
Spark and Datasets
Spark programs in CDAP can directly access datasets similar to the way a MapReduce can. These programs can create Spark's Resilient Distributed Dataset (RDD) by reading a dataset and can also write RDD to a dataset. In Scala, implicit objects are provided for reading and writing datasets directly through the SparkContext
and RDD
objects.
In order to access a dataset in Spark, both the key and value classes have to be serializable. Otherwise, Spark will fail to read or write them. For example, the Table dataset has a value type of Row, which is not serializable. An ObjectStore
dataset can be used, provided its classes are serializable.
Creating an RDD from a dataset:
Scala:
Java
Writing an RDD to a dataset:
Scala
Java
Note: Spark programs can read or write to datasets in different namespaces using Cross Namespace Dataset Access by passing a String
containing the namespace as an additional parameter before the dataset name parameter. (By default, if the namespace parameter is not supplied, the namespace in which the program runs is used.).
Spark and Services
Spark programs in CDAP, including worker nodes, can discover Services. Service Discovery by worker nodes ensures that if an endpoint changes during the execution of a Spark program, due to failure or another reason, worker nodes will see the most recent endpoint.
Here is an example of service discovery in a Spark program:
Spark Metrics
Spark programs in CDAP emit metrics, similar to a MapReduce program. CDAP collect system metrics emitted by Spark and display them in the CDAP UI. This helps in monitoring the progress and resources used by a Spark program. You can also emit custom user metrics from the worker nodes of your Spark program:
Spark in Workflows
Spark programs in CDAP can also be added to a workflow, similar to a MapReduce. The Spark program can get information about the workflow through the SparkExecutionContext.getWorkflowInfo
method.
Transactions and Spark
When a Spark program interacts with datasets, CDAP will automatically create a long-running transaction that covers the Spark job execution. A Spark job refers to a Spark action and any tasks that need to be executed to evaluate the action (see Spark Job Scheduling for details).
You can also control the transaction scope yourself explicitly. It's useful when you want multiple Spark actions to be committed in the same transaction. For example, in Kafka Spark Streaming, you can persist the Kafka offsets together with the changes in the datasets in the same transaction to obtain exactly-once processing semantics.
When using an explicit transaction, you can access a dataset directly by calling the getDataset()
method of the DatasetContext
provided to the transaction. However, the dataset acquired through getDataset()
cannot be used through a function closure. See the section on Using Datasets in Programs for additional information.
Here is an example of using an explicit transaction in Spark:
Scala:
Java:
Spark Versions
CDAP allows you to write Spark programs using either Spark 2 or Spark 3 with Scala 2.12.
To use it, you must add the cdap-api-spark3_2.12
Maven dependency:
Created in 2020 by Google Inc.