From 8ac9a0575bda6c73eef7b1c814353411bfaa7670 Mon Sep 17 00:00:00 2001 From: "danish.amjad" Date: Wed, 15 May 2024 12:17:04 +0200 Subject: [PATCH] Add support for zeroDateTimeBehavior to CloudSQLMySQL. --- .../cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java | 3 ++- .../io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java index a5ee68787..b4b87c81b 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java @@ -16,6 +16,7 @@ package io.cdap.plugin.cloudsql.mysql; +import com.google.common.collect.Maps; import io.cdap.cdap.api.annotation.Category; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -75,7 +76,7 @@ public StructuredRecord transform(LongWritable longWritable, MysqlDBRecord mysql @Override protected SchemaReader getSchemaReader(String sessionID) { - return new MysqlSchemaReader(sessionID); + return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties())); } @Override diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java index b8b6fbf27..b0bea9e7a 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java @@ -31,9 +31,11 @@ import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig; import io.cdap.plugin.db.source.AbstractDBSource; import io.cdap.plugin.mysql.MysqlDBRecord; +import io.cdap.plugin.mysql.MysqlSchemaReader; import io.cdap.plugin.util.CloudSQLUtil; import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.mapreduce.lib.db.DBWritable; @@ -120,6 +122,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { return new LineageRecorder(context, assetBuilder.build()); } + @Override + protected SchemaReader getSchemaReader() { + return new MysqlSchemaReader(null, cloudsqlMysqlSourceConfig.getConnectionArguments()); + } + /** CloudSQL MySQL source config. */ public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig {