Skip to content

Commit

Permalink
Added LT Refectored (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
pawankashyapollion authored Feb 5, 2025
1 parent f4c9811 commit 30f1c7c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,25 +51,27 @@
*
* <p>The class is thread-safe.
*/
public class CassandraSharedResourceManager
extends TestContainerResourceManager<GenericContainer<?>> implements ResourceManager {
public class CassandraResourceManager extends TestContainerResourceManager<GenericContainer<?>>
implements ResourceManager {

private static final Logger LOG = LoggerFactory.getLogger(CassandraSharedResourceManager.class);
private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class);

private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";

// A list of available Cassandra Docker image tags can be found at
// https://hub.docker.com/_/cassandra/tags
private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";

private static final long DEFAULT_CASSANDRA_TIMEOUT = 2;

// 9042 is the default port that Cassandra is configured to listen on
private static final int CASSANDRA_INTERNAL_PORT = 9042;

private final CqlSession cassandraClient;
private final String keyspaceName;
private final boolean usingStaticDatabase;

private CassandraSharedResourceManager(Builder builder) {
private CassandraResourceManager(Builder builder) {
this(
null,
new CassandraContainer<>(
Expand All @@ -80,7 +81,7 @@ private CassandraSharedResourceManager(Builder builder) {

@VisibleForTesting
@SuppressWarnings("nullness")
CassandraSharedResourceManager(
CassandraResourceManager(
@Nullable CqlSession cassandraClient, CassandraContainer<?> container, Builder builder) {
super(container, builder);
// we are trying to handle userDefined KeyspaceName name without usingStatic Container
Expand Down Expand Up @@ -145,13 +146,27 @@ public synchronized String getKeyspaceName() {
*/
public synchronized ResultSet executeStatement(String statement) {
LOG.info("Executing statement: {}", statement);
return this.executeStatement(statement, DEFAULT_CASSANDRA_TIMEOUT);
}

/**
* Execute the given statement on the managed keyspace.
*
* @param statement The statement to execute.
* @param timeouts The timeout for the given statement to execute in seconds.
* @return ResultSet from Cassandra.
*/
public synchronized ResultSet executeStatement(String statement, long timeouts) {
LOG.info("Executing statement within timeouts: {} {}", statement, timeouts);

try {
return Failsafe.with(buildRetryPolicy())
.get(
() ->
cassandraClient.execute(
SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName)));
SimpleStatement.newInstance(statement)
.setKeyspace(this.keyspaceName)
.setTimeout(Duration.ofSeconds(timeouts))));
} catch (Exception e) {
throw new IllegalArgumentException("Error reading collection.", e);
}
Expand All @@ -176,19 +191,6 @@ public synchronized void execute(String statement) {
}
}

/**
* Inserts the given Document into a table.
*
* <p>A database will be created here, if one does not already exist.
*
* @param tableName The name of the table to insert the document into.
* @param document The document to insert into the table.
* @return A boolean indicating whether the Document was inserted successfully.
*/
public synchronized boolean insertDocument(String tableName, Map<String, Object> document) {
return insertDocuments(tableName, ImmutableList.of(document));
}

/**
* Inserts the given Documents into a collection.
*
Expand Down Expand Up @@ -246,7 +248,6 @@ public synchronized void cleanupAll() {

boolean producedError = false;

// First, delete the database if it was not given as a static argument
if (!usingStaticDatabase) {
try {
executeStatement(String.format("DROP KEYSPACE IF EXISTS %s", this.keyspaceName));
Expand Down Expand Up @@ -312,9 +313,9 @@ private static RetryPolicy<Object> buildRetryPolicy() {
.build();
}

/** Builder for {@link CassandraSharedResourceManager}. */
/** Builder for {@link CassandraResourceManager}. */
public static final class Builder
extends TestContainerResourceManager.Builder<CassandraSharedResourceManager> {
extends TestContainerResourceManager.Builder<CassandraResourceManager> {

private @Nullable String keyspaceName;

Expand Down Expand Up @@ -359,8 +360,8 @@ public Builder sePreGeneratedKeyspaceName(boolean preGeneratedKeyspaceName) {
}

@Override
public CassandraSharedResourceManager build() {
return new CassandraSharedResourceManager(this);
public CassandraResourceManager build() {
return new CassandraResourceManager(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@AutoValue
public abstract class CassandraRowsCheck extends ConditionCheck {

abstract CassandraSharedResourceManager resourceManager();
abstract CassandraResourceManager resourceManager();

abstract String tableName();

Expand All @@ -46,7 +46,7 @@ public String getDescription() {

private long getRowCount(String tableName) {
String query = String.format("SELECT COUNT(*) FROM %s", tableName);
ResultSet resultSet = resourceManager().executeStatement(query);
ResultSet resultSet = resourceManager().executeStatement(query, 10);
Row row = resultSet.one();
if (row != null) {
return row.getLong(0);
Expand Down Expand Up @@ -79,7 +79,7 @@ public CheckResult check() {
true, String.format("Expected at least %d rows and found %d", minRows(), totalRows));
}

public static Builder builder(CassandraSharedResourceManager resourceManager, String tableName) {
public static Builder builder(CassandraResourceManager resourceManager, String tableName) {
return new AutoValue_CassandraRowsCheck.Builder()
.setResourceManager(resourceManager)
.setTableName(tableName);
Expand All @@ -89,7 +89,7 @@ public static Builder builder(CassandraSharedResourceManager resourceManager, St
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setResourceManager(CassandraSharedResourceManager resourceManager);
public abstract Builder setResourceManager(CassandraResourceManager resourceManager);

public abstract Builder setTableName(String tableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import com.google.common.base.MoreObjects;
import com.google.common.io.Resources;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -34,32 +32,21 @@
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for Spanner to sourcedb Load tests. It provides helper functions related to
* environment setup and assertConditions.
*/
public class SpannerToCassandraLTBase extends TemplateLoadTestBase {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToCassandraLTBase.class);
public class SpannerToCassandraLTBase extends SpannerToSourceDbLTBase {

private static final String TEMPLATE_SPEC_PATH =
MoreObjects.firstNonNull(
TestProperties.specPath(),
"gs://dataflow-templates-spanner-to-cassandra/templates/flex/Spanner_to_SourceDb");
public SpannerResourceManager spannerResourceManager;
public SpannerResourceManager spannerMetadataResourceManager;
public CassandraSharedResourceManager cassandraSharedResourceManager;
public GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;
public CassandraResourceManager cassandraSharedResourceManager;

public void setupResourceManagers(
String spannerDdlResource, String cassandraDdlResource, String artifactBucket)
Expand All @@ -81,7 +68,7 @@ public void setupResourceManagers(
.replace("gs://" + artifactBucket, ""));
}

public CassandraSharedResourceManager generateKeyspaceAndBuildCassandraResource() {
public CassandraResourceManager generateKeyspaceAndBuildCassandraResource() {
String keyspaceName =
ResourceManagerUtils.generateResourceId(
testName,
Expand All @@ -94,7 +81,7 @@ public CassandraSharedResourceManager generateKeyspaceAndBuildCassandraResource(
keyspaceName = keyspaceName.substring(0, 48);
}

return CassandraSharedResourceManager.builder(testName)
return CassandraResourceManager.builder(testName)
.setKeyspaceName(keyspaceName)
.sePreGeneratedKeyspaceName(true)
.build();
Expand All @@ -109,28 +96,6 @@ public void cleanupResourceManagers() {
cassandraSharedResourceManager);
}

public PubsubResourceManager setUpPubSubResourceManager() throws IOException {
return PubsubResourceManager.builder(testName, project, CREDENTIALS_PROVIDER)
.setMonitoringClient(monitoringClient)
.build();
}

public SubscriptionName createPubsubResources(
String identifierSuffix, PubsubResourceManager pubsubResourceManager, String gcsPrefix) {
String topicNameSuffix = "rr-load" + identifierSuffix;
String subscriptionNameSuffix = "rr-load-sub" + identifierSuffix;
TopicName topic = pubsubResourceManager.createTopic(topicNameSuffix);
SubscriptionName subscription =
pubsubResourceManager.createSubscription(topic, subscriptionNameSuffix);
String prefix = gcsPrefix;
if (prefix.startsWith("/")) {
prefix = prefix.substring(1);
}
prefix += "/retry/";
gcsResourceManager.createNotification(topic.toString(), prefix);
return subscription;
}

public SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFile)
throws IOException {
SpannerResourceManager spannerResourceManager =
Expand All @@ -152,19 +117,8 @@ public SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFil
return spannerResourceManager;
}

public SpannerResourceManager createSpannerMetadataDatabase() throws IOException {
SpannerResourceManager spannerMetadataResourceManager =
SpannerResourceManager.builder("rr-meta-lt-" + testName, project, region)
.maybeUseStaticInstance()
.build();
String dummy = "create table spnr_csdr_t1(id INT64 ) primary key(id)";
spannerMetadataResourceManager.executeDdlStatement(dummy);
return spannerMetadataResourceManager;
}

public void createAndUploadCassandraConfigToGcs(
GcsResourceManager gcsResourceManager,
CassandraSharedResourceManager cassandraResourceManagers)
GcsResourceManager gcsResourceManager, CassandraResourceManager cassandraResourceManagers)
throws IOException {

String host = cassandraResourceManagers.getHost();
Expand Down Expand Up @@ -193,7 +147,7 @@ public void createAndUploadCassandraConfigToGcs(
}

public void createCassandraSchema(
CassandraSharedResourceManager cassandraResourceManager, String cassandraDdlResourceFile)
CassandraResourceManager cassandraResourceManager, String cassandraDdlResourceFile)
throws IOException {
String ddl =
String.join(
Expand Down Expand Up @@ -294,8 +248,4 @@ public void exportMetrics(PipelineLauncher.LaunchInfo jobInfo, int numShards)
// export results
exportMetricsToBigQuery(jobInfo, metrics);
}

public void getResourceManagerMetrics(Map<String, Double> metrics) {
pubsubResourceManager.collectMetrics(metrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class SpannerToSourceDbLTBase extends TemplateLoadTestBase {
public SpannerResourceManager spannerMetadataResourceManager;
public List<JDBCResourceManager> jdbcResourceManagers;
public GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;
protected static PubsubResourceManager pubsubResourceManager;
protected SubscriptionName subscriptionName;

public void setupResourceManagers(
String spannerDdlResource, String sessionFileResource, String artifactBucket)
Expand Down

0 comments on commit 30f1c7c

Please sign in to comment.