Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Hierarchical Actor support in actor bundle #79

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.appform.dropwizard.actors</groupId>
<artifactId>dropwizard-rabbitmq-actors</artifactId>
<version>2.0.28-12</version>
<version>2.0.28-12-MULTI-TENANT</version>
<name>Dropwizard RabbitMQ Bundle</name>
<url>https://github.com/santanusinha/dropwizard-rabbitmq-actors</url>
<description>Provides actor abstraction on RabbitMQ for dropwizard based projects.</description>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.appform.dropwizard.actors.base;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import io.appform.dropwizard.actors.actor.ActorConfig;
Expand Down Expand Up @@ -113,4 +114,9 @@ private String getConsumerTag(int consumerIndex) {
.map(tagPrefix -> tagPrefix + "_" + consumerIndex)
.orElse(StringUtils.EMPTY);
}

@VisibleForTesting
public String queueName() {
return queueName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.appform.dropwizard.actors.common.Constants.MESSAGE_PUBLISHED_TEXT;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -271,4 +272,9 @@ protected final ObjectMapper mapper() {
private String getRoutingKey() {
return config.isSharded() ? NamingUtils.getShardedQueueName(queueName, getShardId()) : queueName;
}

@VisibleForTesting
public String queueName() {
return queueName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.appform.dropwizard.actors.router;

import io.appform.dropwizard.actors.router.tree.key.HierarchicalRoutingKey;

public interface HierarchicalMessageRouter<Message> {

void start() throws Exception;

void stop() throws Exception;

void submit(final HierarchicalRoutingKey<String> routingKey, final Message message);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.appform.dropwizard.actors.router;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.appform.dropwizard.actors.ConnectionRegistry;
import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.common.ErrorCode;
import io.appform.dropwizard.actors.common.RabbitmqActorException;
import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import io.appform.dropwizard.actors.router.config.HierarchicalOperationWorkerConfig;
import io.appform.dropwizard.actors.router.tree.HierarchicalDataStoreSupplierTree;
import io.appform.dropwizard.actors.router.tree.HierarchicalTreeConfig;
import io.appform.dropwizard.actors.router.tree.key.HierarchicalRoutingKey;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.util.Set;

@Slf4j
@SuppressWarnings({"java:S119"})
public abstract class HierarchicalOperationRouter<MessageType extends Enum<MessageType>, Message> implements HierarchicalMessageRouter<Message> {

private final MessageType messageType;
private final HierarchicalTreeConfig<ActorConfig, String, HierarchicalOperationWorkerConfig> hierarchicalTreeConfig;
private final ConnectionRegistry connectionRegistry;
private final ObjectMapper mapper;
private final RetryStrategyFactory retryStrategyFactory;
private final ExceptionHandlingFactory exceptionHandlingFactory;
private final Class<? extends Message> clazz;
private final Set<Class<?>> droppedExceptionTypes;

@Getter
private HierarchicalDataStoreSupplierTree<
HierarchicalOperationWorkerConfig,
ActorConfig,
MessageType,
HierarchicalOperationWorker<MessageType, ? extends Message>> worker;

protected HierarchicalOperationRouter(final MessageType messageType,
final HierarchicalTreeConfig<ActorConfig, String, HierarchicalOperationWorkerConfig> hierarchicalTreeConfig,
final ConnectionRegistry connectionRegistry,
final ObjectMapper mapper,
final RetryStrategyFactory retryStrategyFactory,
final ExceptionHandlingFactory exceptionHandlingFactory,
final Class<? extends Message> clazz,
final Set<Class<?>> droppedExceptionTypes) {
this.messageType = messageType;
this.hierarchicalTreeConfig = hierarchicalTreeConfig;
this.connectionRegistry = connectionRegistry;
this.mapper = mapper;
this.retryStrategyFactory = retryStrategyFactory;
this.exceptionHandlingFactory = exceptionHandlingFactory;
this.clazz = clazz;
this.droppedExceptionTypes = droppedExceptionTypes;
}

public void initializeRouter() {
this.worker = new HierarchicalDataStoreSupplierTree<>(
messageType,
hierarchicalTreeConfig,
HierarchicalRouterUtils.actorConfigToWorkerConfigFunc,
(routingKey, messageTypeKey, workerConfig) ->
new HierarchicalOperationWorker<MessageType, Message>(
messageType,
workerConfig,
hierarchicalTreeConfig.getDefaultData(),
routingKey,
connectionRegistry,
mapper,
retryStrategyFactory,
exceptionHandlingFactory,
clazz,
droppedExceptionTypes,
this::process)
);
}


@Override
public void start() throws Exception {
log.info("Initializing Router");
initializeRouter();
log.info("Staring all workers");
worker.traverse(hierarchicalOperationWorker -> {
try {
log.info("Starting worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
hierarchicalOperationWorker.start();
} catch (Exception e) {
log.error("Unable to start worker: {}", hierarchicalOperationWorker);
val errorMessage = "Unable to start worker: " + hierarchicalOperationWorker.getType();
throw new RabbitmqActorException(ErrorCode.INTERNAL_ERROR, errorMessage, e);
}
});
}

@Override
public void stop() throws Exception {
log.info("Stopping all workers");
worker.traverse(hierarchicalOperationWorker -> {
try {
log.info("Stopping worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
hierarchicalOperationWorker.stop();
} catch (Exception e) {
log.error("Unable to stop worker: {}", hierarchicalOperationWorker);
val errorMessage = "Unable to stop worker: " + hierarchicalOperationWorker.getType();
throw new RabbitmqActorException(ErrorCode.INTERNAL_ERROR, errorMessage, e);
}
});
}

@Override
public void submit(final HierarchicalRoutingKey<String> routingKey,
final Message message) {
try {
HierarchicalOperationWorker<MessageType, Message> worker = (HierarchicalOperationWorker<MessageType, Message>) this.worker.get(messageType, routingKey);
log.info("Publishing message:{} to worker: {} ({})", message,
worker.getClass().getSimpleName(), worker.getRoutingKey().getRoutingKey());
worker.publish(message);
} catch (Exception e) {
log.error("Unable to submit message to worker : {} {}", routingKey, message);
val errorMessage = "Unable to submit message to worker " + routingKey;
throw new RabbitmqActorException(ErrorCode.INTERNAL_ERROR, errorMessage, e);
}
}

public abstract boolean process(final Message message);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.appform.dropwizard.actors.router;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.appform.dropwizard.actors.ConnectionRegistry;
import io.appform.dropwizard.actors.actor.Actor;
import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import io.appform.dropwizard.actors.router.config.HierarchicalOperationWorkerConfig;
import io.appform.dropwizard.actors.router.tree.key.RoutingKey;
import lombok.Getter;

import java.util.Set;
import java.util.function.Consumer;

@Getter
@SuppressWarnings({"java:S119"})
public class HierarchicalOperationWorker<MessageType extends Enum<MessageType>, Message>
extends Actor<MessageType, Message> {

private final RoutingKey routingKey;
private final Consumer<Message> messageConsumer;

protected HierarchicalOperationWorker(final MessageType messageType,
final HierarchicalOperationWorkerConfig workerConfig,
final ActorConfig actorConfig,
final RoutingKey routingKey,
final ConnectionRegistry connectionRegistry,
final ObjectMapper mapper,
final RetryStrategyFactory retryStrategyFactory,
final ExceptionHandlingFactory exceptionHandlingFactory,
final Class<? extends Message> clazz,
final Set<Class<?>> droppedExceptionTypes,
final Consumer<Message> messageConsumer) {
super(messageType,
HierarchicalRouterUtils.toActorConfig(messageType, routingKey, workerConfig, actorConfig),
connectionRegistry,
mapper,
retryStrategyFactory,
exceptionHandlingFactory,
clazz,
droppedExceptionTypes);
this.routingKey = routingKey;
this.messageConsumer = messageConsumer;
}

@Override
protected boolean handle(Message message, MessageMetadata messageMetadata) {
return process(message);
}

@Override
protected boolean handle(Message message) {
return process(message);
}

private boolean process(Message message) {
try {
messageConsumer.accept(message);
return true;
} catch (Exception e) {
return false;
}
}

}
Loading