diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..1615505 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,38 @@ +name: build + +on: + pull_request: + paths-ignore: + - 'docs/**' + - '**.md' + - '.*' + push: + branches: + - main + - 'release-*' + +env: + JDK_VERSION: 8 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v4 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + cache: 'maven' + + - name: Build and Test + timeout-minutes: 60 + run: mvn clean package diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..60d22c1 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,70 @@ +runner.dialect = scala212 + +# Version is required to make sure IntelliJ picks the right version +version = 3.4.3 +preset = default + +# Max column +maxColumn = 100 + +# This parameter simply says the .stripMargin method was not redefined by the user to assign +# special meaning to indentation preceding the | character. Hence, that indentation can be modified. +assumeStandardLibraryStripMargin = true +align.stripMargin = true + +# Align settings +align.preset = none +align.closeParenSite = false +align.openParenCallSite = false +danglingParentheses.defnSite = false +danglingParentheses.callSite = false +danglingParentheses.ctrlSite = true +danglingParentheses.tupleSite = false +align.openParenCallSite = false +align.openParenDefnSite = false +align.openParenTupleSite = false + +# Newlines +newlines.alwaysBeforeElseAfterCurlyIf = false +newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params +newlines.afterCurlyLambdaParams = squash # No newline after lambda params +newlines.inInterpolation = "avoid" +newlines.avoidInResultType = true +optIn.annotationNewlines = true + +# Scaladoc +docstrings.style = Asterisk # Javadoc style +docstrings.removeEmpty = true +docstrings.oneline = fold +docstrings.forceBlankLineBefore = true + +# Indentation +indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters + +# Rewrites +rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers] + +# Imports +rewrite.imports.sort = scalastyle +rewrite.imports.groups = [ + ["com.oceanbase\\..*"], + ["com.oceanbase.shade\\..*"], + [".*"], + ["javax\\..*"], + ["java\\..*"], + ["scala\\..*"] +] +rewrite.imports.contiguousGroups = no +importSelectors = singleline # Imports in a single line, like IntelliJ + +# Remove redundant braces in string interpolation. +rewrite.redundantBraces.stringInterpolation = true +rewrite.redundantBraces.defnBodies = false +rewrite.redundantBraces.generalExpressions = false +rewrite.redundantBraces.ifElseExpressions = false +rewrite.redundantBraces.methodBodies = false +rewrite.redundantBraces.includeUnitMethods = false +rewrite.redundantBraces.maxBreaks = 1 + +# Remove trailing commas +rewrite.trailingCommas.style = "never" diff --git a/README.md b/README.md index 87d3908..b28ab7c 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,11 @@ English | [简体中文](README_CN.md) ## Features -Coming soon... +This repository contains connectors as following: + +| Connector | Description | Document | +|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------| +| Spark Connector: OceanBase | This Connector uses the JDBC driver or the [direct load](https://en.oceanbase.com/docs/common-oceanbase-database-10000000001375568) API to write data to OceanBase. | [Sink](docs/spark-connector-oceanbase.md) | ## Community diff --git a/README_CN.md b/README_CN.md index 3adf060..52fde47 100644 --- a/README_CN.md +++ b/README_CN.md @@ -6,7 +6,11 @@ ## 功能 -即将推出... +本仓库提供了如下 Connector: + +| Connector | 描述 | 使用文档 | +|----------------------------|-----------------------------------------------------------------------------------------------------------------------------|----------------------------------------------| +| Spark Connector: OceanBase | 该Connector可以通过JDBC驱动或[旁路导入](https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000001428636)API将数据写入OceanBase。 | [Sink](docs/spark-connector-oceanbase_cn.md) | ## 社区 diff --git a/docs/spark-connector-oceanbase.md b/docs/spark-connector-oceanbase.md new file mode 100644 index 0000000..e02fad1 --- /dev/null +++ b/docs/spark-connector-oceanbase.md @@ -0,0 +1,410 @@ +# Spark Connector OceanBase + +English | [简体中文](spark-connector-oceanbase_cn.md) + +Spark OceanBase Connector can support reading data stored in OceanBase through Spark, and also supports writing data to OceanBase through Spark. + +| | Read | Write | +|-----------|------|------------------| +| DataFrame | JDBC | JDBC、Direct Load | +| SQL | JDBC | JDBC、Direct Load | + +## Version compatibility + +
+ + + + + + + + + + + + + + + + + + + +
ConnectorSparkOceanBaseJavaScala
1.02.4, 3.1 ~ 3.4 +
    +
  • JDBC: 3.x, 4.x
  • +
  • Direct Load: 4.2.x or later versions
  • +
+
82.12
+
+ +- Note: If you need a package built based on other Scala versions, you can get the package by building it from source code. + +## Get the package + +You can get the release packages at [Releases Page](https://github.com/oceanbase/spark-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/spark-connector-oceanbase). + +```xml + + com.oceanbase + spark-connector-oceanbase-3.4_2.12 + ${project.version} + +``` + +If you'd rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version. + +```xml + + com.oceanbase + spark-connector-oceanbase-3.4_2.12 + ${project.version} + + + + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + true + + + +``` + +Of course, you can also get the package by building from source code. + +- By default, it is built with scala version 2.12 +- After successful compilation, the target jar package will be generated in the target directory under the module corresponding to each version, such as: spark-connector-oceanbase-3.4_2.12-1.0-SNAPSHOT.jar. Copy this file to Spark's ClassPath to use spark-connector-oceanbase. + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -DskipTests +``` + +- If you need a package built based on other Scala versions, refer to the command below to build based on Scala 2.11. + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTests +``` + +## Usage Examples + +### Read + +#### Use Spark-SQL. + +```sql +CREATE TEMPORARY VIEW spark_oceanbase +USING oceanbase +OPTIONS( + "host"= "localhost", + "sql-port" = "2881", + "schema-name"="test", + "table-name"="test", + "username"="root", + "password"="123456" +); + +SELECT * FROM spark_oceanbase; +``` + +#### Use DataFrame + +```scala +val oceanBaseSparkDF = spark.read.format("OceanBase") + .option("host", "localhost") + .option("sql-port", 2881) + .option("username", "root") + .option("password", "123456") + .option("table-name", "test") + .option("schema-name", "test") + .load() + +oceanBaseSparkDF.show(5) +``` + +### Write + +Take synchronizing data from Hive to OceanBase as an example. + +#### Preparation + +Create corresponding Hive tables and OceanBase tables to prepare for data synchronization + +- Start spark-sql by running `${SPARK_HOME}/bin/spark-sql` + +```sql +CREATE TABLE test.orders ( + order_id INT, + order_date TIMESTAMP, + customer_name string, + price double, + product_id INT, + order_status BOOLEAN +) using parquet; + +insert into orders values +(1, now(), 'zs', 12.2, 12, true), +(2, now(), 'ls', 121.2, 12, true), +(3, now(), 'xx', 123.2, 12, true), +(4, now(), 'jac', 124.2, 12, false), +(5, now(), 'dot', 111.25, 12, true); +``` + +- Connect to OceanBase + +```sql +CREATE TABLE test.orders ( + order_id INT PRIMARY KEY, + order_date TIMESTAMP, + customer_name VARCHAR(225), + price double, + product_id INT, + order_status BOOLEAN +); +``` + +#### Based on JDBC + +##### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_jdbc +USING oceanbase +OPTIONS( + "host"="localhost", + "sql-port" = "2881", + "schema-name"="test", + "table-name"="orders", + "username"="root@test", + "password"="" +); + +insert into table test_jdbc +select * from test.orders; + +insert overwrite table test_jdbc +select * from test.orders; +``` + +##### DataFrame + +```scala +val df = spark.sql("select * from test.orders") + +import org.apache.spark.sql.SaveMode +df.write + .format("oceanbase") + .mode(saveMode = SaveMode.Append) + .option("host", "localhost") + .option("sql-port", 2881) + .option("username", "root") + .option("password", "123456") + .option("table-name", "orders") + .option("schema-name", "test") + .save() +``` + +#### Based on direct-load + +##### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_direct +USING oceanbase +OPTIONS( + "host"="localhost", + "sql-port" = "2881", + "schema-name"="test", + "table-name"="orders", + "username"="root@test", + "password"="", + "direct-load.enabled" = true, + "direct-load.rpc-port" = "2882" +); + +insert into table test_direct +select * from test.orders; + +insert overwrite table test_direct +select * from test.orders; +``` + +##### DataFrame + +```scala +val df = spark.sql("select * from test.orders") + +import org.apache.spark.sql.SaveMode +df.write + .format("oceanbase") + .mode(saveMode = SaveMode.Append) + .option("host", "localhost") + .option("sql-port", 2881) + .option("username", "root") + .option("password", "123456") + .option("table-name", "orders") + .option("schema-name", "test") + .option("direct-load.enabled", "true") + .option("direct-load.rpc-port", "2882") + .save() +``` + +## Configuration + +### General configuration + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionDefaultTypeDescription
hostStringHostname used in direct-load.
sql-portIntegerThe SQL port.
usernameStringThe connection username like 'root@sys'.
tenant-nameStringThe tenant name.
passwordStringThe password.
schema-nameStringThe schema name or database name.
table-nameStringThe table name.
+
+ +### Direct load configuration + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionDefaultTypeDescription
direct-load.enabledfalseBooleanEnable direct-load writing.
direct-load.rpc-port2882IntegerRpc port number used in direct-load.
direct-load.parallel8IntegerThe parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task.
direct-load.batch-size10240IntegerThe size of the batch that is written to the OceanBase at one time.
direct-load.max-error-rows0LongMaximum tolerable number of error rows.
direct-load.dup-actionREPLACEStringAction when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE.
direct-load.timeout7dDurationThe timeout for direct-load task.
direct-load.heartbeat-timeout60sDurationClient heartbeat timeout in direct-load task.
direct-load.heartbeat-interval10sDurationClient heartbeat interval in direct-load task.
direct-load.load-methodfullStringThe direct-load load mode: full, inc, inc_replace. +
    +
  • full: full direct-load, default value.
  • +
  • inc: normal incremental direct-load, primary key conflict check will be performed, observer-4.3.2 and above support, direct-load.dup-action REPLACE is not supported for the time being.
  • +
  • inc_replace: special replace mode incremental direct-load, no primary key conflict check will be performed, directly overwrite the old data (equivalent to the effect of replace), direct-load.dup-action parameter will be ignored, observer-4.3.2 and above support.
  • +
+
+
+ +### JDBC configuration + +- This Connector is implemented based on [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html). + - For more configuration items, see: [JDBC To Other Databases#Data Source Option](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option) +- Support OceanBase MySQL and Oracle modes: + - For MySQL mode, you need to add the `MySQL Connector/J` driver to Spark's CLASSPATH + - For Oracle mode, you need to add the `OceanBase Connector/J` driver to Spark's CLASSPATH + diff --git a/docs/spark-connector-oceanbase_cn.md b/docs/spark-connector-oceanbase_cn.md new file mode 100644 index 0000000..7267f8a --- /dev/null +++ b/docs/spark-connector-oceanbase_cn.md @@ -0,0 +1,407 @@ +# Spark Connector OceanBase + +[English](spark-connector-oceanbase.md) | 简体中文 + +Spark OceanBase Connector 可以支持通过 Spark 读取 OceanBase 中存储的数据,也支持通过 Spark 写入数据到 OceanBase。 + +| | Read | Write | +|-----------|------|-----------| +| DataFrame | JDBC | JDBC、旁路导入 | +| SQL | JDBC | JDBC、旁路导入 | + +## 版本兼容 + +
+ + + + + + + + + + + + + + + + + + + +
ConnectorSparkOceanBaseJavaScala
1.02.4, 3.1 ~ 3.4 +
    +
  • JDBC: 3.x, 4.x
  • +
  • 旁路导入: 4.2.x及以后的版本
  • +
