Skip to content

Commit

Permalink
Added limit of messages (#4)
Browse files Browse the repository at this point in the history
Co-authored-by: bartlomiej.zylinski <[email protected]>
  • Loading branch information
Pask423 and bartlomiej.zylinski authored Oct 9, 2024
1 parent eee18fb commit 06f5fa3
Show file tree
Hide file tree
Showing 15 changed files with 969 additions and 212 deletions.
598 changes: 598 additions & 0 deletions intellij-java-google-style.xml

Large diffs are not rendered by default.

37 changes: 20 additions & 17 deletions src/main/java/otter/jet/reader/ReaderConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,32 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import otter.jet.store.MessageStore;

@Configuration
@EnableConfigurationProperties(ReaderConfigurationProperties.class)
class ReaderConfiguration {

private final ReaderConfigurationProperties readerConfigurationProperties;
private final ReaderConfigurationProperties readerConfigurationProperties;

ReaderConfiguration(ReaderConfigurationProperties readerConfigurationProperties) {
this.readerConfigurationProperties = readerConfigurationProperties;
}
ReaderConfiguration(ReaderConfigurationProperties readerConfigurationProperties) {
this.readerConfigurationProperties = readerConfigurationProperties;
}

@Bean
public ReaderService readerService(
@Value("${nats.server.host}") String natsServerHost,
@Value("${nats.server.port}") String natsServerPort,
MessageDeserializer messageDeserializer) {
return new ReaderService(
createNatsServerUrl(natsServerHost, natsServerPort),
messageDeserializer,
readerConfigurationProperties.getSubject());
}
@Bean
public ReaderService readerService(
@Value("${nats.server.host}") String natsServerHost,
@Value("${nats.server.port}") String natsServerPort,
MessageDeserializer messageDeserializer,
MessageStore messageStore) {
return new ReaderService(
createNatsServerUrl(natsServerHost, natsServerPort),
messageDeserializer,
readerConfigurationProperties.getSubject(),
messageStore);
}

private String createNatsServerUrl(String natsServerHost, String natsServerPort) {
return "nats://" + natsServerHost + ":" + natsServerPort;
}
private String createNatsServerUrl(String natsServerHost, String natsServerPort) {
return "nats://" + natsServerHost + ":" + natsServerPort;
}
}
194 changes: 82 additions & 112 deletions src/main/java/otter/jet/reader/ReaderService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,138 +6,108 @@
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Subscription;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import otter.jet.store.MessageStore;

public class ReaderService {

private static final Logger LOG = LoggerFactory.getLogger(ReaderService.class);

private final String natsServerUrl;
private final MessageDeserializer messageDeserializer;
private final String subject;

private final Executor executorService = Executors.newSingleThreadExecutor();
private static final Logger LOG = LoggerFactory.getLogger(ReaderService.class);
private static final String NO_MATCHING_STREAM_CODE = "SUB-90007";

ArrayDeque<ReadMessage> msgs = new ArrayDeque<>();
private final String natsServerUrl;
private final MessageDeserializer messageDeserializer;
private final String subject;
private final MessageStore messageStore;

public ReaderService(
String natsServerUrl, MessageDeserializer messageDeserializer, String subject) {
this.natsServerUrl = natsServerUrl;
this.messageDeserializer = messageDeserializer;
this.subject = subject;
}
private final Executor executorService = Executors.newSingleThreadExecutor();

@EventListener(ApplicationReadyEvent.class)
public void startReadingMessages() {
// This method will be invoked after the service is initialized
startMessageListener();
}
public ReaderService(String natsServerUrl,
MessageDeserializer messageDeserializer,
String subject,
MessageStore messageStore) {
this.natsServerUrl = natsServerUrl;
this.messageDeserializer = messageDeserializer;
this.subject = subject;
this.messageStore = messageStore;
}

private void startMessageListener() {
executorService.execute(
() -> {
try {
// Connect to NATS server
try (Connection natsConnection = Nats.connect(natsServerUrl)) {
LOG.info("Connected to NATS server at: {}", natsServerUrl);
@EventListener(ApplicationReadyEvent.class)
public void startReadingMessages() {
// This method will be invoked after the service is initialized
startMessageListener();
}

JetStream jetStream = natsConnection.jetStream();
LOG.info("Connected to JetStream server at: {}", natsServerUrl);
// Subscribe to the subject
private void startMessageListener() {
executorService.execute(
() -> {
// Connect to NATS server
try (Connection natsConnection = Nats.connect(natsServerUrl)) {
LOG.info("Connected to NATS server at: {}", natsServerUrl);

JetStream jetStream = natsConnection.jetStream();
LOG.info("Connected to JetStream server at: {}", natsServerUrl);
// Subscribe to the subject

Subscription subscription = tryToSubscribe(jetStream);
LOG.info("Subscribed to subject: {}", natsServerUrl);

continuouslyReadMessages(subscription, messageDeserializer);
} catch (Exception e) {
LOG.error("Error during message reading: ", e);
}
});
}

Subscription subscription = tryToSubscribe(jetStream);
LOG.info("Subscribed to subject: {}", natsServerUrl);
private Subscription tryToSubscribe(JetStream jetStream)
throws IOException, JetStreamApiException, InterruptedException {

continuouslyReadMessages(subscription, messageDeserializer);
}

} catch (Exception e) {
LOG.error("Error during message reading: ", e);
}
});
}

private Subscription tryToSubscribe(JetStream jetStream)
throws IOException, JetStreamApiException, InterruptedException {

Subscription subscription;
try {
subscription = jetStream.subscribe(subject);
} catch (IllegalStateException e) {
if (e.getMessage().contains("SUB-90007")) { // No matching streams for subject
// try again after 5 seconds
LOG.warn(
"Unable to subscribe to subject: "
+ subject
+ " . No matching streams. Trying again in 5sec...");
Thread.sleep(5000);
return tryToSubscribe(jetStream);
}
throw new RuntimeException(e);
}
return subscription;
}

private void continuouslyReadMessages(
Subscription subscription, MessageDeserializer messageDeserializer)
throws InterruptedException {
while (true) {
// Wait for a message
Message message = subscription.nextMessage(100);
// Print the message
if (message != null) {
try {
DeserializedMessage deserializedMessage =
messageDeserializer.deserializeMessage(ByteBuffer.wrap(message.getData()));
ReadMessage msg =
new ReadMessage(
message.getSubject(),
deserializedMessage.name(),
deserializedMessage.content(),
message.metaData().timestamp().toLocalDateTime());
msgs.addFirst(msg);
message.ack();
} catch (Exception e) {
LOG.warn("Unable to deserialize message", e);
return jetStream.subscribe(subject);
} catch (IllegalStateException e) {
if (e.getMessage().contains(NO_MATCHING_STREAM_CODE)) { // No matching streams for subject
// try again after 5 seconds
LOG.warn(
"Unable to subscribe to subject: "
+ subject
+ " . No matching streams. Trying again in 5sec...");
Thread.sleep(5000);
return tryToSubscribe(jetStream);
}
throw new RuntimeException(e);
}
}
}
}

public List<ReadMessage> filter(String subject, String type, int page, int size, String bodyContent) {
return msgs.stream()
.filter(
m -> {
if (!subject.isBlank()) {
return m.subject().contains(subject);
}
return true;
})
.filter(
m -> {
if (!type.isBlank()) {
return m.name().contains(type);
}
return true;
})
.filter(
m -> {
if (!bodyContent.isBlank()) {
return m.body().contains(bodyContent);

private void continuouslyReadMessages(
Subscription subscription, MessageDeserializer messageDeserializer) throws InterruptedException {
while (true) {
// Wait for a message
Message message = subscription.nextMessage(100);
// Print the message
if (message != null) {
try {
DeserializedMessage deserializedMessage =
messageDeserializer.deserializeMessage(ByteBuffer.wrap(message.getData()));
ReadMessage msg =
new ReadMessage(
message.getSubject(),
deserializedMessage.name(),
deserializedMessage.content(),
message.metaData().timestamp().toLocalDateTime());
messageStore.add(msg);
message.ack();
} catch (Exception e) {
LOG.warn("Unable to deserialize message", e);
}
return true;
})
.skip((long) page * size)
.limit(size)
.toList();
}
}
}
}
}
28 changes: 13 additions & 15 deletions src/main/java/otter/jet/rest/MsgsController.java
Original file line number Diff line number Diff line change
@@ -1,46 +1,44 @@
package otter.jet.rest;

import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import otter.jet.store.MessageStore;
import otter.jet.reader.ReadMessage;
import otter.jet.reader.ReaderService;
import otter.jet.store.Filters;

@Controller
public class MsgsController {

private static final String TEMPLATE_NAME = "msgs-page";
private static final Logger LOG = LoggerFactory.getLogger(MsgsController.class);

private final ReaderService readerService;
private final MessageStore messageStore;

public MsgsController(ReaderService readerService) {
this.readerService = readerService;
public MsgsController(MessageStore messageStore) {
this.messageStore = messageStore;
}

@GetMapping("/msgs")
public String page(
@RequestParam(value = "subject", required = false) String subject,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "bodyContent", required = false) String bodyContent,
@RequestParam(value = "subject", required = false, defaultValue = "") String subject,
@RequestParam(value = "type", required = false, defaultValue = "") String type,
@RequestParam(value = "bodyContent", required = false, defaultValue = "") String bodyContent,
@RequestParam(value = "page", defaultValue = "0") int page,
@RequestParam(value = "size", defaultValue = "10") int size,
Model model) {
String subjectFilter = Optional.ofNullable(subject).orElse("");
String typeFilter = Optional.ofNullable(type).orElse("");
String bodyContentFilter = Optional.ofNullable(bodyContent).orElse("");
List<ReadMessage> filteredMessages = readerService.filter(subjectFilter, typeFilter, page, size, bodyContentFilter);
Filters filters = Filters.of(subject, type, bodyContent);
List<ReadMessage> filteredMessages = messageStore.filter(filters, page, size);
LOG.info("amount of read messages: " + filteredMessages.size());
model.addAttribute("messages", filteredMessages);
model.addAttribute("subject", subjectFilter);
model.addAttribute("type", typeFilter);
model.addAttribute("bodyContent", bodyContentFilter);
model.addAttribute("subject", subject);
model.addAttribute("type", type);
model.addAttribute("bodyContent", bodyContent);
model.addAttribute("page", page);
model.addAttribute("size", size);
return TEMPLATE_NAME;
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/otter/jet/store/DefaultMessageStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package otter.jet.store;

import org.springframework.beans.factory.annotation.Value;
import otter.jet.reader.ReadMessage;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.function.Predicate;

public class DefaultMessageStore implements MessageStore {

private final Deque<ReadMessage> messages = new ArrayDeque<>();
private final int limit;

public DefaultMessageStore(@Value("${read.store.limit:1000}") int limit) {
this.limit = limit;
}

public void add(ReadMessage message) {
if (messages.size() >= limit) {
messages.removeLast();
}
messages.addFirst(message);
}

public List<ReadMessage> filter(Filters filters, int page, int size) {
return filter(filters.toPredicate(), page, size);
}

private List<ReadMessage> filter(Predicate<ReadMessage> predicate, int page, int size) {
return messages.stream()
.filter(predicate)
.skip((long) page * size)
.limit(size)
.toList();
}



}
Loading

0 comments on commit 06f5fa3

Please sign in to comment.