Pipeline execution fails in case of Override schema for any of the database plugins

Description

In case of custom output schema the pipeline execution will fail. This is due to a logic failure which assumes that in the ResultMetadata the field index will match with the field in the ouput schema.

Repro pipeline is attached.

Error in pipeline execution:

2023-02-16 07:38:24,172 - ERROR [Executor task launch worker for task 0.0 in stage 0.0 (TID 0):o.a.s.u.Utils@94] - Aborting task java.lang.IllegalArgumentException: A value for field 'COL12' is of type 'java.lang.String', which does not match schema '[{"type":"bytes","logicalType":"decimal","precision":4,"scale":0},"null"]'. at io.cdap.cdap.format.io.JsonStructuredRecordDatumWriter.encodeRecordField(JsonStructuredRecordDatumWriter.java:110) at io.cdap.cdap.format.io.StructuredRecordDatumWriter.encodeRecord(StructuredRecordDatumWriter.java:325) at io.cdap.cdap.format.io.StructuredRecordDatumWriter.encode(StructuredRecordDatumWriter.java:106) at io.cdap.cdap.format.io.JsonStructuredRecordDatumWriter.encode(JsonStructuredRecordDatumWriter.java:160) at io.cdap.cdap.format.io.StructuredRecordDatumWriter.encode(StructuredRecordDatumWriter.java:58) at io.cdap.cdap.format.io.JsonStructuredRecordDatumWriter.encode(JsonStructuredRecordDatumWriter.java:58) at io.cdap.cdap.internal.app.store.preview.PreviewJsonSerializer.serialize(PreviewJsonSerializer.java:51) at io.cdap.cdap.internal.app.store.preview.PreviewJsonSerializer.serialize(PreviewJsonSerializer.java:39) at com.google.gson.TreeTypeAdapter.write(TreeTypeAdapter.java:70) at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:68) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.write(ReflectiveTypeAdapterFactory.java:99) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:219) at com.google.gson.Gson.toJson(Gson.java:600) at com.google.gson.Gson.toJsonTree(Gson.java:493) at com.google.gson.Gson.toJsonTree(Gson.java:472) at io.cdap.cdap.internal.app.preview.DefaultDataTracer.info(DefaultDataTracer.java:57) at io.cdap.cdap.app.runtime.spark.preview.SparkDataTracer.info(SparkDataTracer.java:49) at io.cdap.cdap.etl.common.TrackedEmitter.emit(TrackedEmitter.java:54) at io.cdap.cdap.etl.common.plugin.UntimedEmitter.emit(UntimedEmitter.java:64) at io.cdap.plugin.db.batch.source.AbstractDBSource.transform(AbstractDBSource.java:313) at io.cdap.cdap.etl.common.plugin.WrappedBatchSource.lambda$transform$2(WrappedBatchSource.java:71) at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) at io.cdap.cdap.etl.common.plugin.WrappedBatchSource.transform(WrappedBatchSource.java:70) at io.cdap.cdap.etl.common.plugin.WrappedBatchSource.transform(WrappedBatchSource.java:36) at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) at io.cdap.cdap.etl.spark.function.BatchSourceFunction.call(BatchSourceFunction.java:57) at io.cdap.cdap.etl.spark.function.BatchSourceFunction.call(BatchSourceFunction.java:34) at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:135) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473) at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134) at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: java.io.IOException: Incomplete document at com.google.gson.stream.JsonWriter.close(JsonWriter.java:527) at io.cdap.cdap.internal.app.store.preview.PreviewJsonSerializer.serialize(PreviewJsonSerializer.java:54) ... 37 common frames omitted

In the above failure the type of COL12 is set as DECIMAL in the DB, COL12 is present at position 2 in the output schema. In the DB record the field at position 2 is COL1 which is of type STRING. This causes the logic to fail the pipeline.

Expected behavior:

Only ID(String) and COL12(Decimal) type should get passed and pipeline should succeed.

Release Notes

PLUGIN-1512: Fixed an issue where pipelines failed when the output schema was overridden in certain source plugins. This was because the output schema didn’t match the order of the fields from the query. This happened when the pipeline included any of the following batch sources: Database Oracle MySQL SQL Server PostgreSQL DB2 MariaDB Netezza CloudSQL PostgreSQL CloudSQL MySQL Teradata Pipelines no longer fail when you override the output schema in these source plugins. CDAP uses the name of the field to match the schema of the field in the result set and the field in the output schema.

Attachments

1
  • 16 Feb 2023, 07:47 AM

Activity

Show:
Unresolved
Pinned fields
Click on the next to a field label to start pinning.

Details

Assignee

Reporter

Components

Fix versions

Affects versions

Priority

More fields

Created February 16, 2023 at 7:52 AM
Updated April 11, 2023 at 4:38 AM