From 2e3aca70a7656a0c5e23c92253b4b8ae958e5e3f Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Mon, 6 Jan 2025 10:21:46 +0800 Subject: [PATCH] BugFix: Fix the dbtable parameter creation error under jdbc sink. --- .../scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala | 11 ++++++++++- .../apache/spark/sql/OceanBaseSparkDataSource.scala | 9 +++++---- .../spark/OceanBaseMySQLConnectorITCase.scala | 12 ++++++++---- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala index 93ee4c8..8d788e2 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala @@ -36,13 +36,22 @@ object OBJdbcUtils { val statement = conn.createStatement try { val rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'") - if (rs.next) rs.getString("VALUE") else null + if (rs.next) rs.getString("VALUE") + else throw new RuntimeException("Failed to obtain compatible mode of OceanBase.") } finally { statement.close() conn.close() } } + def getDbTable(oceanBaseConfig: OceanBaseConfig): String = { + if ("MySQL".equalsIgnoreCase(getCompatibleMode(oceanBaseConfig))) { + s"`${oceanBaseConfig.getSchemaName}`.`${oceanBaseConfig.getTableName}`" + } else { + s""""${oceanBaseConfig.getSchemaName}"."${oceanBaseConfig.getTableName}"""" + } + } + def truncateTable(oceanBaseConfig: OceanBaseConfig): Unit = { val conn = getConnection(oceanBaseConfig) val statement = conn.createStatement diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala index d2619eb..5559365 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala @@ -16,10 +16,10 @@ package org.apache.spark.sql import com.oceanbase.spark.config.OceanBaseConfig -import com.oceanbase.spark.jdbc.OBJdbcUtils.getCompatibleMode +import com.oceanbase.spark.jdbc.OBJdbcUtils.{getCompatibleMode, getDbTable} import com.oceanbase.spark.sql.OceanBaseSparkSource -import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, SHORT_NAME} +import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, OCEANBASE_DEFAULT_ISOLATION_LEVEL, SHORT_NAME} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation, JdbcRelationProvider} import org.apache.spark.sql.jdbc.{JdbcDialects, OceanBaseMySQLDialect, OceanBaseOracleDialect} import org.apache.spark.sql.sources._ @@ -65,7 +65,7 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider { JDBC_URL -> oceanBaseConfig.getURL, JDBC_USER -> parameters(OceanBaseConfig.USERNAME.getKey), JDBC_TXN_ISOLATION_LEVEL -> { - if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) "READ_COMMITTED" + if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) OCEANBASE_DEFAULT_ISOLATION_LEVEL else parameters(JDBC_TXN_ISOLATION_LEVEL) } ) @@ -74,7 +74,7 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider { paraMap = paraMap + (JDBCOptions.JDBC_QUERY_STRING -> parameters(JDBCOptions.JDBC_QUERY_STRING)) } else { - paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> oceanBaseConfig.getTableName) + paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> getDbTable(oceanBaseConfig)) } // Set dialect @@ -92,4 +92,5 @@ object OceanBaseSparkDataSource { val JDBC_URL = "url" val JDBC_USER = "user" val JDBC_TXN_ISOLATION_LEVEL = "isolationLevel" + val OCEANBASE_DEFAULT_ISOLATION_LEVEL = "READ_COMMITTED" } diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala index fbf7245..d32b41e 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala @@ -34,7 +34,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "url"= "$getJdbcUrl", + | "url"= "$getJdbcUrlWithoutDB", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", @@ -89,7 +89,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "url"= "$getJdbcUrl", + | "url"= "$getJdbcUrlWithoutDB", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", @@ -206,7 +206,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { |CREATE TEMPORARY VIEW test_sink |USING oceanbase |OPTIONS( - | "url"= "$getJdbcUrl", + | "url"= "$getJdbcUrlWithoutDB", | "rpc-port" = "$getRpcPort", | "schema-name"="$getSchemaName", | "table-name"="products", @@ -437,7 +437,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { // DataFrame read val dataFrame = session.read .format("oceanbase") - .option("url", getJdbcUrl) + .option("url", getJdbcUrlWithoutDB) .option("username", getUsername) .option("password", getPassword) .option("table-name", "products") @@ -469,6 +469,10 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { session.stop() dropTables("products") } + + def getJdbcUrlWithoutDB: String = + s"jdbc:mysql://$getHost:$getPort?useUnicode=true&characterEncoding=UTF-8&useSSL=false" + } object OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {