Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor bigtable API to use v2 client #5444

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ val beamVersion = "2.58.0"
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val bigdataossVersion = "2.2.16"
val bigtableClientVersion = "1.28.0"
val commonsCodecVersion = "1.17.0"
val commonsCompressVersion = "1.26.2"
val commonsIoVersion = "2.16.1"
Expand Down Expand Up @@ -736,6 +735,7 @@ lazy val `scio-core` = project
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"com.github.luben" % "zstd-jni" % zstdJniVersion,
"com.google.api" % "api-common" % gcpBom.key.value,
"com.google.api" % "gax" % gcpBom.key.value,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
"com.google.auto.service" % "auto-service-annotations" % autoServiceVersion,
Expand Down Expand Up @@ -962,12 +962,12 @@ lazy val `scio-google-cloud-platform` = project
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.google.api" % "api-common" % gcpBom.key.value,
"com.google.api" % "gax" % gcpBom.key.value,
"com.google.api" % "gax-grpc" % gcpBom.key.value,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
"com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1beta1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-datastore-v1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % gcpBom.key.value,
Expand All @@ -979,9 +979,6 @@ lazy val `scio-google-cloud-platform` = project
"com.google.cloud" % "google-cloud-core" % gcpBom.key.value,
"com.google.cloud" % "google-cloud-spanner" % gcpBom.key.value,
"com.google.cloud.bigdataoss" % "util" % bigdataossVersion,
"com.google.cloud.bigtable" % "bigtable-client-core" % bigtableClientVersion,
"com.google.cloud.bigtable" % "bigtable-client-core-config" % bigtableClientVersion,
"com.google.guava" % "guava" % guavaVersion,
"com.google.http-client" % "google-http-client" % gcpBom.key.value,
"com.google.http-client" % "google-http-client-gson" % gcpBom.key.value,
"com.google.protobuf" % "protobuf-java" % gcpBom.key.value,
Expand Down
120 changes: 52 additions & 68 deletions integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
package com.spotify.scio.bigtable

import java.util.UUID

import com.google.bigtable.admin.v2.{DeleteTableRequest, GetTableRequest, ListTablesRequest}
import com.google.bigtable.v2.{Mutation, Row, RowFilter}
import com.google.cloud.bigtable.config.BigtableOptions
import com.google.cloud.bigtable.grpc._
import com.google.cloud.bigtable.admin.v2.{BigtableInstanceAdminClient, BigtableTableAdminClient}
import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.testing._
Expand All @@ -41,12 +38,6 @@ object BigtableIT {
def testData(id: String): Seq[(String, Long)] =
Seq((s"$id-key1", 1L), (s"$id-key2", 2L), (s"$id-key3", 3L))

val bigtableOptions: BigtableOptions = BigtableOptions
.builder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.build

val FAMILY_NAME: String = "count"
val COLUMN_QUALIFIER: ByteString = ByteString.copyFromUtf8("long")

Expand All @@ -67,31 +58,29 @@ object BigtableIT {

def fromRow(r: Row): (String, Long) =
(r.getKey.toStringUtf8, r.getValue(FAMILY_NAME, COLUMN_QUALIFIER).get.toStringUtf8.toLong)

def listTables(client: BigtableTableAdminGrpcClient): Set[String] = {
val instancePath = s"projects/$projectId/instances/$instanceId"
val tables = client.listTables(ListTablesRequest.newBuilder().setParent(instancePath).build)
tables.getTablesList.asScala.map(t => new BigtableTableName(t.getName).getTableId).toSet
}
}

class BigtableIT extends PipelineSpec {
import BigtableIT._

// "Update number of bigtable nodes" should "work" in {
ignore should "update number of bigtable nodes" in {
val bt = new BigtableClusterUtilities(bigtableOptions)
val sc = ScioContext()
sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4
bt.getClusterNodeCount(clusterId, zoneId) shouldBe 4
sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3
bt.getClusterNodeCount(clusterId, zoneId) shouldBe 3
val client = BigtableInstanceAdminClient.create(projectId)
try {
val sc = ScioContext()
sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4
client.getCluster(clusterId, zoneId).getServeNodes shouldBe 4
sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3
client.getCluster(clusterId, zoneId).getServeNodes shouldBe 3
} finally {
client.close()
}
}

"BigtableIO" should "work in default mode" in {
TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME)))
Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME))
val id = testId()
val data = testData(id)
try {
Expand Down Expand Up @@ -126,12 +115,7 @@ class BigtableIT extends PipelineSpec {
}

it should "work in bulk mode" in {
TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME)))
val options = BigtableOptions
.builder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.build()
Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME))
val id = testId()
val data = testData(id)

Expand All @@ -140,7 +124,7 @@ class BigtableIT extends PipelineSpec {
runWithRealContext() { sc =>
sc
.parallelize(data.map(kv => toWriteMutation(kv._1, kv._2)))
.saveAsBigtable(options, tableId, 1)
.saveAsBigtable(projectId, instanceId, tableId)
}.waitUntilDone()

// Read rows back
Expand All @@ -166,48 +150,48 @@ class BigtableIT extends PipelineSpec {
}.waitUntilFinish()
}

