Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
linenumberstrue
class SimpleSpark extends SparkProgram {
  override def run(implicit sec: SparkExecutionContext) {

  // Create a DStream with the direct Kafka API in Spark. Copied from the Kafka example in Spark
  val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])
 
  // Hold a reference to the current offset ranges, so it can be used downstream
  var offsetRanges = Array[OffsetRange]()
	
  directKafkaStream.transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
  }.map {
    ...
  }.foreachRDD { rdd =>
    sec.execute(new TxRunnable {
      override def run(dsContext: DatasetContext) {
        // Operate on the RDD.
        rdd.map(...).saveAsDataset(...);
        // This happen in the driver
        for (o <- offsetRanges) {
          context.getDataset("kafkaOffsets").save(offsetRanges)
        }
      }
    })
  }
}

 

Runtime

Spark Classes Isolation

We need to have spark classes (and it's dependencies) isolated from the system classloader, meaning it cannot be in the system classpath, whether it's the SDK, master or container. It is already true for the master process in cluster mode. It is needed for the following reasons:

...