In a pipeline run we first try to run it with Spark SQL engine , and then falls back to usual spark rdd
In a case when we are using CONDITIONAL plugin, from which wrangler takes input.
CONDITION ---> WRANGLER
The pipeline fails with :
ERROR [Driver:o.a.s.d.y.ApplicationMaster@94] - User class threw exception: java.lang.NullPointerException
java.lang.NullPointerException: null
at io.cdap.cdap.etl.spark.batch.SparkSQLEngine.getRelation(SparkSQLEngine.java:89)
at io.cdap.cdap.etl.spark.batch.BatchSQLEngineAdapter.lambda$tryRelationalTransform$4(BatchSQLEngineAdapter.java:799)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at io.cdap.cdap.etl.spark.batch.BatchSQLEngineAdapter.tryRelationalTransform(BatchSQLEngineAdapter.java:797)
at io.cdap.cdap.etl.spark.batch.SQLEngineRelationalEngine.tryRelationalTransform(SQLEngineRelationalEngine.java:72)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.tryRelationalTransform(SparkPipelineRunner.java:576)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.processOtherPluginTypes(SparkPipelineRunner.java:401)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.processStage(SparkPipelineRunner.java:349)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:198)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183)
at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:270)
What is happening here is internally CDAP platform adding .connector suffix here
So in our DAG our code sees the stage name as conditional_name.connector
But in pipeline json it's still conditional_name
And in SparkSQL , we create a collection of source inputs using DAG , meaning we stored conditional_name.connector , but while fetching schema, we look at pipeline json. So When we look for conditional_name.connector in pipeline json we don't get anything.
In a pipeline run we first try to run it with Spark SQL engine , and then falls back to usual spark rdd
In a case when we are using CONDITIONAL plugin, from which wrangler takes input.
CONDITION ---> WRANGLER
The pipeline fails with :
ERROR [Driver:o.a.s.d.y.ApplicationMaster@94] - User class threw exception: java.lang.NullPointerException java.lang.NullPointerException: null at io.cdap.cdap.etl.spark.batch.SparkSQLEngine.getRelation(SparkSQLEngine.java:89) at io.cdap.cdap.etl.spark.batch.BatchSQLEngineAdapter.lambda$tryRelationalTransform$4(BatchSQLEngineAdapter.java:799) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at io.cdap.cdap.etl.spark.batch.BatchSQLEngineAdapter.tryRelationalTransform(BatchSQLEngineAdapter.java:797) at io.cdap.cdap.etl.spark.batch.SQLEngineRelationalEngine.tryRelationalTransform(SQLEngineRelationalEngine.java:72) at io.cdap.cdap.etl.spark.SparkPipelineRunner.tryRelationalTransform(SparkPipelineRunner.java:576) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processOtherPluginTypes(SparkPipelineRunner.java:401) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processStage(SparkPipelineRunner.java:349) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:198) at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:270)
What is happening here is internally CDAP platform adding
.connector
suffix hereSo in our
DAG
our code sees the stage name asconditional_name.connector
But in pipeline json it's still
conditional_name
And in SparkSQL , we create a collection of source inputs using
DAG
, meaning we storedconditional_name.connector
, but while fetching schema, we look at pipeline json.So When we look for
conditional_name.connector
in pipeline json we don't get anything.Hence we are getting Null pointer exception. here
As a quickfix we can Extract the 1st string on splitting of
.
and then fetch schema .