From 12bebc4cdaab4d67bd4518061b046bbaae10da79 Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Thu, 28 Nov 2024 16:24:49 +0800 Subject: [PATCH 1/2] BugFix: Fix unable to connect to oracle tenant. --- docs/spark-connector-oceanbase.md | 8 +++++++- docs/spark-connector-oceanbase_cn.md | 6 ++++++ .../oceanbase/spark/cfg/ConnectionOptions.java | 2 ++ .../com/oceanbase/spark/jdbc/OBJdbcUtils.scala | 17 +++++++++++++++-- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/docs/spark-connector-oceanbase.md b/docs/spark-connector-oceanbase.md index e02fad1..5e4b175 100644 --- a/docs/spark-connector-oceanbase.md +++ b/docs/spark-connector-oceanbase.md @@ -279,7 +279,7 @@ df.write sql-port - + 2881 Integer The SQL port. @@ -313,6 +313,12 @@ df.write String The table name. + + driver + com.mysql.cj.jdbc.Driver + String + The class name of the JDBC driver. By default, it connects to the MySQL tenant. If you need to connect to Oracle tenant, the name needs to be com.oceanbase.jdbc.Driver + diff --git a/docs/spark-connector-oceanbase_cn.md b/docs/spark-connector-oceanbase_cn.md index 7267f8a..0414e44 100644 --- a/docs/spark-connector-oceanbase_cn.md +++ b/docs/spark-connector-oceanbase_cn.md @@ -310,6 +310,12 @@ df.write String 表名。 + + driver + com.mysql.cj.jdbc.Driver + String + JDBC 驱动程序的类名。默认支持连接MySQL租户。如果需要连接到Oracle租户,请修改为 com.oceanbase.jdbc.Driver + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java index f3597b5..4e986a6 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java @@ -24,6 +24,8 @@ public interface ConnectionOptions { String PASSWORD = "password"; String SCHEMA_NAME = "schema-name"; String TABLE_NAME = "table-name"; + String DRIVER = "driver"; + String DRIVER_DEFAULT = "com.mysql.cj.jdbc.Driver"; /* Direct-load config */ String ENABLE_DIRECT_LOAD_WRITE = "direct-load.enabled"; diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala index 5c0ce3e..1f6cc23 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala @@ -25,6 +25,10 @@ import java.sql.{Connection, DriverManager} object OBJdbcUtils { val OB_MYSQL_URL = s"jdbc:mysql://%s:%d/%s" private val OB_ORACLE_URL = s"jdbc:oceanbase://%s:%d/%s" + private val MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver" + private val MYSQL_LEGACY_JDBC_DRIVER = "com.mysql.jdbc.Driver" + private val OB_JDBC_DRIVER = "com.oceanbase.jdbc.Driver" + private val OB_LEGACY_JDBC_DRIVER = "com.alipay.oceanbase.jdbc.Driver" def getConnection(sparkSettings: SparkSettings): Connection = { val connection = DriverManager.getConnection( @@ -41,19 +45,28 @@ object OBJdbcUtils { def getJdbcUrl(sparkSettings: SparkSettings): String = { var url: String = null - if ("MYSQL".equalsIgnoreCase(getCompatibleMode(sparkSettings))) { + val driver = + sparkSettings.getProperty(ConnectionOptions.DRIVER, ConnectionOptions.DRIVER_DEFAULT) + if ( + driver.equalsIgnoreCase(MYSQL_JDBC_DRIVER) || driver.equalsIgnoreCase( + MYSQL_LEGACY_JDBC_DRIVER) + ) { url = OBJdbcUtils.OB_MYSQL_URL.format( sparkSettings.getProperty(ConnectionOptions.HOST), sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) ) - } else { + } else if ( + driver.equalsIgnoreCase(OB_JDBC_DRIVER) || driver.equalsIgnoreCase(OB_LEGACY_JDBC_DRIVER) + ) { JdbcDialects.registerDialect(OceanBaseOracleDialect) url = OBJdbcUtils.OB_ORACLE_URL.format( sparkSettings.getProperty(ConnectionOptions.HOST), sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) ) + } else { + throw new RuntimeException(String.format("Unsupported driver name: %s", driver)) } url } From 428349893c296cb1c1a083187ab08bb35a6a22e2 Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Tue, 10 Dec 2024 13:44:44 +0800 Subject: [PATCH 2/2] Enhancement: Refactor connector options and close #6. --- docs/spark-connector-oceanbase.md | 44 ++- docs/spark-connector-oceanbase_cn.md | 46 ++-- .../oceanbase/spark/config/ConfigBuilder.java | 26 ++ .../oceanbase/spark/OceanBaseE2eITCase.java | 16 +- .../utils/SparkContainerTestEnvironment.java | 10 + spark-connector-oceanbase/pom.xml | 6 + .../spark-connector-oceanbase-2.4/pom.xml | 1 + .../spark-connector-oceanbase-3.1/pom.xml | 1 + .../spark-connector-oceanbase-3.2/pom.xml | 1 + .../spark-connector-oceanbase-3.3/pom.xml | 1 + .../spark-connector-oceanbase-3.4/pom.xml | 1 + .../spark-connector-oceanbase-base/pom.xml | 5 + .../spark/cfg/ConnectionOptions.java | 54 ---- .../com/oceanbase/spark/cfg/Settings.java | 120 --------- .../oceanbase/spark/cfg/SparkSettings.java | 86 ------ .../spark/config/OceanBaseConfig.java | 255 ++++++++++++++++++ .../{cfg => config}/OceanBaseUserInfo.java | 6 +- .../spark/directload/DirectLoadUtils.java | 36 ++- .../spark/directload/DirectLoaderBuilder.java | 25 -- .../oceanbase/spark/jdbc/OBJdbcUtils.scala | 61 +---- .../spark/jdbc/OceanBaseOracleDialect.scala | 153 ----------- .../spark/sql/OceanBaseRelation.scala | 9 +- .../spark/sql/OceanBaseSparkSource.scala | 20 +- .../spark/writer/DirectLoadWriter.scala | 21 +- .../spark/sql/OceanBaseJDBCRelation.scala | 6 +- .../spark/sql/OceanBaseSparkDataSource.scala | 58 ++-- .../sql/jdbc/OceanBaseMySQLDialect.scala | 127 +++++++++ .../sql/jdbc/OceanBaseOracleDialect.scala | 82 ++++++ .../spark/OceanBaseMySQLConnectorITCase.scala | 164 ++++++++++- 29 files changed, 800 insertions(+), 641 deletions(-) delete mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java delete mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java delete mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java create mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java rename spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/{cfg => config}/OceanBaseUserInfo.java (93%) delete mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala create mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseMySQLDialect.scala create mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseOracleDialect.scala diff --git a/docs/spark-connector-oceanbase.md b/docs/spark-connector-oceanbase.md index 5e4b175..0f44af8 100644 --- a/docs/spark-connector-oceanbase.md +++ b/docs/spark-connector-oceanbase.md @@ -103,8 +103,7 @@ mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTest CREATE TEMPORARY VIEW spark_oceanbase USING oceanbase OPTIONS( - "host"= "localhost", - "sql-port" = "2881", + "url"= "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "schema-name"="test", "table-name"="test", "username"="root", @@ -118,8 +117,7 @@ SELECT * FROM spark_oceanbase; ```scala val oceanBaseSparkDF = spark.read.format("OceanBase") - .option("host", "localhost") - .option("sql-port", 2881) + .option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false") .option("username", "root") .option("password", "123456") .option("table-name", "test") @@ -178,8 +176,7 @@ CREATE TABLE test.orders ( CREATE TEMPORARY VIEW test_jdbc USING oceanbase OPTIONS( - "host"="localhost", - "sql-port" = "2881", + "url"= "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "schema-name"="test", "table-name"="orders", "username"="root@test", @@ -202,8 +199,7 @@ import org.apache.spark.sql.SaveMode df.write .format("oceanbase") .mode(saveMode = SaveMode.Append) - .option("host", "localhost") - .option("sql-port", 2881) + .option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false") .option("username", "root") .option("password", "123456") .option("table-name", "orders") @@ -219,13 +215,13 @@ df.write CREATE TEMPORARY VIEW test_direct USING oceanbase OPTIONS( - "host"="localhost", - "sql-port" = "2881", + "url"="jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "schema-name"="test", "table-name"="orders", "username"="root@test", - "password"="", + "password"="123456", "direct-load.enabled" = true, + "direct-load.host" = "localhost", "direct-load.rpc-port" = "2882" ); @@ -245,13 +241,13 @@ import org.apache.spark.sql.SaveMode df.write .format("oceanbase") .mode(saveMode = SaveMode.Append) - .option("host", "localhost") - .option("sql-port", 2881) + .option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false") .option("username", "root") .option("password", "123456") .option("table-name", "orders") .option("schema-name", "test") .option("direct-load.enabled", "true") + .option("direct-load.host", "localhost") .option("direct-load.rpc-port", "2882") .save() ``` @@ -272,16 +268,10 @@ df.write - host + url String - Hostname used in direct-load. - - - sql-port - 2881 - Integer - The SQL port. + The connection URL. username @@ -313,12 +303,6 @@ df.write String The table name. - - driver - com.mysql.cj.jdbc.Driver - String - The class name of the JDBC driver. By default, it connects to the MySQL tenant. If you need to connect to Oracle tenant, the name needs to be com.oceanbase.jdbc.Driver - @@ -342,6 +326,12 @@ df.write Boolean Enable direct-load writing. + + direct-load.host + + String + Hostname used in direct-load. + direct-load.rpc-port 2882 diff --git a/docs/spark-connector-oceanbase_cn.md b/docs/spark-connector-oceanbase_cn.md index 0414e44..eac94ff 100644 --- a/docs/spark-connector-oceanbase_cn.md +++ b/docs/spark-connector-oceanbase_cn.md @@ -102,8 +102,7 @@ mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTest CREATE TEMPORARY VIEW spark_oceanbase USING oceanbase OPTIONS( - "host"= "localhost", - "sql-port" = "2881", + "url"= "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "schema-name"="test", "table-name"="test", "username"="root", @@ -117,8 +116,7 @@ SELECT * FROM spark_oceanbase; ```scala val oceanBaseSparkDF = spark.read.format("OceanBase") - .option("host", "localhost") - .option("sql-port", 2881) + .option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false") .option("username", "root") .option("password", "123456") .option("table-name", "test") @@ -177,12 +175,11 @@ CREATE TABLE test.orders ( CREATE TEMPORARY VIEW test_jdbc USING oceanbase OPTIONS( - "host"="localhost", - "sql-port" = "2881", + "url"="jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "schema-name"="test", "table-name"="orders", "username"="root@test", - "password"="" + "password"="123456" ); insert into table test_jdbc @@ -203,8 +200,7 @@ import org.apache.spark.sql.SaveMode df.write .format("oceanbase") .mode(saveMode = SaveMode.Append) - .option("host", "localhost") - .option("sql-port", 2881) + .option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false") .option("username", "root") .option("password", "123456") .option("table-name", "orders") @@ -220,13 +216,13 @@ df.write CREATE TEMPORARY VIEW test_direct USING oceanbase OPTIONS( - "host"="localhost", - "sql-port" = "2881", + "url"="jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "schema-name"="test", "table-name"="orders", "username"="root@test", - "password"="", + "password"="123456", "direct-load.enabled" = true, + "direct-load.host" = "localhost", "direct-load.rpc-port" = "2882" ); @@ -248,13 +244,13 @@ import org.apache.spark.sql.SaveMode df.write .format("oceanbase") .mode(saveMode = SaveMode.Append) - .option("host", "localhost") - .option("sql-port", 2881) + .option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false") .option("username", "root") .option("password", "123456") .option("table-name", "orders") .option("schema-name", "test") .option("direct-load.enabled", "true") + .option("direct-load.host", "localhost") .option("direct-load.rpc-port", "2882") .save() ``` @@ -275,16 +271,10 @@ df.write - host + url String - 数据库的 JDBC url。 - - - sql-port - 2881 - Int - SQL 端口。 + 连接到OceanBase的 JDBC url. username @@ -310,12 +300,6 @@ df.write String 表名。 - - driver - com.mysql.cj.jdbc.Driver - String - JDBC 驱动程序的类名。默认支持连接MySQL租户。如果需要连接到Oracle租户,请修改为 com.oceanbase.jdbc.Driver - @@ -339,6 +323,12 @@ df.write Boolean 是否开启旁路导入写入。 + + direct-load.host + + String + 旁路导入用到的host地址。 + direct-load.rpc-port 2882 diff --git a/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java index 7da3009..373fc89 100644 --- a/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java +++ b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java @@ -15,6 +15,7 @@ */ package com.oceanbase.spark.config; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -191,4 +192,29 @@ public ConfigEntry booleanConf() { return conf; } + + /** + * Creates a configuration entry for Duration data type. + * + * @return The created ConfigEntry instance for Boolean data type. + */ + public ConfigEntry durationConf() { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + Function func = + s -> { + if (s == null || s.isEmpty()) { + return null; + } else { + return Duration.parse(s); + } + }; + conf.setValueConverter(func); + + Function stringFunc = + t -> Optional.ofNullable(t).map(String::valueOf).orElse(null); + conf.setStringConverter(stringFunc); + + return conf; + } } diff --git a/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java index c19ce7f..ac55aaf 100644 --- a/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java +++ b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java @@ -80,20 +80,20 @@ public void testInsertValues() throws Exception { "CREATE TEMPORARY VIEW test_sink " + "USING oceanbase " + "OPTIONS( " - + " \"host\"= \"%s\"," - + " \"sql-port\" = \"%s\"," + + " \"url\"= \"%s\"," + " \"schema-name\"=\"%s\"," + " \"table-name\"=\"products\"," + " \"username\"=\"%s\"," + " \"password\"=\"%s\"," + " \"direct-load.enabled\" = \"true\"," + + " \"direct-load.host\" = \"%s\"," + " \"direct-load.rpc-port\" = \"%s\" " + ");", - getHostInContainer(), - getPortInContainer(), + getJdbcUrlInContainer(), getSchemaName(), getUsername(), getPassword(), + getHostInContainer(), getRpcPortInContainer())); sqlLines.add( @@ -163,20 +163,20 @@ public void testInsertValuesSpark2() throws Exception { + "df.write\n" + " .format(\"oceanbase\")\n" + " .mode(saveMode = SaveMode.Append)\n" - + " .option(\"host\", \"%s\")\n" - + " .option(\"sql-port\", \"%s\")\n" + + " .option(\"url\", \"%s\")\n" + " .option(\"username\", \"%s\")\n" + " .option(\"password\", \"%s\")\n" + " .option(\"table-name\", \"products\")\n" + " .option(\"schema-name\", \"%s\")\n" + " .option(\"direct-load.enabled\", value = true)\n" + + " .option(\"direct-load.host\", value = \"%s\")\n" + " .option(\"direct-load.rpc-port\", value = \"%s\")\n" + " .save()", - getHostInContainer(), - getPortInContainer(), + getJdbcUrlInContainer(), getUsername(), getPassword(), getSchemaName(), + getHostInContainer(), getRpcPortInContainer())); submitSparkShellJob( diff --git a/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java index c505d0a..43c90ef 100644 --- a/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java +++ b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java @@ -82,6 +82,16 @@ public void after() throws Exception { } } + public String getJdbcUrlInContainer() { + return "jdbc:mysql://" + + getHostInContainer() + + ":" + + getPortInContainer() + + "/" + + getSchemaName() + + "?useUnicode=true&characterEncoding=UTF-8&useSSL=false"; + } + public String getHostInContainer() { return getOBServerIP(); } diff --git a/spark-connector-oceanbase/pom.xml b/spark-connector-oceanbase/pom.xml index 6c315a3..ffb2d03 100644 --- a/spark-connector-oceanbase/pom.xml +++ b/spark-connector-oceanbase/pom.xml @@ -37,6 +37,12 @@ under the License. + + com.oceanbase + spark-connector-oceanbase-common + ${project.version} + + com.oceanbase obkv-table-client diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml index ca49386..bb8c714 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml @@ -90,6 +90,7 @@ under the License. com.alibaba:* + com.oceanbase:spark-connector-oceanbase-common com.oceanbase:spark-connector-oceanbase-base com.oceanbase:obkv-table-client com.alipay.sofa:bolt diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml index 4ce1bfc..7646c60 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml @@ -69,6 +69,7 @@ under the License. com.alibaba:* + com.oceanbase:spark-connector-oceanbase-common com.oceanbase:spark-connector-oceanbase-base com.oceanbase:obkv-table-client com.alipay.sofa:bolt diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml index 78c8d67..02434d9 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml @@ -69,6 +69,7 @@ under the License. com.alibaba:* + com.oceanbase:spark-connector-oceanbase-common com.oceanbase:spark-connector-oceanbase-base com.oceanbase:obkv-table-client com.alipay.sofa:bolt diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml index f27858c..667bf86 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml @@ -69,6 +69,7 @@ under the License. com.alibaba:* + com.oceanbase:spark-connector-oceanbase-common com.oceanbase:spark-connector-oceanbase-base com.oceanbase:obkv-table-client com.alipay.sofa:bolt diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml index d96b781..cfda7e3 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml @@ -69,6 +69,7 @@ under the License. com.alibaba:* + com.oceanbase:spark-connector-oceanbase-common com.oceanbase:spark-connector-oceanbase-base com.oceanbase:obkv-table-client com.alipay.sofa:bolt diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml index 8ef246e..8b8110c 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml @@ -169,6 +169,11 @@ under the License. + + com.oceanbase + oceanbase-client + test + org.apache.logging.log4j diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java deleted file mode 100644 index 4e986a6..0000000 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2024 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.oceanbase.spark.cfg; - -public interface ConnectionOptions { - - String HOST = "host"; - String SQL_PORT = "sql-port"; - String USERNAME = "username"; - String PASSWORD = "password"; - String SCHEMA_NAME = "schema-name"; - String TABLE_NAME = "table-name"; - String DRIVER = "driver"; - String DRIVER_DEFAULT = "com.mysql.cj.jdbc.Driver"; - - /* Direct-load config */ - String ENABLE_DIRECT_LOAD_WRITE = "direct-load.enabled"; - boolean ENABLE_DIRECT_LOAD_WRITE_DEFAULT = false; - String RPC_PORT = "direct-load.rpc-port"; - String PARALLEL = "direct-load.parallel"; - String EXECUTION_ID = "direct-load.execution-id"; - String DUP_ACTION = "direct-load.dup-action"; - String HEARTBEAT_TIMEOUT = "direct-load.heartbeat-timeout"; - String HEARTBEAT_INTERVAL = "direct-load.heartbeat-interval"; - String LOAD_METHOD = "direct-load.load-method"; - String TIMEOUT = "direct-load.timeout"; - String MAX_ERROR_ROWS = "direct-load.max-error-rows"; - String BATCH_SIZE = "direct-load-batch-size"; - int BATCH_SIZE_DEFAULT = 10240; - String SINK_TASK_PARTITION_SIZE = "direct-load.task.partition.size"; - /** - * Set direct-load task partition size. If you set a small coalesce size, and you don't have the - * action operations, this may result in the same parallelism in your computation. To avoid - * this, you can use repartition operations. This will add a shuffle step, but means the current - * upstream partitions will be executed in parallel. - */ - String SINK_TASK_USE_REPARTITION = "direct-load.task.use.repartition"; - - boolean SINK_TASK_USE_REPARTITION_DEFAULT = false; -} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java deleted file mode 100644 index 964f65c..0000000 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2024 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.oceanbase.spark.cfg; - -import java.util.Enumeration; -import java.util.Map; -import java.util.Properties; - - -import org.apache.commons.lang3.StringUtils; - -public abstract class Settings { - public abstract String getProperty(String name); - - public abstract void setProperty(String name, String value); - - public abstract Properties asProperties(); - - public abstract Settings copy(); - - public String getProperty(String name, String defaultValue) { - String value = getProperty(name); - if (StringUtils.isEmpty(value)) { - return defaultValue; - } - return value; - } - - public Integer getIntegerProperty(String name) { - return getIntegerProperty(name, null); - } - - public Integer getIntegerProperty(String name, Integer defaultValue) { - if (getProperty(name) != null) { - return Integer.parseInt(getProperty(name)); - } - - return defaultValue; - } - - public Long getLongProperty(String name) { - return getLongProperty(name, null); - } - - public Long getLongProperty(String name, Long defaultValue) { - if (getProperty(name) != null) { - return Long.parseLong(getProperty(name)); - } - - return defaultValue; - } - - public Boolean getBooleanProperty(String name) { - return getBooleanProperty(name, null); - } - - public Boolean getBooleanProperty(String name, Boolean defaultValue) { - if (getProperty(name) != null) { - return Boolean.valueOf(getProperty(name)); - } - return defaultValue; - } - - public Settings merge(Properties properties) { - if (properties == null || properties.isEmpty()) { - return this; - } - - Enumeration propertyNames = properties.propertyNames(); - - for (; propertyNames.hasMoreElements(); ) { - Object prop = propertyNames.nextElement(); - if (prop instanceof String) { - Object value = properties.get(prop); - setProperty((String) prop, value.toString()); - } - } - - return this; - } - - public Settings merge(Map map) { - if (map == null || map.isEmpty()) { - return this; - } - - for (Map.Entry entry : map.entrySet()) { - setProperty(entry.getKey(), entry.getValue()); - } - - return this; - } - - @Override - public int hashCode() { - return asProperties().hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - return asProperties().equals(((Settings) obj).asProperties()); - } -} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java deleted file mode 100644 index 8539c50..0000000 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2024 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.oceanbase.spark.cfg; - -import java.util.Properties; - -import scala.Option; -import scala.Serializable; -import scala.Tuple2; - -import com.google.common.base.Preconditions; -import org.apache.spark.SparkConf; - -public class SparkSettings extends Settings implements Serializable { - - private final SparkConf cfg; - - public SparkSettings(SparkConf cfg) { - Preconditions.checkArgument(cfg != null, "non-null spark configuration expected."); - this.cfg = cfg; - } - - public static SparkSettings fromProperties(Properties props) { - SparkConf sparkConf = new SparkConf(); - props.forEach( - (k, v) -> { - if (k instanceof String) { - sparkConf.set((String) k, v.toString()); - } - }); - return new SparkSettings(sparkConf); - } - - public SparkSettings copy() { - return new SparkSettings(cfg.clone()); - } - - public String getProperty(String name) { - Option op = cfg.getOption(name); - if (!op.isDefined()) { - op = cfg.getOption("spark." + name); - } - return (op.isDefined() ? op.get() : null); - } - - public void setProperty(String name, String value) { - cfg.set(name, value); - } - - public Properties asProperties() { - Properties props = new Properties(); - - if (cfg != null) { - String sparkPrefix = "spark."; - for (Tuple2 tuple : cfg.getAll()) { - // spark. are special so save them without the prefix as well - // since its unlikely the other implementations will be aware of this convention - String key = tuple._1; - props.setProperty(key, tuple._2); - if (key.startsWith(sparkPrefix)) { - String simpleKey = key.substring(sparkPrefix.length()); - // double check to not override a property defined directly in the config - if (!props.containsKey(simpleKey)) { - props.setProperty(simpleKey, tuple._2); - } - } - } - } - - return props; - } -} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java new file mode 100644 index 0000000..d4f1919 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseConfig.java @@ -0,0 +1,255 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.config; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Map; + + +import org.apache.commons.lang3.StringUtils; + +public class OceanBaseConfig extends Config implements Serializable { + public static final ConfigEntry URL = + new ConfigBuilder("url") + .doc("The connection URL") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry USERNAME = + new ConfigBuilder("username") + .doc("The username") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry PASSWORD = + new ConfigBuilder("password") + .doc("The password") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry SCHEMA_NAME = + new ConfigBuilder("schema-name") + .doc("The schema name or database name") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry TABLE_NAME = + new ConfigBuilder("table-name") + .doc("The table name") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry DIRECT_LOAD_ENABLE = + new ConfigBuilder("direct-load.enabled") + .doc("Enable direct-load writing") + .version(ConfigConstants.VERSION_1_0_0) + .booleanConf() + .createWithDefault(false); + + public static final ConfigEntry DIRECT_LOAD_HOST = + new ConfigBuilder("direct-load.host") + .doc("The direct-load host") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry DIRECT_LOAD_RPC_PORT = + new ConfigBuilder("direct-load.rpc-port") + .doc("Rpc port number used in direct-load") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(port -> port > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(2882); + + public static final ConfigEntry DIRECT_LOAD_PARALLEL = + new ConfigBuilder("direct-load.parallel") + .doc( + "The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(port -> port > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(8); + + public static final ConfigEntry DIRECT_LOAD_EXECUTION_ID = + new ConfigBuilder("direct-load.execution-id") + .doc("The execution id") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .create(); + + public static final ConfigEntry DIRECT_LOAD_DUP_ACTION = + new ConfigBuilder("direct-load.dup-action") + .doc( + "Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .createWithDefault("REPLACE"); + + public static final ConfigEntry DIRECT_LOAD_TIMEOUT = + new ConfigBuilder("direct-load.timeout") + .doc("The timeout for direct-load task.") + .version(ConfigConstants.VERSION_1_0_0) + .durationConf() + .createWithDefault(Duration.ofDays(7)); + + public static final ConfigEntry DIRECT_LOAD_HEARTBEAT_TIMEOUT = + new ConfigBuilder("direct-load.heartbeat-timeout") + .doc("Client heartbeat timeout in direct-load task") + .version(ConfigConstants.VERSION_1_0_0) + .durationConf() + .createWithDefault(Duration.ofSeconds(60)); + + public static final ConfigEntry DIRECT_LOAD_HEARTBEAT_INTERVAL = + new ConfigBuilder("direct-load.heartbeat-interval") + .doc("Client heartbeat interval in direct-load task") + .version(ConfigConstants.VERSION_1_0_0) + .durationConf() + .createWithDefault(Duration.ofSeconds(10)); + + public static final ConfigEntry DIRECT_LOAD_LOAD_METHOD = + new ConfigBuilder("direct-load.load-method") + .doc("The direct-load load mode: full, inc, inc_replace") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .createWithDefault("full"); + + public static final ConfigEntry DIRECT_LOAD_MAX_ERROR_ROWS = + new ConfigBuilder("Maximum tolerable number of error rows") + .doc("direct-load.max-error-rows") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(size -> size >= 0, "The value must be greater than or equal to 0") + .createWithDefault(0); + + public static final ConfigEntry DIRECT_LOAD_BATCH_SIZE = + new ConfigBuilder("direct-load-batch-size") + .doc("The batch size write to OceanBase one time") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(size -> size > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(10240); + + public static final ConfigEntry DIRECT_LOAD_TASK_PARTITION_SIZE = + new ConfigBuilder("direct-load.task-partition-size") + .doc("The number of partitions corresponding to the Writing task") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .create(); + + public static final ConfigEntry DIRECT_LOAD_TASK_USE_REPARTITION = + new ConfigBuilder("direct-load.task-use-repartition") + .doc( + "Whether to use repartition mode to control the number of partitions written by OceanBase") + .version(ConfigConstants.VERSION_1_0_0) + .booleanConf() + .createWithDefault(false); + + public OceanBaseConfig(Map properties) { + super(); + loadFromMap(properties, k -> true); + } + + public String getURL() { + return get(URL); + } + + public String getUsername() { + return get(USERNAME); + } + + public String getPassword() { + return get(PASSWORD); + } + + public String getSchemaName() { + return get(SCHEMA_NAME); + } + + public String getTableName() { + return get(TABLE_NAME); + } + + public Boolean getDirectLoadEnable() { + return get(DIRECT_LOAD_ENABLE); + } + + public String getDirectLoadHost() { + return get(DIRECT_LOAD_HOST); + } + + public int getDirectLoadPort() { + return get(DIRECT_LOAD_RPC_PORT); + } + + public String getDirectLoadExecutionId() { + return get(DIRECT_LOAD_EXECUTION_ID); + } + + public int getDirectLoadParallel() { + return get(DIRECT_LOAD_PARALLEL); + } + + public int getBatchSize() { + return get(DIRECT_LOAD_BATCH_SIZE); + } + + public long getDirectLoadMaxErrorRows() { + return get(DIRECT_LOAD_MAX_ERROR_ROWS); + } + + public String getDirectLoadDupAction() { + return get(DIRECT_LOAD_DUP_ACTION); + } + + public long getDirectLoadTimeout() { + return get(DIRECT_LOAD_TIMEOUT).toMillis(); + } + + public long getDirectLoadHeartbeatTimeout() { + return get(DIRECT_LOAD_HEARTBEAT_TIMEOUT).toMillis(); + } + + public long getDirectLoadHeartbeatInterval() { + return get(DIRECT_LOAD_HEARTBEAT_INTERVAL).toMillis(); + } + + public String getDirectLoadLoadMethod() { + return get(DIRECT_LOAD_LOAD_METHOD); + } + + public Integer getDirectLoadTaskPartitionSize() { + return get(DIRECT_LOAD_TASK_PARTITION_SIZE); + } + + public boolean getDirectLoadUseRepartition() { + return get(DIRECT_LOAD_TASK_USE_REPARTITION); + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/OceanBaseUserInfo.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseUserInfo.java similarity index 93% rename from spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/OceanBaseUserInfo.java rename to spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseUserInfo.java index 8de852c..01f89a0 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/OceanBaseUserInfo.java +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/config/OceanBaseUserInfo.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.oceanbase.spark.cfg; +package com.oceanbase.spark.config; import java.io.Serializable; @@ -55,8 +55,8 @@ public void setTenant(String tenant) { this.tenant = tenant; } - public static OceanBaseUserInfo parse(SparkSettings settings) { - final String username = settings.getProperty(ConnectionOptions.USERNAME); + public static OceanBaseUserInfo parse(OceanBaseConfig oceanBaseConfig) { + final String username = oceanBaseConfig.getUsername(); final String sepUserAtTenant = "@"; final String sepTenantAtCluster = "#"; final String sep = ":"; diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java index d20c2dd..48b6353 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java @@ -16,33 +16,31 @@ package com.oceanbase.spark.directload; -import com.oceanbase.spark.cfg.ConnectionOptions; -import com.oceanbase.spark.cfg.OceanBaseUserInfo; -import com.oceanbase.spark.cfg.SparkSettings; +import com.oceanbase.spark.config.OceanBaseConfig; +import com.oceanbase.spark.config.OceanBaseUserInfo; /** The utils of {@link DirectLoader} */ public class DirectLoadUtils { - public static DirectLoader buildDirectLoaderFromSetting(SparkSettings settings) { + public static DirectLoader buildDirectLoaderFromSetting(OceanBaseConfig oceanBaseConfig) { try { - OceanBaseUserInfo userInfo = OceanBaseUserInfo.parse(settings); + OceanBaseUserInfo userInfo = OceanBaseUserInfo.parse(oceanBaseConfig); return new DirectLoaderBuilder() - .host(settings.getProperty(ConnectionOptions.HOST)) - .port(settings.getIntegerProperty(ConnectionOptions.RPC_PORT)) + .host(oceanBaseConfig.getDirectLoadHost()) + .port(oceanBaseConfig.getDirectLoadPort()) .user(userInfo.getUser()) - .password(settings.getProperty(ConnectionOptions.PASSWORD)) + .password(oceanBaseConfig.getPassword()) .tenant(userInfo.getTenant()) - .schema(settings.getProperty(ConnectionOptions.SCHEMA_NAME)) - .table(settings.getProperty(ConnectionOptions.TABLE_NAME)) - .executionId(settings.getProperty(ConnectionOptions.EXECUTION_ID)) - .duplicateKeyAction(settings.getProperty(ConnectionOptions.DUP_ACTION)) - .maxErrorCount(settings.getLongProperty(ConnectionOptions.MAX_ERROR_ROWS)) - .timeout(settings.getLongProperty(ConnectionOptions.TIMEOUT)) - .heartBeatTimeout(settings.getLongProperty(ConnectionOptions.HEARTBEAT_TIMEOUT)) - .heartBeatInterval( - settings.getLongProperty(ConnectionOptions.HEARTBEAT_INTERVAL)) - .directLoadMethod(settings.getProperty(ConnectionOptions.LOAD_METHOD)) - .parallel(settings.getIntegerProperty(ConnectionOptions.PARALLEL)) + .schema(oceanBaseConfig.getSchemaName()) + .table(oceanBaseConfig.getTableName()) + .executionId(oceanBaseConfig.getDirectLoadExecutionId()) + .duplicateKeyAction(oceanBaseConfig.getDirectLoadDupAction()) + .maxErrorCount(oceanBaseConfig.getDirectLoadMaxErrorRows()) + .timeout(oceanBaseConfig.getDirectLoadTimeout()) + .heartBeatTimeout(oceanBaseConfig.getDirectLoadHeartbeatTimeout()) + .heartBeatInterval(oceanBaseConfig.getDirectLoadHeartbeatInterval()) + .directLoadMethod(oceanBaseConfig.getDirectLoadLoadMethod()) + .parallel(oceanBaseConfig.getDirectLoadParallel()) .build(); } catch (Exception e) { throw new RuntimeException("Fail to build DirectLoader.", e); diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java index 5e004e1..0169de6 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java @@ -17,7 +17,6 @@ package com.oceanbase.spark.directload; import java.io.Serializable; -import java.util.Objects; import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection; @@ -95,60 +94,36 @@ public DirectLoaderBuilder table(String table) { } public DirectLoaderBuilder parallel(Integer parallel) { - if (Objects.isNull(parallel)) { - return this; - } - this.parallel = parallel; return this; } public DirectLoaderBuilder maxErrorCount(Long maxErrorCount) { - if (Objects.isNull(maxErrorCount)) { - return this; - } this.maxErrorCount = maxErrorCount; return this; } public DirectLoaderBuilder duplicateKeyAction(String duplicateKeyAction) { - if (Objects.isNull(duplicateKeyAction)) { - return this; - } this.duplicateKeyAction = ObLoadDupActionType.valueOf(duplicateKeyAction); return this; } public DirectLoaderBuilder directLoadMethod(String directLoadMethod) { - if (Objects.isNull(directLoadMethod)) { - return this; - } this.directLoadMethod = directLoadMethod; return this; } public DirectLoaderBuilder timeout(Long timeout) { - if (Objects.isNull(timeout)) { - return this; - } this.timeout = timeout; return this; } public DirectLoaderBuilder heartBeatTimeout(Long heartBeatTimeout) { - if (Objects.isNull(heartBeatTimeout)) { - return this; - } - this.heartBeatTimeout = heartBeatTimeout; return this; } public DirectLoaderBuilder heartBeatInterval(Long heartBeatInterval) { - if (Objects.isNull(heartBeatInterval)) { - return this; - } - this.heartBeatInterval = heartBeatInterval; return this; } diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala index 1f6cc23..93ee4c8 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala @@ -16,63 +16,23 @@ package com.oceanbase.spark.jdbc -import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} - -import org.apache.spark.sql.jdbc.JdbcDialects +import com.oceanbase.spark.config.OceanBaseConfig import java.sql.{Connection, DriverManager} object OBJdbcUtils { - val OB_MYSQL_URL = s"jdbc:mysql://%s:%d/%s" - private val OB_ORACLE_URL = s"jdbc:oceanbase://%s:%d/%s" - private val MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver" - private val MYSQL_LEGACY_JDBC_DRIVER = "com.mysql.jdbc.Driver" - private val OB_JDBC_DRIVER = "com.oceanbase.jdbc.Driver" - private val OB_LEGACY_JDBC_DRIVER = "com.alipay.oceanbase.jdbc.Driver" - def getConnection(sparkSettings: SparkSettings): Connection = { + def getConnection(oceanBaseConfig: OceanBaseConfig): Connection = { val connection = DriverManager.getConnection( - OB_MYSQL_URL.format( - sparkSettings.getProperty(ConnectionOptions.HOST), - sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), - sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) - ), - s"${sparkSettings.getProperty(ConnectionOptions.USERNAME)}", - sparkSettings.getProperty(ConnectionOptions.PASSWORD) + oceanBaseConfig.getURL, + oceanBaseConfig.getUsername, + oceanBaseConfig.getPassword ) connection } - def getJdbcUrl(sparkSettings: SparkSettings): String = { - var url: String = null - val driver = - sparkSettings.getProperty(ConnectionOptions.DRIVER, ConnectionOptions.DRIVER_DEFAULT) - if ( - driver.equalsIgnoreCase(MYSQL_JDBC_DRIVER) || driver.equalsIgnoreCase( - MYSQL_LEGACY_JDBC_DRIVER) - ) { - url = OBJdbcUtils.OB_MYSQL_URL.format( - sparkSettings.getProperty(ConnectionOptions.HOST), - sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), - sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) - ) - } else if ( - driver.equalsIgnoreCase(OB_JDBC_DRIVER) || driver.equalsIgnoreCase(OB_LEGACY_JDBC_DRIVER) - ) { - JdbcDialects.registerDialect(OceanBaseOracleDialect) - url = OBJdbcUtils.OB_ORACLE_URL.format( - sparkSettings.getProperty(ConnectionOptions.HOST), - sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), - sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) - ) - } else { - throw new RuntimeException(String.format("Unsupported driver name: %s", driver)) - } - url - } - - def getCompatibleMode(sparkSettings: SparkSettings): String = { - val conn = getConnection(sparkSettings) + def getCompatibleMode(oceanBaseConfig: OceanBaseConfig): String = { + val conn = getConnection(oceanBaseConfig) val statement = conn.createStatement try { val rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'") @@ -83,13 +43,12 @@ object OBJdbcUtils { } } - def truncateTable(sparkSettings: SparkSettings): Unit = { - val conn = getConnection(sparkSettings) + def truncateTable(oceanBaseConfig: OceanBaseConfig): Unit = { + val conn = getConnection(oceanBaseConfig) val statement = conn.createStatement try { statement.executeUpdate( - s"truncate table ${sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME)}.${sparkSettings - .getProperty(ConnectionOptions.TABLE_NAME)}") + s"truncate table ${oceanBaseConfig.getSchemaName}.${oceanBaseConfig.getTableName}") } finally { statement.close() conn.close() diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala deleted file mode 100644 index dbdd1d3..0000000 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2024 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.oceanbase.spark.jdbc - -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.connector.expressions.Expression -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType} -import org.apache.spark.sql.types._ - -import java.sql.{Date, Timestamp, Types} -import java.util.{Locale, TimeZone} - -import scala.util.control.NonFatal - -private case object OceanBaseOracleDialect extends JdbcDialect { - private[jdbc] val BINARY_FLOAT = 100 - private[jdbc] val BINARY_DOUBLE = 101 - private[jdbc] val TIMESTAMPTZ = -101 - - override def canHandle(url: String): Boolean = - url.toLowerCase(Locale.ROOT).startsWith("jdbc:oceanbase") - - private val distinctUnsupportedAggregateFunctions = - Set( - "VAR_POP", - "VAR_SAMP", - "STDDEV_POP", - "STDDEV_SAMP", - "COVAR_POP", - "COVAR_SAMP", - "CORR", - "REGR_INTERCEPT", - "REGR_R2", - "REGR_SLOPE", - "REGR_SXY") - - private val supportedAggregateFunctions = - Set("MAX", "MIN", "SUM", "COUNT", "AVG") ++ distinctUnsupportedAggregateFunctions - private val supportedFunctions = supportedAggregateFunctions - - override def isSupportedFunction(funcName: String): Boolean = - supportedFunctions.contains(funcName) - - private def supportTimeZoneTypes: Boolean = { - val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone) - // TODO: support timezone types when users are not using the JVM timezone, which - // is the default value of SESSION_LOCAL_TIMEZONE - timeZone == TimeZone.getDefault - } - - override def getCatalystType( - sqlType: Int, - typeName: String, - size: Int, - md: MetadataBuilder): Option[DataType] = { - sqlType match { - case Types.NUMERIC => - val scale = if (null != md) md.build().getLong("scale") else 0L - size match { - // Handle NUMBER fields that have no precision/scale in special way - // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale - // For more details, please see - // https://github.com/apache/spark/pull/8780#issuecomment-145598968 - // and - // https://github.com/apache/spark/pull/8780#issuecomment-144541760 - case 0 => Option(DecimalType(DecimalType.MAX_PRECISION, 10)) - // Handle FLOAT fields in a special way because JDBC ResultSetMetaData converts - // this to NUMERIC with -127 scale - // Not sure if there is a more robust way to identify the field as a float (or other - // numeric types that do not specify a scale. - case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10)) - case _ => None - } - case TIMESTAMPTZ if supportTimeZoneTypes => - Some(TimestampType) // Value for Timestamp with Time Zone in Oracle mode - case BINARY_FLOAT => Some(FloatType) // Value for OracleModeTypes.BINARY_FLOAT - case BINARY_DOUBLE => Some(DoubleType) // Value for OracleModeTypes.BINARY_DOUBLE - case _ => None - } - } - - override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { - case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.BOOLEAN)) - case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.INTEGER)) - case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.BIGINT)) - case FloatType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.FLOAT)) - case DoubleType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.DOUBLE)) - case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT)) - case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT)) - case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) - case _ => None - } - - override def compileValue(value: Any): Any = value match { - case stringValue: String => s"'${escapeSql(stringValue)}'" - case timestampValue: Timestamp => "{ts '" + timestampValue + "'}" - case dateValue: Date => "{d '" + dateValue + "'}" - case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") - case _ => value - } - - override def isCascadingTruncateTable(): Option[Boolean] = Some(false) - - /** - * The SQL query used to truncate a table. - * @param table - * The table to truncate - * @param cascade - * Whether or not to cascade the truncation. Default value is the value of - * isCascadingTruncateTable() - * @return - * The SQL query to use for truncating a table - */ - override def getTruncateQuery( - table: String, - cascade: Option[Boolean] = isCascadingTruncateTable()): String = { - cascade match { - case Some(true) => s"TRUNCATE TABLE $table CASCADE" - case _ => s"TRUNCATE TABLE $table" - } - } - - override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = - s"ALTER TABLE $tableName ADD ${quoteIdentifier(columnName)} $dataType" - - override def getUpdateColumnTypeQuery( - tableName: String, - columnName: String, - newDataType: String): String = - s"ALTER TABLE $tableName MODIFY ${quoteIdentifier(columnName)} $newDataType" - - override def getUpdateColumnNullabilityQuery( - tableName: String, - columnName: String, - isNullable: Boolean): String = { - val nullable = if (isNullable) "NULL" else "NOT NULL" - s"ALTER TABLE $tableName MODIFY ${quoteIdentifier(columnName)} $nullable" - } -} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala index d8cdf3e..e0a80e0 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala @@ -15,7 +15,7 @@ */ package com.oceanbase.spark.sql -import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.config.OceanBaseConfig import com.oceanbase.spark.jdbc.OBJdbcUtils import org.apache.spark.sql @@ -34,16 +34,13 @@ private[sql] class OceanBaseRelation( with InsertableRelation { private lazy val cfg = { - val conf = new SparkSettings(sqlContext.sparkContext.getConf) - conf.merge(parameters.asJava) - conf + new OceanBaseConfig(parameters.asJava) } private lazy val lazySchema = { val conn = OBJdbcUtils.getConnection(cfg) try { - val statement = conn.prepareStatement( - s"select * from ${cfg.getProperty(ConnectionOptions.TABLE_NAME)} where 1 = 0") + val statement = conn.prepareStatement(s"select * from ${cfg.getTableName} where 1 = 0") try { val rs = statement.executeQuery() try { diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala index 02b432a..728ea63 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala @@ -15,7 +15,7 @@ */ package com.oceanbase.spark.sql -import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.config.OceanBaseConfig import com.oceanbase.spark.directload.DirectLoadUtils import com.oceanbase.spark.jdbc.OBJdbcUtils import com.oceanbase.spark.listener.DirectLoadListener @@ -27,7 +27,7 @@ import org.apache.spark.sql import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter, RelationProvider, SchemaRelationProvider, StreamSinkProvider} -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters.mapAsJavaMapConverter @Deprecated private[sql] class OceanBaseSparkSource @@ -49,10 +49,8 @@ private[sql] class OceanBaseSparkSource mode: SaveMode, parameters: Map[String, String], dataFrame: DataFrame): BaseRelation = { - val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) - sparkSettings.merge(parameters.asJava) - - createDirectLoadRelation(sqlContext, mode, dataFrame, sparkSettings) + val oceanBaseConfig = new OceanBaseConfig(parameters.asJava) + createDirectLoadRelation(sqlContext, mode, dataFrame, oceanBaseConfig) createRelation(sqlContext, parameters) } @@ -66,21 +64,21 @@ object OceanBaseSparkSource { sqlContext: SQLContext, mode: SaveMode, dataFrame: DataFrame, - sparkSettings: SparkSettings): Unit = { + oceanBaseConfig: OceanBaseConfig): Unit = { mode match { case sql.SaveMode.Append => // do nothing case sql.SaveMode.Overwrite => - OBJdbcUtils.truncateTable(sparkSettings) + OBJdbcUtils.truncateTable(oceanBaseConfig) case _ => throw new NotImplementedError(s"${mode.name()} mode is not currently supported.") } // Init direct-loader. - val directLoader = DirectLoadUtils.buildDirectLoaderFromSetting(sparkSettings) + val directLoader = DirectLoadUtils.buildDirectLoaderFromSetting(oceanBaseConfig) val executionId = directLoader.begin() - sparkSettings.setProperty(ConnectionOptions.EXECUTION_ID, executionId) + oceanBaseConfig.set(OceanBaseConfig.DIRECT_LOAD_EXECUTION_ID, executionId) sqlContext.sparkContext.addSparkListener(new DirectLoadListener(directLoader)) - val writer = new DirectLoadWriter(sparkSettings) + val writer = new DirectLoadWriter(oceanBaseConfig) writer.write(dataFrame) directLoader.commit() diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala index bd9a61c..059724e 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala @@ -15,7 +15,7 @@ */ package com.oceanbase.spark.writer -import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.config.OceanBaseConfig import com.oceanbase.spark.directload.{DirectLoader, DirectLoadUtils} import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket @@ -27,20 +27,14 @@ import java.util.Objects import scala.collection.mutable.ArrayBuffer -class DirectLoadWriter(settings: SparkSettings) extends Serializable { +class DirectLoadWriter(oceanBaseConfig: OceanBaseConfig) extends Serializable { - private val bufferSize = - settings.getIntegerProperty(ConnectionOptions.BATCH_SIZE, ConnectionOptions.BATCH_SIZE_DEFAULT) - private val sinkTaskPartitionSize: Integer = - settings.getIntegerProperty(ConnectionOptions.SINK_TASK_PARTITION_SIZE) - private val sinkTaskUseRepartition: Boolean = settings - .getProperty( - ConnectionOptions.SINK_TASK_USE_REPARTITION, - ConnectionOptions.SINK_TASK_USE_REPARTITION_DEFAULT.toString) - .toBoolean + private val bufferSize = oceanBaseConfig.getBatchSize + private val sinkTaskPartitionSize = oceanBaseConfig.getDirectLoadTaskPartitionSize + private val sinkTaskUseRepartition: Boolean = oceanBaseConfig.getDirectLoadUseRepartition def write(dataFrame: DataFrame): Unit = { - assert(StringUtils.isNotBlank(settings.getProperty(ConnectionOptions.EXECUTION_ID))) + assert(StringUtils.isNotBlank(oceanBaseConfig.getDirectLoadExecutionId)) var resultDataFrame = dataFrame if (Objects.nonNull(sinkTaskPartitionSize)) { @@ -51,7 +45,8 @@ class DirectLoadWriter(settings: SparkSettings) extends Serializable { resultDataFrame.foreachPartition( (partition: Iterator[Row]) => { - val directLoader: DirectLoader = DirectLoadUtils.buildDirectLoaderFromSetting(settings) + val directLoader: DirectLoader = + DirectLoadUtils.buildDirectLoaderFromSetting(oceanBaseConfig) directLoader.begin() val buffer = ArrayBuffer[Row]() partition.foreach( diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala index a0f9d3d..1bb067d 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala @@ -15,7 +15,7 @@ */ package org.apache.spark.sql -import com.oceanbase.spark.cfg.ConnectionOptions +import com.oceanbase.spark.config.OceanBaseConfig import org.apache.spark.{sql, Partition} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation} @@ -31,8 +31,8 @@ class OceanBaseJDBCRelation( if ( jdbcOptions.parameters .getOrElse( - ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE, - s"${ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE_DEFAULT}") + OceanBaseConfig.DIRECT_LOAD_ENABLE.getKey, + OceanBaseConfig.DIRECT_LOAD_ENABLE.getDefaultValue.toString) .toBoolean ) { data.write diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala index bfaa7cc..d2619eb 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala @@ -15,12 +15,13 @@ */ package org.apache.spark.sql -import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} -import com.oceanbase.spark.jdbc.OBJdbcUtils.{getCompatibleMode, getJdbcUrl, OB_MYSQL_URL} +import com.oceanbase.spark.config.OceanBaseConfig +import com.oceanbase.spark.jdbc.OBJdbcUtils.getCompatibleMode import com.oceanbase.spark.sql.OceanBaseSparkSource -import OceanBaseSparkDataSource.SHORT_NAME +import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, SHORT_NAME} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation, JdbcRelationProvider} +import org.apache.spark.sql.jdbc.{JdbcDialects, OceanBaseMySQLDialect, OceanBaseOracleDialect} import org.apache.spark.sql.sources._ import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -32,9 +33,8 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) - sparkSettings.merge(parameters.asJava) - val jdbcOptions = buildJDBCOptions(parameters, sparkSettings)._1 + val oceanBaseConfig = new OceanBaseConfig(parameters.asJava) + val jdbcOptions = buildJDBCOptions(parameters, oceanBaseConfig)._1 val resolver = sqlContext.conf.resolver val timeZoneId = sqlContext.conf.sessionLocalTimeZone val schema = JDBCRelation.getSchema(resolver, jdbcOptions) @@ -47,35 +47,49 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider { mode: SaveMode, parameters: Map[String, String], dataFrame: DataFrame): BaseRelation = { - val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) - sparkSettings.merge(parameters.asJava) - val enableDirectLoadWrite = sparkSettings.getBooleanProperty( - ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE, - ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE_DEFAULT) + val oceanBaseConfig = new OceanBaseConfig(parameters.asJava) + val enableDirectLoadWrite = oceanBaseConfig.getDirectLoadEnable if (!enableDirectLoadWrite) { - val param = buildJDBCOptions(parameters, sparkSettings)._2 + val param = buildJDBCOptions(parameters, oceanBaseConfig)._2 super.createRelation(sqlContext, mode, param, dataFrame) } else { - OceanBaseSparkSource.createDirectLoadRelation(sqlContext, mode, dataFrame, sparkSettings) + OceanBaseSparkSource.createDirectLoadRelation(sqlContext, mode, dataFrame, oceanBaseConfig) createRelation(sqlContext, parameters) } } private def buildJDBCOptions( parameters: Map[String, String], - sparkSettings: SparkSettings): (JDBCOptions, Map[String, String]) = { - val url: String = getJdbcUrl(sparkSettings) - val table: String = parameters(ConnectionOptions.TABLE_NAME) - val paraMap = parameters ++ Map( - "url" -> url, - "dbtable" -> table, - "user" -> parameters(ConnectionOptions.USERNAME), - "isolationLevel" -> "READ_COMMITTED" + oceanBaseConfig: OceanBaseConfig): (JDBCOptions, Map[String, String]) = { + var paraMap = parameters ++ Map( + JDBC_URL -> oceanBaseConfig.getURL, + JDBC_USER -> parameters(OceanBaseConfig.USERNAME.getKey), + JDBC_TXN_ISOLATION_LEVEL -> { + if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) "READ_COMMITTED" + else parameters(JDBC_TXN_ISOLATION_LEVEL) + } ) - (new JDBCOptions(url, table, paraMap), paraMap) + // It is not allowed to specify dbtable and query options at the same time. + if (parameters.contains(JDBCOptions.JDBC_QUERY_STRING)) { + paraMap = + paraMap + (JDBCOptions.JDBC_QUERY_STRING -> parameters(JDBCOptions.JDBC_QUERY_STRING)) + } else { + paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> oceanBaseConfig.getTableName) + } + + // Set dialect + if ("MySQL".equalsIgnoreCase(getCompatibleMode(oceanBaseConfig))) { + JdbcDialects.registerDialect(OceanBaseMySQLDialect) + } else { + JdbcDialects.registerDialect(OceanBaseOracleDialect) + } + (new JDBCOptions(paraMap), paraMap) } } object OceanBaseSparkDataSource { val SHORT_NAME: String = "oceanbase" + val JDBC_URL = "url" + val JDBC_USER = "user" + val JDBC_TXN_ISOLATION_LEVEL = "isolationLevel" } diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseMySQLDialect.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseMySQLDialect.scala new file mode 100644 index 0000000..6c663c7 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseMySQLDialect.scala @@ -0,0 +1,127 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.index.TableIndex +import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} +import org.apache.spark.sql.types._ + +import java.sql.{Connection, SQLException, Types} +import java.util +import java.util.Locale + +/** + * Since [[MySQLDialect]] is a case object, it cannot be inherited. So we need to rewrite the + * methods one by one. + */ +case object OceanBaseMySQLDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = + url.toLowerCase(Locale.ROOT).startsWith("jdbc:oceanbase") || url + .toLowerCase(Locale.ROOT) + .startsWith("jdbc:mysql") + + override def isSupportedFunction(funcName: String): Boolean = + MySQLDialect.isSupportedFunction(funcName) + + override def compileExpression(expr: Expression): Option[String] = + MySQLDialect.compileExpression(expr) + + override def getCatalystType( + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder): Option[DataType] = + MySQLDialect.getCatalystType(sqlType, typeName, size, md) + + override def quoteIdentifier(colName: String): String = MySQLDialect.quoteIdentifier(colName) + + override def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = + MySQLDialect.schemasExists(conn, options, schema) + + override def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = + MySQLDialect.listSchemas(conn, options) + + override def getTableExistsQuery(table: String): String = MySQLDialect.getTableExistsQuery(table) + + override def isCascadingTruncateTable(): Option[Boolean] = MySQLDialect.isCascadingTruncateTable() + + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + MySQLDialect.getUpdateColumnTypeQuery(tableName, columnName, newDataType) + + override def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String, + dbMajorVersion: Int): String = + super.getRenameColumnQuery(tableName, columnName, newName, dbMajorVersion) + + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = + MySQLDialect.getUpdateColumnNullabilityQuery(tableName, columnName, isNullable) + + override def getTableCommentQuery(table: String, comment: String): String = + MySQLDialect.getTableCommentQuery(table, comment) + + override def getJDBCType(dt: DataType): Option[JdbcType] = MySQLDialect.getJDBCType(dt) + + override def getSchemaCommentQuery(schema: String, comment: String): String = + MySQLDialect.getSchemaCommentQuery(schema, comment) + + override def removeSchemaCommentQuery(schema: String): String = + MySQLDialect.removeSchemaCommentQuery(schema) + + override def createIndex( + indexName: String, + tableIdent: Identifier, + columns: Array[NamedReference], + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String]): String = + MySQLDialect.createIndex(indexName, tableIdent, columns, columnsProperties, properties) + + override def indexExists( + conn: Connection, + indexName: String, + tableIdent: Identifier, + options: JDBCOptions): Boolean = + MySQLDialect.indexExists(conn, indexName, tableIdent, options) + + override def dropIndex(indexName: String, tableIdent: Identifier): String = + MySQLDialect.dropIndex(indexName, tableIdent) + + override def listIndexes( + conn: Connection, + tableIdent: Identifier, + options: JDBCOptions): Array[TableIndex] = MySQLDialect.listIndexes(conn, tableIdent, options) + + override def classifyException(message: String, e: Throwable): AnalysisException = + MySQLDialect.classifyException(message, e) + + override def dropSchema(schema: String, cascade: Boolean): String = + MySQLDialect.dropSchema(schema, cascade) +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseOracleDialect.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseOracleDialect.scala new file mode 100644 index 0000000..63a71f1 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/jdbc/OceanBaseOracleDialect.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import org.apache.spark.sql.connector.expressions.Expression +import org.apache.spark.sql.types._ + +import java.util.Locale + +/** + * Since [[OracleDialect]] is a case object, it cannot be inherited. So we need to rewrite the + * methods one by one. + */ +case object OceanBaseOracleDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = + url.toLowerCase(Locale.ROOT).startsWith("jdbc:oceanbase") + + override def isSupportedFunction(funcName: String): Boolean = + OracleDialect.isSupportedFunction(funcName) + + override def compileExpression(expr: Expression): Option[String] = + OracleDialect.compileExpression(expr) + + override def getCatalystType( + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder): Option[DataType] = + OracleDialect.getCatalystType(sqlType, typeName, size, md) + + override def getJDBCType(dt: DataType): Option[JdbcType] = OracleDialect.getJDBCType(dt) + + override def compileValue(value: Any): Any = OracleDialect.compileValue(value) + + override def isCascadingTruncateTable(): Option[Boolean] = + OracleDialect.isCascadingTruncateTable() + + /** + * The SQL query used to truncate a table. + * @param table + * The table to truncate + * @param cascade + * Whether or not to cascade the truncation. Default value is the value of + * isCascadingTruncateTable() + * @return + * The SQL query to use for truncating a table + */ + override def getTruncateQuery( + table: String, + cascade: Option[Boolean] = isCascadingTruncateTable()): String = + OracleDialect.getTruncateQuery(table, cascade) + + override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = + OracleDialect.getAddColumnQuery(tableName, columnName, dataType) + + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + OracleDialect.getUpdateColumnTypeQuery(tableName, columnName, newDataType) + + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = + OracleDialect.getUpdateColumnNullabilityQuery(tableName, columnName, isNullable) +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala index cd85866..fbf7245 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala @@ -34,8 +34,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "host"= "$getHost", - | "sql-port" = "$getPort", + | "url"= "$getJdbcUrl", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", @@ -90,14 +89,14 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "host"= "$getHost", - | "sql-port" = "$getPort", + | "url"= "$getJdbcUrl", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", | "username"="$getUsername", | "password"="$getPassword", | "direct-load.enabled"=true, + | "direct-load.host"="$getHost", | "direct-load.rpc-port"=$getRpcPort |); |""".stripMargin) @@ -165,13 +164,13 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { df.write .format("oceanbase") .mode(saveMode = SaveMode.Append) - .option("host", getHost) - .option("sql-port", getPort) + .option("url", getJdbcUrl) .option("username", getUsername) .option("password", getPassword) .option("table-name", "products") .option("schema-name", getSchemaName) .option("direct-load.enabled", value = true) + .option("direct-load.host", getHost) .option("direct-load.rpc-port", value = getRpcPort) .save() session.stop() @@ -207,8 +206,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "host"= "$getHost", - | "sql-port" = "$getPort", + | "url"= "$getJdbcUrl", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", @@ -258,6 +256,150 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { dropTables("products") } + @Test + def testSqlReadWithQuery(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "url"= "$getJdbcUrl", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "direct-load.enabled" ="false", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + session.sql( + """ + |INSERT INTO test_sink VALUES + |(101, 'scooter', 'Small 2-wheel scooter', 3.14), + |(102, 'car battery', '12V car battery', 8.1), + |(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8), + |(104, 'hammer', '12oz carpenter\'s hammer', 0.75), + |(105, 'hammer', '14oz carpenter\'s hammer', 0.875), + |(106, 'hammer', '16oz carpenter\'s hammer', 1.0), + |(107, 'rocks', 'box of assorted rocks', 5.3), + |(108, 'jacket', 'water resistent black wind breaker', 0.1), + |(109, 'spare tire', '24 inch spare tire', 22.2); + |""".stripMargin) + + session.sql(s""" + |CREATE TEMPORARY VIEW test_read_query + |USING oceanbase + |OPTIONS( + | "url"= "$getJdbcUrl", + | "schema-name"="$getSchemaName", + | "query" = "select id, name from products", + | "direct-load.enabled" ="false", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter", + "102,car battery", + "103,12-pack drill bits", + "104,hammer", + "105,hammer", + "106,hammer", + "107,rocks", + "108,jacket", + "109,spare tire" + ) + import scala.collection.JavaConverters._ + val actual = session + .sql("select * from test_read_query") + .collect() + .map( + _.toString().drop(1).dropRight(1) + ) + .toList + .asJava + assertEqualsInAnyOrder(expected, actual) + + session.stop() + dropTables("products") + } + + @Test + def testSqlReadWithOceanBaseDriver(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "url"= "${getJdbcUrl.replace("mysql", "oceanbase")}", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "direct-load.enabled" ="false", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + session.sql( + """ + |INSERT INTO test_sink VALUES + |(101, 'scooter', 'Small 2-wheel scooter', 3.14), + |(102, 'car battery', '12V car battery', 8.1), + |(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8), + |(104, 'hammer', '12oz carpenter\'s hammer', 0.75), + |(105, 'hammer', '14oz carpenter\'s hammer', 0.875), + |(106, 'hammer', '16oz carpenter\'s hammer', 1.0), + |(107, 'rocks', 'box of assorted rocks', 5.3), + |(108, 'jacket', 'water resistent black wind breaker', 0.1), + |(109, 'spare tire', '24 inch spare tire', 22.2); + |""".stripMargin) + + session.sql(s""" + |CREATE TEMPORARY VIEW test_read_query + |USING oceanbase + |OPTIONS( + | "url"= "${getJdbcUrl.replace("mysql", "oceanbase")}", + | "schema-name"="$getSchemaName", + | "query" = "select id, name from products", + | "direct-load.enabled" ="false", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter", + "102,car battery", + "103,12-pack drill bits", + "104,hammer", + "105,hammer", + "106,hammer", + "107,rocks", + "108,jacket", + "109,spare tire" + ) + import scala.collection.JavaConverters._ + val actual = session + .sql("select * from test_read_query") + .collect() + .map( + _.toString().drop(1).dropRight(1) + ) + .toList + .asJava + assertEqualsInAnyOrder(expected, actual) + + session.stop() + dropTables("products") + } + @Test def testDataFrameRead(): Unit = { initialize("sql/mysql/products.sql") @@ -269,8 +411,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "host"= "$getHost", - | "sql-port" = "$getPort", + | "url"= "$getJdbcUrl", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", @@ -296,8 +437,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { // DataFrame read val dataFrame = session.read .format("oceanbase") - .option("host", getHost) - .option("sql-port", getPort) + .option("url", getJdbcUrl) .option("username", getUsername) .option("password", getPassword) .option("table-name", "products")