From a2133fdc827b3a6119795473688db5e6cdcbe532 Mon Sep 17 00:00:00 2001 From: ddesilva Date: Tue, 5 Mar 2024 19:08:52 -0800 Subject: [PATCH 1/3] update --- deploy.sh | 0 pom.xml | 19 ++--------------- .../manager/StarRocksQueryPlanVisitor.java | 7 ++++++- .../StarRocksDynamicSourceFunction.java | 6 ++++-- .../source/StarRocksDynamicTableSource.java | 21 ++++++++++++------- .../source/StarRocksExpressionExtractor.java | 10 ++++----- 6 files changed, 30 insertions(+), 33 deletions(-) mode change 100644 => 100755 deploy.sh diff --git a/deploy.sh b/deploy.sh old mode 100644 new mode 100755 diff --git a/pom.xml b/pom.xml index 77422f3a..0a26fdf4 100644 --- a/pom.xml +++ b/pom.xml @@ -354,21 +354,6 @@ limitations under the License. ${file_encoding} - - maven-clean-plugin - 3.0.0 - - - - ./target/ - - ${artifactId}*.jar - - false - - - - org.apache.maven.plugins maven-jar-plugin @@ -376,7 +361,7 @@ limitations under the License. package - jar + test-jar @@ -588,7 +573,7 @@ limitations under the License. - + release diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java index 576f5cc0..aec33c1d 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java @@ -81,7 +81,7 @@ private static Map> transferQueryPlanToBeXTablet(QueryPlan que beXTablets.put(beNode, new HashSet<>()); candidateBe = beNode; break; - } + } if (beXTablets.get(beNode).size() < tabletCount) { candidateBe = beNode; tabletCount = beXTablets.get(beNode).size(); @@ -113,10 +113,15 @@ private static QueryPlan getQueryPlan(String querySQL, String httpNode, StarRock post.setHeader("Content-Type", "application/json;charset=UTF-8"); post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword())); post.setEntity(new ByteArrayEntity(body.getBytes())); + System.out.println("HELLO POST!"); +// System.out.println(post.getAllHeaders()); +// System.out.println(post.getEntity()); try (CloseableHttpResponse response = httpClient.execute(post)) { requsetCode = response.getStatusLine().getStatusCode(); HttpEntity respEntity = response.getEntity(); respString = EntityUtils.toString(respEntity, "UTF-8"); + System.out.println(respString); + System.out.println(requsetCode); } } if (200 == requsetCode || i == sourceOptions.getScanMaxRetries() - 1) { diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java index 082ead0d..92b791d9 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java @@ -46,7 +46,7 @@ public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction columnRichInfos; private List dataReaderList; - + private StarRocksSourceQueryType queryType; private transient Counter counterTotalScannedRows; @@ -70,7 +70,7 @@ public StarRocksDynamicSourceFunction(TableSchema flinkSchema, StarRocksSourceOp this.dataReaderList = new ArrayList<>(); } - public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, TableSchema flinkSchema, + public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, TableSchema flinkSchema, String filter, long limit, SelectColumn[] selectColumns, String columns, StarRocksSourceQueryType queryType) { // StarRocksSourceCommonFunc.validateTableStructure(sourceOptions, flinkSchema); this.sourceOptions = sourceOptions; @@ -82,6 +82,8 @@ public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, Tabl } else { this.selectColumns = selectColumns; } + System.out.println("called!"); + System.out.println(columns); String SQL = genSQL(queryType, columns, filter, limit); if (queryType == StarRocksSourceQueryType.QueryCount) { this.dataCount = StarRocksSourceCommonFunc.getQueryCount(this.sourceOptions, SQL); diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index 0764ed5d..27fecfdf 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -58,12 +58,13 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction( - options, flinkSchema, - this.pushDownHolder.getFilter(), - this.pushDownHolder.getLimit(), - this.pushDownHolder.getSelectColumns(), - this.pushDownHolder.getColumns(), + options, flinkSchema, + this.pushDownHolder.getFilter(), + this.pushDownHolder.getLimit(), + this.pushDownHolder.getSelectColumns(), + this.pushDownHolder.getColumns(), this.pushDownHolder.getQueryType()); + return SourceFunctionProvider.of(sourceFunction, true); } @@ -73,7 +74,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length]; for (int i = 0; i < projectedFields.length; i ++) { ColumnRichInfo columnRichInfo = new ColumnRichInfo( - this.flinkSchema.getFieldName(projectedFields[i]).get(), + this.flinkSchema.getFieldName(projectedFields[i]).get(), projectedFields[i], this.flinkSchema.getFieldDataType(projectedFields[i]).get() ); @@ -114,9 +115,9 @@ public void applyProjection(int[][] projectedFields) { this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns); ArrayList columnList = new ArrayList<>(); - ArrayList selectColumns = new ArrayList(); + ArrayList selectColumns = new ArrayList(); for (int index : curProjectedFields) { - String columnName = flinkSchema.getFieldName(index).get(); + String columnName = "`" + flinkSchema.getFieldName(index).get() + "`"; columnList.add(columnName); selectColumns.add(new SelectColumn(columnName, index)); } @@ -148,11 +149,15 @@ public Result applyFilters(List filtersExpressions) { } Optional filter = Optional.of(String.join(" and ", filters)); this.pushDownHolder.setFilter(filter.get()); + System.out.println("filter"); + System.out.println(filter.get()); return Result.of(ac, remain); } @Override public void applyLimit(long limit) { this.pushDownHolder.setLimit(limit); + System.out.println("limit"); + System.out.println(limit); } } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java index c8d4a4c3..8fa7d739 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java @@ -57,7 +57,7 @@ public String visit(CallExpression call) { } if (SUPPORT_FUNC.containsKey(funcDef)) { - + List operands = new ArrayList<>(); for (Expression child : call.getChildren()) { String operand = child.accept(this); @@ -74,8 +74,8 @@ public String visit(CallExpression call) { @Override public String visit(ValueLiteralExpression valueLiteral) { LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); - if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) || - typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) || + if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) || + typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) || typeRoot.equals(LogicalTypeRoot.DATE)) { return "'" + valueLiteral.toString() + "'"; @@ -85,7 +85,7 @@ public String visit(ValueLiteralExpression valueLiteral) { @Override public String visit(FieldReferenceExpression fieldReference) { - return fieldReference.getName(); + return "`" + fieldReference.getName() + "`"; } @Override @@ -98,4 +98,4 @@ public String visit(Expression other) { return null; } -} \ No newline at end of file +} From 87e2533604d0576c7b91552e4bf8136a0f9a51ee Mon Sep 17 00:00:00 2001 From: ddesilva Date: Tue, 5 Mar 2024 19:39:59 -0800 Subject: [PATCH 2/3] update --- .../flink/table/source/StarRocksDynamicSourceFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java index 92b791d9..ff5a5541 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java @@ -128,7 +128,7 @@ private String genSQL(StarRocksSourceQueryType queryType, String columns, String } if (limit > 0) { // (not support) SQL = SQL + " limit " + limit; - throw new RuntimeException("Read data from be not support limit now !"); + sqlSb.append(" limit " + limit); } return sqlSb.toString(); } From 703b546e713ff42ead91409e182c8e61203c5ec7 Mon Sep 17 00:00:00 2001 From: ddesilva Date: Wed, 6 Mar 2024 14:15:31 -0800 Subject: [PATCH 3/3] remove print statements --- build.sh | 2 +- pom.xml | 15 +++++++++++++++ .../flink/manager/StarRocksQueryPlanVisitor.java | 7 +------ .../source/StarRocksDynamicSourceFunction.java | 8 +++----- .../table/source/StarRocksDynamicTableSource.java | 4 ---- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/build.sh b/build.sh index 21bcf4dc..d78f3230 100755 --- a/build.sh +++ b/build.sh @@ -33,7 +33,7 @@ check_flink_version_supported $flink_minor_version flink_version="$(get_flink_version $flink_minor_version)" kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)" -${MVN_CMD} clean package -DskipTests \ +${MVN_CMD} clean package -DskipTests -Drat.skip=true \ -Dflink.minor.version=${flink_minor_version} \ -Dflink.version=${flink_version} \ -Dkafka.connector.version=${kafka_connector_version} diff --git a/pom.xml b/pom.xml index 0a26fdf4..8a701d41 100644 --- a/pom.xml +++ b/pom.xml @@ -354,6 +354,21 @@ limitations under the License. ${file_encoding} + + maven-clean-plugin + 3.0.0 + + + + ./target/ + + ${artifactId}*.jar + + false + + + + org.apache.maven.plugins maven-jar-plugin diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java index aec33c1d..576f5cc0 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java @@ -81,7 +81,7 @@ private static Map> transferQueryPlanToBeXTablet(QueryPlan que beXTablets.put(beNode, new HashSet<>()); candidateBe = beNode; break; - } + } if (beXTablets.get(beNode).size() < tabletCount) { candidateBe = beNode; tabletCount = beXTablets.get(beNode).size(); @@ -113,15 +113,10 @@ private static QueryPlan getQueryPlan(String querySQL, String httpNode, StarRock post.setHeader("Content-Type", "application/json;charset=UTF-8"); post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword())); post.setEntity(new ByteArrayEntity(body.getBytes())); - System.out.println("HELLO POST!"); -// System.out.println(post.getAllHeaders()); -// System.out.println(post.getEntity()); try (CloseableHttpResponse response = httpClient.execute(post)) { requsetCode = response.getStatusLine().getStatusCode(); HttpEntity respEntity = response.getEntity(); respString = EntityUtils.toString(respEntity, "UTF-8"); - System.out.println(respString); - System.out.println(requsetCode); } } if (200 == requsetCode || i == sourceOptions.getScanMaxRetries() - 1) { diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java index ff5a5541..082ead0d 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java @@ -46,7 +46,7 @@ public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction columnRichInfos; private List dataReaderList; - + private StarRocksSourceQueryType queryType; private transient Counter counterTotalScannedRows; @@ -70,7 +70,7 @@ public StarRocksDynamicSourceFunction(TableSchema flinkSchema, StarRocksSourceOp this.dataReaderList = new ArrayList<>(); } - public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, TableSchema flinkSchema, + public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, TableSchema flinkSchema, String filter, long limit, SelectColumn[] selectColumns, String columns, StarRocksSourceQueryType queryType) { // StarRocksSourceCommonFunc.validateTableStructure(sourceOptions, flinkSchema); this.sourceOptions = sourceOptions; @@ -82,8 +82,6 @@ public StarRocksDynamicSourceFunction(StarRocksSourceOptions sourceOptions, Tabl } else { this.selectColumns = selectColumns; } - System.out.println("called!"); - System.out.println(columns); String SQL = genSQL(queryType, columns, filter, limit); if (queryType == StarRocksSourceQueryType.QueryCount) { this.dataCount = StarRocksSourceCommonFunc.getQueryCount(this.sourceOptions, SQL); @@ -128,7 +126,7 @@ private String genSQL(StarRocksSourceQueryType queryType, String columns, String } if (limit > 0) { // (not support) SQL = SQL + " limit " + limit; - sqlSb.append(" limit " + limit); + throw new RuntimeException("Read data from be not support limit now !"); } return sqlSb.toString(); } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index 27fecfdf..5b79f77c 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -149,15 +149,11 @@ public Result applyFilters(List filtersExpressions) { } Optional filter = Optional.of(String.join(" and ", filters)); this.pushDownHolder.setFilter(filter.get()); - System.out.println("filter"); - System.out.println(filter.get()); return Result.of(ac, remain); } @Override public void applyLimit(long limit) { this.pushDownHolder.setLimit(limit); - System.out.println("limit"); - System.out.println(limit); } }