Spark SQL relation breaks on conditional plugins

Description

In a pipeline run we first try to run it with Spark SQL engine , and then falls back to usual spark rdd

In a case when we are using CONDITIONAL plugin, from which wrangler takes input.

CONDITION ---> WRANGLER

The pipeline fails with :

ERROR [Driver:o.a.s.d.y.ApplicationMaster@94] - User class threw exception: java.lang.NullPointerException java.lang.NullPointerException: null at io.cdap.cdap.etl.spark.batch.SparkSQLEngine.getRelation(SparkSQLEngine.java:89) at io.cdap.cdap.etl.spark.batch.BatchSQLEngineAdapter.lambda$tryRelationalTransform$4(BatchSQLEngineAdapter.java:799) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at io.cdap.cdap.etl.spark.batch.BatchSQLEngineAdapter.tryRelationalTransform(BatchSQLEngineAdapter.java:797) at io.cdap.cdap.etl.spark.batch.SQLEngineRelationalEngine.tryRelationalTransform(SQLEngineRelationalEngine.java:72) at io.cdap.cdap.etl.spark.SparkPipelineRunner.tryRelationalTransform(SparkPipelineRunner.java:576) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processOtherPluginTypes(SparkPipelineRunner.java:401) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processStage(SparkPipelineRunner.java:349) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:198) at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:270)

What is happening here is internally CDAP platform adding .connector suffix here

So in our DAG our code sees the stage name as conditional_name.connector

But in pipeline json it's still conditional_name

And in SparkSQL , we create a collection of source inputs using DAG , meaning we stored conditional_name.connector , but while fetching schema, we look at pipeline json.
So When we look for conditional_name.connector in pipeline json we don't get anything.

Hence we are getting Null pointer exception. here


As a quickfix we can Extract the 1st string on splitting of . and then fetch schema .

Release Notes

As of now, when schema is unavailable we fall back to usual rdd way of processing. https://github.com/cdapio/cdap/pull/15540

Activity

Show:
Fixed
Pinned fields
Click on the next to a field label to start pinning.

Details

Assignee

Reporter

Affects versions

Triaged

No

Size

M

Components

Fix versions

Priority

Created November 9, 2023 at 6:25 PM
Updated June 13, 2024 at 9:21 AM
Resolved February 12, 2024 at 6:01 AM