diff --git a/pom.xml b/pom.xml
index 4c83590..d07bb23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
io.cdap.plugin
snowflake-plugins
- 1.1.3
+ 1.1.4-SNAPSHOT
jar
Snowflake plugins
diff --git a/src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java b/src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java
index fee4808..9302eae 100644
--- a/src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java
+++ b/src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java
@@ -24,6 +24,7 @@
import io.cdap.plugin.snowflake.common.client.SnowflakeFieldDescriptor;
import io.cdap.plugin.snowflake.common.exception.SchemaParseException;
import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfig;
+import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
import java.io.IOException;
import java.sql.Types;
@@ -62,7 +63,8 @@ public static Schema getSchema(SnowflakeBatchSourceConfig config, FailureCollect
return getParsedSchema(config.getSchema());
}
- SnowflakeSourceAccessor snowflakeSourceAccessor = new SnowflakeSourceAccessor(config);
+ SnowflakeSourceAccessor snowflakeSourceAccessor =
+ new SnowflakeSourceAccessor(config, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
return getSchema(snowflakeSourceAccessor, config.getSchema(), collector, config.getImportQuery());
}
diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java
index 3c6c8db..f5fde15 100644
--- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java
+++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.snowflake.source.batch;
+import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
@@ -33,6 +34,7 @@
import io.cdap.plugin.snowflake.common.util.SchemaHelper;
import org.apache.hadoop.io.NullWritable;
+import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -68,7 +70,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
public void prepareRun(BatchSourceContext context) {
FailureCollector failureCollector = context.getFailureCollector();
config.validate(failureCollector);
-
+ Map arguments = new HashMap<>(context.getArguments().asMap());
+ String escapeChar = arguments.containsKey(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR) &&
+ !Strings.isNullOrEmpty(arguments.get(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR))
+ ? arguments.get(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR)
+ : SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR;
Schema schema = SchemaHelper.getSchema(config, failureCollector);
failureCollector.getOrThrowException();
@@ -81,7 +87,7 @@ public void prepareRun(BatchSourceContext context) {
.collect(Collectors.toList()));
}
- context.setInput(Input.of(config.getReferenceName(), new SnowflakeInputFormatProvider(config)));
+ context.setInput(Input.of(config.getReferenceName(), new SnowflakeInputFormatProvider(config, escapeChar)));
}
@Override
diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java
index 5b655e3..c4c1d1b 100644
--- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java
+++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java
@@ -60,6 +60,8 @@ private SnowflakeSourceAccessor getSnowflakeAccessor(Configuration configuration
SnowflakeInputFormatProvider.PROPERTY_CONFIG_JSON);
SnowflakeBatchSourceConfig config = GSON.fromJson(
configJson, SnowflakeBatchSourceConfig.class);
- return new SnowflakeSourceAccessor(config);
+ String escapeChar = configuration.get(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR,
+ SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
+ return new SnowflakeSourceAccessor(config, escapeChar);
}
}
diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java
index 9020599..97f6996 100644
--- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java
+++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java
@@ -29,13 +29,17 @@
public class SnowflakeInputFormatProvider implements InputFormatProvider {
public static final String PROPERTY_CONFIG_JSON = "cdap.snowflake.source.config";
+ public static final String PROPERTY_ESCAPE_CHAR = "cdap.snowflake.source.escape";
+
+ public static final String PROPERTY_DEFAULT_ESCAPE_CHAR = "\\";
private static final Gson GSON = new Gson();
private final Map conf;
- public SnowflakeInputFormatProvider(SnowflakeBatchSourceConfig config) {
+ public SnowflakeInputFormatProvider(SnowflakeBatchSourceConfig config, String escapeChar) {
this.conf = new ImmutableMap.Builder()
.put(PROPERTY_CONFIG_JSON, GSON.toJson(config))
+ .put(PROPERTY_ESCAPE_CHAR, escapeChar)
.build();
}
diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java
index 2f75653..d8990c0 100644
--- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java
+++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java
@@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
@@ -83,7 +84,8 @@ private Object convertValue(String fieldName, String value, Schema fieldSchema)
case TIME_MICROS:
return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(value).toNanoOfDay());
case DECIMAL:
- return new BigDecimal(value).setScale(fieldSchema.getScale()).unscaledValue().toByteArray();
+ return new BigDecimal(value).setScale(fieldSchema.getScale(),
+ RoundingMode.HALF_EVEN).unscaledValue().toByteArray();
default:
throw new IllegalArgumentException(
String.format("Field '%s' is of unsupported type '%s'", fieldSchema.getDisplayName(),
diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java
index 9bda860..bc8c922 100644
--- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java
+++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java
@@ -60,10 +60,12 @@ public class SnowflakeSourceAccessor extends SnowflakeAccessor {
"OVERWRITE=TRUE HEADER=TRUE SINGLE=FALSE";
private static final String COMMAND_MAX_FILE_SIZE = " MAX_FILE_SIZE=%s";
private final SnowflakeBatchSourceConfig config;
+ private final char escapeChar;
- public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config) {
+ public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config, String escapeChar) {
super(config);
this.config = config;
+ this.escapeChar = escapeChar.charAt(0);
}
/**
@@ -116,7 +118,7 @@ public CSVReader buildCsvReader(String stageSplit) throws IOException {
InputStream downloadStream = connection.unwrap(SnowflakeConnection.class)
.downloadStream("@~", stageSplit, true);
InputStreamReader inputStreamReader = new InputStreamReader(downloadStream);
- return new CSVReader(inputStreamReader);
+ return new CSVReader(inputStreamReader, ',', '"', escapeChar);
} catch (SQLException e) {
throw new IOException(e);
}
diff --git a/src/test/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessorTest.java b/src/test/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessorTest.java
index 9df01e4..27b0154 100644
--- a/src/test/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessorTest.java
+++ b/src/test/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessorTest.java
@@ -18,6 +18,7 @@
import io.cdap.plugin.snowflake.Constants;
import io.cdap.plugin.snowflake.common.BaseSnowflakeTest;
+import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
import org.junit.Assert;
import org.junit.Test;
@@ -44,7 +45,8 @@
*/
public class SnowflakeAccessorTest extends BaseSnowflakeTest {
- private SnowflakeSourceAccessor snowflakeAccessor = new SnowflakeSourceAccessor(CONFIG);
+ private SnowflakeSourceAccessor snowflakeAccessor =
+ new SnowflakeSourceAccessor(CONFIG, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
@Test
public void testDescribeQuery() throws Exception {