+
82.12
+
+ +- 注意:如果需要基于其他 scala 版本构建的程序包, 您可以通过源码构建的方式获得程序包 + +## 获取程序包 + +您可以在 [Releases 页面](https://github.com/oceanbase/spark-connector-oceanbase/releases) 或者 [Maven 中央仓库](https://central.sonatype.com/artifact/com.oceanbase/spark-connector-oceanbase) 找到正式的发布版本。 + +```xml + + com.oceanbase + spark-connector-oceanbase-3.4_2.12 + ${project.version} + +``` + +如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定: + +```xml + + com.oceanbase + spark-connector-oceanbase-3.4_2.12 + ${project.version} + + + + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + true + + + +``` + +当然您也可以通过源码构建的方式获得程序包。 +- 默认以scala 2.12版本进行构建 +- 编译成功后,会在各个版本对应的模块下的target目录生成目标 jar 包,如:spark-connector-oceanbase-3.4_2.12-1.0-SNAPSHOT.jar。 将此文件复制到 Spark 的 ClassPath 中即可使用 spark-connector-oceanbase。 + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -DskipTests +``` + +- 如果需要其他 scala 版本,请参考下面以 scala 2.11版本构建命令 + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTests +``` + +## 使用示例 + +### 读取 + +#### 通过Spark-SQL + +```sql +CREATE TEMPORARY VIEW spark_oceanbase +USING oceanbase +OPTIONS( + "host"= "localhost", + "sql-port" = "2881", + "schema-name"="test", + "table-name"="test", + "username"="root", + "password"="123456" +); + +SELECT * FROM spark_oceanbase; +``` + +#### 通过DataFrame + +```scala +val oceanBaseSparkDF = spark.read.format("OceanBase") + .option("host", "localhost") + .option("sql-port", 2881) + .option("username", "root") + .option("password", "123456") + .option("table-name", "test") + .option("schema-name", "test") + .load() + +oceanBaseSparkDF.show(5) +``` + +### 写入 + +以从Hive同步数据到OceanBase为例 + +#### 准备工作 + +创建对应的Hive表和OceanBase表,为数据同步做准备 + +- 通过${SPARK_HOME}/bin/spark-sql命令,开启spark-sql + +```sql +CREATE TABLE test.orders ( + order_id INT, + order_date TIMESTAMP, + customer_name string, + price double, + product_id INT, + order_status BOOLEAN +) using parquet; + +insert into orders values +(1, now(), 'zs', 12.2, 12, true), +(2, now(), 'ls', 121.2, 12, true), +(3, now(), 'xx', 123.2, 12, true), +(4, now(), 'jac', 124.2, 12, false), +(5, now(), 'dot', 111.25, 12, true); +``` + +- 连接到OceanBase + +```sql +CREATE TABLE test.orders ( + order_id INT PRIMARY KEY, + order_date TIMESTAMP, + customer_name VARCHAR(225), + price double, + product_id INT, + order_status BOOLEAN +); +``` + +#### 通过JDBC方式 + +##### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_jdbc +USING oceanbase +OPTIONS( + "host"="localhost", + "sql-port" = "2881", + "schema-name"="test", + "table-name"="orders", + "username"="root@test", + "password"="" +); + +insert into table test_jdbc +select * from test.orders; + +insert overwrite table test_jdbc +select * from test.orders; +``` + +##### DataFrame + +```scala +// 读取hive表test.orders +val df = spark.sql("select * from test.orders") + +// 写入到OceanBase使用DataFrame +import org.apache.spark.sql.SaveMode +df.write + .format("oceanbase") + .mode(saveMode = SaveMode.Append) + .option("host", "localhost") + .option("sql-port", 2881) + .option("username", "root") + .option("password", "123456") + .option("table-name", "orders") + .option("schema-name", "test") + .save() +``` + +#### 通过旁路导入方式 + +##### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_direct +USING oceanbase +OPTIONS( + "host"="localhost", + "sql-port" = "2881", + "schema-name"="test", + "table-name"="orders", + "username"="root@test", + "password"="", + "direct-load.enabled" = true, + "direct-load.rpc-port" = "2882" +); + +insert into table test_direct +select * from test.orders; + +insert overwrite table test_direct +select * from test.orders; +``` + +##### DataFrame + +```scala +// 读取hive表test.orders +val df = spark.sql("select * from test.orders") + +// 写入到OceanBase使用DataFrame +import org.apache.spark.sql.SaveMode +df.write + .format("oceanbase") + .mode(saveMode = SaveMode.Append) + .option("host", "localhost") + .option("sql-port", 2881) + .option("username", "root") + .option("password", "123456") + .option("table-name", "orders") + .option("schema-name", "test") + .option("direct-load.enabled", "true") + .option("direct-load.rpc-port", "2882") + .save() +``` + +## 配置 + +### 通用配置项 + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数名默认值类型描述
hostString数据库的 JDBC url。
sql-port2881IntSQL 端口。
usernameString连接用户名。
passwordString连接密码。
schema-nameString连接的 schema 名或 db 名。
table-nameString表名。
+
+ +### 旁路导入配置项 + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数名默认值类型描述
direct-load.enabledfalseBoolean是否开启旁路导入写入。
direct-load.rpc-port2882Integer旁路导入用到的rpc端口。
direct-load.parallel8Integer旁路导入服务端的并发度。该参数决定了服务端使用多少cpu资源来处理本次导入任务。
direct-load.batch-size10240Integer一次写入OceanBase的批大小。
direct-load.max-error-rows0Long旁路导入任务最大可容忍的错误行数目。
direct-load.dup-actionREPLACEString旁路导入任务中主键重复时的处理策略。可以是 STOP_ON_DUP(本次导入失败),REPLACE(替换)或 IGNORE(忽略)。
direct-load.timeout7dDuration旁路导入任务的超时时间。
direct-load.heartbeat-timeout60sDuration旁路导入任务客户端的心跳超时时间。
direct-load.heartbeat-interval10sDuration旁路导入任务客户端的心跳间隔时间。
direct-load.load-methodfullString旁路导入导入模式:full, inc, inc_replace。 +
    +
  • full:全量旁路导入,默认值。
  • +
  • inc:普通增量旁路导入,会进行主键冲突检查,observer-4.3.2及以上支持,暂时不支持direct-load.dup-action为REPLACE。
  • +
  • inc_replace: 特殊replace模式的增量旁路导入,不会进行主键冲突检查,直接覆盖旧数据(相当于replace的效果),direct-load.dup-action参数会被忽略,observer-4.3.2及以上支持。
  • +
