Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL batch source job fails when zeroDateTimeBehavior=CONVERT_TO_NULL is set in connection arguments. #488

Closed
damjad opened this issue Mar 14, 2024 · 5 comments
Assignees

Comments

@damjad
Copy link
Contributor

damjad commented Mar 14, 2024

Context
We have a MySQL table with a date column that has a not null constraint.

CREATE TABLE `test_zero_date_time_1` (
  `id` int(11) DEFAULT NULL,
  `date_col` timestamp NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

The data in the table is:

+------+---------------------+
| id   | date_col            |
+------+---------------------+
|    1 | 2020-01-01 00:00:00 |
|    2 | 2021-01-01 00:00:00 |
|    3 | 0000-00-00 00:00:00 |
+------+---------------------+

The connection arguments we use are as follows:

zeroDateTimeBehavior=CONVERT_TO_NULL

Issue

When we try to ingest data from MySQL and push to BigQuery, the ingestion fails with the following error:

io.cdap.cdap.api.data.format.UnexpectedFormatException: field date_col cannot be set to a null value.

So, the problem is that JDBC converts the zero timestamp with id=3 to be null and some schema validation marks this record as invalid.

The whole stack trace is as follows:

Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075)
	at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:833)
	at io.cdap.cdap.etl.spark.batch.RDDUtils.saveHadoopDataset(RDDUtils.java:58)
	at io.cdap.cdap.etl.spark.batch.RDDUtils.saveUsingOutputFormat(RDDUtils.java:47)
	at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:200)
	at io.cdap.cdap.etl.spark.batch.BaseRDDCollection$1.run(BaseRDDCollection.java:238)
	at io.cdap.cdap.etl.spark.SparkPipelineRunner.executeSinkRunnables(SparkPipelineRunner.java:210)
	at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:202)
	at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183)
	at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:260)
	at io.cdap.cdap.app.runtime.spark.SparkTransactional$2.run(SparkTransactional.java:236)
	at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:208)
	at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:138)
	at io.cdap.cdap.app.runtime.spark.AbstractSparkExecutionContext.execute(AbstractSparkExecutionContext.scala:231)
	at io.cdap.cdap.app.runtime.spark.SerializableSparkExecutionContext.execute(SerializableSparkExecutionContext.scala:63)
	at io.cdap.cdap.app.runtime.spark.DefaultJavaSparkExecutionContext.execute(DefaultJavaSparkExecutionContext.scala:94)
	at io.cdap.cdap.api.Transactionals.execute(Transactionals.java:63)
	at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:189)
	at io.cdap.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:88)
	at io.cdap.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (cdap-mysqlfull-xxx.europe-west1-d.c.xxxx.internal executor 2): org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:162)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.cdap.cdap.api.data.format.UnexpectedFormatException: field date_col cannot be set to a null value.
	at io.cdap.cdap.api.data.format.StructuredRecord$Builder.validateAndGetField(StructuredRecord.java:675)
	at io.cdap.cdap.api.data.format.StructuredRecord$Builder.set(StructuredRecord.java:371)
	at io.cdap.plugin.db.DBRecord.setField(DBRecord.java:164)
	at io.cdap.plugin.db.DBRecord.handleField(DBRecord.java:139)
	at io.cdap.plugin.db.DBRecord.readFields(DBRecord.java:112)
	at org.apache.hadoop.mapreduce.lib.db.DBRecordReader.nextKeyValue(DBRecordReader.java:236)
	at io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat$1.nextKeyValue(DataDrivenETLDBInputFormat.java:142)
	at io.cdap.cdap.etl.spark.io.TrackingRecordReader.nextKeyValue(TrackingRecordReader.java:47)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:135)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2257)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 30 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:162)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.cdap.cdap.api.data.format.UnexpectedFormatException: field date_col cannot be set to a null value.
	at io.cdap.cdap.api.data.format.StructuredRecord$Builder.validateAndGetField(StructuredRecord.java:675)
	at io.cdap.cdap.api.data.format.StructuredRecord$Builder.set(StructuredRecord.java:371)
	at io.cdap.plugin.db.DBRecord.setField(DBRecord.java:164)
	at io.cdap.plugin.db.DBRecord.handleField(DBRecord.java:139)
	at io.cdap.plugin.db.DBRecord.readFields(DBRecord.java:112)
	at org.apache.hadoop.mapreduce.lib.db.DBRecordReader.nextKeyValue(DBRecordReader.java:236)
	at io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat$1.nextKeyValue(DataDrivenETLDBInputFormat.java:142)
	at io.cdap.cdap.etl.spark.io.TrackingRecordReader.nextKeyValue(TrackingRecordReader.java:47)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:135)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
	... 9 more
@damjad
Copy link
Contributor Author

damjad commented Mar 15, 2024

@itsankit-google please assign it to me. I have found the issue. Preliminary results of my change are working fine. I'll do a proper fix soon.

@damjad
Copy link
Contributor Author

damjad commented Mar 15, 2024

Please have a look #489

@damjad
Copy link
Contributor Author

damjad commented May 15, 2024

Thank you so much for merging it.
Do you know when it'll be available in DF? I have created a fix version over an old one. So no rush, just curious.

@itsankit-google
Copy link
Member

Thank you so much for merging it. Do you know when it'll be available in DF? I have created a fix version over an old one. So no rush, just curious.

With the current setup, it will go in next minor release which will be somewhere in Q3-Q4 this year. If it is urgent, we can release MySQL plugin separately in hub with the fix for latest released DF version.

@damjad
Copy link
Contributor Author

damjad commented May 16, 2024

Thanks. I have built the plugin locally and uploaded the JAR for now.
It's not that urgent. So, closing the issue.

Thanks for all the help!

@damjad damjad closed this as completed May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants