Skip to content

Commit

Permalink
test: added test for offset based sse
Browse files Browse the repository at this point in the history
  • Loading branch information
mherwig committed Nov 27, 2024
1 parent 40df23f commit 778c637
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.model.db.State;
import de.telekom.eni.pandora.horizon.model.event.DeliveryType;
import de.telekom.eni.pandora.horizon.model.event.Status;
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
Expand All @@ -28,6 +29,8 @@
import org.springframework.test.util.ReflectionTestUtils;

import java.time.Duration;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Optional;

Expand All @@ -48,24 +51,20 @@ void setupEventMessageSupplierTest() {
}

@ParameterizedTest
@ValueSource(ints = {10})
void testGetEventMessageContext(int polls) {
@ValueSource(booleans = {false, true})
void testGetEventMessageContext(boolean isOffsetMode) {
final var objectMapper = new ObjectMapper();

var pageableCaptor = ArgumentCaptor.forClass(Pageable.class);

var messagesCount = 100;
// We create a list of some test state documents similar as we would get from MongoDB
var states = MockHelper.createMessageStateMongoDocumentsForTesting(MockHelper.pulsarConfig.getSseBatchSize(), MockHelper.TEST_ENVIRONMENT, Status.PROCESSED, false);
var states = MockHelper.createMessageStateMongoDocumentsForTesting(messagesCount, MockHelper.TEST_ENVIRONMENT, Status.PROCESSED, false);

// We mock the request to MongoDB and return our dummy state documents instead
// We also capture the pageable argument to check whether is has been used correctly, later
when(MockHelper.messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(eq(List.of(Status.PROCESSED)),
eq(DeliveryType.SERVER_SENT_EVENT),
eq(MockHelper.TEST_SUBSCRIPTION_ID),
pageableCaptor.capture())).thenReturn(new SliceImpl<>(states));
var offsetIndex = 0;

// We create a new SubscriptionEventMessage for testing
var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK, false);
var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK);

// We mock the picked message from Kafka
ConsumerRecord<String, String> record = Mockito.mock(ConsumerRecord.class);
Expand All @@ -78,21 +77,58 @@ void testGetEventMessageContext(int polls) {
fail(e);
}

// EventWriter ha private access and will be created in the constructor of the SseTaskFactory
// We mock the request to MongoDB and return our dummy state documents instead
// We also capture the pageable argument to check whether it has been used correctly, later
if (isOffsetMode) {
offsetIndex = 4;
var offsetMessage = states.get(offsetIndex);

eventMessageSupplier = new EventMessageSupplier(MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.sseTaskFactory, false, offsetMessage.getUuid(), new StreamLimit());

when(MockHelper.messageStateMongoRepo.findById(anyString())).thenAnswer(invocation -> {
String uuid = invocation.getArgument(0);
return states.stream().filter(s -> s.getUuid().equals(uuid)).findAny();
});
when(MockHelper.messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(eq(DeliveryType.SERVER_SENT_EVENT),
eq(MockHelper.TEST_SUBSCRIPTION_ID),
any(Date.class),
pageableCaptor.capture())).thenAnswer(invocation -> {
Date timestamp = invocation.getArgument(2);
var slice = states.stream().filter(s -> s.getTimestamp().toInstant().isAfter(timestamp.toInstant())).sorted(Comparator.comparing(State::getTimestamp)).limit(MockHelper.pulsarConfig.getSseBatchSize()).toList();
return new SliceImpl<>(slice);
});
} else {
when(MockHelper.messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(eq(List.of(Status.PROCESSED)),
eq(DeliveryType.SERVER_SENT_EVENT),
eq(MockHelper.TEST_SUBSCRIPTION_ID),
pageableCaptor.capture())).thenAnswer(invocation -> {
var slice = states.stream().filter(a -> a.getStatus() == Status.PROCESSED).sorted(Comparator.comparing(State::getTimestamp)).limit(MockHelper.pulsarConfig.getSseBatchSize()).toList();
return new SliceImpl<>(slice);
});
}

// EventWriter has private access and will be created in the constructor of the SseTaskFactory
// Since we want to check invocations with it, we will overwrite the EventWriter in EventMessageSupplier via reflections
var eventWriterMock = mock(EventWriter.class);
ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class);

