CSV Parser should handle IndexOutOfBounds Exception when there are more columns in record than in the header
Description
The lines of code responsible for this exception are here. The problem is that the header column of the CSV data has some rows, say n, and one or more of the row records have > n values after splitting by the delimiter (, for CSV).
We should handle such a case and throw an exception with a more meaningful message, eg: “Ensure number of columns in records match the number of columns in the header”.
Stack trace:
java.lang.Exception: Stage:Wrangler - Failing pipeline due to error : Index: 26, Size: 26
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:402) ~[%20artifact5502482460600740819.jar:na]
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:82) ~[%20artifact5502482460600740819.jar:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.6.0.jar:na]
at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.6.0.jar:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.6.0.jar:na]
at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.6.0.jar:na]
at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:54) ~[hydrator-spark-core2_2.11-6.6.0.jar:na]
at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[na:na]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) ~[spark-sql_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:87) ~[spark-sql_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85) ~[spark-sql_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.3.jar:na]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) ~[spark-core_2.12-3.1.3.jar:3.1.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_332]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_332]
at java.lang.Thread.run(Thread.java:750) [na:1.8.0_332]
Caused by: java.lang.IndexOutOfBoundsException: Index: 26, Size: 26
at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[na:1.8.0_332]
at java.util.ArrayList.get(ArrayList.java:435) ~[na:1.8.0_332]
at io.cdap.directives.parser.CsvParser.toRow(CsvParser.java:177) ~[na:na]
at io.cdap.directives.parser.CsvParser.execute(CsvParser.java:156) ~[na:na]
at io.cdap.directives.parser.CsvParser.execute(CsvParser.java:55) ~[na:na]
at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:121) ~[wrangler-core-4.6.0.jar:na]
at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:90) ~[wrangler-core-4.6.0.jar:na]
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:377) ~[%20artifact5502482460600740819.jar:na]
... 39 common frames omitted
Steps to repro:
Create a pipeline with wrangler, use the parse as CSV directive with header field set as true.
Supply data to the input of wrangler such that the header (first row) has fewer columns than a record.
This exception should be seen in the pipeline logs.
This should be a small fix.
Release Notes
None
Activity
Show:
Arjan BalAugust 3, 2022 at 5:04 AM
Discussed with about ignoring the extra columns in the record. Decided not to do this as this could incur data loss when data is not clean without the users realising it.
Pinned fields
Click on the next to a field label to start pinning.
The lines of code responsible for this exception are here. The problem is that the header column of the CSV data has some rows, say
n
, and one or more of the row records have >n
values after splitting by the delimiter (, for CSV).We should handle such a case and throw an exception with a more meaningful message, eg: “Ensure number of columns in records match the number of columns in the header”.
Stack trace:
java.lang.Exception: Stage:Wrangler - Failing pipeline due to error : Index: 26, Size: 26 at io.cdap.wrangler.Wrangler.transform(Wrangler.java:402) ~[%20artifact5502482460600740819.jar:na] at io.cdap.wrangler.Wrangler.transform(Wrangler.java:82) ~[%20artifact5502482460600740819.jar:na] at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.6.0.jar:na] at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.6.0.jar:na] at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.6.0.jar:na] at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.6.0.jar:na] at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:54) ~[hydrator-spark-core2_2.11-6.6.0.jar:na] at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) ~[spark-core_2.12-3.1.3.jar:3.1.3] at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.14.jar:na] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[na:na] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.1.3.jar:3.1.3] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) ~[spark-sql_2.12-3.1.3.jar:3.1.3] at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:87) ~[spark-sql_2.12-3.1.3.jar:3.1.3] at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85) ~[spark-sql_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) ~[spark-core_2.12-3.1.3.jar:3.1.3] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.3.jar:na] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) ~[spark-core_2.12-3.1.3.jar:3.1.3] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_332] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_332] at java.lang.Thread.run(Thread.java:750) [na:1.8.0_332] Caused by: java.lang.IndexOutOfBoundsException: Index: 26, Size: 26 at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[na:1.8.0_332] at java.util.ArrayList.get(ArrayList.java:435) ~[na:1.8.0_332] at io.cdap.directives.parser.CsvParser.toRow(CsvParser.java:177) ~[na:na] at io.cdap.directives.parser.CsvParser.execute(CsvParser.java:156) ~[na:na] at io.cdap.directives.parser.CsvParser.execute(CsvParser.java:55) ~[na:na] at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:121) ~[wrangler-core-4.6.0.jar:na] at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:90) ~[wrangler-core-4.6.0.jar:na] at io.cdap.wrangler.Wrangler.transform(Wrangler.java:377) ~[%20artifact5502482460600740819.jar:na] ... 39 common frames omitted
Steps to repro:
Create a pipeline with wrangler, use the parse as CSV directive with header field set as true.
Supply data to the input of wrangler such that the header (first row) has fewer columns than a record.
This exception should be seen in the pipeline logs.
This should be a small fix.