From 8894ff3937f35aba939703f1a8521cb8000185fd Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Wed, 20 Nov 2024 14:14:03 +0800 Subject: [PATCH] Enhancement: Add e2e test module. (#5) --- .github/workflows/build.yml | 38 --- .github/workflows/push_pr.yml | 40 ++++ .github/workflows/test.yml | 42 ++++ .github/workflows/test_scala2_11.yml | 42 ++++ pom.xml | 19 ++ .../spark/OceanBaseMySQLTestBase.java | 20 +- spark-connector-oceanbase-e2e-tests/pom.xml | 173 ++++++++++++++ .../oceanbase/spark/OceanBaseE2eITCase.java | 202 ++++++++++++++++ .../utils/SparkContainerTestEnvironment.java | 220 ++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 24 ++ .../src/test/resources/sql/mysql/products.sql | 20 ++ .../spark-connector-oceanbase-3.1/pom.xml | 21 +- .../log/log4j/log-conf.xml | 88 +++++++ .../spark-connector-oceanbase-3.2/pom.xml | 21 +- .../log/log4j/log-conf.xml | 88 +++++++ 15 files changed, 1005 insertions(+), 53 deletions(-) delete mode 100644 .github/workflows/build.yml create mode 100644 .github/workflows/push_pr.yml create mode 100644 .github/workflows/test.yml create mode 100644 .github/workflows/test_scala2_11.yml create mode 100644 spark-connector-oceanbase-e2e-tests/pom.xml create mode 100644 spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java create mode 100644 spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java create mode 100644 spark-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties create mode 100644 spark-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql/products.sql create mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml create mode 100644 spark-connector-oceanbase/spark-connector-oceanbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml deleted file mode 100644 index 1615505..0000000 --- a/.github/workflows/build.yml +++ /dev/null @@ -1,38 +0,0 @@ -name: build - -on: - pull_request: - paths-ignore: - - 'docs/**' - - '**.md' - - '.*' - push: - branches: - - main - - 'release-*' - -env: - JDK_VERSION: 8 - -concurrency: - group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} - cancel-in-progress: true - -jobs: - build: - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up JDK ${{ env.JDK_VERSION }} - uses: actions/setup-java@v4 - with: - java-version: ${{ env.JDK_VERSION }} - distribution: 'adopt' - cache: 'maven' - - - name: Build and Test - timeout-minutes: 60 - run: mvn clean package diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml new file mode 100644 index 0000000..9104431 --- /dev/null +++ b/.github/workflows/push_pr.yml @@ -0,0 +1,40 @@ +name: CI + +on: + pull_request: + paths-ignore: + - "docs/**" + - "**.md" + - ".*" + push: + branches: + - main + - 'release-*' + +concurrency: + group: ci-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + spark-connector-oceanbase: + uses: ./.github/workflows/test.yml + with: + module: spark-connector-oceanbase/spark-connector-oceanbase-base + + e2e-tests: + strategy: + matrix: + spark_version: ["3.1","3.2","3.3","3.4"] + uses: ./.github/workflows/test.yml + with: + module: spark-connector-oceanbase-e2e-tests + maven_opts: "-D spark_version=${{ matrix.spark_version }}" + + e2e-tests-scala-2_11: + strategy: + matrix: + spark_version: ["2.4.6"] + uses: ./.github/workflows/test_scala2_11.yml + with: + module: spark-connector-oceanbase-e2e-tests + maven_opts: "-D spark_version=${{ matrix.spark_version }}" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..dec4409 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,42 @@ +name: Test Module + +on: + workflow_call: + inputs: + module: + required: true + type: string + maven_opts: + required: false + type: string + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - name: Free disk space on Ubuntu runner + uses: kfir4444/free-disk-space@main + with: + tool-cache: false + android: true + dotnet: true + haskell: true + large-packages: true + swap-storage: true + + - name: Check out repository code + uses: actions/checkout@v4 + + - name: Set up Java + uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'zulu' + cache: 'maven' + + - name: Maven build + run: mvn install -DskipTests=true + + - name: Maven test + run: mvn verify -pl ${{ inputs.module }} -am ${{ inputs.maven_opts }} diff --git a/.github/workflows/test_scala2_11.yml b/.github/workflows/test_scala2_11.yml new file mode 100644 index 0000000..e5485d9 --- /dev/null +++ b/.github/workflows/test_scala2_11.yml @@ -0,0 +1,42 @@ +name: Test Module + +on: + workflow_call: + inputs: + module: + required: true + type: string + maven_opts: + required: false + type: string + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - name: Free disk space on Ubuntu runner + uses: kfir4444/free-disk-space@main + with: + tool-cache: false + android: true + dotnet: true + haskell: true + large-packages: true + swap-storage: true + + - name: Check out repository code + uses: actions/checkout@v4 + + - name: Set up Java + uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'zulu' + cache: 'maven' + + - name: Maven build + run: mvn clean install -D scala.version=2.11.12 -D scala.binary.version=2.11 -D skipTests=true + + - name: Maven test + run: mvn verify -pl ${{ inputs.module }} -am ${{ inputs.maven_opts }} -D scala.binary.version=2.11 diff --git a/pom.xml b/pom.xml index a1972be..06469af 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ under the License. spark-connector-oceanbase spark-connector-oceanbase-common + spark-connector-oceanbase-e2e-tests @@ -67,6 +68,24 @@ under the License. pom import + + + mysql + mysql-connector-java + 8.0.28 + + + com.google.protobuf + protobuf-java + + + + + + com.oceanbase + oceanbase-client + 2.4.12 + diff --git a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java index 53bc92c..d924be0 100644 --- a/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java +++ b/spark-connector-oceanbase-common/src/test/java/com/oceanbase/spark/OceanBaseMySQLTestBase.java @@ -47,7 +47,7 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase { private static final String SYS_PASSWORD = "123456"; private static final String TEST_PASSWORD = "654321"; - private static final Network NETWORK = Network.newNetwork(); + public static final Network NETWORK = Network.newNetwork(); @SuppressWarnings("resource") public static final GenericContainer CONFIG_SERVER = @@ -133,6 +133,24 @@ public static void createSysUser(String user, String password) throws SQLExcepti } } + private static String configServerAddress; + private static String obServerIP; + + public static String getConfigServerAddress() { + if (configServerAddress == null) { + String ip = getContainerIP(CONFIG_SERVER); + configServerAddress = "http://" + ip + ":" + CONFIG_SERVER_PORT; + } + return configServerAddress; + } + + public static String getOBServerIP() { + if (obServerIP == null) { + obServerIP = getContainerIP(CONTAINER); + } + return obServerIP; + } + @Override public String getHost() { return CONTAINER.getHost(); diff --git a/spark-connector-oceanbase-e2e-tests/pom.xml b/spark-connector-oceanbase-e2e-tests/pom.xml new file mode 100644 index 0000000..544efc0 --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/pom.xml @@ -0,0 +1,173 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-oceanbase-parent + ${revision} + + + spark-connector-oceanbase-e2e-tests + + + + com.oceanbase + spark-connector-oceanbase-common + ${revision} + test-jar + test + + + + mysql + mysql-connector-java + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + none + + + integration-tests + none + + + end-to-end-tests + + test + + integration-test + + + **/*.* + + 1 + false + + ${project.basedir} + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + com.oceanbase + spark-connector-oceanbase-2.4_${scala.binary.version} + ${project.version} + spark-connector-oceanbase-2.4_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-oceanbase-3.1_${scala.binary.version} + ${project.version} + spark-connector-oceanbase-3.1_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-oceanbase-3.2_${scala.binary.version} + ${project.version} + spark-connector-oceanbase-3.2_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-oceanbase-3.3_${scala.binary.version} + ${project.version} + spark-connector-oceanbase-3.3_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-oceanbase-3.4_${scala.binary.version} + ${project.version} + spark-connector-oceanbase-3.4_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + + + + copy-jars + + copy + + process-resources + + + copy-mysql-connector + + copy + + package + + + + mysql + mysql-connector-java + 8.0.26 + ${project.build.directory}/dependencies + mysql-connector-java.jar + + + + + + + + + diff --git a/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java new file mode 100644 index 0000000..c19ce7f --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/OceanBaseE2eITCase.java @@ -0,0 +1,202 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark; + +import com.oceanbase.spark.utils.SparkContainerTestEnvironment; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +public class OceanBaseE2eITCase extends SparkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class); + + private static final String SINK_CONNECTOR_NAME = + "^.*spark-connector-oceanbase-\\d+\\.\\d+_\\d+\\.\\d+-[\\d\\.]+(?:-SNAPSHOT)?\\.jar$"; + private static final String MYSQL_CONNECTOR_JAVA = "mysql-connector-java.jar"; + + @BeforeAll + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); + } + + @AfterAll + public static void tearDown() { + CONTAINER.stop(); + } + + @BeforeEach + public void before() throws Exception { + super.before(); + + initialize("sql/mysql/products.sql"); + } + + @AfterEach + public void after() throws Exception { + super.after(); + + dropTables("products"); + } + + @Test + @DisabledIfSystemProperty( + named = "spark_version", + matches = "^(2\\.4\\.[0-9])$", + disabledReason = + "This is because the spark 2.x docker image fails to execute the spark-sql command.") + public void testInsertValues() throws Exception { + List sqlLines = new ArrayList<>(); + + sqlLines.add( + String.format( + "CREATE TEMPORARY VIEW test_sink " + + "USING oceanbase " + + "OPTIONS( " + + " \"host\"= \"%s\"," + + " \"sql-port\" = \"%s\"," + + " \"schema-name\"=\"%s\"," + + " \"table-name\"=\"products\"," + + " \"username\"=\"%s\"," + + " \"password\"=\"%s\"," + + " \"direct-load.enabled\" = \"true\"," + + " \"direct-load.rpc-port\" = \"%s\" " + + ");", + getHostInContainer(), + getPortInContainer(), + getSchemaName(), + getUsername(), + getPassword(), + getRpcPortInContainer())); + + sqlLines.add( + "INSERT INTO test_sink " + + "VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14)," + + " (102, 'car battery', '12V car battery', 8.1)," + + " (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8)," + + " (104, 'hammer', '12oz carpenter\\'s hammer', 0.75)," + + " (105, 'hammer', '14oz carpenter\\'s hammer', 0.875)," + + " (106, 'hammer', '16oz carpenter\\'s hammer', 1.0)," + + " (107, 'rocks', 'box of assorted rocks', 5.3)," + + " (108, 'jacket', 'water resistent black wind breaker', 0.1)," + + " (109, 'spare tire', '24 inch spare tire', 22.2);"); + + submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME), getResource(MYSQL_CONNECTOR_JAVA)); + + List expected = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000"); + + waitingAndAssertTableCount("products", expected.size()); + + List actual = queryTable("products"); + assertEqualsInAnyOrder(expected, actual); + } + + @Test + @EnabledIfSystemProperty( + named = "spark_version", + matches = "^(2\\.4\\.[0-9])$", + disabledReason = + "This is because the spark 2.x docker image fails to execute the spark-sql command.") + public void testInsertValuesSpark2() throws Exception { + List shellLines = new ArrayList<>(); + + shellLines.add( + "val df = spark\n" + + " .createDataFrame(\n" + + " Seq(\n" + + " (101, \"scooter\", \"Small 2-wheel scooter\", 3.14),\n" + + " (102, \"car battery\", \"12V car battery\", 8.1),\n" + + " (\n" + + " 103,\n" + + " \"12-pack drill bits\",\n" + + " \"12-pack of drill bits with sizes ranging from #40 to #3\",\n" + + " 0.8),\n" + + " (104, \"hammer\", \"12oz carpenter's hammer\", 0.75),\n" + + " (105, \"hammer\", \"14oz carpenter's hammer\", 0.875),\n" + + " (106, \"hammer\", \"16oz carpenter's hammer\", 1.0),\n" + + " (107, \"rocks\", \"box of assorted rocks\", 5.3),\n" + + " (108, \"jacket\", \"water resistent black wind breaker\", 0.1),\n" + + " (109, \"spare tire\", \"24 inch spare tire\", 22.2)\n" + + " ))\n" + + " .toDF(\"id\", \"name\", \"description\", \"weight\")"); + + shellLines.add( + String.format( + "import org.apache.spark.sql.SaveMode\n" + + "df.write\n" + + " .format(\"oceanbase\")\n" + + " .mode(saveMode = SaveMode.Append)\n" + + " .option(\"host\", \"%s\")\n" + + " .option(\"sql-port\", \"%s\")\n" + + " .option(\"username\", \"%s\")\n" + + " .option(\"password\", \"%s\")\n" + + " .option(\"table-name\", \"products\")\n" + + " .option(\"schema-name\", \"%s\")\n" + + " .option(\"direct-load.enabled\", value = true)\n" + + " .option(\"direct-load.rpc-port\", value = \"%s\")\n" + + " .save()", + getHostInContainer(), + getPortInContainer(), + getUsername(), + getPassword(), + getSchemaName(), + getRpcPortInContainer())); + + submitSparkShellJob( + shellLines, getResource(SINK_CONNECTOR_NAME), getResource(MYSQL_CONNECTOR_JAVA)); + + List expected = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000"); + + waitingAndAssertTableCount("products", expected.size()); + + List actual = queryTable("products"); + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java new file mode 100644 index 0000000..c505d0a --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/spark/utils/SparkContainerTestEnvironment.java @@ -0,0 +1,220 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.utils; + +import com.oceanbase.spark.OceanBaseMySQLTestBase; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +public abstract class SparkContainerTestEnvironment extends OceanBaseMySQLTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(SparkContainerTestEnvironment.class); + + private static final String SPARK_VERSION = System.getProperty("spark_version"); + private static final String MODULE_DIRECTORY = System.getProperty("moduleDir", ""); + + private static final String INTER_CONTAINER_JM_ALIAS = "spark"; + + protected String getSparkDockerImageTag() { + return String.format("bitnami/spark:%s", SPARK_VERSION); + } + + @TempDir public java.nio.file.Path temporaryFolder; + + public GenericContainer sparkContainer; + + @SuppressWarnings("resource") + @BeforeEach + public void before() throws Exception { + LOG.info("Starting Spark containers..."); + sparkContainer = + new GenericContainer<>(getSparkDockerImageTag()) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + Startables.deepStart(Stream.of(sparkContainer)).join(); + LOG.info("Spark containers started"); + } + + @AfterEach + public void after() throws Exception { + if (sparkContainer != null) { + sparkContainer.stop(); + } + } + + public String getHostInContainer() { + return getOBServerIP(); + } + + public int getPortInContainer() { + return 2881; + } + + public int getRpcPortInContainer() { + return 2882; + } + + /** + * Searches for a resource file matching the given regex in the given directory. This method is + * primarily intended to be used for the initialization of static {@link Path} fields for + * resource file(i.e. jar, config file). if resolvePaths is empty, this method will search file + * under the modules {@code target} directory. if resolvePaths is not empty, this method will + * search file under resolvePaths of current project. + * + * @param resourceNameRegex regex pattern to match against + * @param resolvePaths an array of resolve paths of current project + * @return Path pointing to the matching file + * @throws RuntimeException if none or multiple resource files could be found + */ + public static Path getResource(final String resourceNameRegex, String... resolvePaths) { + Path path = Paths.get(MODULE_DIRECTORY).toAbsolutePath(); + + if (resolvePaths != null && resolvePaths.length > 0) { + path = path.getParent().getParent(); + for (String resolvePath : resolvePaths) { + path = path.resolve(resolvePath); + } + } + + try (Stream dependencyResources = Files.walk(path)) { + final List matchingResources = + dependencyResources + .filter( + file -> + file.toAbsolutePath() + .toString() + .contains(SPARK_VERSION.substring(0, 3)) + || file.toAbsolutePath() + .toString() + .contains("mysql")) + .filter( + jar -> + Pattern.compile(resourceNameRegex) + .matcher(jar.toAbsolutePath().toString()) + .find()) + .collect(Collectors.toList()); + switch (matchingResources.size()) { + case 0: + throw new RuntimeException( + new FileNotFoundException( + String.format( + "No resource file could be found that matches the pattern %s. " + + "This could mean that the test module must be rebuilt via maven.", + resourceNameRegex))); + case 1: + return matchingResources.get(0); + default: + throw new RuntimeException( + new IOException( + String.format( + "Multiple resource files were found matching the pattern %s. Matches=%s", + resourceNameRegex, matchingResources))); + } + } catch (final IOException ioe) { + throw new RuntimeException("Could not search for resource resource files.", ioe); + } + } + + /** + * Submits a SQL job to the running cluster. + * + *

NOTE: You should not use {@code '\t'}. + */ + public void submitSQLJob(List sqlLines, Path... jars) + throws IOException, InterruptedException { + final List commands = new ArrayList<>(); + Path script = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toPath(); + Files.write(script, sqlLines); + sparkContainer.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql"); + commands.add("--jars"); + String jarStr = + Arrays.stream(jars) + .map( + jar -> + copyAndGetContainerPath( + sparkContainer, jar.toAbsolutePath().toString())) + .collect(Collectors.joining(",")); + commands.add(jarStr); + commands.add("-f /tmp/script.sql"); + + String command = String.format("spark-sql %s", String.join(" ", commands)); + LOG.info(command); + Container.ExecResult execResult = sparkContainer.execInContainer("bash", "-c", command); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the SQL job."); + } + } + + public void submitSparkShellJob(List shellLines, Path... jars) + throws IOException, InterruptedException { + final List commands = new ArrayList<>(); + Path script = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toPath(); + Files.write(script, shellLines); + sparkContainer.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.scala"); + commands.add("--jars"); + String jarStr = + Arrays.stream(jars) + .map( + jar -> + copyAndGetContainerPath( + sparkContainer, jar.toAbsolutePath().toString())) + .collect(Collectors.joining(",")); + commands.add(jarStr); + commands.add("-i /tmp/script.scala"); + + String command = String.format("timeout 2m spark-shell %s", String.join(" ", commands)); + LOG.info(command); + Container.ExecResult execResult = sparkContainer.execInContainer("bash", "-c", command); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + } + + private String copyAndGetContainerPath(GenericContainer container, String filePath) { + Path path = Paths.get(filePath); + String containerPath = "/opt/bitnami/spark/jars/" + path.getFileName(); + container.copyFileToContainer(MountableFile.forHostPath(path), containerPath); + return containerPath; + } +} diff --git a/spark-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties b/spark-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..bef06db --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,24 @@ +# Copyright 2024 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n diff --git a/spark-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql/products.sql b/spark-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql/products.sql new file mode 100644 index 0000000..e5b318c --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql/products.sql @@ -0,0 +1,20 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight DECIMAL(20, 10) +); diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml index 5c10af3..4ce1bfc 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/pom.xml @@ -76,13 +76,20 @@ under the License. com.lmax:disruptor - - - - org.apache.logging - shade.org.apache.logging - - + + + com.oceanbase:obkv-table-client + + **/log4j/log-conf.xml + + + + *:* + + **/Log4j2Plugins.dat + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml new file mode 100644 index 0000000..989e8c7 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml index 1894824..78c8d67 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/pom.xml @@ -76,13 +76,20 @@ under the License. com.lmax:disruptor - - - - org.apache.logging - shade.org.apache.logging - - + + + com.oceanbase:obkv-table-client + + **/log4j/log-conf.xml + + + + *:* + + **/Log4j2Plugins.dat + + + diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml new file mode 100644 index 0000000..989e8c7 --- /dev/null +++ b/spark-connector-oceanbase/spark-connector-oceanbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +