BigQuery Sink fails if the Dedupe By conditions are missing an ordering key

Description

This PR introduced a change to the BigQuery Sink which makes the Dedupe By ordering key required when these conditons are specified.

By Default, BigQuery will default to using ASC order of no ordering clause is specified (meaning, you only specify a column/columns when ordering). See for details.

We should update this line to ensure the abscence of a ordering key doesn’t cause an ArrayOutOfBounds exception when no ordering key is specified.

An example stack trace of this bug is below.

ERROR [spark-submitter-phase-3-a086fed1-da94-11ed-9e9f-42010a0a004a:o.a.s.d.y.Client@73] - Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted. at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075) at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:833) at io.cdap.cdap.etl.spark.batch.RDDUtils.saveHadoopDataset(RDDUtils.java:58) at io.cdap.cdap.etl.spark.batch.RDDUtils.saveUsingOutputFormat(RDDUtils.java:47) at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:200) at io.cdap.cdap.etl.spark.batch.BaseRDDCollection$1.run(BaseRDDCollection.java:238) at io.cdap.cdap.etl.spark.SparkPipelineRunner.executeSinkRunnables(SparkPipelineRunner.java:210) at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:202) at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:260) at io.cdap.cdap.app.runtime.spark.SparkTransactional$2.run(SparkTransactional.java:236) at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:208) at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:138) at io.cdap.cdap.app.runtime.spark.AbstractSparkExecutionContext.execute(AbstractSparkExecutionContext.scala:231) at io.cdap.cdap.app.runtime.spark.SerializableSparkExecutionContext.execute(SerializableSparkExecutionContext.scala:63) at io.cdap.cdap.app.runtime.spark.DefaultJavaSparkExecutionContext.execute(DefaultJavaSparkExecutionContext.scala:94) at io.cdap.cdap.api.Transactionals.execute(Transactionals.java:63) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:189) at io.cdap.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:88) at io.cdap.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732) Caused by: java.io.IOException: Failed to import GCS into BigQuery. at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.commitJob(BigQueryOutputFormat.java:217) at io.cdap.cdap.etl.spark.io.TrackingOutputCommitter.commitJob(TrackingOutputCommitter.java:51) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:184) at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:99) ... 30 more Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils.lambda$generateUpdateUpsertQuery$12(BigQuerySinkUtils.java:616) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils.generateUpdateUpsertQuery(BigQuerySinkUtils.java:617) at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.handleUpdateUpsertOperation(BigQueryOutputFormat.java:608) at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.importFromGcs(BigQueryOutputFormat.java:391) at io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormat$BigQueryOutputCommitter.commitJob(BigQueryOutputFormat.java:213) ... 33 more

Release Notes

None

Activity

Show:
Unresolved
Pinned fields
Click on the next to a field label to start pinning.

Details

Assignee

Reporter

Fix versions

Priority

More fields

Created April 18, 2023 at 2:23 PM
Updated May 25, 2023 at 6:37 AM