Skip to content

Commit

Permalink
feat: Add logging to EventUploader
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziodemaria committed Mar 26, 2024
1 parent 97e9862 commit 7a92761
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 7 deletions.
27 changes: 24 additions & 3 deletions src/main/java/com/spotify/confidence/GrpcEventUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.spotify.confidence.events.v1.EventsServiceGrpc;
import com.spotify.confidence.events.v1.PublishEventsRequest;
import io.grpc.ManagedChannel;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;

class GrpcEventUploader implements EventUploader {

Expand All @@ -18,6 +20,8 @@ class GrpcEventUploader implements EventUploader {
private final EventsServiceGrpc.EventsServiceFutureStub stub;
private final Clock clock;

private static final Logger log = org.slf4j.LoggerFactory.getLogger(GrpcEventUploader.class);

GrpcEventUploader(String clientSecret, Clock clock, ManagedChannel managedChannel) {
this.clientSecret = clientSecret;
this.managedChannel = managedChannel;
Expand Down Expand Up @@ -48,11 +52,28 @@ public CompletableFuture<Boolean> upload(EventBatch batch) {

return GrpcUtil.toCompletableFuture(
stub.withDeadlineAfter(5, TimeUnit.SECONDS).publishEvents(request))
.thenApply(publishEventsResponse -> publishEventsResponse.getErrorsCount() == 0)
.thenApply(
publishEventsResponse -> {
final List<Event> eventsInRequest = request.getEventsList();
if (publishEventsResponse.getErrorsCount() == 0) {
log.debug(
String.format("Successfully published %d events", eventsInRequest.size()));
return true;
}
log.error(
String.format(
"Published batch with %d events, of which %d failed. Failed events are of type: %s",
eventsInRequest.size(),
publishEventsResponse.getErrorsCount(),
publishEventsResponse.getErrorsList().stream()
.map(e -> eventsInRequest.get(e.getIndex()).getEventDefinition())
.collect(Collectors.toSet())));
return false;
})
.exceptionally(
(throwable -> {
// TODO update to use some user-configurable logging
throwable.printStackTrace();
log.error(
String.format("Publishing batch failed with reason: %s", throwable.getMessage()));
return false;
}));
}
Expand Down
64 changes: 60 additions & 4 deletions src/test/java/com/spotify/confidence/GrpcEventUploaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
import com.spotify.confidence.events.v1.EventError;
import com.spotify.confidence.events.v1.EventError.Reason;
import com.spotify.confidence.events.v1.EventsServiceGrpc;
import com.spotify.confidence.events.v1.PublishEventsRequest;
import com.spotify.confidence.events.v1.PublishEventsResponse;
Expand Down Expand Up @@ -132,9 +134,46 @@ public void testMapsMultiEventBatchToProtobuf() {
}
}

@Test
public void testMapsMultiEventBatchToProtobufSparseErrors()
throws ExecutionException, InterruptedException {
fakedEventsService.resultType = ResultType.FIRST_EVENT_ERROR;
final EventBatch batch =
new EventBatch(
List.of(
new Event("event1", messageStruct("m1"), contextStruct("c1"), 1337),
new Event("event2", messageStruct("m2"), contextStruct("c2"), 1338),
new Event("event3", messageStruct("m3"), contextStruct("c3"), 1339),
new Event("event4", messageStruct("m4"), contextStruct("c4"), 1340)));
final CompletableFuture<Boolean> completableFuture = uploader.upload(batch);
assertThat(fakedEventsService.requests).hasSize(1);

final PublishEventsRequest request = fakedEventsService.requests.get(0);
assertThat(request.getEventsList()).hasSize(4);

for (int i = 0; i < batch.events().size(); i++) {
final com.spotify.confidence.events.v1.Event protoEvent = request.getEvents(i);
assertThat(protoEvent.getEventDefinition()).isEqualTo("event" + (i + 1));

final Map<String, com.google.protobuf.Value> fieldsMap =
protoEvent.getPayload().getFieldsMap();
assertThat(fieldsMap.get("messageKey").getStringValue()).isEqualTo("value_m" + (i + 1));
assertThat(
fieldsMap
.get(CONTEXT)
.getStructValue()
.getFieldsMap()
.get("contextKey")
.getStringValue())
.isEqualTo("value_c" + (i + 1));
}
final Boolean result = completableFuture.get();
assertThat(result).isFalse();
}

@Test
public void testServiceThrows() throws ExecutionException, InterruptedException {
fakedEventsService.shouldError = true;
fakedEventsService.resultType = ResultType.REQUEST_ERROR;
final EventBatch batch =
new EventBatch(List.of(new Event("event1", messageStruct("1"), contextStruct("1"), 1337)));
final CompletableFuture<Boolean> completableFuture = uploader.upload(batch);
Expand All @@ -151,21 +190,38 @@ private ConfidenceValue.Struct messageStruct(String s) {
return ConfidenceValue.of(ImmutableMap.of("messageKey", ConfidenceValue.of("value_" + s)));
}

private enum ResultType {
REQUEST_ERROR,
FIRST_EVENT_ERROR,
SUCCESS
}

private static class FakedEventsService extends EventsServiceGrpc.EventsServiceImplBase {
public boolean shouldError;

public ResultType resultType;
final List<PublishEventsRequest> requests = new ArrayList<>();

public void clear() {
requests.clear();
shouldError = false;
resultType = ResultType.SUCCESS;
}

@Override
public void publishEvents(
PublishEventsRequest request, StreamObserver<PublishEventsResponse> responseObserver) {
requests.add(request);
if (shouldError) {
if (resultType == ResultType.REQUEST_ERROR) {
responseObserver.onError(new RuntimeException("error"));
} else if (resultType == ResultType.FIRST_EVENT_ERROR) {
responseObserver.onNext(
PublishEventsResponse.newBuilder()
.addErrors(
0,
EventError.newBuilder()
.setReason(Reason.EVENT_SCHEMA_VALIDATION_FAILED)
.build())
.build());
responseObserver.onCompleted();
} else {
responseObserver.onNext(PublishEventsResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down

0 comments on commit 7a92761

Please sign in to comment.