Skip to content

Commit

Permalink
refactor: Add message container to payload (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziodemaria authored Apr 23, 2024
1 parent 394e377 commit 176dec1
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 19 deletions.
8 changes: 4 additions & 4 deletions src/main/java/com/spotify/confidence/Confidence.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ public Confidence withContext(Map<String, ConfidenceValue> context) {
}

@Override
public void send(String name) {
public void send(String eventName) {
try {
client().send(name, getContext(), Optional.empty());
client().send(eventName, getContext(), Optional.empty());
} catch (IllegalStateException e) {
// swallow this exception
}
}

@Override
public void send(String name, ConfidenceValue.Struct message) {
public void send(String eventName, ConfidenceValue.Struct message) {
try {
client().send(name, getContext(), Optional.of(message));
client().send(eventName, getContext(), Optional.of(message));
} catch (IllegalStateException e) {
// swallow this exception
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/spotify/confidence/EventSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

@Beta
public interface EventSender extends Contextual {
public void send(String name, ConfidenceValue.Struct message);
public void send(String eventName, ConfidenceValue.Struct message);

public void send(String name);
public void send(String eventName);

@Override
EventSender withContext(ConfidenceValue.Struct context);
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/spotify/confidence/EventUploader.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.spotify.confidence;

import static com.spotify.confidence.GrpcEventUploader.CONTEXT;

import com.google.protobuf.Struct;
import com.spotify.confidence.events.v1.Event;
import java.util.List;
Expand All @@ -15,8 +13,8 @@ static Event.Builder event(
.setEventDefinition(EventSenderEngineImpl.EVENT_NAME_PREFIX + name)
.setPayload(
Struct.newBuilder()
.putAllFields(message.orElse(ConfidenceValue.Struct.EMPTY).asProtoMap())
.putFields(CONTEXT, context.toProto()));
.putAllFields(context.asProtoMap())
.putFields("message", message.orElse(ConfidenceValue.Struct.EMPTY).toProto()));
}

CompletableFuture<Boolean> upload(List<Event> events);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class GrpcEventUploader implements EventUploader {
Status.Code.ABORTED,
Status.Code.INTERNAL,
Status.Code.DATA_LOSS);
static final String CONTEXT = "context";
private final String clientSecret;
private final Sdk sdk;
private final ManagedChannel managedChannel;
Expand Down
53 changes: 52 additions & 1 deletion src/test/java/com/spotify/confidence/EventSenderEngineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -81,6 +82,48 @@ public void testEngineUploads() throws IOException {
assertThat(uploadCallsCount - fullBatchCount).isEqualTo(additionalBatch);
}

@Test
public void testOverlappingKeysInPayload() throws InterruptedException {
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
final EventSenderEngine engine =
new EventSenderEngineImpl(
1,
alwaysSucceedUploader,
clock,
DEFAULT_MAX_FLUSH_INTERVAL,
DEFAULT_MAX_MEMORY_CONSUMPTION);
// wait for the flush timeout to trigger the upload
engine.send(
"my_event",
ConfidenceValue.of(
ImmutableMap.of(
"a", ConfidenceValue.of(2),
"message", ConfidenceValue.of(3))),
Optional.of(
ConfidenceValue.Struct.of(
Map.of(
"a", ConfidenceValue.of(0),
"message", ConfidenceValue.of(1)))));
Thread.sleep(300);
assertThat(alwaysSucceedUploader.uploadCalls.peek().get(0).getPayload())
.isEqualTo(
Struct.newBuilder()
.putAllFields(
Map.of(
"message",
Value.newBuilder()
.setStructValue(
Struct.newBuilder()
.putFields(
"a", Value.newBuilder().setNumberValue(0).build())
.putFields(
"message",
Value.newBuilder().setNumberValue(1).build()))
.build(),
"a", Value.newBuilder().setNumberValue(2).build()))
.build());
}

@Test
public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException {
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
Expand Down Expand Up @@ -156,7 +199,15 @@ public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException
final Set<Value> uniqueEventIds =
fakeUploader.uploadCalls.stream()
.flatMap(Collection::stream)
.map(event -> event.getPayload().getFieldsMap().get("id"))
.map(
event ->
event
.getPayload()
.getFieldsMap()
.get("message")
.getStructValue()
.getFieldsMap()
.get("id"))
.collect(Collectors.toSet());
// Verify all events reached the uploader
assertThat(uniqueEventIds.size()).isEqualTo(numEvents);
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/com/spotify/confidence/GrpcEventUploaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ public void testMapsSingleEventBatchToProtobuf() throws ExecutionException, Inte
assertThat(protoEvent.getEventDefinition()).isEqualTo("eventDefinitions/event1");

final Map<String, com.google.protobuf.Value> fieldsMap = protoEvent.getPayload().getFieldsMap();
assertThat(fieldsMap.get("messageKey").getStringValue()).isEqualTo("value_1");
assertThat(
fieldsMap
.get(CONTEXT)
.get("message")
.getStructValue()
.getFieldsMap()
.get("contextKey")
.get("messageKey")
.getStringValue())
.isEqualTo("value_1");
assertThat(fieldsMap.get("contextKey").getStringValue()).isEqualTo("value_1");
}

@Test
Expand Down Expand Up @@ -136,15 +136,15 @@ public void testMapsMultiEventBatchToProtobuf() {

final Map<String, com.google.protobuf.Value> fieldsMap =
protoEvent.getPayload().getFieldsMap();
assertThat(fieldsMap.get("messageKey").getStringValue()).isEqualTo("value_m" + (i + 1));
assertThat(
fieldsMap
.get(CONTEXT)
.get("message")
.getStructValue()
.getFieldsMap()
.get("contextKey")
.get("messageKey")
.getStringValue())
.isEqualTo("value_c" + (i + 1));
.isEqualTo("value_m" + (i + 1));
assertThat(fieldsMap.get("contextKey").getStringValue()).isEqualTo("value_c" + (i + 1));
}
}

Expand Down

0 comments on commit 176dec1

Please sign in to comment.