BigQuery Sink fails if the Dedupe By conditions are missing an ordering key
Description
This PR introduced a change to the BigQuery Sink which makes the Dedupe By ordering key required when these conditons are specified.
By Default, BigQuery will default to using ASC order of no ordering clause is specified (meaning, you only specify a column/columns when ordering). See for details.
We should update this line to ensure the abscence of a ordering key doesn’t cause an ArrayOutOfBounds exception when no ordering key is specified.
An example stack trace of this bug is below.
ERROR [spark-submitter-phase-3-a086fed1-da94-11ed-9e9f-42010a0a004a:o.a.s.d.y.Client@73] - Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075)
at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:833)
at io.cdap.cdap.etl.spark.batch.RDDUtils.saveHadoopDataset(RDDUtils.java:58)
at io.cdap.cdap.etl.spark.batch.RDDUtils.saveUsingOutputFormat(RDDUtils.java:47)
at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:200)
at io.cdap.cdap.etl.spark.batch.BaseRDDCollection$1.run(BaseRDDCollection.java:238)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.executeSinkRunnables(SparkPipelineRunner.java:210)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:202)
at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183)
at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:260)
at io.cdap.cdap.app.runtime.spark.SparkTransactional$2.run(SparkTransactional.java:236)
at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:208)
at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:138)
at io.cdap.cdap.app.runtime.spark.AbstractSparkExecutionContext.execute(AbstractSparkExecutionContext.scala:231)
at io.cdap.cdap.app.runtime.spark.SerializableSparkExecutionContext.execute(SerializableSparkExecutionContext.scala:63)
at io.cdap.cdap.app.runtime.spark.DefaultJavaSparkExecutionContext.execute(DefaultJavaSparkExecutionContext.scala:94)
at io.cdap.cdap.api.Transactionals.execute(Transactionals.java:63)
at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:189)
at io.cdap.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:88)
at io.cdap.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: java.io.IOException: Failed to import GCS into BigQuery.
at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.commitJob(BigQueryOutputFormat.java:217)
at io.cdap.cdap.etl.spark.io.TrackingOutputCommitter.commitJob(TrackingOutputCommitter.java:51)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:184)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:99)
... 30 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils.lambda$generateUpdateUpsertQuery$12(BigQuerySinkUtils.java:616)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils.generateUpdateUpsertQuery(BigQuerySinkUtils.java:617)
at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.handleUpdateUpsertOperation(BigQueryOutputFormat.java:608)
at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.importFromGcs(BigQueryOutputFormat.java:391)
at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.commitJob(BigQueryOutputFormat.java:213)
... 33 more
Release Notes
None
Activity
Show:
Unresolved
Pinned fields
Click on the next to a field label to start pinning.
This PR introduced a change to the BigQuery Sink which makes the Dedupe By ordering key required when these conditons are specified.
By Default, BigQuery will default to using ASC order of no ordering clause is specified (meaning, you only specify a column/columns when ordering). See for details.
We should update this line to ensure the abscence of a ordering key doesn’t cause an ArrayOutOfBounds exception when no ordering key is specified.
An example stack trace of this bug is below.
ERROR [spark-submitter-phase-3-a086fed1-da94-11ed-9e9f-42010a0a004a:o.a.s.d.y.Client@73] - Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted. at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075) at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:833) at io.cdap.cdap.etl.spark.batch.RDDUtils.saveHadoopDataset(RDDUtils.java:58) at io.cdap.cdap.etl.spark.batch.RDDUtils.saveUsingOutputFormat(RDDUtils.java:47) at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:200) at io.cdap.cdap.etl.spark.batch.BaseRDDCollection$1.run(BaseRDDCollection.java:238) at io.cdap.cdap.etl.spark.SparkPipelineRunner.executeSinkRunnables(SparkPipelineRunner.java:210) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:202) at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:260) at io.cdap.cdap.app.runtime.spark.SparkTransactional$2.run(SparkTransactional.java:236) at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:208) at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:138) at io.cdap.cdap.app.runtime.spark.AbstractSparkExecutionContext.execute(AbstractSparkExecutionContext.scala:231) at io.cdap.cdap.app.runtime.spark.SerializableSparkExecutionContext.execute(SerializableSparkExecutionContext.scala:63) at io.cdap.cdap.app.runtime.spark.DefaultJavaSparkExecutionContext.execute(DefaultJavaSparkExecutionContext.scala:94) at io.cdap.cdap.api.Transactionals.execute(Transactionals.java:63) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:189) at io.cdap.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:88) at io.cdap.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732) Caused by: java.io.IOException: Failed to import GCS into BigQuery. at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.commitJob(BigQueryOutputFormat.java:217) at io.cdap.cdap.etl.spark.io.TrackingOutputCommitter.commitJob(TrackingOutputCommitter.java:51) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:184) at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:99) ... 30 more Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils.lambda$generateUpdateUpsertQuery$12(BigQuerySinkUtils.java:616) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils.generateUpdateUpsertQuery(BigQuerySinkUtils.java:617) at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.handleUpdateUpsertOperation(BigQueryOutputFormat.java:608) at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.importFromGcs(BigQueryOutputFormat.java:391) at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.commitJob(BigQueryOutputFormat.java:213) ... 33 more