+
+
+ +### JDBC配置项 + +- 此Connector在[JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)的基础上实现。 + - 更多配置项见:[JDBC To Other Databases#Data Source Option](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option) +- 支持OceanBase的MySQL和Oracle模式: + - 对于MySQL模式需要添加`MySQL Connector/J`驱动到Spark的CLASSPATH + - 对于Oracle模式需要添加`OceanBase Connector/J`驱动到Spark的CLASSPATH + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..62ef94b --- /dev/null +++ b/pom.xml @@ -0,0 +1,487 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-oceanbase-parent + ${rversion} + pom + + + spark-connector-oceanbase + spark-connector-oceanbase-common + + + + 1.0-SNAPSHOT + 2.12.15 + 2.12 + 1.7.32 + 2.17.1 + 4.13.2 + 5.8.1 + 2.27.1 + 1.8 + 3.23.1 + 1.20.1 + 3.2.2 + + + + + + org.junit + junit-bom + ${junit5.version} + pom + import + + + + junit + junit + ${junit4.version} + + + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + + org.junit.jupiter + junit-jupiter + test + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + + + + org.testcontainers + oceanbase + ${testcontainers.version} + test + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + ${target.java.version} + ${target.java.version} + + false + + + -Xpkginfo:always + + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 1.7 + + + + + + com.oceanbase,javax,java,scala,\# + + + + + + + + + + UTF-8 + 4 + true + false + false + true + false + false + custom_1 + false + false + + + Leading blank line + project + project + + + + + *.md + docs/**/*.md + + + + + true + + + + + spotless-check + + check + + validate + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9.1 + + + true + false + + -Xdoclint:none + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + 3.0.0-M1 + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + + + org.scalastyle + scalastyle-maven-plugin + 1.0.0 + + false + true + true + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${project.basedir}/target/scalastyle-output.xml + UTF-8 + UTF-8 + + + + + check + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M5 + + + + true + + + + + default-test + + test + + test + + + **/*.* + **/*Test.* + + + + + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + true + + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + + true + true + + + + + + test-jar + + test-jar + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.apache.rat + apache-rat-plugin + 0.15 + false + + false + 0 + + + AL2 + Apache License 2.0 + + + Licensed to the Apache Software Foundation (ASF) under one + + + + + + Apache License 2.0 + + + + + **/.*/** + **/*.prefs + **/*.log + + **/*.md + .github/** + + **/*.iml + **/.idea/** + + **/target/** + **/scalastyle-output.xml + **/*.svg + + **/LICENSE* + + release/** + **/dependency-reduced-pom.xml + + + + + + check + + compile + + + + + + com.diffplug.spotless + spotless-maven-plugin + + + + org.apache.maven.plugins + maven-clean-plugin + + + + ${project.basedir} + + dependency-reduced-pom.xml + + + + + + + org.codehaus.mojo + flatten-maven-plugin + 1.5.0 + + true + resolveCiFriendliesOnly + + + + flatten + + flatten + + process-resources + + + flatten.clean + + clean + + clean + + + + + + + 2024 + + + OceanBase + https://open.oceanbase.com + + + + + Apache-2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + oceanbase + OceanBase Developers + open_contact@oceanbase.com + + Developer + + +8 + + + + + scm:git:https://github.com/oceanbase/spark-connector-oceanbase.git + scm:git:https://github.com/oceanbase/spark-connector-oceanbase.git + https://github.com/oceanbase/spark-connector-oceanbase + HEAD + + + + + + true + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + + + + + sonatype-nexus-staging + Nexus Release Repository + https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/ + + + sonatype-nexus-snapshots + Sonatype Nexus Snapshots + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + + diff --git a/spark-connector-oceanbase-common/pom.xml b/spark-connector-oceanbase-common/pom.xml new file mode 100644 index 0000000..06a756b --- /dev/null +++ b/spark-connector-oceanbase-common/pom.xml @@ -0,0 +1,26 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-oceanbase-parent + ${rversion} + + + spark-connector-oceanbase-common + ${rversion} + diff --git a/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/Config.java b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/Config.java new file mode 100644 index 0000000..080cc14 --- /dev/null +++ b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/Config.java @@ -0,0 +1,160 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.config; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The Config class is responsible for managing configuration settings. */ +public abstract class Config implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(Config.class); + + private final ConcurrentMap configMap; + + private final Map deprecatedConfigMap; + + private final DeprecatedConfig[] deprecatedConfigs = {}; + + /** Constructs a Config instance and loads default configurations. */ + public Config() { + deprecatedConfigMap = new HashMap<>(); + for (DeprecatedConfig config : deprecatedConfigs) { + deprecatedConfigMap.put(config.key, config); + } + + configMap = new ConcurrentHashMap<>(); + } + + /** + * Gets the value of a configuration entry. + * + * @param entry The configuration entry to retrieve. + * @param The type of the configuration value. + * @return The value of the configuration entry. + * @throws NoSuchElementException If the configuration entry is not found. + */ + public T get(ConfigEntry entry) throws NoSuchElementException { + return entry.readFrom(configMap); + } + + /** + * Retrieves the raw string value associated with the specified configuration key. + * + * @param key The configuration key for which the raw string value is requested. + * @return The raw string value associated with the given configuration key, or null if the key + * is not found. + */ + public String getRawString(String key) { + return configMap.get(key); + } + + /** + * Retrieves the raw string value associated with the specified configuration key, providing a + * default value if the key is not found. + * + * @param key The configuration key for which the raw string value is requested. + * @param defaultValue The default value to be returned if the key is not found. + * @return The raw string value associated with the given configuration key, or the provided + * default value if the key is not found. + */ + public String getRawString(String key, String defaultValue) { + return configMap.getOrDefault(key, defaultValue); + } + + /** + * Retrieves a map containing all configuration entries. + * + * @return An unmodifiable map containing all configuration entries. + */ + public Map getAllConfig() { + return new HashMap<>(configMap); + } + + /** + * Sets the value of a configuration entry. + * + * @param entry The configuration entry for which the value needs to be set. + * @param value The new value to be assigned to the configuration entry. + * @param The type of the configuration value. + */ + public void set(ConfigEntry entry, T value) { + if (entry.isDeprecated() + && deprecatedConfigMap.containsKey(entry.getKey()) + && LOG.isWarnEnabled()) { + LOG.warn( + "Config {} is deprecated since {}. {}", + entry.getKey(), + deprecatedConfigMap.get(entry.getKey()).version, + deprecatedConfigMap.get(entry.getKey()).deprecationMessage); + } + + if (value == null && LOG.isWarnEnabled()) { + LOG.warn("Config {} value to set is null, ignore setting to Config.", entry.getKey()); + return; + } + + entry.writeTo(configMap, value); + } + + /** + * Loads configurations from a map. + * + * @param map The map containing configuration key-value pairs. + * @param predicate The keys only match the predicate will be loaded to configMap + */ + public void loadFromMap(Map map, Predicate predicate) { + map.forEach( + (k, v) -> { + String trimmedK = k.trim(); + String trimmedV = v.trim(); + if (!trimmedK.isEmpty() && !trimmedV.isEmpty()) { + if (predicate.test(trimmedK)) { + configMap.put(trimmedK, trimmedV); + } + } + }); + } + + /** The DeprecatedConfig class represents a configuration entry that has been deprecated. */ + private static class DeprecatedConfig { + private final String key; + private final String version; + private final String deprecationMessage; + + /** + * Constructs a DeprecatedConfig instance. + * + * @param key The key of the deprecated configuration. + * @param version The version in which the configuration was deprecated. + * @param deprecationMessage Message to indicate the deprecation warning. + */ + private DeprecatedConfig(String key, String version, String deprecationMessage) { + this.key = key; + this.version = version; + this.deprecationMessage = deprecationMessage; + } + } +} diff --git a/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java new file mode 100644 index 0000000..7da3009 --- /dev/null +++ b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigBuilder.java @@ -0,0 +1,194 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.config; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +/** Builder class for creating configuration entries. */ +public class ConfigBuilder { + + private String key; + + private List alternatives; + + private String doc; + + private String version; + + private boolean isPublic; + + private boolean isDeprecated; + + /** + * Constructs a ConfigBuilder with the given key. + * + * @param key The key for the configuration. + */ + public ConfigBuilder(String key) { + this.key = key; + + this.alternatives = Collections.emptyList(); + this.doc = ""; + this.version = "0.1.0"; + this.isPublic = true; + this.isDeprecated = false; + } + + /** + * Sets the alternatives for the configuration. + * + * @param alternatives The list of alternative keys. + * @return The current ConfigBuilder instance. + */ + public ConfigBuilder alternatives(List alternatives) { + this.alternatives = alternatives; + return this; + } + + /** + * Sets the documentation for the configuration. + * + * @param doc The documentation string. + * @return The current ConfigBuilder instance. + */ + public ConfigBuilder doc(String doc) { + this.doc = doc; + return this; + } + + /** + * Sets the version for the configuration. + * + * @param version The version string. + * @return The current ConfigBuilder instance. + */ + public ConfigBuilder version(String version) { + this.version = version; + return this; + } + + /** + * Marks the configuration entry as internal (non-public). + * + * @return The current ConfigBuilder instance. + */ + public ConfigBuilder internal() { + this.isPublic = false; + return this; + } + + /** + * Marks the configuration entry as deprecated. + * + * @return The current ConfigBuilder instance. + */ + public ConfigBuilder deprecated() { + this.isDeprecated = true; + return this; + } + + /** + * Creates a configuration entry for String data type. + * + * @return The created ConfigEntry instance for String data type. + */ + public ConfigEntry stringConf() { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + conf.setValueConverter(s -> s); + conf.setStringConverter(s -> s); + + return conf; + } + + /** + * Creates a configuration entry for Integer data type. + * + * @return The created ConfigEntry instance for Integer data type. + */ + public ConfigEntry intConf() { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + Function func = + s -> { + if (s == null || s.isEmpty()) { + return null; + } else { + return Integer.parseInt(s); + } + }; + conf.setValueConverter(func); + + Function stringFunc = + t -> Optional.ofNullable(t).map(String::valueOf).orElse(null); + conf.setStringConverter(stringFunc); + + return conf; + } + + /** + * Creates a configuration entry for Long data type. + * + * @return The created ConfigEntry instance for Long data type. + */ + public ConfigEntry longConf() { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + Function func = + s -> { + if (s == null || s.isEmpty()) { + return null; + } else { + return Long.parseLong(s); + } + }; + conf.setValueConverter(func); + + Function stringFunc = + t -> Optional.ofNullable(t).map(String::valueOf).orElse(null); + conf.setStringConverter(stringFunc); + + return conf; + } + + /** + * Creates a configuration entry for Boolean data type. + * + * @return The created ConfigEntry instance for Boolean data type. + */ + public ConfigEntry booleanConf() { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + Function func = + s -> { + if (s == null || s.isEmpty()) { + return null; + } else { + return Boolean.parseBoolean(s); + } + }; + conf.setValueConverter(func); + + Function stringFunc = + t -> Optional.ofNullable(t).map(String::valueOf).orElse(null); + conf.setStringConverter(stringFunc); + + return conf; + } +} diff --git a/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigConstants.java b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigConstants.java new file mode 100644 index 0000000..ea420d0 --- /dev/null +++ b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigConstants.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.config; + +/** Constants used for configuration. */ +public interface ConfigConstants { + + /** The value of messages used to indicate that the configuration is not set. */ + String NOT_BLANK_ERROR_MSG = "The value can't be blank"; + + /** + * The value of messages used to indicate that the configuration should be a positive number. + */ + String POSITIVE_NUMBER_ERROR_MSG = "The value must be a positive number"; + + String VERSION_1_0_0 = "1.0"; +} diff --git a/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigEntry.java b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigEntry.java new file mode 100644 index 0000000..4295607 --- /dev/null +++ b/spark-connector-oceanbase-common/src/main/java/com/oceanbase/spark/config/ConfigEntry.java @@ -0,0 +1,350 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.config; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Defines configuration properties. */ +public class ConfigEntry { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigEntry.class); + + private String key; + + private List alternatives; + + private T defaultValue; + + private Function valueConverter; + + private Function stringConverter; + + private String doc; + + private String version; + + private boolean isPublic; + + private boolean isDeprecated; + + private boolean isOptional; + private boolean hasNoDefault; + private Consumer validator; + + /** + * Creates a new ConfigEntry instance. + * + * @param key The key of the configuration. + * @param version The version that introduces this configuration. + * @param doc The documentation of the configuration. + * @param alternatives Alternative keys for the configuration. + * @param isPublic Whether the configuration is public. + * @param isDeprecated Whether the configuration is deprecated. + */ + ConfigEntry( + String key, + String version, + String doc, + List alternatives, + boolean isPublic, + boolean isDeprecated) { + this.key = key; + this.version = version; + this.doc = doc; + this.alternatives = alternatives; + this.isPublic = isPublic; + this.isDeprecated = isDeprecated; + this.isOptional = false; + } + + /** + * Sets a custom value converter function for this configuration. + * + * @param valueConverter The function that converts a configuration value string to the desired + * type. + */ + void setValueConverter(Function valueConverter) { + this.valueConverter = valueConverter; + } + + /** + * Sets a custom string converter function for this configuration. + * + * @param stringConverter The function that converts a configuration value to its string + * representation. + */ + void setStringConverter(Function stringConverter) { + this.stringConverter = stringConverter; + } + + /** + * Sets the default value for this configuration. + * + * @param t The default value to be used when no value is provided. + */ + void setDefaultValue(T t) { + this.defaultValue = t; + } + + /** + * Marks this configuration as optional. An optional entry can be absent in the configuration + * properties without raising an exception. + */ + void setOptional() { + this.isOptional = true; + } + + /** Marks this configuration as no default value. */ + void setHasNoDefault() { + this.hasNoDefault = true; + } + + /** Set the validator value. */ + void setValidator(Consumer validator) { + this.validator = validator; + } + + /** + * Checks if the user-provided value for the config matches the validator. + * + * @param checkValueFunc The validator of the configuration option + * @param errorMsg The thrown error message if the value is invalid + * @return The current ConfigEntry instance + */ + public ConfigEntry checkValue(Function checkValueFunc, String errorMsg) { + setValidator( + value -> { + if (!checkValueFunc.apply(value)) { + throw new IllegalArgumentException( + String.format( + "%s in %s is invalid. %s", + stringConverter.apply(value), key, errorMsg)); + } + }); + return this; + } + + /** + * Split the string to a list, then map each string element to its converted form. + * + * @param str The string form of the value list from the conf entry. + * @param converter The original ConfigEntry valueConverter. + * @return The list of converted type. + */ + public List strToSeq(String str, Function converter) { + List strList = Arrays.asList(str.split(",")); + List valList = strList.stream().map(converter).collect(Collectors.toList()); + + return valList; + } + + /** + * Reduce the values then join them as a string. + * + * @param seq The sequence of the value list from the conf entry. + * @param converter The original ConfigEntry stringConverter. + * @return The converted string. + */ + public String seqToStr(List seq, Function converter) { + List valList = seq.stream().map(converter).collect(Collectors.toList()); + String str = String.join(",", valList); + return str; + } + + /** + * Converts the configuration value to value list. + * + * @return The ConfigEntry instance. + */ + public ConfigEntry> toSequence() { + ConfigEntry> conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + + Function elementConverter; + if (validator == null) { + elementConverter = valueConverter; + } else { + elementConverter = + value -> { + if (value == null) { + validator.accept(null); + } + T convertedValue = valueConverter.apply(value); + validator.accept(convertedValue); + return convertedValue; + }; + } + + conf.setValueConverter((String str) -> strToSeq(str, elementConverter)); + conf.setStringConverter( + (List val) -> val == null ? null : seqToStr(val, stringConverter)); + + return conf; + } + + /** + * Creates a new ConfigEntry instance based on this configuration entry with a default value. + * + * @param t The default value to be used when no value is provided. + * @return A new ConfigEntry instance with the specified default value. + */ + public ConfigEntry createWithDefault(T t) { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + conf.setValueConverter(valueConverter); + conf.setStringConverter(stringConverter); + conf.setDefaultValue(t); + conf.setValidator(validator); + + return conf; + } + + /** + * Creates a new ConfigEntry instance based on this configuration entry with optional value + * handling. + * + * @return A new ConfigEntry instance that works with optional values. + */ + public ConfigEntry> createWithOptional() { + ConfigEntry> conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + conf.setValueConverter(s -> Optional.ofNullable(valueConverter.apply(s))); + // Unless explicitly set by the user, null values are not expected to occur. + conf.setStringConverter(t -> t.map(stringConverter).orElse(null)); + conf.setOptional(); + conf.setValidator( + optionValue -> { + if (Stream.of(Optional.ofNullable(validator), optionValue) + .allMatch(Optional::isPresent)) { + validator.accept(optionValue.get()); + } + }); + + return conf; + } + + /** + * Creates a new ConfigEntry instance based on this configuration entry with no default value. + * + * @return A new ConfigEntry instance with no default value. + */ + public ConfigEntry create() { + ConfigEntry conf = + new ConfigEntry<>(key, version, doc, alternatives, isPublic, isDeprecated); + conf.setValueConverter(valueConverter); + conf.setStringConverter(stringConverter); + conf.setHasNoDefault(); + conf.setValidator(validator); + return conf; + } + + /** + * Reads the configuration value. If the configuration value is not found, it will try to find + * the value from the alternatives,which means that newer configurations have higher priority + * over deprecated ones. + * + * @param properties The map containing the configuration properties. + * @return The value of the configuration entry. + * @throws NoSuchElementException If the configuration value is not found. + */ + public T readFrom(Map properties) throws NoSuchElementException { + String value = properties.get(key); + if (value == null) { + for (String alternative : alternatives) { + value = properties.get(alternative); + if (value != null) { + break; + } + } + } + + if (value == null) { + if (defaultValue != null) { + return defaultValue; + } else if (hasNoDefault) { + if (validator != null) { + validator.accept(null); + } + return null; + } else if (!isOptional) { + throw new NoSuchElementException("No configuration found for key " + key); + } + } + + T convertedValue = valueConverter.apply(value); + if (validator != null) { + validator.accept(convertedValue); + } + return convertedValue; + } + + /** + * Writes the provided value to the specified properties map. + * + * @param properties The map to write the configuration property to. + * @param value The value of the configuration entry. + */ + public void writeTo(Map properties, T value) { + String stringValue = stringConverter.apply(value); + if (stringValue == null) { + // To ensure that a null value is not set in the configuration + LOG.warn("Config {} value to set is null, ignore setting to Config.", stringValue); + return; + } + + properties.put(key, stringValue); + } + + public String getKey() { + return key; + } + + public List getAlternatives() { + return alternatives; + } + + public T getDefaultValue() { + return defaultValue; + } + + public String getDoc() { + return doc; + } + + public String getVersion() { + return version; + } + + public boolean isPublic() { + return isPublic; + } + + public boolean isDeprecated() { + return isDeprecated; + } +} diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMetadata.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMetadata.java new file mode 100644 index 0000000..1abde4e --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMetadata.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +public interface OceanBaseMetadata { + + String getHost(); + + int getPort(); + + int getRpcPort(); + + String getJdbcUrl(); + + String getClusterName(); + + String getSchemaName(); + + String getSysUsername(); + + String getSysPassword(); + + String getUsername(); + + String getPassword(); +} diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java new file mode 100644 index 0000000..53bc92c --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java @@ -0,0 +1,191 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; + + +import com.github.dockerjava.api.model.ContainerNetwork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.oceanbase.OceanBaseCEContainer; + +public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLTestBase.class); + + private static final int SQL_PORT = 2881; + private static final int RPC_PORT = 2882; + private static final int CONFIG_SERVER_PORT = 8080; + private static final String CONFIG_URL_PATH = "/services?Action=GetObProxyConfig"; + + private static final String CLUSTER_NAME = "flink-oceanbase-ci"; + private static final String TEST_TENANT = "flink"; + private static final String SYS_PASSWORD = "123456"; + private static final String TEST_PASSWORD = "654321"; + + private static final Network NETWORK = Network.newNetwork(); + + @SuppressWarnings("resource") + public static final GenericContainer CONFIG_SERVER = + new GenericContainer<>("oceanbase/ob-configserver:1.0.0-2") + .withNetwork(NETWORK) + .withExposedPorts(CONFIG_SERVER_PORT) + .waitingFor( + new HttpWaitStrategy() + .forPort(CONFIG_SERVER_PORT) + .forPath(CONFIG_URL_PATH)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + public static final OceanBaseCEContainer CONTAINER = + new OceanBaseCEContainer("oceanbase/oceanbase-ce:latest") + .withNetwork(NETWORK) + .withMode(OceanBaseCEContainer.Mode.MINI) + .withTenantName(TEST_TENANT) + .withPassword(TEST_PASSWORD) + .withEnv("OB_CLUSTER_NAME", CLUSTER_NAME) + .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD) + .withEnv("OB_DATAFILE_SIZE", "2G") + .withEnv("OB_LOG_DISK_SIZE", "4G") + .withStartupTimeout(Duration.ofMinutes(4)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + public static final OceanBaseProxyContainer ODP = + new OceanBaseProxyContainer("4.3.1.0-4") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + public static String getContainerIP(GenericContainer container) { + String ip = + container.getContainerInfo().getNetworkSettings().getNetworks().values().stream() + .findFirst() + .map(ContainerNetwork::getIpAddress) + .orElseThrow( + () -> + new RuntimeException( + "Can't get IP address of container: " + container)); + LOG.info("Docker image: {}, container IP: {}", container.getDockerImageName(), ip); + return ip; + } + + public static String getConfigServerAddress(GenericContainer container) { + String ip = getContainerIP(container); + return "http://" + ip + ":" + CONFIG_SERVER_PORT; + } + + public static String constructConfigUrlForODP(String address) { + return address + CONFIG_URL_PATH; + } + + public static Connection getSysJdbcConnection() throws SQLException { + String jdbcUrl = + "jdbc:mysql://" + + CONTAINER.getHost() + + ":" + + CONTAINER.getMappedPort(SQL_PORT) + + "/?useUnicode=true&characterEncoding=UTF-8&useSSL=false"; + return DriverManager.getConnection(jdbcUrl, "root", SYS_PASSWORD); + } + + public static String getSysParameter(String parameter) { + try (Connection connection = getSysJdbcConnection(); + Statement statement = connection.createStatement()) { + String sql = String.format("SHOW PARAMETERS LIKE '%s'", parameter); + ResultSet rs = statement.executeQuery(sql); + if (rs.next()) { + return rs.getString("VALUE"); + } + throw new RuntimeException("Parameter '" + parameter + "' not found"); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void createSysUser(String user, String password) throws SQLException { + assert user != null && password != null; + try (Connection connection = getSysJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE USER '" + user + "' IDENTIFIED BY '" + password + "'"); + statement.execute("GRANT ALL PRIVILEGES ON *.* TO '" + user + "'@'%'"); + } + } + + @Override + public String getHost() { + return CONTAINER.getHost(); + } + + @Override + public int getPort() { + return CONTAINER.getMappedPort(SQL_PORT); + } + + @Override + public int getRpcPort() { + return CONTAINER.getMappedPort(RPC_PORT); + } + + @Override + public String getJdbcUrl() { + return "jdbc:mysql://" + + getHost() + + ":" + + getPort() + + "/" + + getSchemaName() + + "?useUnicode=true&characterEncoding=UTF-8&useSSL=false"; + } + + @Override + public String getClusterName() { + return CLUSTER_NAME; + } + + @Override + public String getSchemaName() { + return CONTAINER.getDatabaseName(); + } + + @Override + public String getSysUsername() { + return "root"; + } + + @Override + public String getSysPassword() { + return SYS_PASSWORD; + } + + @Override + public String getUsername() { + return CONTAINER.getUsername(); + } + + @Override + public String getPassword() { + return CONTAINER.getPassword(); + } +} diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseOracleTestBase.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseOracleTestBase.java new file mode 100644 index 0000000..7964f83 --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseOracleTestBase.java @@ -0,0 +1,79 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +import java.util.Map; + +public abstract class OceanBaseOracleTestBase extends OceanBaseTestBase { + + @Override + public Map getOptions() { + Map options = super.getOptions(); + options.put("driver-class-name", "com.oceanbase.jdbc.Driver"); + return options; + } + + @Override + public String getHost() { + return System.getenv("HOST"); + } + + @Override + public int getPort() { + return Integer.parseInt(System.getenv("PORT")); + } + + @Override + public int getRpcPort() { + return Integer.parseInt(System.getenv("RPC_PORT")); + } + + @Override + public String getJdbcUrl() { + return String.format("jdbc:oceanbase://%s:%d/%s", getHost(), getPort(), getSchemaName()); + } + + @Override + public String getClusterName() { + return System.getenv("CLUSTER_NAME"); + } + + @Override + public String getSchemaName() { + return System.getenv("SCHEMA_NAME"); + } + + @Override + public String getSysUsername() { + return System.getenv("SYS_USERNAME"); + } + + @Override + public String getSysPassword() { + return System.getenv("SYS_PASSWORD"); + } + + @Override + public String getUsername() { + return System.getenv("USERNAME"); + } + + @Override + public String getPassword() { + return System.getenv("PASSWORD"); + } +} diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseProxyContainer.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseProxyContainer.java new file mode 100644 index 0000000..872ad58 --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseProxyContainer.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +import java.util.Objects; + + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class OceanBaseProxyContainer extends GenericContainer { + + private static final String IMAGE = "oceanbase/obproxy-ce"; + + private static final int SQL_PORT = 2883; + private static final int RPC_PORT = 2885; + private static final String APP_NAME = "flink_oceanbase_test"; + + private String configUrl; + private String password; + + public OceanBaseProxyContainer(String version) { + super(DockerImageName.parse(IMAGE + ":" + version)); + addExposedPorts(SQL_PORT, RPC_PORT); + } + + @Override + protected void configure() { + addEnv("APP_NAME", APP_NAME); + addEnv("CONFIG_URL", Objects.requireNonNull(configUrl)); + addEnv("PROXYRO_PASSWORD", Objects.requireNonNull(password)); + } + + public OceanBaseProxyContainer withConfigUrl(String configUrl) { + this.configUrl = configUrl; + return this; + } + + public OceanBaseProxyContainer withPassword(String password) { + this.password = password; + return this; + } + + public int getSqlPort() { + return getMappedPort(SQL_PORT); + } + + public int getRpcPort() { + return getMappedPort(RPC_PORT); + } +} diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseTestBase.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseTestBase.java new file mode 100644 index 0000000..7cfbb95 --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseTestBase.java @@ -0,0 +1,192 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public abstract class OceanBaseTestBase implements OceanBaseMetadata { + + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + public static String getOptionsString(Map options) { + assertNotNull(options); + return options.entrySet().stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + + public Map getBaseOptions() { + Map options = new HashMap<>(); + options.put("url", getJdbcUrl()); + options.put("username", getUsername()); + options.put("password", getPassword()); + return options; + } + + public Map getOptions() { + Map options = getBaseOptions(); + options.put("schema-name", getSchemaName()); + return options; + } + + public String getOptionsString() { + return getOptionsString(getOptions()); + } + + public Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection(getJdbcUrl(), getUsername(), getPassword()); + } + + public void initialize(String sqlFile) { + final URL file = getClass().getClassLoader().getResource(sqlFile); + assertNotNull("Cannot locate " + sqlFile, file); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(file.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void waitingAndAssertTableCount(String tableName, int expectedCount) + throws InterruptedException { + int tableRowsCount = 0; + for (int i = 0; i < 100; ++i) { + try { + tableRowsCount = getTableRowsCount(tableName); + } catch (Exception e) { + throw new RuntimeException( + "Failed to get table rows count for table " + tableName, e); + } + + if (tableRowsCount < expectedCount) { + Thread.sleep(100); + } + } + assertEquals(expectedCount, tableRowsCount); + } + + public int getTableRowsCount(String tableName) throws SQLException { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM " + tableName); + return rs.next() ? rs.getInt(1) : 0; + } + } + + public List queryTable(String tableName) throws SQLException { + return queryTable(tableName, Collections.singletonList("*")); + } + + public List queryTable(String tableName, List fields) throws SQLException { + return queryTable(tableName, fields, this::getRowString); + } + + public List queryTable(String tableName, List fields, RowConverter rowConverter) + throws SQLException { + String sql = String.format("SELECT %s FROM %s", String.join(", ", fields), tableName); + List result = new ArrayList<>(); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery(sql); + ResultSetMetaData metaData = rs.getMetaData(); + + while (rs.next()) { + result.add(rowConverter.convert(rs, metaData.getColumnCount())); + } + } + return result; + } + + protected String getRowString(ResultSet rs, int columnCount) throws SQLException { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columnCount; i++) { + if (i != 0) { + sb.append(","); + } + sb.append(rs.getObject(i + 1)); + } + return sb.toString(); + } + + public void dropTables(String... tableNames) throws SQLException { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (String tableName : tableNames) { + statement.execute("DROP TABLE " + tableName); + } + } + } + + @FunctionalInterface + public interface RowConverter { + String convert(ResultSet rs, int columnCount) throws SQLException; + } +} diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/TestLoggerExtension.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/TestLoggerExtension.java new file mode 100644 index 0000000..2f996de --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/TestLoggerExtension.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +import java.io.PrintWriter; +import java.io.StringWriter; + + +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A JUnit-5-style test logger. */ +public class TestLoggerExtension implements TestWatcher, BeforeEachCallback { + private static final Logger LOG = LoggerFactory.getLogger(TestLoggerExtension.class); + + @Override + public void beforeEach(ExtensionContext context) { + LOG.info( + "\n================================================================================" + + "\nTest {}.{}[{}] is running." + + "\n--------------------------------------------------------------------------------", + context.getRequiredTestClass().getCanonicalName(), + context.getRequiredTestMethod().getName(), + context.getDisplayName()); + } + + @Override + public void testSuccessful(ExtensionContext context) { + LOG.info( + "\n--------------------------------------------------------------------------------" + + "\nTest {}.{}[{}] successfully run." + + "\n================================================================================", + context.getRequiredTestClass().getCanonicalName(), + context.getRequiredTestMethod().getName(), + context.getDisplayName()); + } + + @Override + public void testFailed(ExtensionContext context, Throwable cause) { + LOG.error( + "\n--------------------------------------------------------------------------------" + + "\nTest {}.{}[{}] failed with:\n{}." + + "\n================================================================================", + context.getRequiredTestClass().getCanonicalName(), + context.getRequiredTestMethod().getName(), + context.getDisplayName(), + exceptionToString(cause)); + } + + private static String exceptionToString(Throwable t) { + if (t == null) { + return "(null)"; + } + + try { + StringWriter stm = new StringWriter(); + PrintWriter wrt = new PrintWriter(stm); + t.printStackTrace(wrt); + wrt.close(); + return stm.toString(); + } catch (Throwable ignored) { + return t.getClass().getName() + " (error while printing stack trace)"; + } + } +} diff --git a/spark-connector-oceanbase-common/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/spark-connector-oceanbase-common/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000..3cb50b4 --- /dev/null +++ b/spark-connector-oceanbase-common/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,14 @@ +# Copyright 2024 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.oceanbase.spark.TestLoggerExtension diff --git a/spark-connector-oceanbase/pom.xml b/spark-connector-oceanbase/pom.xml new file mode 100644 index 0000000..65d010e --- /dev/null +++ b/spark-connector-oceanbase/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase-parent + ${rversion} + + + spark-connector-oceanbase + + pom + spark-connector-oceanbase + + + spark-connector-oceanbase-base + spark-connector-oceanbase-2.4 + spark-connector-oceanbase-3.1 + spark-connector-oceanbase-3.2 + spark-connector-oceanbase-3.3 + spark-connector-oceanbase-3.4 + + + + + com.oceanbase + obkv-table-client + 1.2.14-SNAPSHOT + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml new file mode 100644 index 0000000..217cd41 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-2.4/pom.xml @@ -0,0 +1,121 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase + ${rversion} + + + spark-connector-oceanbase-2.4_${scala.binary.version} + spark-connector-oceanbase-2.4 + + + 2.4.8 + + + + + com.oceanbase + spark-connector-oceanbase-base + ${rversion} + + + + org.apache.spark + spark-sql_2.11 + ${spark2.version} + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + * + + + com.fasterxml.jackson.module + * + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.xbean + xbean-asm6-shaded + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.alibaba:* + com.oceanbase:spark-connector-oceanbase-base + com.oceanbase:obkv-table-client + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + com.lmax:disruptor + + + false + + + com.oceanbase:obkv-table-client + + **/log4j/log-conf.xml + + + + *:* + + **/Log4j2Plugins.dat + + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-2.4/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml b/spark-connector-oceanbase/spark-connector-oceanbase-2.4/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml new file mode 100644 index 0000000..989e8c7 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-2.4/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml new file mode 100644 index 0000000..374a8ed --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase + ${rversion} + + + spark-connector-oceanbase-3.1_${scala.binary.version} + spark-connector-oceanbase-3.1 + + + 3.1.3 + + + + + com.oceanbase + spark-connector-oceanbase-base + ${rversion} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.alibaba:* + com.oceanbase:spark-connector-oceanbase-base + com.oceanbase:obkv-table-client + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + com.lmax:disruptor + + + + + + org.apache.logging + shade.org.apache.logging + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml new file mode 100644 index 0000000..06249b5 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase + ${rversion} + + + spark-connector-oceanbase-3.2_${scala.binary.version} + spark-connector-oceanbase-3.2 + + + 3.2.2 + + + + + com.oceanbase + spark-connector-oceanbase-base + ${rversion} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.alibaba:* + com.oceanbase:spark-connector-oceanbase-base + com.oceanbase:obkv-table-client + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + com.lmax:disruptor + + + + + + org.apache.logging + shade.org.apache.logging + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml new file mode 100644 index 0000000..3a556a9 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.3/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase + ${rversion} + + + spark-connector-oceanbase-3.3_${scala.binary.version} + spark-connector-oceanbase-3.3 + + + 3.3.2 + + + + + com.oceanbase + spark-connector-oceanbase-base + ${rversion} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.alibaba:* + com.oceanbase:spark-connector-oceanbase-base + com.oceanbase:obkv-table-client + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + com.lmax:disruptor + + + + + + org.apache.logging + shade.org.apache.logging + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml new file mode 100644 index 0000000..544efdf --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.4/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase + ${rversion} + + + spark-connector-oceanbase-3.4_${scala.binary.version} + spark-connector-oceanbase-3.4 + + + 3.4.0 + + + + + com.oceanbase + spark-connector-oceanbase-base + ${rversion} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.alibaba:* + com.oceanbase:spark-connector-oceanbase-base + com.oceanbase:obkv-table-client + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + com.lmax:disruptor + + + + + + org.apache.logging + shade.org.apache.logging + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml new file mode 100644 index 0000000..7008d6f --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/pom.xml @@ -0,0 +1,272 @@ + + + + 4.0.0 + + + com.oceanbase + spark-connector-oceanbase + ${rversion} + + + spark-connector-oceanbase-base + + jar + spark-connector-oceanbase-base + + + 3.4.0 + + + + + org.scala-lang + scala-library + ${scala.version} + + + org.scala-lang + scala-reflect + ${scala.version} + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + + + + + com.oceanbase + spark-connector-oceanbase-common + ${project.version} + test-jar + test + + + org.apache.spark + spark-hive_2.12 + ${spark.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + + + org.apache.spark + spark-core_2.12 + ${spark.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + + + org.apache.spark + spark-catalyst_2.12 + ${spark.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + test + + + + + com.lmax + disruptor + 3.4.2 + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + -target:jvm-${target.java.version} + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + + scala-compile-first + + add-source + compile + + process-resources + + + + + scala-test-compile + + testCompile + + process-test-resources + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 3.4.3 + + ${project.basedir}/../../.scalafmt.conf + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java new file mode 100644 index 0000000..f3597b5 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/ConnectionOptions.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.cfg; + +public interface ConnectionOptions { + + String HOST = "host"; + String SQL_PORT = "sql-port"; + String USERNAME = "username"; + String PASSWORD = "password"; + String SCHEMA_NAME = "schema-name"; + String TABLE_NAME = "table-name"; + + /* Direct-load config */ + String ENABLE_DIRECT_LOAD_WRITE = "direct-load.enabled"; + boolean ENABLE_DIRECT_LOAD_WRITE_DEFAULT = false; + String RPC_PORT = "direct-load.rpc-port"; + String PARALLEL = "direct-load.parallel"; + String EXECUTION_ID = "direct-load.execution-id"; + String DUP_ACTION = "direct-load.dup-action"; + String HEARTBEAT_TIMEOUT = "direct-load.heartbeat-timeout"; + String HEARTBEAT_INTERVAL = "direct-load.heartbeat-interval"; + String LOAD_METHOD = "direct-load.load-method"; + String TIMEOUT = "direct-load.timeout"; + String MAX_ERROR_ROWS = "direct-load.max-error-rows"; + String BATCH_SIZE = "direct-load-batch-size"; + int BATCH_SIZE_DEFAULT = 10240; + String SINK_TASK_PARTITION_SIZE = "direct-load.task.partition.size"; + /** + * Set direct-load task partition size. If you set a small coalesce size, and you don't have the + * action operations, this may result in the same parallelism in your computation. To avoid + * this, you can use repartition operations. This will add a shuffle step, but means the current + * upstream partitions will be executed in parallel. + */ + String SINK_TASK_USE_REPARTITION = "direct-load.task.use.repartition"; + + boolean SINK_TASK_USE_REPARTITION_DEFAULT = false; +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/OceanBaseUserInfo.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/OceanBaseUserInfo.java new file mode 100644 index 0000000..8de852c --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/OceanBaseUserInfo.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.cfg; + +import java.io.Serializable; + + +import org.apache.commons.lang3.StringUtils; + +public class OceanBaseUserInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + private String cluster; + private String tenant; + private final String user; + + public OceanBaseUserInfo(String cluster, String tenant, String user) { + this.cluster = cluster; + this.tenant = tenant; + this.user = user; + } + + public String getCluster() { + return cluster; + } + + public String getTenant() { + return tenant; + } + + public String getUser() { + return user; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public static OceanBaseUserInfo parse(SparkSettings settings) { + final String username = settings.getProperty(ConnectionOptions.USERNAME); + final String sepUserAtTenant = "@"; + final String sepTenantAtCluster = "#"; + final String sep = ":"; + final int expectedSepCount = 2; + if (username.contains(sepTenantAtCluster) && username.contains(sepUserAtTenant)) { + // user@tenant#cluster + String[] parts = username.split(sepTenantAtCluster); + String[] userAndTenant = parts[0].split(sepUserAtTenant); + return new OceanBaseUserInfo(parts[1], userAndTenant[1], userAndTenant[0]); + } else if (StringUtils.countMatches(username, sep) == expectedSepCount) { + // cluster:tenant:user + String[] parts = username.split(sep); + return new OceanBaseUserInfo(parts[0], parts[1], parts[2]); + } else if (username.contains(sepUserAtTenant) && !username.contains(sepTenantAtCluster)) { + // user@tenant + String[] parts = username.split(sepUserAtTenant); + return new OceanBaseUserInfo(null, parts[1], parts[0]); + } else { + // only user + return new OceanBaseUserInfo(null, null, username); + } + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java new file mode 100644 index 0000000..964f65c --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/Settings.java @@ -0,0 +1,120 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.cfg; + +import java.util.Enumeration; +import java.util.Map; +import java.util.Properties; + + +import org.apache.commons.lang3.StringUtils; + +public abstract class Settings { + public abstract String getProperty(String name); + + public abstract void setProperty(String name, String value); + + public abstract Properties asProperties(); + + public abstract Settings copy(); + + public String getProperty(String name, String defaultValue) { + String value = getProperty(name); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + return value; + } + + public Integer getIntegerProperty(String name) { + return getIntegerProperty(name, null); + } + + public Integer getIntegerProperty(String name, Integer defaultValue) { + if (getProperty(name) != null) { + return Integer.parseInt(getProperty(name)); + } + + return defaultValue; + } + + public Long getLongProperty(String name) { + return getLongProperty(name, null); + } + + public Long getLongProperty(String name, Long defaultValue) { + if (getProperty(name) != null) { + return Long.parseLong(getProperty(name)); + } + + return defaultValue; + } + + public Boolean getBooleanProperty(String name) { + return getBooleanProperty(name, null); + } + + public Boolean getBooleanProperty(String name, Boolean defaultValue) { + if (getProperty(name) != null) { + return Boolean.valueOf(getProperty(name)); + } + return defaultValue; + } + + public Settings merge(Properties properties) { + if (properties == null || properties.isEmpty()) { + return this; + } + + Enumeration propertyNames = properties.propertyNames(); + + for (; propertyNames.hasMoreElements(); ) { + Object prop = propertyNames.nextElement(); + if (prop instanceof String) { + Object value = properties.get(prop); + setProperty((String) prop, value.toString()); + } + } + + return this; + } + + public Settings merge(Map map) { + if (map == null || map.isEmpty()) { + return this; + } + + for (Map.Entry entry : map.entrySet()) { + setProperty(entry.getKey(), entry.getValue()); + } + + return this; + } + + @Override + public int hashCode() { + return asProperties().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + return asProperties().equals(((Settings) obj).asProperties()); + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java new file mode 100644 index 0000000..8539c50 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/cfg/SparkSettings.java @@ -0,0 +1,86 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.cfg; + +import java.util.Properties; + +import scala.Option; +import scala.Serializable; +import scala.Tuple2; + +import com.google.common.base.Preconditions; +import org.apache.spark.SparkConf; + +public class SparkSettings extends Settings implements Serializable { + + private final SparkConf cfg; + + public SparkSettings(SparkConf cfg) { + Preconditions.checkArgument(cfg != null, "non-null spark configuration expected."); + this.cfg = cfg; + } + + public static SparkSettings fromProperties(Properties props) { + SparkConf sparkConf = new SparkConf(); + props.forEach( + (k, v) -> { + if (k instanceof String) { + sparkConf.set((String) k, v.toString()); + } + }); + return new SparkSettings(sparkConf); + } + + public SparkSettings copy() { + return new SparkSettings(cfg.clone()); + } + + public String getProperty(String name) { + Option op = cfg.getOption(name); + if (!op.isDefined()) { + op = cfg.getOption("spark." + name); + } + return (op.isDefined() ? op.get() : null); + } + + public void setProperty(String name, String value) { + cfg.set(name, value); + } + + public Properties asProperties() { + Properties props = new Properties(); + + if (cfg != null) { + String sparkPrefix = "spark."; + for (Tuple2 tuple : cfg.getAll()) { + // spark. are special so save them without the prefix as well + // since its unlikely the other implementations will be aware of this convention + String key = tuple._1; + props.setProperty(key, tuple._2); + if (key.startsWith(sparkPrefix)) { + String simpleKey = key.substring(sparkPrefix.length()); + // double check to not override a property defined directly in the config + if (!props.containsKey(simpleKey)) { + props.setProperty(simpleKey, tuple._2); + } + } + } + } + + return props; + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java new file mode 100644 index 0000000..d20c2dd --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoadUtils.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.directload; + +import com.oceanbase.spark.cfg.ConnectionOptions; +import com.oceanbase.spark.cfg.OceanBaseUserInfo; +import com.oceanbase.spark.cfg.SparkSettings; + +/** The utils of {@link DirectLoader} */ +public class DirectLoadUtils { + + public static DirectLoader buildDirectLoaderFromSetting(SparkSettings settings) { + try { + OceanBaseUserInfo userInfo = OceanBaseUserInfo.parse(settings); + return new DirectLoaderBuilder() + .host(settings.getProperty(ConnectionOptions.HOST)) + .port(settings.getIntegerProperty(ConnectionOptions.RPC_PORT)) + .user(userInfo.getUser()) + .password(settings.getProperty(ConnectionOptions.PASSWORD)) + .tenant(userInfo.getTenant()) + .schema(settings.getProperty(ConnectionOptions.SCHEMA_NAME)) + .table(settings.getProperty(ConnectionOptions.TABLE_NAME)) + .executionId(settings.getProperty(ConnectionOptions.EXECUTION_ID)) + .duplicateKeyAction(settings.getProperty(ConnectionOptions.DUP_ACTION)) + .maxErrorCount(settings.getLongProperty(ConnectionOptions.MAX_ERROR_ROWS)) + .timeout(settings.getLongProperty(ConnectionOptions.TIMEOUT)) + .heartBeatTimeout(settings.getLongProperty(ConnectionOptions.HEARTBEAT_TIMEOUT)) + .heartBeatInterval( + settings.getLongProperty(ConnectionOptions.HEARTBEAT_INTERVAL)) + .directLoadMethod(settings.getProperty(ConnectionOptions.LOAD_METHOD)) + .parallel(settings.getIntegerProperty(ConnectionOptions.PARALLEL)) + .build(); + } catch (Exception e) { + throw new RuntimeException("Fail to build DirectLoader.", e); + } + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoader.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoader.java new file mode 100644 index 0000000..b34efcc --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoader.java @@ -0,0 +1,247 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.directload; + +import java.io.InputStream; +import java.io.Reader; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.util.Base64; +import java.util.List; +import java.util.Objects; + + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement; +import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutionId; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType; +import com.alipay.oceanbase.rpc.util.ObVString; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Wrapper of the direct-load api. */ +public class DirectLoader implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DirectLoader.class); + + private final DirectLoaderBuilder builder; + private final String schemaTableName; + private final ObDirectLoadStatement statement; + private final ObDirectLoadConnection connection; + + private String executionId; + + public DirectLoader( + DirectLoaderBuilder builder, + String schemaTableName, + ObDirectLoadStatement statement, + ObDirectLoadConnection connection) { + this.builder = builder; + this.schemaTableName = schemaTableName; + this.statement = statement; + this.connection = connection; + } + + public DirectLoader( + DirectLoaderBuilder builder, + String schemaTableName, + ObDirectLoadStatement statement, + ObDirectLoadConnection connection, + String executionId) { + this(builder, schemaTableName, statement, connection); + this.executionId = executionId; + } + + public String begin() throws SQLException { + try { + LOG.info("{} direct load beginning ......", schemaTableName); + if (Objects.isNull(executionId)) { + statement.begin(); + ObDirectLoadStatementExecutionId statementExecutionId = statement.getExecutionId(); + byte[] executionIdBytes = statementExecutionId.encode(); + this.executionId = Base64.getEncoder().encodeToString(executionIdBytes); + LOG.info("{} direct load execution id : {}", schemaTableName, this.executionId); + builder.executionId(executionId); + } else { + ObDirectLoadStatementExecutionId statementExecutionId = + new ObDirectLoadStatementExecutionId(); + byte[] executionIdBytes = Base64.getDecoder().decode(executionId); + statementExecutionId.decode(executionIdBytes); + statement.resume(statementExecutionId); + LOG.info( + "{} direct load resume from execution id : {} success", + schemaTableName, + this.executionId); + } + return executionId; + } catch (Exception ex) { + throw new SQLException(ex); + } + } + + public void write(ObDirectLoadBucket bucket) throws SQLException { + try { + this.statement.write(bucket); + } catch (Exception e) { + throw new SQLException( + String.format( + "Failed to write to table: %s, execution id: %s", + schemaTableName, executionId), + e); + } + } + + public void commit() throws SQLException { + try { + statement.commit(); + LOG.info( + "Success to commit, table: {}, execution id: {}", schemaTableName, executionId); + } catch (Exception e) { + throw new SQLException( + String.format( + "Failed to commit, table: %s, execution id: %s", + schemaTableName, executionId), + e); + } + } + + public void close() { + if (Objects.nonNull(statement)) { + this.statement.close(); + } + if (Objects.nonNull(connection)) { + this.connection.close(); + } + } + + public DirectLoaderBuilder getBuilder() { + return builder; + } + + public String getSchemaTableName() { + return schemaTableName; + } + + public ObDirectLoadStatement getStatement() { + return statement; + } + + public static ObObj[] createObObjArray(List values) { + if (values == null) { + return null; + } + ObObj[] array = new ObObj[values.size()]; + for (int i = 0; i < values.size(); i++) { + array[i] = createObObj(values.get(i)); + } + return array; + } + + public static ObObj createObObj(Object value) { + try { + // Only used for strongly typed declared variables + Object convertedValue = value == null ? null : convertValue(value); + return new ObObj(ObObjType.defaultObjMeta(convertedValue), convertedValue); + } catch (Exception ex) { + throw new IllegalArgumentException(ex); + } + } + + /** + * Some values with data type is unsupported by ObObjType#valueOfType. We should convert the + * input value to supported value data type. + */ + public static Object convertValue(Object pairValue) throws Exception { + if (pairValue == null) { + return null; + } + Object value = pairValue; + + if (value instanceof BigDecimal) { + return value.toString(); + } else if (value instanceof BigInteger) { + return value.toString(); + } else if (value instanceof Instant) { + return Timestamp.from(((Instant) value)); + } else if (value instanceof LocalDate) { + LocalDateTime ldt = ((LocalDate) value).atTime(0, 0); + return Timestamp.valueOf(ldt); + } else if (value instanceof LocalTime) { + // Warn: java.sql.Time.valueOf() is deprecated. + Time t = Time.valueOf((LocalTime) value); + return new Timestamp(t.getTime()); + } else if (value instanceof LocalDateTime) { + return Timestamp.valueOf(((LocalDateTime) value)); + } else if (value instanceof OffsetDateTime) { + return Timestamp.from(((OffsetDateTime) value).toInstant()); + } else if (value instanceof Time) { + return new Timestamp(((Time) value).getTime()); + } else if (value instanceof ZonedDateTime) { + // Note: Be care of time zone!!! + return Timestamp.from(((ZonedDateTime) value).toInstant()); + } else if (value instanceof OffsetTime) { + LocalTime lt = ((OffsetTime) value).toLocalTime(); + // Warn: java.sql.Time.valueOf() is deprecated. + return new Timestamp(Time.valueOf(lt).getTime()); + } else if (value instanceof InputStream) { + try (InputStream is = ((InputStream) value)) { + // Note: Be care of character set!!! + return new ObVString(IOUtils.toString(is, Charset.defaultCharset())); + } + } else if (value instanceof Blob) { + Blob b = (Blob) value; + try (InputStream is = b.getBinaryStream()) { + // Note: Be care of character set!!! + if (is == null) { + return null; + } + return new ObVString(IOUtils.toString(is, Charset.defaultCharset())); + } finally { + b.free(); + } + } else if (value instanceof Reader) { + try (Reader r = ((Reader) value)) { + return IOUtils.toString(r); + } + } else if (value instanceof Clob) { + Clob c = (Clob) value; + try (Reader r = c.getCharacterStream()) { + return r == null ? null : IOUtils.toString(r); + } finally { + c.free(); + } + } else { + return value; + } + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java new file mode 100644 index 0000000..5e004e1 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/java/com/oceanbase/spark/directload/DirectLoaderBuilder.java @@ -0,0 +1,206 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.directload; + +import java.io.Serializable; +import java.util.Objects; + + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadManager; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement; +import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType; +import org.apache.commons.lang3.StringUtils; + +/** The builder for {@link DirectLoader}. */ +public class DirectLoaderBuilder implements Serializable { + + private String host; + private int port; + + private String user; + private String tenant; + private String password; + + private String schema; + private String table; + + /** Server-side parallelism. */ + private int parallel = 8; + + private long maxErrorCount = 0L; + + private ObLoadDupActionType duplicateKeyAction = ObLoadDupActionType.REPLACE; + + /** The overall timeout of the direct load task */ + private long timeout = 1000L * 1000 * 1000; + + private long heartBeatTimeout = 60 * 1000; + + private long heartBeatInterval = 10 * 1000; + + /** Direct load mode: full, inc, inc_replace. */ + private String directLoadMethod = "full"; + + private String executionId; + + public DirectLoaderBuilder host(String host) { + this.host = host; + return this; + } + + public DirectLoaderBuilder port(int port) { + this.port = port; + return this; + } + + public DirectLoaderBuilder user(String user) { + this.user = user; + return this; + } + + public DirectLoaderBuilder tenant(String tenant) { + this.tenant = tenant; + return this; + } + + public DirectLoaderBuilder password(String password) { + this.password = password; + return this; + } + + public DirectLoaderBuilder schema(String schema) { + this.schema = schema; + return this; + } + + public DirectLoaderBuilder table(String table) { + this.table = table; + return this; + } + + public DirectLoaderBuilder parallel(Integer parallel) { + if (Objects.isNull(parallel)) { + return this; + } + + this.parallel = parallel; + return this; + } + + public DirectLoaderBuilder maxErrorCount(Long maxErrorCount) { + if (Objects.isNull(maxErrorCount)) { + return this; + } + this.maxErrorCount = maxErrorCount; + return this; + } + + public DirectLoaderBuilder duplicateKeyAction(String duplicateKeyAction) { + if (Objects.isNull(duplicateKeyAction)) { + return this; + } + this.duplicateKeyAction = ObLoadDupActionType.valueOf(duplicateKeyAction); + return this; + } + + public DirectLoaderBuilder directLoadMethod(String directLoadMethod) { + if (Objects.isNull(directLoadMethod)) { + return this; + } + this.directLoadMethod = directLoadMethod; + return this; + } + + public DirectLoaderBuilder timeout(Long timeout) { + if (Objects.isNull(timeout)) { + return this; + } + this.timeout = timeout; + return this; + } + + public DirectLoaderBuilder heartBeatTimeout(Long heartBeatTimeout) { + if (Objects.isNull(heartBeatTimeout)) { + return this; + } + + this.heartBeatTimeout = heartBeatTimeout; + return this; + } + + public DirectLoaderBuilder heartBeatInterval(Long heartBeatInterval) { + if (Objects.isNull(heartBeatInterval)) { + return this; + } + + this.heartBeatInterval = heartBeatInterval; + return this; + } + + public DirectLoaderBuilder executionId(String executionId) { + this.executionId = executionId; + return this; + } + + public DirectLoader build() { + try { + ObDirectLoadConnection obDirectLoadConnection = buildConnection(parallel); + ObDirectLoadStatement obDirectLoadStatement = buildStatement(obDirectLoadConnection); + if (StringUtils.isNotBlank(executionId)) { + return new DirectLoader( + this, + String.format("%s.%s", schema, table), + obDirectLoadStatement, + obDirectLoadConnection, + executionId); + } else { + return new DirectLoader( + this, + String.format("%s.%s", schema, table), + obDirectLoadStatement, + obDirectLoadConnection); + } + } catch (ObDirectLoadException e) { + throw new RuntimeException("Fail to obtain direct-load connection.", e); + } + } + + private ObDirectLoadConnection buildConnection(int writeThreadNum) + throws ObDirectLoadException { + return ObDirectLoadManager.getConnectionBuilder() + .setServerInfo(host, port) + .setLoginInfo(tenant, user, password, schema) + .setHeartBeatInfo(heartBeatTimeout, heartBeatInterval) + .enableParallelWrite(writeThreadNum) + .build(); + } + + private ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection) + throws ObDirectLoadException { + return connection + .getStatementBuilder() + .setTableName(table) + .setDupAction(duplicateKeyAction) + .setParallel(parallel) + .setQueryTimeout(timeout) + .setMaxErrorRowCount(maxErrorCount) + .setLoadMethod(directLoadMethod) + .build(); + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..eb40e6f --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,14 @@ +# Copyright 2024 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.spark.sql.OceanBaseSparkDataSource diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala new file mode 100644 index 0000000..5c0ce3e --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.jdbc + +import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} + +import org.apache.spark.sql.jdbc.JdbcDialects + +import java.sql.{Connection, DriverManager} + +object OBJdbcUtils { + val OB_MYSQL_URL = s"jdbc:mysql://%s:%d/%s" + private val OB_ORACLE_URL = s"jdbc:oceanbase://%s:%d/%s" + + def getConnection(sparkSettings: SparkSettings): Connection = { + val connection = DriverManager.getConnection( + OB_MYSQL_URL.format( + sparkSettings.getProperty(ConnectionOptions.HOST), + sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), + sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) + ), + s"${sparkSettings.getProperty(ConnectionOptions.USERNAME)}", + sparkSettings.getProperty(ConnectionOptions.PASSWORD) + ) + connection + } + + def getJdbcUrl(sparkSettings: SparkSettings): String = { + var url: String = null + if ("MYSQL".equalsIgnoreCase(getCompatibleMode(sparkSettings))) { + url = OBJdbcUtils.OB_MYSQL_URL.format( + sparkSettings.getProperty(ConnectionOptions.HOST), + sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), + sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) + ) + } else { + JdbcDialects.registerDialect(OceanBaseOracleDialect) + url = OBJdbcUtils.OB_ORACLE_URL.format( + sparkSettings.getProperty(ConnectionOptions.HOST), + sparkSettings.getIntegerProperty(ConnectionOptions.SQL_PORT), + sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME) + ) + } + url + } + + def getCompatibleMode(sparkSettings: SparkSettings): String = { + val conn = getConnection(sparkSettings) + val statement = conn.createStatement + try { + val rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'") + if (rs.next) rs.getString("VALUE") else null + } finally { + statement.close() + conn.close() + } + } + + def truncateTable(sparkSettings: SparkSettings): Unit = { + val conn = getConnection(sparkSettings) + val statement = conn.createStatement + try { + statement.executeUpdate( + s"truncate table ${sparkSettings.getProperty(ConnectionOptions.SCHEMA_NAME)}.${sparkSettings + .getProperty(ConnectionOptions.TABLE_NAME)}") + } finally { + statement.close() + conn.close() + } + } + +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala new file mode 100644 index 0000000..dbdd1d3 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OceanBaseOracleDialect.scala @@ -0,0 +1,153 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.jdbc + +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.connector.expressions.Expression +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType} +import org.apache.spark.sql.types._ + +import java.sql.{Date, Timestamp, Types} +import java.util.{Locale, TimeZone} + +import scala.util.control.NonFatal + +private case object OceanBaseOracleDialect extends JdbcDialect { + private[jdbc] val BINARY_FLOAT = 100 + private[jdbc] val BINARY_DOUBLE = 101 + private[jdbc] val TIMESTAMPTZ = -101 + + override def canHandle(url: String): Boolean = + url.toLowerCase(Locale.ROOT).startsWith("jdbc:oceanbase") + + private val distinctUnsupportedAggregateFunctions = + Set( + "VAR_POP", + "VAR_SAMP", + "STDDEV_POP", + "STDDEV_SAMP", + "COVAR_POP", + "COVAR_SAMP", + "CORR", + "REGR_INTERCEPT", + "REGR_R2", + "REGR_SLOPE", + "REGR_SXY") + + private val supportedAggregateFunctions = + Set("MAX", "MIN", "SUM", "COUNT", "AVG") ++ distinctUnsupportedAggregateFunctions + private val supportedFunctions = supportedAggregateFunctions + + override def isSupportedFunction(funcName: String): Boolean = + supportedFunctions.contains(funcName) + + private def supportTimeZoneTypes: Boolean = { + val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone) + // TODO: support timezone types when users are not using the JVM timezone, which + // is the default value of SESSION_LOCAL_TIMEZONE + timeZone == TimeZone.getDefault + } + + override def getCatalystType( + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder): Option[DataType] = { + sqlType match { + case Types.NUMERIC => + val scale = if (null != md) md.build().getLong("scale") else 0L + size match { + // Handle NUMBER fields that have no precision/scale in special way + // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale + // For more details, please see + // https://github.com/apache/spark/pull/8780#issuecomment-145598968 + // and + // https://github.com/apache/spark/pull/8780#issuecomment-144541760 + case 0 => Option(DecimalType(DecimalType.MAX_PRECISION, 10)) + // Handle FLOAT fields in a special way because JDBC ResultSetMetaData converts + // this to NUMERIC with -127 scale + // Not sure if there is a more robust way to identify the field as a float (or other + // numeric types that do not specify a scale. + case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10)) + case _ => None + } + case TIMESTAMPTZ if supportTimeZoneTypes => + Some(TimestampType) // Value for Timestamp with Time Zone in Oracle mode + case BINARY_FLOAT => Some(FloatType) // Value for OracleModeTypes.BINARY_FLOAT + case BINARY_DOUBLE => Some(DoubleType) // Value for OracleModeTypes.BINARY_DOUBLE + case _ => None + } + } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.BOOLEAN)) + case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.INTEGER)) + case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.BIGINT)) + case FloatType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.FLOAT)) + case DoubleType => Some(JdbcType("NUMBER(19, 4)", java.sql.Types.DOUBLE)) + case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT)) + case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT)) + case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) + case _ => None + } + + override def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case timestampValue: Timestamp => "{ts '" + timestampValue + "'}" + case dateValue: Date => "{d '" + dateValue + "'}" + case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") + case _ => value + } + + override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + + /** + * The SQL query used to truncate a table. + * @param table + * The table to truncate + * @param cascade + * Whether or not to cascade the truncation. Default value is the value of + * isCascadingTruncateTable() + * @return + * The SQL query to use for truncating a table + */ + override def getTruncateQuery( + table: String, + cascade: Option[Boolean] = isCascadingTruncateTable()): String = { + cascade match { + case Some(true) => s"TRUNCATE TABLE $table CASCADE" + case _ => s"TRUNCATE TABLE $table" + } + } + + override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = + s"ALTER TABLE $tableName ADD ${quoteIdentifier(columnName)} $dataType" + + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName MODIFY ${quoteIdentifier(columnName)} $newDataType" + + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "NULL" else "NOT NULL" + s"ALTER TABLE $tableName MODIFY ${quoteIdentifier(columnName)} $nullable" + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/listener/DirectLoadListener.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/listener/DirectLoadListener.scala new file mode 100644 index 0000000..6cfb5c9 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/listener/DirectLoadListener.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.listener + +import com.oceanbase.spark.directload.{DirectLoader, DirectLoadUtils} + +import org.apache.commons.lang.StringUtils +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.catalyst.expressions.Log + +class DirectLoadListener(directLoader: DirectLoader) extends SparkListener { + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = {} + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {} + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {} + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {} +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala new file mode 100644 index 0000000..d8cdf3e --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseRelation.scala @@ -0,0 +1,79 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.sql + +import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.jdbc.OBJdbcUtils + +import org.apache.spark.sql +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.jdbc.JdbcDialects +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +private[sql] class OceanBaseRelation( + val sqlContext: SQLContext = null, + parameters: Map[String, String]) + extends BaseRelation + with InsertableRelation { + + private lazy val cfg = { + val conf = new SparkSettings(sqlContext.sparkContext.getConf) + conf.merge(parameters.asJava) + conf + } + + private lazy val lazySchema = { + val conn = OBJdbcUtils.getConnection(cfg) + try { + val statement = conn.prepareStatement( + s"select * from ${cfg.getProperty(ConnectionOptions.TABLE_NAME)} where 1 = 0") + try { + val rs = statement.executeQuery() + try { + JdbcUtils.getSchema(rs, dialect) + } finally { + rs.close() + } + } finally { + statement.close() + } + } finally { + conn.close() + } + } + + private lazy val dialect = JdbcDialects.get("") + + override def schema: StructType = lazySchema + + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + // Handle Filters are not currently supported. + filters + } + + // Insert Table. + override def insert(data: DataFrame, overwrite: Boolean): Unit = { + data.write + .format(OceanBaseSparkSource.SHORT_NAME) + .options(parameters) + .mode(if (overwrite) sql.SaveMode.Overwrite else sql.SaveMode.Append) + .save() + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala new file mode 100644 index 0000000..02b432a --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/sql/OceanBaseSparkSource.scala @@ -0,0 +1,89 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.sql + +import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.directload.DirectLoadUtils +import com.oceanbase.spark.jdbc.OBJdbcUtils +import com.oceanbase.spark.listener.DirectLoadListener +import com.oceanbase.spark.writer.DirectLoadWriter + +import OceanBaseSparkSource.{createDirectLoadRelation, SHORT_NAME} +import org.apache.spark.internal.Logging +import org.apache.spark.sql +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter, RelationProvider, SchemaRelationProvider, StreamSinkProvider} + +import scala.collection.JavaConverters._ + +@Deprecated +private[sql] class OceanBaseSparkSource + extends DataSourceRegister + with RelationProvider + with CreatableRelationProvider + with Logging { + + override def shortName(): String = SHORT_NAME + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + new OceanBaseRelation(sqlContext, parameters) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + dataFrame: DataFrame): BaseRelation = { + val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) + sparkSettings.merge(parameters.asJava) + + createDirectLoadRelation(sqlContext, mode, dataFrame, sparkSettings) + + createRelation(sqlContext, parameters) + } + +} + +object OceanBaseSparkSource { + val SHORT_NAME: String = "oceanbase" + + def createDirectLoadRelation( + sqlContext: SQLContext, + mode: SaveMode, + dataFrame: DataFrame, + sparkSettings: SparkSettings): Unit = { + mode match { + case sql.SaveMode.Append => // do nothing + case sql.SaveMode.Overwrite => + OBJdbcUtils.truncateTable(sparkSettings) + case _ => + throw new NotImplementedError(s"${mode.name()} mode is not currently supported.") + } + // Init direct-loader. + val directLoader = DirectLoadUtils.buildDirectLoaderFromSetting(sparkSettings) + val executionId = directLoader.begin() + sparkSettings.setProperty(ConnectionOptions.EXECUTION_ID, executionId) + + sqlContext.sparkContext.addSparkListener(new DirectLoadListener(directLoader)) + val writer = new DirectLoadWriter(sparkSettings) + writer.write(dataFrame) + + directLoader.commit() + directLoader.close() + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala new file mode 100644 index 0000000..bd9a61c --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark.writer + +import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.directload.{DirectLoader, DirectLoadUtils} + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj +import org.apache.commons.lang.StringUtils +import org.apache.spark.sql.{DataFrame, Row} + +import java.util.Objects + +import scala.collection.mutable.ArrayBuffer + +class DirectLoadWriter(settings: SparkSettings) extends Serializable { + + private val bufferSize = + settings.getIntegerProperty(ConnectionOptions.BATCH_SIZE, ConnectionOptions.BATCH_SIZE_DEFAULT) + private val sinkTaskPartitionSize: Integer = + settings.getIntegerProperty(ConnectionOptions.SINK_TASK_PARTITION_SIZE) + private val sinkTaskUseRepartition: Boolean = settings + .getProperty( + ConnectionOptions.SINK_TASK_USE_REPARTITION, + ConnectionOptions.SINK_TASK_USE_REPARTITION_DEFAULT.toString) + .toBoolean + + def write(dataFrame: DataFrame): Unit = { + assert(StringUtils.isNotBlank(settings.getProperty(ConnectionOptions.EXECUTION_ID))) + + var resultDataFrame = dataFrame + if (Objects.nonNull(sinkTaskPartitionSize)) { + resultDataFrame = + if (sinkTaskUseRepartition) dataFrame.repartition(sinkTaskPartitionSize) + else dataFrame.coalesce(sinkTaskPartitionSize) + } + + resultDataFrame.foreachPartition( + (partition: Iterator[Row]) => { + val directLoader: DirectLoader = DirectLoadUtils.buildDirectLoaderFromSetting(settings) + directLoader.begin() + val buffer = ArrayBuffer[Row]() + partition.foreach( + row => { + buffer += row + if (buffer.length >= bufferSize) { + flush(buffer, directLoader) + } + }) + flush(buffer, directLoader) + }) + } + + private def flush(buffer: ArrayBuffer[Row], directLoader: DirectLoader): Unit = { + val bucket = new ObDirectLoadBucket() + buffer.foreach( + row => { + val array = new Array[ObObj](row.size) + for (i <- 0 until (row.size)) { + array(i) = DirectLoader.createObObj(row.get(i)) + } + bucket.addRow(array) + }) + + directLoader.write(bucket) + buffer.clear() + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala new file mode 100644 index 0000000..a0f9d3d --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseJDBCRelation.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import com.oceanbase.spark.cfg.ConnectionOptions + +import org.apache.spark.{sql, Partition} +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation} +import org.apache.spark.sql.types.StructType + +class OceanBaseJDBCRelation( + override val schema: StructType, + override val parts: Array[Partition], + override val jdbcOptions: JDBCOptions)(@transient override val sparkSession: SparkSession) + extends JDBCRelation(schema, parts, jdbcOptions)(sparkSession) { + + override def insert(data: DataFrame, overwrite: Boolean): Unit = { + if ( + jdbcOptions.parameters + .getOrElse( + ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE, + s"${ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE_DEFAULT}") + .toBoolean + ) { + data.write + .format(OceanBaseSparkDataSource.SHORT_NAME) + .options(jdbcOptions.parameters) + .mode(if (overwrite) sql.SaveMode.Overwrite else sql.SaveMode.Append) + .save() + } else { + super.insert(data, overwrite) + } + + } +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala new file mode 100644 index 0000000..bfaa7cc --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import com.oceanbase.spark.cfg.{ConnectionOptions, SparkSettings} +import com.oceanbase.spark.jdbc.OBJdbcUtils.{getCompatibleMode, getJdbcUrl, OB_MYSQL_URL} +import com.oceanbase.spark.sql.OceanBaseSparkSource + +import OceanBaseSparkDataSource.SHORT_NAME +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation, JdbcRelationProvider} +import org.apache.spark.sql.sources._ + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +class OceanBaseSparkDataSource extends JdbcRelationProvider { + + override def shortName(): String = SHORT_NAME + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) + sparkSettings.merge(parameters.asJava) + val jdbcOptions = buildJDBCOptions(parameters, sparkSettings)._1 + val resolver = sqlContext.conf.resolver + val timeZoneId = sqlContext.conf.sessionLocalTimeZone + val schema = JDBCRelation.getSchema(resolver, jdbcOptions) + val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions) + new OceanBaseJDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + dataFrame: DataFrame): BaseRelation = { + val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) + sparkSettings.merge(parameters.asJava) + val enableDirectLoadWrite = sparkSettings.getBooleanProperty( + ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE, + ConnectionOptions.ENABLE_DIRECT_LOAD_WRITE_DEFAULT) + if (!enableDirectLoadWrite) { + val param = buildJDBCOptions(parameters, sparkSettings)._2 + super.createRelation(sqlContext, mode, param, dataFrame) + } else { + OceanBaseSparkSource.createDirectLoadRelation(sqlContext, mode, dataFrame, sparkSettings) + createRelation(sqlContext, parameters) + } + } + + private def buildJDBCOptions( + parameters: Map[String, String], + sparkSettings: SparkSettings): (JDBCOptions, Map[String, String]) = { + val url: String = getJdbcUrl(sparkSettings) + val table: String = parameters(ConnectionOptions.TABLE_NAME) + val paraMap = parameters ++ Map( + "url" -> url, + "dbtable" -> table, + "user" -> parameters(ConnectionOptions.USERNAME), + "isolationLevel" -> "READ_COMMITTED" + ) + (new JDBCOptions(url, table, paraMap), paraMap) + } +} + +object OceanBaseSparkDataSource { + val SHORT_NAME: String = "oceanbase" +} diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/log4j2-test.properties b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..072c7c9 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/log4j2-test.properties @@ -0,0 +1,23 @@ +# Copyright 2024 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/sql/mysql/gis_types.sql b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/sql/mysql/gis_types.sql new file mode 100644 index 0000000..f22be67 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/sql/mysql/gis_types.sql @@ -0,0 +1,25 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE gis_types +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + point_c POINT, + geometry_c GEOMETRY, + linestring_c LINESTRING, + polygon_c POLYGON, + multipoint_c MULTIPOINT, + multiline_c MULTILINESTRING, + multipolygon_c MULTIPOLYGON, + geometrycollection_c GEOMETRYCOLLECTION +) diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/sql/mysql/products.sql b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/sql/mysql/products.sql new file mode 100644 index 0000000..e5b318c --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/resources/sql/mysql/products.sql @@ -0,0 +1,20 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight DECIMAL(20, 10) +); diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala new file mode 100644 index 0000000..cd85866 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala @@ -0,0 +1,344 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark + +import com.oceanbase.spark.OceanBaseTestBase.assertEqualsInAnyOrder + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.junit.jupiter.api.{AfterAll, BeforeAll, Disabled, Test} + +import java.util + +class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { + + @Test + def testSqlJDBCWrite(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "host"= "$getHost", + | "sql-port" = "$getPort", + | "rpc-port" = "$getRpcPort", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + session.sql( + """ + |INSERT INTO test_sink VALUES + |(101, 'scooter', 'Small 2-wheel scooter', 3.14), + |(102, 'car battery', '12V car battery', 8.1), + |(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8), + |(104, 'hammer', '12oz carpenter\'s hammer', 0.75), + |(105, 'hammer', '14oz carpenter\'s hammer', 0.875), + |(106, 'hammer', '16oz carpenter\'s hammer', 1.0), + |(107, 'rocks', 'box of assorted rocks', 5.3), + |(108, 'jacket', 'water resistent black wind breaker', 0.1), + |(109, 'spare tire', '24 inch spare tire', 22.2); + |""".stripMargin) + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000" + ) + session.stop() + + waitingAndAssertTableCount("products", expected.size) + + val actual: util.List[String] = queryTable("products") + + assertEqualsInAnyOrder(expected, actual) + + dropTables("products") + } + + @Test + def testSqlDirectLoadWrite(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "host"= "$getHost", + | "sql-port" = "$getPort", + | "rpc-port" = "$getRpcPort", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "username"="$getUsername", + | "password"="$getPassword", + | "direct-load.enabled"=true, + | "direct-load.rpc-port"=$getRpcPort + |); + |""".stripMargin) + + session.sql( + """ + |INSERT INTO test_sink VALUES + |(101, 'scooter', 'Small 2-wheel scooter', 3.14), + |(102, 'car battery', '12V car battery', 8.1), + |(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8), + |(104, 'hammer', '12oz carpenter\'s hammer', 0.75), + |(105, 'hammer', '14oz carpenter\'s hammer', 0.875), + |(106, 'hammer', '16oz carpenter\'s hammer', 1.0), + |(107, 'rocks', 'box of assorted rocks', 5.3), + |(108, 'jacket', 'water resistent black wind breaker', 0.1), + |(109, 'spare tire', '24 inch spare tire', 22.2); + |""".stripMargin) + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000" + ) + session.stop() + + waitingAndAssertTableCount("products", expected.size) + + val actual: util.List[String] = queryTable("products") + + assertEqualsInAnyOrder(expected, actual) + + dropTables("products") + } + + @Test + def testDataFrameDirectLoadWrite(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + val df = session + .createDataFrame( + Seq( + (101, "scooter", "Small 2-wheel scooter", 3.14), + (102, "car battery", "12V car battery", 8.1), + ( + 103, + "12-pack drill bits", + "12-pack of drill bits with sizes ranging from #40 to #3", + 0.8), + (104, "hammer", "12oz carpenter's hammer", 0.75), + (105, "hammer", "14oz carpenter's hammer", 0.875), + (106, "hammer", "16oz carpenter's hammer", 1.0), + (107, "rocks", "box of assorted rocks", 5.3), + (108, "jacket", "water resistent black wind breaker", 0.1), + (109, "spare tire", "24 inch spare tire", 22.2) + )) + .toDF("id", "name", "description", "weight") + + df.write + .format("oceanbase") + .mode(saveMode = SaveMode.Append) + .option("host", getHost) + .option("sql-port", getPort) + .option("username", getUsername) + .option("password", getPassword) + .option("table-name", "products") + .option("schema-name", getSchemaName) + .option("direct-load.enabled", value = true) + .option("direct-load.rpc-port", value = getRpcPort) + .save() + session.stop() + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000" + ) + + waitingAndAssertTableCount("products", expected.size) + + val actual: util.List[String] = queryTable("products") + + assertEqualsInAnyOrder(expected, actual) + + dropTables("products") + } + + @Test + def testSqlRead(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "host"= "$getHost", + | "sql-port" = "$getPort", + | "rpc-port" = "$getRpcPort", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "direct-load.enabled" ="false", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + session.sql( + """ + |INSERT INTO test_sink VALUES + |(101, 'scooter', 'Small 2-wheel scooter', 3.14), + |(102, 'car battery', '12V car battery', 8.1), + |(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8), + |(104, 'hammer', '12oz carpenter\'s hammer', 0.75), + |(105, 'hammer', '14oz carpenter\'s hammer', 0.875), + |(106, 'hammer', '16oz carpenter\'s hammer', 1.0), + |(107, 'rocks', 'box of assorted rocks', 5.3), + |(108, 'jacket', 'water resistent black wind breaker', 0.1), + |(109, 'spare tire', '24 inch spare tire', 22.2); + |""".stripMargin) + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000" + ) + import scala.collection.JavaConverters._ + val actual = session + .sql("select * from test_sink") + .collect() + .map( + _.toString().drop(1).dropRight(1) + ) + .toList + .asJava + assertEqualsInAnyOrder(expected, actual) + + session.stop() + dropTables("products") + } + + @Test + def testDataFrameRead(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + // Sql write + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "host"= "$getHost", + | "sql-port" = "$getPort", + | "rpc-port" = "$getRpcPort", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "username"="$getUsername", + | "password"="$getPassword" + |); + |""".stripMargin) + + session.sql( + """ + |INSERT INTO test_sink VALUES + |(101, 'scooter', 'Small 2-wheel scooter', 3.14), + |(102, 'car battery', '12V car battery', 8.1), + |(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8), + |(104, 'hammer', '12oz carpenter\'s hammer', 0.75), + |(105, 'hammer', '14oz carpenter\'s hammer', 0.875), + |(106, 'hammer', '16oz carpenter\'s hammer', 1.0), + |(107, 'rocks', 'box of assorted rocks', 5.3), + |(108, 'jacket', 'water resistent black wind breaker', 0.1), + |(109, 'spare tire', '24 inch spare tire', 22.2); + |""".stripMargin) + + // DataFrame read + val dataFrame = session.read + .format("oceanbase") + .option("host", getHost) + .option("sql-port", getPort) + .option("username", getUsername) + .option("password", getPassword) + .option("table-name", "products") + .option("schema-name", getSchemaName) + .load() + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000" + ) + + import scala.collection.JavaConverters._ + val actual = dataFrame + .collect() + .map( + _.toString().drop(1).dropRight(1) + ) + .toList + .asJava + assertEqualsInAnyOrder(expected, actual) + + session.stop() + dropTables("products") + } +} + +object OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { + @BeforeAll + def setup(): Unit = { + OceanBaseMySQLTestBase.CONTAINER.start() + } + + @AfterAll + def tearDown(): Unit = { + OceanBaseMySQLTestBase.CONTAINER.stop() + } +}