"TableAdmin" should "work" in {
"Admin.Table" should "work" in {
val id = testId()
val tables = Map(
s"scio-bigtable-empty-table-$id" -> List(),
s"scio-bigtable-one-cf-table-$id" -> List("colfam1"),
s"scio-bigtable-two-cf-table-$id" -> List("colfam1", "colfam2")
)
val channel = ChannelPoolCreator.createPool(bigtableOptions)
val executorService = BigtableSessionSharedThreadPools.getInstance().getRetryExecutor
val client = new BigtableTableAdminGrpcClient(channel, executorService, bigtableOptions)
val instancePath = s"projects/$projectId/instances/$instanceId"
val tableIds = tables.keys.toSet
def tablePath(table: String): String = s"$instancePath/tables/$table"
def deleteTable(table: String): Unit =
client.deleteTable(DeleteTableRequest.newBuilder().setName(tablePath(table)).build)

// Delete any tables that could be left around from previous IT run.
val oldTables = listTables(client).intersect(tableIds)
oldTables.foreach(deleteTable)

// Ensure that the tables don't exist now
listTables(client).intersect(tableIds) shouldBe empty

// Run UUT
TableAdmin.ensureTables(bigtableOptions, tables)

// Tables must exist
listTables(client).intersect(tableIds) shouldEqual tableIds

// Assert Column families exist
for ((table, columnFamilies) <- tables) {
val tableInfo = client.getTable(
GetTableRequest
.newBuilder()
.setName(tablePath(table))
.build
)
val actualColumnFamilies = tableInfo.getColumnFamiliesMap.asScala.keys
actualColumnFamilies should contain theSameElementsAs columnFamilies
}

// Clean up and delete
tables.keys.foreach(deleteTable)
val client = BigtableTableAdminClient.create(projectId, instanceId)
try {
val tableIds = tables.keys.toSet

// Delete any tables that could be left around from previous IT run.
client
.listTables()
.asScala
.filterNot(tableIds.contains)
.foreach(client.deleteTable)

// Ensure that the tables don't exist now
client.listTables().asScala.toSet.intersect(tableIds) shouldBe empty

// Run UUT
tables.foreach { case (tableId, cfs) =>
Admin.Table.ensureTable(projectId, instanceId, tableId, cfs)
}

// Tables must exist
client.listTables().asScala should contain allElementsOf tableIds

// Assert Column families exist
tables.foreach { case (id, columnFamilies) =>
val table = client.getTable(id)
val actualFamilies = table.getColumnFamilies.asScala.map(_.getId)

actualFamilies should contain theSameElementsAs columnFamilies
}

// Clean up and delete
tableIds.foreach(client.deleteTable)
} finally {
client.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package com.spotify.scio.transforms;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.util.concurrent.*;
import java.util.concurrent.*;
import java.util.function.Function;
Expand Down Expand Up @@ -137,4 +141,78 @@ default CompletableFuture<V> addCallback(
});
}
}

/**
* A {@link Base} implementation for Google API {@link ApiFuture}. Similar to Guava's
* ListenableFuture, but redeclared so that Guava could be shaded.
*/
public interface GoogleApi<V> extends Base<ApiFuture<V>, V> {
/**
* Executor used for callbacks. Default is {@link ForkJoinPool#commonPool()}. Consider
* overriding this method if callbacks are blocking.
*
* @return Executor for callbacks.
*/
default Executor getCallbackExecutor() {
return ForkJoinPool.commonPool();
}

@Override
default void waitForFutures(Iterable<ApiFuture<V>> futures)
throws InterruptedException, ExecutionException {
// use Future#successfulAsList instead of Futures#allAsList which only works if all
// futures succeed
ApiFutures.successfulAsList(futures).get();
}

@Override
default ApiFuture<V> addCallback(
ApiFuture<V> future, Function<V, Void> onSuccess, Function<Throwable, Void> onFailure) {
// Futures#transform doesn't allow onFailure callback while Futures#addCallback doesn't
// guarantee that callbacks are called before ListenableFuture#get() unblocks
SettableApiFuture<V> f = SettableApiFuture.create();
// if executor rejects the callback, we have to fail the future
Executor rejectPropagationExecutor =
command -> {
try {
getCallbackExecutor().execute(command);
} catch (RejectedExecutionException e) {
f.setException(e);
}
};
ApiFutures.addCallback(
future,
new ApiFutureCallback<V>() {
@Override
public void onSuccess(@Nullable V result) {
try {
onSuccess.apply(result);
f.set(result);
} catch (Throwable e) {
f.setException(e);
}
}

@Override
public void onFailure(Throwable t) {
Throwable callbackException = null;
try {
onFailure.apply(t);
} catch (Throwable e) {
// do not fail executing thread if callback fails
// record exception and propagate as suppressed
callbackException = e;
} finally {
if (callbackException != null) {
t.addSuppressed(callbackException);
}
f.setException(t);
}
}
},
rejectPropagationExecutor);

return f;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ object BigtableWriteExample {
// before the ingestion starts.
sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 15)

// Ensure that destination tables and column families exist
sc.ensureTables(
// Ensure that destination table and column families exist
sc.ensureTable(
btProjectId,
btInstanceId,
Map(
btTableId -> List(BigtableExample.FAMILY_NAME)
)
btTableId,
List(BigtableExample.FAMILY_NAME)
)

sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR))
Expand Down
Loading
Loading