Skip to content

Commit

Permalink
BugFix: Fix the dbtable parameter creation error under jdbc sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanoOo committed Jan 6, 2025
1 parent 9402e77 commit 2e3aca7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ object OBJdbcUtils {
val statement = conn.createStatement
try {
val rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'")
if (rs.next) rs.getString("VALUE") else null
if (rs.next) rs.getString("VALUE")
else throw new RuntimeException("Failed to obtain compatible mode of OceanBase.")
} finally {
statement.close()
conn.close()
}
}

def getDbTable(oceanBaseConfig: OceanBaseConfig): String = {
if ("MySQL".equalsIgnoreCase(getCompatibleMode(oceanBaseConfig))) {
s"`${oceanBaseConfig.getSchemaName}`.`${oceanBaseConfig.getTableName}`"
} else {
s""""${oceanBaseConfig.getSchemaName}"."${oceanBaseConfig.getTableName}""""
}
}

def truncateTable(oceanBaseConfig: OceanBaseConfig): Unit = {
val conn = getConnection(oceanBaseConfig)
val statement = conn.createStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package org.apache.spark.sql

import com.oceanbase.spark.config.OceanBaseConfig
import com.oceanbase.spark.jdbc.OBJdbcUtils.getCompatibleMode
import com.oceanbase.spark.jdbc.OBJdbcUtils.{getCompatibleMode, getDbTable}
import com.oceanbase.spark.sql.OceanBaseSparkSource

import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, SHORT_NAME}
import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, OCEANBASE_DEFAULT_ISOLATION_LEVEL, SHORT_NAME}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation, JdbcRelationProvider}
import org.apache.spark.sql.jdbc.{JdbcDialects, OceanBaseMySQLDialect, OceanBaseOracleDialect}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -65,7 +65,7 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider {
JDBC_URL -> oceanBaseConfig.getURL,
JDBC_USER -> parameters(OceanBaseConfig.USERNAME.getKey),
JDBC_TXN_ISOLATION_LEVEL -> {
if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) "READ_COMMITTED"
if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) OCEANBASE_DEFAULT_ISOLATION_LEVEL
else parameters(JDBC_TXN_ISOLATION_LEVEL)
}
)
Expand All @@ -74,7 +74,7 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider {
paraMap =
paraMap + (JDBCOptions.JDBC_QUERY_STRING -> parameters(JDBCOptions.JDBC_QUERY_STRING))
} else {
paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> oceanBaseConfig.getTableName)
paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> getDbTable(oceanBaseConfig))
}

// Set dialect
Expand All @@ -92,4 +92,5 @@ object OceanBaseSparkDataSource {
val JDBC_URL = "url"
val JDBC_USER = "user"
val JDBC_TXN_ISOLATION_LEVEL = "isolationLevel"
val OCEANBASE_DEFAULT_ISOLATION_LEVEL = "READ_COMMITTED"
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
|CREATE TEMPORARY VIEW test_sink
|USING oceanbase
|OPTIONS(
| "url"= "$getJdbcUrl",
| "url"= "$getJdbcUrlWithoutDB",
| "rpc-port" = "$getRpcPort",
| "schema-name"="$getSchemaName",
| "table-name"="products",
Expand Down Expand Up @@ -89,7 +89,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
|CREATE TEMPORARY VIEW test_sink
|USING oceanbase
|OPTIONS(
| "url"= "$getJdbcUrl",
| "url"= "$getJdbcUrlWithoutDB",
| "rpc-port" = "$getRpcPort",
| "schema-name"="$getSchemaName",
| "table-name"="products",
Expand Down Expand Up @@ -206,7 +206,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
|CREATE TEMPORARY VIEW test_sink
|USING oceanbase
|OPTIONS(
| "url"= "$getJdbcUrl",
| "url"= "$getJdbcUrlWithoutDB",
| "rpc-port" = "$getRpcPort",
| "schema-name"="$getSchemaName",
| "table-name"="products",
Expand Down Expand Up @@ -437,7 +437,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
// DataFrame read
val dataFrame = session.read
.format("oceanbase")
.option("url", getJdbcUrl)
.option("url", getJdbcUrlWithoutDB)
.option("username", getUsername)
.option("password", getPassword)
.option("table-name", "products")
Expand Down Expand Up @@ -469,6 +469,10 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
session.stop()
dropTables("products")
}

def getJdbcUrlWithoutDB: String =
s"jdbc:mysql://$getHost:$getPort?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

}

object OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
Expand Down

0 comments on commit 2e3aca7

Please sign in to comment.