var startAt = 0;
if (isOffsetMode) {
startAt = offsetIndex +1;
}
// We do multiple calls to EventMessageSupplier.get() in order to test
// that each call will fetch the next event message in the queue
for (int i = 0; i < polls; i++) {
for(int i = startAt; i < states.size(); i++) {
// We mock the actual picking of a message from Kafka here
when(MockHelper.kafkaTemplate.receive(eq(MockHelper.TEST_TOPIC), eq(states.get(i).getCoordinates().partition()), eq(states.get(i).getCoordinates().offset()), eq(Duration.ofMillis(30000)))).thenReturn(record);

// PUBLIC METHOD WE WANT TO TEST
var result = eventMessageSupplier.get();
assertNotNull(result);

// mock Vortex setting event to DELIVERED
states.get(i).setStatus(Status.DELIVERED);

// Check that we fetch batches from MongoDB correctly
var pageable = pageableCaptor.getValue();
assertEquals(0, pageable.getOffset());
Expand Down Expand Up @@ -126,7 +162,7 @@ void testGetEventMessageContextWithSubscriberDoesNotMatchSubscriptionException()
// We create a new SubscriptionEventMessage for testing,
// and we overwrite the subscriptionId so that it doesn't match the standard testing subscriptionId
// in the state messages anymore
var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK, false);
var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK);
subscriptionEventMessage.setSubscriptionId("something different");

// We mock the picked message from Kafka
Expand Down
18 changes: 9 additions & 9 deletions src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ void setupSseServiceTest() {
}

@Test
void testRun() throws IOException, InterruptedException {
void testRun() throws IOException {
final var o = new ObjectMapper();
// For this test we set the timeout to 10s to simulate the automatic timeout if there are no new events flowing
final var timeout = 10000L;
when(MockHelper.pulsarConfig.getSseTimeout()).thenReturn(timeout);

// We create a new SubscriptionEventMessage queue and add some test messages
var itemQueue = new ConcurrentLinkedQueue<SubscriptionEventMessage>();
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false));
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false));
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT));
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT));

final var itemQueueInitialSize = itemQueue.size();

Expand Down Expand Up @@ -191,7 +191,7 @@ void testTerminateConnection() throws InterruptedException, IOException {
// an endless stream
when(eventMessageSupplierMock.get()).thenAnswer(i -> {
await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
});
when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID);
// The following checks that we track a DELIVERED event with the de-duplication service
Expand Down Expand Up @@ -249,7 +249,7 @@ void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException {
// an endless stream
when(eventMessageSupplierMock.get()).thenAnswer(i -> {
await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
});
when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID);
// The following checks that we track a DELIVERED event with the de-duplication service
Expand Down Expand Up @@ -304,7 +304,7 @@ void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException {
// an endless stream
when(eventMessageSupplierMock.get()).thenAnswer(i -> {
await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
});
when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID);
// The following checks that we track a DELIVERED event with the de-duplication service
Expand Down Expand Up @@ -366,7 +366,7 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException {
when(eventMessageSupplierMock.get()).thenAnswer(i -> {
await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true));

return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class));
});
when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID);
// The following checks that we track a DELIVERED event with the de-duplication service
Expand Down Expand Up @@ -397,8 +397,8 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException {
void testSendEventFailed() throws IOException {
// We create a new SubscriptionEventMessage queue and add some test messages
var itemQueue = new ConcurrentLinkedQueue<SubscriptionEventMessage>();
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false));
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false));
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT));
itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT));

final var itemQueueInitialSize = itemQueue.size();

Expand Down
19 changes: 8 additions & 11 deletions src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -64,12 +61,7 @@ public class MockHelper {
public static ResponseBodyEmitter emitter;
public static TokenService tokenService;
public static HorizonMetricsHelper metricsHelper;

public static DeDuplicationService deDuplicationService;



public static String TEST_EVENT_ID = "abc123-def456-ghi789";
public static String TEST_ENVIRONMENT = "bond";
public static String TEST_SUBSCRIPTION_ID = "1-2-3";
public static String TEST_SUBSCRIBER_ID = "eni-pan-dora";
Expand Down Expand Up @@ -105,14 +97,14 @@ public static void init() {
sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, metricsHelper, tracingHelper);
}

public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType, boolean withAdditionalFields) {
public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType) {
var subscriptionEventMessageForTesting = new SubscriptionEventMessage();

var event = new Event();
event.setId(RandomStringUtils.random(12, true, true));
event.setData(Map.of("message", "foobar"));

subscriptionEventMessageForTesting.setUuid(TEST_EVENT_ID);
subscriptionEventMessageForTesting.setUuid(UUID.randomUUID().toString());
subscriptionEventMessageForTesting.setEvent(event);
subscriptionEventMessageForTesting.setEnvironment(TEST_ENVIRONMENT);
subscriptionEventMessageForTesting.setSubscriptionId(TEST_SUBSCRIPTION_ID);
Expand Down Expand Up @@ -173,6 +165,11 @@ private static State createStateForTesting(String environment, Status status, bo
public static List<State> createStatesForTesting (int count, String environment, Status status, boolean randomSubscriptionId) {
var states = new ArrayList<State>();
for (int i = 0; i < count; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
states.add(createStateForTesting(environment, status, randomSubscriptionId));
}
return states;
Expand Down

0 comments on commit 778c637

Please sign in to comment.