Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added LT Refactored #92

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,25 +51,27 @@
*
* <p>The class is thread-safe.
*/
public class CassandraSharedResourceManager
extends TestContainerResourceManager<GenericContainer<?>> implements ResourceManager {
public class CassandraResourceManager extends TestContainerResourceManager<GenericContainer<?>>
implements ResourceManager {

private static final Logger LOG = LoggerFactory.getLogger(CassandraSharedResourceManager.class);
private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class);

private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";

// A list of available Cassandra Docker image tags can be found at
// https://hub.docker.com/_/cassandra/tags
private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";

private static final long DEFAULT_CASSANDRA_TIMEOUT = 2;

// 9042 is the default port that Cassandra is configured to listen on
private static final int CASSANDRA_INTERNAL_PORT = 9042;

private final CqlSession cassandraClient;
private final String keyspaceName;
private final boolean usingStaticDatabase;

private CassandraSharedResourceManager(Builder builder) {
private CassandraResourceManager(Builder builder) {
this(
null,
new CassandraContainer<>(
Expand All @@ -80,7 +81,7 @@ private CassandraSharedResourceManager(Builder builder) {

@VisibleForTesting
@SuppressWarnings("nullness")
CassandraSharedResourceManager(
CassandraResourceManager(
@Nullable CqlSession cassandraClient, CassandraContainer<?> container, Builder builder) {
super(container, builder);
// we are trying to handle userDefined KeyspaceName name without usingStatic Container
Expand Down Expand Up @@ -145,13 +146,27 @@ public synchronized String getKeyspaceName() {
*/
public synchronized ResultSet executeStatement(String statement) {
LOG.info("Executing statement: {}", statement);
return this.executeStatement(statement, DEFAULT_CASSANDRA_TIMEOUT);
}

/**
* Execute the given statement on the managed keyspace.
*
* @param statement The statement to execute.
* @param timeouts The timeout for the given statement to execute in seconds.
* @return ResultSet from Cassandra.
*/
public synchronized ResultSet executeStatement(String statement, long timeouts) {
LOG.info("Executing statement within timeouts: {} {}", statement, timeouts);

try {
return Failsafe.with(buildRetryPolicy())
.get(
() ->
cassandraClient.execute(
SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName)));
SimpleStatement.newInstance(statement)
.setKeyspace(this.keyspaceName)
.setTimeout(Duration.ofSeconds(timeouts))));
} catch (Exception e) {
throw new IllegalArgumentException("Error reading collection.", e);
}
Expand All @@ -176,19 +191,6 @@ public synchronized void execute(String statement) {
}
}

/**
* Inserts the given Document into a table.
*
* <p>A database will be created here, if one does not already exist.
*
* @param tableName The name of the table to insert the document into.
* @param document The document to insert into the table.
* @return A boolean indicating whether the Document was inserted successfully.
*/
public synchronized boolean insertDocument(String tableName, Map<String, Object> document) {
return insertDocuments(tableName, ImmutableList.of(document));
}

/**
* Inserts the given Documents into a collection.
*
Expand Down Expand Up @@ -246,7 +248,6 @@ public synchronized void cleanupAll() {

boolean producedError = false;

// First, delete the database if it was not given as a static argument
if (!usingStaticDatabase) {
try {
executeStatement(String.format("DROP KEYSPACE IF EXISTS %s", this.keyspaceName));
Expand Down Expand Up @@ -312,9 +313,9 @@ private static RetryPolicy<Object> buildRetryPolicy() {
.build();
}

/** Builder for {@link CassandraSharedResourceManager}. */
/** Builder for {@link CassandraResourceManager}. */
public static final class Builder
extends TestContainerResourceManager.Builder<CassandraSharedResourceManager> {
extends TestContainerResourceManager.Builder<CassandraResourceManager> {

private @Nullable String keyspaceName;

Expand Down Expand Up @@ -359,8 +360,8 @@ public Builder sePreGeneratedKeyspaceName(boolean preGeneratedKeyspaceName) {
}

@Override
public CassandraSharedResourceManager build() {
return new CassandraSharedResourceManager(this);
public CassandraResourceManager build() {
return new CassandraResourceManager(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@AutoValue
public abstract class CassandraRowsCheck extends ConditionCheck {

abstract CassandraSharedResourceManager resourceManager();
abstract CassandraResourceManager resourceManager();

abstract String tableName();

Expand All @@ -46,7 +46,7 @@ public String getDescription() {

private long getRowCount(String tableName) {
String query = String.format("SELECT COUNT(*) FROM %s", tableName);
ResultSet resultSet = resourceManager().executeStatement(query);
ResultSet resultSet = resourceManager().executeStatement(query, 10);
Row row = resultSet.one();
if (row != null) {
return row.getLong(0);
Expand Down Expand Up @@ -79,7 +79,7 @@ public CheckResult check() {
true, String.format("Expected at least %d rows and found %d", minRows(), totalRows));
}

public static Builder builder(CassandraSharedResourceManager resourceManager, String tableName) {
public static Builder builder(CassandraResourceManager resourceManager, String tableName) {
return new AutoValue_CassandraRowsCheck.Builder()
.setResourceManager(resourceManager)
.setTableName(tableName);
Expand All @@ -89,7 +89,7 @@ public static Builder builder(CassandraSharedResourceManager resourceManager, St
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setResourceManager(CassandraSharedResourceManager resourceManager);
public abstract Builder setResourceManager(CassandraResourceManager resourceManager);

public abstract Builder setTableName(String tableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import com.google.common.base.MoreObjects;
import com.google.common.io.Resources;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -34,32 +32,21 @@
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for Spanner to sourcedb Load tests. It provides helper functions related to
* environment setup and assertConditions.
*/
public class SpannerToCassandraLTBase extends TemplateLoadTestBase {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToCassandraLTBase.class);
public class SpannerToCassandraLTBase extends SpannerToSourceDbLTBase {

private static final String TEMPLATE_SPEC_PATH =
MoreObjects.firstNonNull(
TestProperties.specPath(),
"gs://dataflow-templates-spanner-to-cassandra/templates/flex/Spanner_to_SourceDb");
public SpannerResourceManager spannerResourceManager;
public SpannerResourceManager spannerMetadataResourceManager;
public CassandraSharedResourceManager cassandraSharedResourceManager;
public GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;
public CassandraResourceManager cassandraSharedResourceManager;

public void setupResourceManagers(
String spannerDdlResource, String cassandraDdlResource, String artifactBucket)
Expand All @@ -81,7 +68,7 @@ public void setupResourceManagers(
.replace("gs://" + artifactBucket, ""));
}

public CassandraSharedResourceManager generateKeyspaceAndBuildCassandraResource() {
public CassandraResourceManager generateKeyspaceAndBuildCassandraResource() {
String keyspaceName =
ResourceManagerUtils.generateResourceId(
testName,
Expand All @@ -94,7 +81,7 @@ public CassandraSharedResourceManager generateKeyspaceAndBuildCassandraResource(
keyspaceName = keyspaceName.substring(0, 48);
}

return CassandraSharedResourceManager.builder(testName)
return CassandraResourceManager.builder(testName)
.setKeyspaceName(keyspaceName)
.sePreGeneratedKeyspaceName(true)
.build();
Expand All @@ -109,28 +96,6 @@ public void cleanupResourceManagers() {
cassandraSharedResourceManager);
}

public PubsubResourceManager setUpPubSubResourceManager() throws IOException {
return PubsubResourceManager.builder(testName, project, CREDENTIALS_PROVIDER)
.setMonitoringClient(monitoringClient)
.build();
}

public SubscriptionName createPubsubResources(
String identifierSuffix, PubsubResourceManager pubsubResourceManager, String gcsPrefix) {
String topicNameSuffix = "rr-load" + identifierSuffix;
String subscriptionNameSuffix = "rr-load-sub" + identifierSuffix;
TopicName topic = pubsubResourceManager.createTopic(topicNameSuffix);
SubscriptionName subscription =
pubsubResourceManager.createSubscription(topic, subscriptionNameSuffix);
String prefix = gcsPrefix;
if (prefix.startsWith("/")) {
prefix = prefix.substring(1);
}
prefix += "/retry/";
gcsResourceManager.createNotification(topic.toString(), prefix);
return subscription;
}

public SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFile)
throws IOException {
SpannerResourceManager spannerResourceManager =
Expand All @@ -152,19 +117,8 @@ public SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFil
return spannerResourceManager;
}

public SpannerResourceManager createSpannerMetadataDatabase() throws IOException {
SpannerResourceManager spannerMetadataResourceManager =
SpannerResourceManager.builder("rr-meta-lt-" + testName, project, region)
.maybeUseStaticInstance()
.build();
String dummy = "create table spnr_csdr_t1(id INT64 ) primary key(id)";
spannerMetadataResourceManager.executeDdlStatement(dummy);
return spannerMetadataResourceManager;
}

public void createAndUploadCassandraConfigToGcs(
GcsResourceManager gcsResourceManager,
CassandraSharedResourceManager cassandraResourceManagers)
GcsResourceManager gcsResourceManager, CassandraResourceManager cassandraResourceManagers)
throws IOException {

String host = cassandraResourceManagers.getHost();
Expand Down Expand Up @@ -193,7 +147,7 @@ public void createAndUploadCassandraConfigToGcs(
}

public void createCassandraSchema(
CassandraSharedResourceManager cassandraResourceManager, String cassandraDdlResourceFile)
CassandraResourceManager cassandraResourceManager, String cassandraDdlResourceFile)
throws IOException {
String ddl =
String.join(
Expand Down Expand Up @@ -294,8 +248,4 @@ public void exportMetrics(PipelineLauncher.LaunchInfo jobInfo, int numShards)
// export results
exportMetricsToBigQuery(jobInfo, metrics);
}

public void getResourceManagerMetrics(Map<String, Double> metrics) {
pubsubResourceManager.collectMetrics(metrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class SpannerToSourceDbLTBase extends TemplateLoadTestBase {
public SpannerResourceManager spannerMetadataResourceManager;
public List<JDBCResourceManager> jdbcResourceManagers;
public GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;
protected static PubsubResourceManager pubsubResourceManager;
protected SubscriptionName subscriptionName;

public void setupResourceManagers(
String spannerDdlResource, String sessionFileResource, String artifactBucket)
Expand Down
Loading