From 38879d2a37c6438ed1fda5af24176780f886fe9a Mon Sep 17 00:00:00 2001
From: xlfjcg <103733994+xlfjcg@users.noreply.github.com>
Date: Fri, 27 May 2022 19:18:34 +0800
Subject: [PATCH] release v1.2.2
release v1.2.2
---
pom.xml | 37 ++++++++++++-------
.../flink/manager/StarRocksSinkManager.java | 27 +++++++-------
.../row/sink/StarRocksDelimiterParser.java | 4 +-
.../sink/StarRocksTableRowTransformer.java | 6 +--
.../sink/StarRocksDynamicSinkFunction.java | 3 +-
.../table/sink/StarRocksSinkOptions.java | 3 +-
.../StarRocksDynamicSourceFunction.java | 2 +-
.../connector/flink/tools/ExecuteSQL.java | 2 +-
.../sink/StarRocksSinkManagerTest.java | 7 +---
.../StarRocksGenericRowTransformerTest.java | 5 +--
.../StarRocksTableRowTransformerTest.java | 4 +-
.../sink/StarRocksStreamLoadVisitorTest.java | 7 ----
12 files changed, 52 insertions(+), 55 deletions(-)
diff --git a/pom.xml b/pom.xml
index c2766c12..790db4e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@ limitations under the License.
4.0.0
com.starrocks
flink-connector-starrocks
- 1.2.1_flink-1.14_${scala.version}
+ 1.2.2_flink-1.14_${scala.version}
1.8
1.8
@@ -86,6 +86,11 @@ limitations under the License.
provided
+
+ com.google.guava
+ guava
+ 31.1-jre
+
commons-codec
commons-codec
@@ -126,18 +131,6 @@ limitations under the License.
mysql-connector-java
5.1.49
-
- org.jmockit
- jmockit
- 1.48
- test
-
-
- junit
- junit
- 4.12
- test
-
com.starrocks
starrocks-thrift-sdk
@@ -173,6 +166,19 @@ limitations under the License.
+
+
+ org.jmockit
+ jmockit
+ 1.48
+ test
+
+
+ junit
+ junit
+ 4.12
+ test
+
@@ -266,6 +272,10 @@ limitations under the License.
com.google.flatbuffers
com.starrocks.shade.com.google.flatbuffers
+
+ com.google.common
+ com.starrocks.shade.com.google.common
+
com.fasterxml.jackson.core
com.starrocks.shade.com.fasterxml.jackson.core
@@ -288,6 +298,7 @@ limitations under the License.
io.netty:netty-buffer
io.netty:netty-common
com.google.flatbuffers:flatbuffers-java
+ com.google.guava:guava
com.fasterxml.jackson.core:jackson-annotations
com.fasterxml.jackson.core:jackson-core
com.fasterxml.jackson.core:jackson-databind
diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java
index 8c9e5fcb..bbdb8d8f 100644
--- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java
+++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java
@@ -46,7 +46,6 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.constraints.UniqueConstraint;
@@ -113,19 +112,19 @@ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, TableSchema flinkS
this.starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName());
// validate table structure
typesMap = new HashMap<>();
- typesMap.put("bigint", Lists.newArrayList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
- typesMap.put("largeint", Lists.newArrayList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
- typesMap.put("char", Lists.newArrayList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
- typesMap.put("date", Lists.newArrayList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
- typesMap.put("datetime", Lists.newArrayList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
- typesMap.put("decimal", Lists.newArrayList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
- typesMap.put("double", Lists.newArrayList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
- typesMap.put("float", Lists.newArrayList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
- typesMap.put("int", Lists.newArrayList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
- typesMap.put("tinyint", Lists.newArrayList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
- typesMap.put("smallint", Lists.newArrayList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
- typesMap.put("varchar", Lists.newArrayList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
- typesMap.put("string", Lists.newArrayList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
+ typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+ typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+ typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
+ typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
+ typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
+ typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
+ typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
+ typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
+ typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+ typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
+ typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+ typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
+ typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
validateTableStructure(flinkSchema);
String version = this.starrocksQueryVisitor.getStarRocksVersion();
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(
diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java
index 2a43316b..27784c58 100644
--- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java
+++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksDelimiterParser.java
@@ -16,14 +16,12 @@
import java.io.StringWriter;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
-
public class StarRocksDelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";
public static String parse(String sp, String dSp) throws RuntimeException {
- if (Strings.isNullOrEmpty(sp)) {
+ if (sp == null || sp.length() == 0) {
return dSp;
}
if (!sp.toUpperCase().startsWith("\\X")) {
diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java
index b204e16c..adc30116 100644
--- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java
+++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java
@@ -32,11 +32,11 @@
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.fastjson.serializer.SerializeWriter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
@@ -145,7 +145,7 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) {
return convertNestedMap(record.getMap(pos), type);
case ROW:
RowType rType = (RowType)type;
- Map m = Maps.newHashMap();
+ Map m = new HashMap<>();
RowData row = record.getRow(pos, rType.getFieldCount());
rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName()))));
return m;
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java
index 20ad8e17..4a40f991 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java
@@ -14,6 +14,7 @@
package com.starrocks.connector.flink.table.sink;
+import com.google.common.base.Strings;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -29,9 +30,9 @@
import org.apache.flink.table.data.binary.NestedRowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java
index a9ee6f06..44715d20 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java
@@ -92,7 +92,8 @@ public enum StreamLoadFormat {
public StarRocksSinkOptions(ReadableConfig options, Map optionsMap) {
this.tableOptions = options;
- this.tableOptionsMap = optionsMap;
+ // Can not promise the input parameter optionsMap is serializable. Use the HashMap to copy the data.
+ this.tableOptionsMap = new HashMap<>(optionsMap);
parseSinkStreamLoadProperties();
this.validate();
}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java
index 997ee8e0..92a65fb5 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java
@@ -14,6 +14,7 @@
package com.starrocks.connector.flink.table.source;
+import com.google.common.base.Strings;
import com.starrocks.connector.flink.table.source.struct.ColunmRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
@@ -22,7 +23,6 @@
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
diff --git a/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java b/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java
index 99ca2884..dfde47bf 100644
--- a/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java
+++ b/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java
@@ -17,8 +17,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
+import com.google.common.base.Strings;
import org.apache.flink.api.java.utils.MultipleParameterTool;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
diff --git a/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java b/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java
index 5573d4a0..bafbeaa1 100644
--- a/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java
+++ b/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java
@@ -14,10 +14,6 @@
package com.starrocks.connector.flink.manager.sink;
-import java.util.ArrayList;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema.Builder;
@@ -28,6 +24,7 @@
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -58,7 +55,7 @@ public void testValidateTableStructure() {
new Expectations(){
{
v.getTableColumnsMetaData();
- result = Lists.newArrayList();
+ result = new ArrayList<>();
}
};
String exMsg = "";
diff --git a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java
index dde6ff57..78e341d6 100644
--- a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java
+++ b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformerTest.java
@@ -14,8 +14,9 @@
package com.starrocks.connector.flink.row.sink;
+import com.google.common.base.Strings;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
+
import org.junit.Test;
import mockit.Injectable;
@@ -29,8 +30,6 @@
import com.alibaba.fastjson.JSON;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
-import com.starrocks.connector.flink.row.sink.StarRocksGenericRowTransformer;
-import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
public class StarRocksGenericRowTransformerTest extends StarRocksSinkBaseTest {
diff --git a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java
index 5d916b9d..aa100d9f 100644
--- a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java
+++ b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformerTest.java
@@ -14,9 +14,9 @@
package com.starrocks.connector.flink.row.sink;
+import com.google.common.base.Strings;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.junit.Test;
import mockit.Injectable;
@@ -32,8 +32,6 @@
import com.alibaba.fastjson.JSON;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
-import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
-import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
diff --git a/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java b/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java
index 2d0c81e1..c95f04d5 100644
--- a/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java
+++ b/src/test/java/com/starrocks/connector/flink/table/sink/StarRocksStreamLoadVisitorTest.java
@@ -14,19 +14,12 @@
package com.starrocks.connector.flink.table.sink;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
-import org.apache.http.HttpEntity;
-import org.apache.http.entity.ByteArrayEntity;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;