Skip to content

Commit

Permalink
Report exception for unmatched batch request
Browse files Browse the repository at this point in the history
When getting an response for a batch request, return an
UnmatchedRequestException for unmatched requests
  • Loading branch information
RustedBones committed Nov 28, 2024
1 parent cc1e584 commit 5db23e2
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.spotify.scio.transforms;

import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

import com.google.common.cache.Cache;
import com.spotify.scio.transforms.BaseAsyncLookupDoFn.CacheSupplier;
Expand All @@ -26,6 +27,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -274,48 +276,53 @@ private void createRequest() throws InterruptedException {
}

private FutureType handleOutput(FutureType future, List<Input> batchInput, UUID key) {
final Map<String, Input> keyedInputs =
batchInput.stream().collect(Collectors.toMap(idExtractorFn::apply, identity()));
return addCallback(
future,
response -> {
batchResponseFn
.apply(response)
.forEach(
pair -> {
final String id = pair.getLeft();
final Output output = pair.getRight();
final List<ValueInSingleWindow<Input>> processInputs = inputs.remove(id);
if (processInputs == null) {
// no need to fail future here as we're only interested in its completion
// finishBundle will fail the checkState as we do not produce any result
LOG.error(
"The ID '{}' received in the gRPC batch response does not "
+ "match any IDs extracted via the idExtractorFn for the requested "
+ "batch sent to the gRPC endpoint. Please ensure that the IDs returned "
+ "from the gRPC endpoints match the IDs extracted using the provided"
+ "idExtractorFn for the same input.",
id);
} else {
final List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
processInputs.stream()
.map(
processInput -> {
final Input i = processInput.getValue();
final TryWrapper o = success(output);
final Instant ts = processInput.getTimestamp();
final BoundedWindow w = processInput.getWindow();
final PaneInfo p = processInput.getPane();
return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
})
.collect(Collectors.toList());
results.add(Pair.of(key, batchResult));
}
});
final Map<String, Output> keyedOutput =
batchResponseFn.apply(response).stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

keyedInputs.forEach(
(id, input) -> {
final List<ValueInSingleWindow<Input>> processInputs = inputs.remove(id);
if (processInputs == null) {
// no need to fail future here as we're only interested in its completion
// finishBundle will fail the checkState as we do not produce any result
LOG.error(
"The ID '{}' received in the gRPC batch response does not "
+ "match any IDs extracted via the idExtractorFn for the requested "
+ "batch sent to the gRPC endpoint. Please ensure that the IDs returned "
+ "from the gRPC endpoints match the IDs extracted using the provided"
+ "idExtractorFn for the same input.",
id);
} else {
List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
processInputs.stream()
.map(
processInput -> {
final Input i = processInput.getValue();
final Output output = keyedOutput.get(id);
final TryWrapper o =
output == null
? failure(new UnmatchedRequestException(id))
: success(output);
final Instant ts = processInput.getTimestamp();
final BoundedWindow w = processInput.getWindow();
final PaneInfo p = processInput.getPane();
return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
})
.collect(Collectors.toList());
results.add(Pair.of(key, batchResult));
}
});
return null;
},
throwable -> {
batchInput.forEach(
element -> {
final String id = idExtractorFn.apply(element);
keyedInputs.forEach(
(id, element) -> {
final List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
inputs.remove(id).stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.transforms;

public class UnmatchedRequestException extends RuntimeException {
public UnmatchedRequestException(String id) {
super("Unmatched batch request for ID: " + id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ class AsyncBatchLookupDoFnTest extends PipelineSpec {
doFn: BaseAsyncBatchLookupDoFn[Int, List[Int], List[String], String, AsyncBatchClient, F, T]
)(tryFn: T => Try[String]): Unit = {
// batches of size 4 and size 3
val output = runWithData(Seq[Seq[Int]](1 to 4, 8 to 10))(_.flatten.parDo(doFn)).map { kv =>
val output = runWithData(
Seq[Seq[Int]](
1 to 4, // 1 and 3 are unmatched
8 to 10 // failure
)
)(_.flatten.parDo(doFn)).map { kv =>
val r = tryFn(kv.getValue) match {
case Success(v) => v
case Failure(e: CompletionException) => e.getCause.getMessage
Expand All @@ -70,8 +75,9 @@ class AsyncBatchLookupDoFnTest extends PipelineSpec {
(kv.getKey, r)
}
output should contain theSameElementsAs (
(1 to 4).map(x => x -> x.toString) ++
(8 to 10).map(x => x -> "failure for 8,9,10")
Seq(1, 3).map(x => x -> s"Unmatched batch request for ID: $x") ++
Seq(2, 4).map(x => x -> x.toString) ++
Seq(8, 9, 10).map(x => x -> "failure for 8,9,10")
)
}

Expand Down Expand Up @@ -229,7 +235,7 @@ class FailingGuavaBatchLookupDoFn extends AbstractGuavaAsyncBatchLookupDoFn() {
input: List[Int]
): ListenableFuture[List[String]] =
if (input.size % 2 == 0) {
Futures.immediateFuture(input.map(_.toString))
Futures.immediateFuture(input.filter(_ % 2 == 0).map(_.toString))
} else {
Futures.immediateFailedFuture(new RuntimeException("failure for " + input.mkString(",")))
}
Expand Down Expand Up @@ -299,7 +305,7 @@ class FailingJavaBatchLookupDoFn extends AbstractJavaAsyncBatchLookupDoFn() {
input: List[Int]
): CompletableFuture[List[String]] =
if (input.size % 2 == 0) {
CompletableFuture.supplyAsync(() => input.map(_.toString))
CompletableFuture.supplyAsync(() => input.filter(_ % 2 == 0).map(_.toString))
} else {
val f = new CompletableFuture[List[String]]()
f.completeExceptionally(new RuntimeException("failure for " + input.mkString(",")))
Expand Down Expand Up @@ -347,7 +353,7 @@ class FailingScalaBatchLookupDoFn extends AbstractScalaAsyncBatchLookupDoFn() {
override protected def newClient(): AsyncBatchClient = null
override def asyncLookup(session: AsyncBatchClient, input: List[Int]): Future[List[String]] =
if (input.size % 2 == 0) {
Future.successful(input.map(_.toString))
Future.successful(input.filter(_ % 2 == 0).map(_.toString))
} else {
Future.failed(new RuntimeException("failure for " + input.mkString(",")))
}
Expand Down

0 comments on commit 5db23e2

Please sign in to comment.