From b2acbc7c0c76a3827e7ea0f69221040e0ed27200 Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 26 Mar 2020 12:49:51 +0100 Subject: [PATCH] Added test for transactional JDBC sink --- pom.xml | 1 + snapshot-jdbc-test/pom.xml | 89 ++++++++++ .../jet/tests/snapshot/jdbc/JdbcSinkTest.java | 164 ++++++++++++++++++ .../tests/snapshot/jdbc/JdbcSinkVerifier.java | 146 ++++++++++++++++ 4 files changed, 400 insertions(+) create mode 100644 snapshot-jdbc-test/pom.xml create mode 100644 snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkTest.java create mode 100644 snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkVerifier.java diff --git a/pom.xml b/pom.xml index cb412d9c..2dbb29fa 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ return-result-to-caller-test rolling-aggregate-test snapshot-file-test + snapshot-jdbc-test snapshot-jms-sink-test snapshot-jms-source-test snapshot-kafka-test diff --git a/snapshot-jdbc-test/pom.xml b/snapshot-jdbc-test/pom.xml new file mode 100644 index 00000000..d5684a00 --- /dev/null +++ b/snapshot-jdbc-test/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + + snapshot-jdbc-test + + + com.hazelcast.jet.tests + hazelcast-jet-ansible-tests + 4.1-SNAPSHOT + + + + ${project.parent.basedir} + + + + + mysql + mysql-connector-java + 8.0.12 + + + com.hazelcast.jet.tests + soak-tests-common + ${jet.version} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + com.hazelcast.jet.tests.snapshot.jdbc.JdbcSinkTest + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + package + + shade + + + + + + + com.hazelcast.jet.tests.snapshot.jdbc.JdbcSinkTest + + + + false + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.hazelcast.jet.tests:* + com.hazelcast.jet.tests.snapshot.jdbc:* + + + + + + + + + diff --git a/snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkTest.java b/snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkTest.java new file mode 100644 index 00000000..40527322 --- /dev/null +++ b/snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.jet.tests.snapshot.jdbc; + +import com.hazelcast.function.SupplierEx; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.Job; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.config.ProcessingGuarantee; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sink; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.SourceBuilder; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.tests.common.AbstractSoakTest; +import com.mysql.cj.jdbc.MysqlXADataSource; +import java.sql.Connection; +import javax.sql.CommonDataSource; +import javax.sql.DataSource; + +import static com.hazelcast.function.FunctionEx.identity; +import static com.hazelcast.jet.core.JobStatus.FAILED; +import static com.hazelcast.jet.pipeline.ServiceFactories.sharedService; +import static com.hazelcast.jet.tests.common.Util.getJobStatusWithRetry; +import static com.hazelcast.jet.tests.common.Util.sleepMillis; +import static com.hazelcast.jet.tests.common.Util.sleepMinutes; +import static com.hazelcast.jet.tests.snapshot.jdbc.JdbcSinkTest.DataSourceSupplier.getDataSourceSupplier; + +public class JdbcSinkTest extends AbstractSoakTest { + + public static final String TABLE_PREFIX = "JdbcSinkTest"; + private static final String DATABASE_NAME = "snapshot-jdbc-test-db"; + + private static final String DEFAULT_DATABASE_URL = "jdbc:mysql://localhost"; + private static final int DEFAULT_SLEEP_MS_BETWEEN_ITEM = 50; + private static final int DEFAULT_SNAPSHOT_INTERVAL = 5000; + + private String connectionUrl; + + private int sleepMsBetweenItem; + private long snapshotIntervalMs; + + public static void main(String[] args) throws Exception { + new JdbcSinkTest().run(args); + } + + @Override + public void init(JetInstance client) throws Exception { + connectionUrl = property("connectionUrl", DEFAULT_DATABASE_URL) + "/" + DATABASE_NAME + "?useSSL=false"; + + sleepMsBetweenItem = propertyInt("sleepMsBetweenItem", DEFAULT_SLEEP_MS_BETWEEN_ITEM); + snapshotIntervalMs = propertyInt("snapshotIntervalMs", DEFAULT_SNAPSHOT_INTERVAL); + } + + @Override + protected boolean runOnBothClusters() { + return true; + } + + @Override + public void test(JetInstance client, String name) throws Exception { + String tableName = (TABLE_PREFIX + name).replaceAll("-", "_"); + try (Connection connection = ((DataSource) getDataSourceSupplier(connectionUrl).get()).getConnection()) { + connection.createStatement().execute("DROP TABLE IF EXISTS " + tableName); + connection.createStatement().execute("CREATE TABLE " + tableName + + "(id int PRIMARY KEY AUTO_INCREMENT, value int)"); + } + JdbcSinkVerifier verifier = new JdbcSinkVerifier(name, logger, connectionUrl); + verifier.start(); + + try { + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(name); + jobConfig.setSnapshotIntervalMillis(snapshotIntervalMs); + jobConfig.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE); + if (name.startsWith(STABLE_CLUSTER)) { + jobConfig.addClass(JdbcSinkTest.class, JdbcSinkVerifier.class); + } + Job job = client.newJob(pipeline(tableName), jobConfig); + + try { + long begin = System.currentTimeMillis(); + while (System.currentTimeMillis() - begin < durationInMillis) { + if (getJobStatusWithRetry(job) == FAILED) { + job.join(); + } + verifier.checkStatus(); + sleepMinutes(1); + } + } finally { + job.cancel(); + } + } finally { + verifier.finish(); + } + } + + @Override + protected void teardown(Throwable t) throws Exception { + } + + private Pipeline pipeline(String tableName) { + int sleep = sleepMsBetweenItem; + + Pipeline pipeline = Pipeline.create(); + + StreamSource source = SourceBuilder + .stream("srcForJmsSink", procCtx -> new int[1]) + .fillBufferFn((ctx, buf) -> { + buf.add(ctx[0]++); + sleepMillis(sleep); + }) + .createSnapshotFn(ctx -> ctx[0]) + .restoreSnapshotFn((ctx, state) -> ctx[0] = state.get(0)) + .build(); + + Sink sink = Sinks.jdbcBuilder() + .dataSourceSupplier(getDataSourceSupplier(connectionUrl)) + .updateQuery("INSERT INTO " + tableName + "(value) VALUES(?)") + .bindFn( + (stmt, item) -> { + stmt.setInt(1, item); + }) + .exactlyOnce(true) + .build(); + + pipeline.readFrom(source) + .withoutTimestamps() + .groupingKey(identity()) + .filterUsingService(sharedService(ctx -> null), (s, k, v) -> true) + .writeTo(sink); + + return pipeline; + } + + static class DataSourceSupplier { + + public static SupplierEx getDataSourceSupplier(String connectionUrl) { + return () -> { + MysqlXADataSource ds = new MysqlXADataSource(); + ds.setUrl(connectionUrl); + ds.setUser("root"); + ds.setPassword("soak-test"); + ds.setDatabaseName(DATABASE_NAME); + return ds; + }; + } + } + +} diff --git a/snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkVerifier.java b/snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkVerifier.java new file mode 100644 index 00000000..c079f2dc --- /dev/null +++ b/snapshot-jdbc-test/src/main/java/com/hazelcast/jet/tests/snapshot/jdbc/JdbcSinkVerifier.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.jet.tests.snapshot.jdbc; + +import com.hazelcast.logging.ILogger; +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; +import javax.sql.DataSource; + +import static com.hazelcast.jet.impl.util.Util.uncheckRun; +import static com.hazelcast.jet.tests.snapshot.jdbc.JdbcSinkTest.DataSourceSupplier.getDataSourceSupplier; +import static com.hazelcast.jet.tests.snapshot.jdbc.JdbcSinkTest.TABLE_PREFIX; + +public class JdbcSinkVerifier { + + private static final int SLEEP_AFTER_VERIFICATION_CYCLE_MS = 1000; + private static final int ALLOWED_NO_INPUT_MS = 600000; + private static final int QUEUE_SIZE_LIMIT = 20_000; + private static final int PRINT_LOG_ITEMS = 10_000; + + private final String tableName; + + private final Thread consumerThread; + private final String name; + private final ILogger logger; + private final String connectionUrl; + + private volatile boolean finished; + private volatile Throwable error; + private long counter; + private final PriorityQueue verificationQueue = new PriorityQueue<>(); + + public JdbcSinkVerifier(String name, ILogger logger, String connectionUrl) { + this.consumerThread = new Thread(() -> uncheckRun(this::run)); + this.name = name; + this.tableName = (TABLE_PREFIX + name).replaceAll("-", "_"); + this.logger = logger; + this.connectionUrl = connectionUrl; + } + + private void run() throws Exception { + long lastInputTime = System.currentTimeMillis(); + while (!finished) { + try { + List ids = new ArrayList<>(); + try (Connection connection + = ((DataSource) getDataSourceSupplier(connectionUrl).get()).getConnection()) { + ResultSet resultSet = connection.createStatement() + .executeQuery("SELECT * FROM " + tableName); + while (resultSet.next()) { + ids.add(resultSet.getInt(1)); + verificationQueue.add(resultSet.getInt(2)); + } + } + + long now = System.currentTimeMillis(); + if (ids.isEmpty()) { + if (now - lastInputTime > ALLOWED_NO_INPUT_MS) { + throw new AssertionError( + String.format("[%s] No new data was added during last %s", name, ALLOWED_NO_INPUT_MS)); + } + } else { + verifyQueue(); + removeLoaded(ids); + lastInputTime = now; + } + Thread.sleep(SLEEP_AFTER_VERIFICATION_CYCLE_MS); + } catch (Throwable e) { + logger.severe("[" + name + "] Exception thrown during processing files.", e); + error = e; + finished = true; + } + } + } + + private void verifyQueue() { + // try to verify head of verification queue + for (Integer peeked; (peeked = verificationQueue.peek()) != null;) { + if (peeked > counter) { + // the item might arrive later + break; + } else if (peeked == counter) { + if (counter % PRINT_LOG_ITEMS == 0) { + logger.info(String.format("[%s] Processed correctly item %d", name, counter)); + } + // correct head of queue + verificationQueue.remove(); + counter++; + } else if (peeked < counter) { + // duplicate key + throw new AssertionError( + String.format("Duplicate key %d, but counter was %d", peeked, counter)); + } + } + if (verificationQueue.size() >= QUEUE_SIZE_LIMIT) { + throw new AssertionError(String.format("[%s] Queue size exceeded while waiting for the next " + + "item. Limit=%d, expected next=%d, next in queue: %s, %s, %s, %s, ...", + name, QUEUE_SIZE_LIMIT, counter, verificationQueue.poll(), verificationQueue.poll(), + verificationQueue.poll(), verificationQueue.poll())); + } + } + + private void removeLoaded(List ids) throws Exception { + try (Connection connection = ((DataSource) getDataSourceSupplier(connectionUrl).get()).getConnection()) { + for (Integer id : ids) { + connection.createStatement().execute("DELETE FROM " + tableName + " WHERE id=" + id); + } + } + } + + void start() { + consumerThread.start(); + } + + public void finish() throws InterruptedException { + finished = true; + consumerThread.join(); + } + + public void checkStatus() { + if (error != null) { + throw new RuntimeException(error); + } + if (finished) { + throw new RuntimeException("[" + name + "] Verifier is not running"); + } + } + +}