Skip to content

Commit

Permalink
Update test got GrpcBatchDoFn
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 28, 2024
1 parent 5db23e2 commit 4427ece
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,30 @@

package com.spotify.scio.transforms;

import java.util.Objects;

public class UnmatchedRequestException extends RuntimeException {

private final String id;

public UnmatchedRequestException(String id) {
super("Unmatched batch request for ID: " + id);
this.id = id;
}

public String getId() {
return id;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
UnmatchedRequestException that = (UnmatchedRequestException) o;
return Objects.equals(id, that.id);
}

@Override
public int hashCode() {
return Objects.hashCode(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import com.spotify.concat.v1.ConcatServiceGrpc.{ConcatServiceFutureStub, ConcatS
import com.spotify.concat.v1._
import com.spotify.scio.testing.PipelineSpec
import com.spotify.scio.transforms.BaseAsyncLookupDoFn.CacheSupplier
import com.spotify.scio.transforms.UnmatchedRequestException
import io.grpc.netty.NettyChannelBuilder
import io.grpc.stub.StreamObserver
import io.grpc.{Server, ServerBuilder}
import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.scalatest.BeforeAndAfterAll

import java.net.ServerSocket
import java.util.stream.Collectors
import scala.jdk.CollectionConverters._
import scala.util.{Success, Try}
import scala.util.{Failure, Success, Try}

object GrpcBatchDoFnTest {

Expand Down Expand Up @@ -230,7 +230,7 @@ class GrpcBatchDoFnTest extends PipelineSpec with BeforeAndAfterAll {
}
}

it should "throw an IllegalStateException if gRPC response contains unknown ids" in {
it should "fail unmatched inputs" in {
val input = (0 to 1).map { i =>
ConcatRequestWithID
.newBuilder()
Expand All @@ -240,30 +240,29 @@ class GrpcBatchDoFnTest extends PipelineSpec with BeforeAndAfterAll {
.build()
}

assertThrows[IllegalStateException] {
try {
runWithContext { sc =>
sc.parallelize(input)
.grpcBatchLookup[
BatchRequest,
BatchResponse,
ConcatResponseWithID,
ConcatServiceFutureStub
](
() => NettyChannelBuilder.forTarget(ServiceUri).usePlaintext().build(),
ConcatServiceGrpc.newFutureStub,
2,
concatBatchRequest,
r => r.getResponseList.asScala.toSeq.map(e => ("WrongID-" + e.getRequestId, e)),
idExtractor,
2
)(_.batchConcat)
}
} catch {
case e: PipelineExecutionException =>
e.getMessage should include("Expected requestCount == responseCount")
throw e.getCause
}
val expected: Seq[(ConcatRequestWithID, Try[ConcatResponseWithID])] = input.map { req =>
req -> Failure(new UnmatchedRequestException(req.getRequestId))
}

runWithContext { sc =>
val result = sc
.parallelize(input)
.grpcBatchLookup[
BatchRequest,
BatchResponse,
ConcatResponseWithID,
ConcatServiceFutureStub
](
() => NettyChannelBuilder.forTarget(ServiceUri).usePlaintext().build(),
ConcatServiceGrpc.newFutureStub,
2,
concatBatchRequest,
r => r.getResponseList.asScala.toSeq.map(e => ("WrongID-" + e.getRequestId, e)),
idExtractor,
2
)(_.batchConcat)

result should containInAnyOrder(expected)
}
}
}

0 comments on commit 4427ece

Please sign in to comment.