Skip to content

Commit

Permalink
Merge pull request #489 from damjad/develop
Browse files Browse the repository at this point in the history
Fix non-nullable datetime when zeroDateTimeBehavior is CONVERT_TO_NULL.
  • Loading branch information
itsankit-google authored May 15, 2024
2 parents 2b47611 + 84b1f93 commit 3ab084b
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.cloudsql.mysql;

import com.google.common.collect.Maps;
import io.cdap.cdap.api.annotation.Category;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
Expand Down Expand Up @@ -75,7 +76,7 @@ public StructuredRecord transform(LongWritable longWritable, MysqlDBRecord mysql

@Override
protected SchemaReader getSchemaReader(String sessionID) {
return new MysqlSchemaReader(sessionID);
return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
import io.cdap.plugin.mysql.MysqlDBRecord;
import io.cdap.plugin.mysql.MysqlSchemaReader;
import io.cdap.plugin.util.CloudSQLUtil;
import io.cdap.plugin.util.DBUtils;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
Expand Down Expand Up @@ -120,6 +122,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
return new LineageRecorder(context, assetBuilder.build());
}

@Override
protected SchemaReader getSchemaReader() {
return new MysqlSchemaReader(null, cloudsqlMysqlSourceConfig.getConnectionArguments());
}

/** CloudSQL MySQL source config. */
public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.mysql;

import com.google.common.collect.Maps;
import io.cdap.cdap.api.annotation.Category;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
Expand Down Expand Up @@ -62,7 +63,7 @@ public boolean supportSchema() {

@Override
protected SchemaReader getSchemaReader(String sessionID) {
return new MysqlSchemaReader(sessionID);
return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private MysqlConstants() {
public static final String TRUST_CERT_KEYSTORE_PASSWORD = "trustCertificateKeyStorePassword";
public static final String MYSQL_CONNECTION_STRING_FORMAT = "jdbc:mysql://%s:%s/%s";
public static final String USE_CURSOR_FETCH = "useCursorFetch";
public static final String ZERO_DATE_TIME_BEHAVIOR = "zeroDateTimeBehavior";

/**
* Query to set SQL_MODE system variable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package io.cdap.plugin.mysql;

import com.google.common.collect.Lists;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.db.CommonSchemaReader;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Map;

/**
* Schema reader for mapping Mysql DB type
Expand All @@ -31,12 +35,42 @@ public class MysqlSchemaReader extends CommonSchemaReader {
public static final String YEAR_TYPE_NAME = "YEAR";
public static final String MEDIUMINT_UNSIGNED_TYPE_NAME = "MEDIUMINT UNSIGNED";
private final String sessionID;
private boolean zeroDateTimeToNull;

public MysqlSchemaReader(String sessionID) {
super();
this.sessionID = sessionID;
}

public MysqlSchemaReader(String sessionID, Map<String, String> connectionArguments) {
super();
this.sessionID = sessionID;
this.zeroDateTimeToNull = MysqlUtil.isZeroDateTimeToNull(connectionArguments);
}

@Override
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
List<Schema.Field> schemaFields = Lists.newArrayList();
ResultSetMetaData metadata = resultSet.getMetaData();
// ResultSetMetadata columns are numbered starting with 1
for (int i = 1; i <= metadata.getColumnCount(); i++) {
if (shouldIgnoreColumn(metadata, i)) {
continue;
}

String columnName = metadata.getColumnName(i);
Schema columnSchema = getSchema(metadata, i);

if (ResultSetMetaData.columnNullable == metadata.isNullable(i)
|| (zeroDateTimeToNull && MysqlUtil.isDateTimeLikeType(metadata.getColumnType(i)))) {
columnSchema = Schema.nullableOf(columnSchema);
}
Schema.Field field = Schema.Field.of(columnName, columnSchema);
schemaFields.add(field);
}
return schemaFields;
}

@Override
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
return metadata.getColumnName(index).equals("c_" + sessionID) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {

@Override
protected SchemaReader getSchemaReader() {
return new MysqlSchemaReader(null);
return new MysqlSchemaReader(null, mysqlSourceConfig.getConnectionArguments());
}

/**
Expand Down
17 changes: 17 additions & 0 deletions mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableMap;

import java.sql.Types;
import java.util.Map;

/**
Expand Down Expand Up @@ -91,4 +92,20 @@ public static Map<String, String> composeDbSpecificArgumentsMap(Boolean autoReco
public static String getConnectionString(String host, Integer port, String database) {
return String.format(MysqlConstants.MYSQL_CONNECTION_STRING_FORMAT, host, port, database);
}

public static boolean isDateTimeLikeType(int columnType) {
int[] dateTimeLikeTypes = new int[]{Types.TIMESTAMP, Types.TIMESTAMP_WITH_TIMEZONE, Types.DATE};

for (int dttType : dateTimeLikeTypes) {
if (dttType == columnType) {
return true;
}
}
return false;
}

public static boolean isZeroDateTimeToNull(Map<String, String> connectionArguments) {
String argValue = connectionArguments.getOrDefault(MysqlConstants.ZERO_DATE_TIME_BEHAVIOR, "");
return argValue.equals("CONVERT_TO_NULL") || argValue.equals("convertToNull");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.junit.Test;
import org.mockito.Mockito;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MysqlSchemaReaderUnitTest {

Expand All @@ -37,4 +41,33 @@ public void validateYearTypeToStringTypeConversion() throws SQLException {
Schema schema = schemaReader.getSchema(metadata, 1);
Assert.assertTrue(Schema.of(Schema.Type.INT).equals(schema));
}

@Test
public void validateZeroDateTimeBehavior() throws SQLException {
ResultSet resultSet = Mockito.mock(ResultSet.class);
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);

Mockito.when(metadata.getColumnCount()).thenReturn(1);
Mockito.when(metadata.getColumnName(Mockito.eq(1))).thenReturn("some_date");

Mockito.when(metadata.getColumnType(Mockito.eq(1))).thenReturn(Types.DATE);
Mockito.when(metadata.getColumnTypeName(Mockito.eq(1))).thenReturn(MysqlSchemaReader.YEAR_TYPE_NAME);

// non-nullable column
Mockito.when(metadata.isNullable(Mockito.eq(1))).thenReturn(0);

// test that non-nullable date remains non-nullable when no conn arg is present
MysqlSchemaReader schemaReader = new MysqlSchemaReader(null);
List<Schema.Field> schemaFields = schemaReader.getSchemaFields(resultSet);
Assert.assertFalse(schemaFields.get(0).getSchema().isNullable());

// test that it converts non-nullable date column to nullable when zeroDateTimeBehavior is convert to null
Map<String, String> connectionArguments = new HashMap<>();
connectionArguments.put("zeroDateTimeBehavior", "CONVERT_TO_NULL");

schemaReader = new MysqlSchemaReader(null, connectionArguments);
schemaFields = schemaReader.getSchemaFields(resultSet);
Assert.assertTrue(schemaFields.get(0).getSchema().isNullable());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.mysql;

import org.junit.Test;

import java.sql.Types;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MysqlUtilUnitTest {

@Test
public void testIsZeroDateTimeToNull() {
Map<String, String> connArgsMap = new HashMap<>(1);

connArgsMap.put("zeroDateTimeBehavior", "");
assertFalse(MysqlUtil.isZeroDateTimeToNull(connArgsMap));

connArgsMap.put("zeroDateTimeBehavior", "ROUND");
assertFalse(MysqlUtil.isZeroDateTimeToNull(connArgsMap));

connArgsMap.put("zeroDateTimeBehavior", "CONVERT_TO_NULL");
assertTrue(MysqlUtil.isZeroDateTimeToNull(connArgsMap));

connArgsMap.put("zeroDateTimeBehavior", "convertToNull");
assertTrue(MysqlUtil.isZeroDateTimeToNull(connArgsMap));
}

@Test
public void testIsDateTimeLikeType() {
int dateType = Types.DATE;
int timestampType = Types.TIMESTAMP;
int timestampWithTimezoneType = Types.TIMESTAMP_WITH_TIMEZONE;
int timeType = Types.TIME;
int stringType = Types.VARCHAR;

assertTrue(MysqlUtil.isDateTimeLikeType(dateType));
assertTrue(MysqlUtil.isDateTimeLikeType(timestampType));
assertTrue(MysqlUtil.isDateTimeLikeType(timestampWithTimezoneType));
assertFalse(MysqlUtil.isDateTimeLikeType(timeType));
assertFalse(MysqlUtil.isDateTimeLikeType(stringType));
}
}

0 comments on commit 3ab084b

Please sign in to comment.