extends Managed {
+
+ /**
+ * This method is used to publish message with provided delay in milliseconds
+ *
+ * @param message data to be published in queue
+ * @param delayMilliseconds param to provide delay value
+ * @throws Exception
+ */
+ void publishWithDelay(final Message message,
+ final long delayMilliseconds) throws Exception;
+
+ /**
+ * This method is used to publish message with provided expiry in milliseconds, message will be auto-expired post expiryMs crosses
+ *
+ * @param message data to be published in queue
+ * @param expiryInMs param to provide expiration time of message
+ * @throws Exception
+ */
+ void publishWithExpiry(final Message message,
+ final long expiryInMs) throws Exception;
+
+ /**
+ * This method is used to publish message in queue
+ *
+ * @param message data to be published in queue
+ * @throws Exception
+ */
+ void publish(final Message message) throws Exception;
+
+ /**
+ * This method is used to publish message in queue with additional properties of AMQP
+ *
+ * @param message data to be published in queue
+ * @param properties map of amqp properties
+ * @throws Exception
+ */
+ void publish(final Message message,
+ final AMQP.BasicProperties properties) throws Exception;
+
+ /**
+ * This method provides count of pending messages in queue
+ *
+ * @return count of message pending in main queue
+ */
+ long pendingMessagesCount();
+
+ /**
+ * This method provides count of pending messages in sidelined queue
+ *
+ * @return count of message pending in sidelined queue
+ */
+ long pendingSidelineMessagesCount();
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalActor.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalActor.java
new file mode 100644
index 00000000..10e5ac9b
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalActor.java
@@ -0,0 +1,36 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.appform.dropwizard.actors.ConnectionRegistry;
+import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
+import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Set;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+@Slf4j
+public abstract class HierarchicalActor, Message> extends HierarchicalBaseActor {
+
+ private MessageType type;
+
+ protected HierarchicalActor(
+ MessageType type,
+ HierarchicalActorConfig config,
+ ConnectionRegistry connectionRegistry,
+ ObjectMapper mapper,
+ RetryStrategyFactory retryStrategyFactory,
+ ExceptionHandlingFactory exceptionHandlingFactory,
+ Class extends Message> clazz,
+ Set> droppedExceptionTypes) {
+ super(type, config, connectionRegistry, mapper, retryStrategyFactory, exceptionHandlingFactory,
+ clazz, droppedExceptionTypes);
+ this.type = type;
+ }
+
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalActorConfig.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalActorConfig.java
new file mode 100644
index 00000000..197a524a
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalActorConfig.java
@@ -0,0 +1,27 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import io.appform.dropwizard.actors.actor.ActorConfig;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.HierarchicalDataStoreTreeNode;
+import lombok.*;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@EqualsAndHashCode
+@ToString
+@AllArgsConstructor
+@NoArgsConstructor
+public class HierarchicalActorConfig extends ActorConfig {
+
+ /**
+ * This param will reused all Parent Level ActorConfig while creating all child actors,
+ * if marked as false, every children will need tp provide Actor config specific to child
+ *
+ */
+ private boolean useParentConfigInWorker = true;
+
+ @JsonUnwrapped
+ private HierarchicalDataStoreTreeNode children;
+
+}
\ No newline at end of file
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalBaseActor.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalBaseActor.java
new file mode 100644
index 00000000..8c4464ed
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalBaseActor.java
@@ -0,0 +1,142 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP;
+import io.appform.dropwizard.actors.ConnectionRegistry;
+import io.appform.dropwizard.actors.actor.MessageMetadata;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.HierarchicalRoutingKey;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.RoutingKey;
+import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
+import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import lombok.val;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Set;
+
+/**
+ * This is a managed wrapper for {@link HierarchicalUnmanagedBaseActor} this is managed and therefore started by D/W.
+ * *
+ *
+ * @param
+ * @param
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+@Slf4j
+public abstract class HierarchicalBaseActor, Message> implements IHierarchicalBaseActor {
+
+ public static final RoutingKey EMPTY_ROUTING_KEY = RoutingKey.builder().build();
+ private final HierarchicalUnmanagedBaseActor actorImpl;
+
+ protected HierarchicalBaseActor(
+ MessageType messageType,
+ HierarchicalActorConfig hierarchicalActorConfig,
+ ConnectionRegistry connectionRegistry,
+ ObjectMapper mapper,
+ RetryStrategyFactory retryStrategyFactory,
+ ExceptionHandlingFactory exceptionHandlingFactory,
+ Class extends Message> clazz,
+ Set> droppedExceptionTypes) {
+ Set> droppedExceptionTypeSet = null == droppedExceptionTypes
+ ? Collections.emptySet() : droppedExceptionTypes;
+ actorImpl = new HierarchicalUnmanagedBaseActor<>(messageType, hierarchicalActorConfig, connectionRegistry, mapper, retryStrategyFactory,
+ exceptionHandlingFactory, clazz, droppedExceptionTypeSet,
+ this::handle,
+ this::handleExpiredMessages);
+ }
+
+ /*
+ Override this method in your code for custom implementation.
+ */
+ protected boolean handle(Message message, MessageMetadata messageMetadata) throws Exception {
+ throw new UnsupportedOperationException("Implement this method");
+ }
+
+ /*
+ Override this method in your code in case you want to handle the expired messages separately
+ */
+ protected boolean handleExpiredMessages(Message message, MessageMetadata messageMetadata) throws Exception {
+ return true;
+ }
+
+
+ @Override
+ public final void publishWithDelay(final Message message,
+ final long delayMilliseconds) throws Exception {
+ publishWithDelay(EMPTY_ROUTING_KEY, message, delayMilliseconds);
+ }
+
+ @Override
+ public final void publishWithDelay(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final long delayMilliseconds) throws Exception {
+ actorImpl.publishWithDelay(routingKey, message, delayMilliseconds);
+ }
+
+ @Override
+ public final void publishWithExpiry(final Message message, final long expiryInMs) throws Exception {
+ publishWithExpiry(EMPTY_ROUTING_KEY, message, expiryInMs);
+ }
+
+ @Override
+ public final void publishWithExpiry(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final long expiryInMs) throws Exception {
+ actorImpl.publishWithExpiry(routingKey, message, expiryInMs);
+ }
+
+ @Override
+ public final void publish(final Message message) throws Exception {
+ publish(EMPTY_ROUTING_KEY, message);
+ }
+
+ @Override
+ public final void publish(final HierarchicalRoutingKey routingKey,
+ final Message message) throws Exception {
+ val properties = new AMQP.BasicProperties.Builder()
+ .deliveryMode(2)
+ .timestamp(new Date())
+ .build();
+ publish(routingKey, message, properties);
+ }
+
+ @Override
+ public final void publish(final Message message,
+ final AMQP.BasicProperties properties) throws Exception {
+ publish(EMPTY_ROUTING_KEY, message, properties);
+ }
+
+ @Override
+ public final void publish(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final AMQP.BasicProperties properties) throws Exception {
+ actorImpl.publish(routingKey, message, properties);
+ }
+
+ @Override
+ public final long pendingMessagesCount() {
+ return actorImpl.pendingMessagesCount();
+ }
+
+
+ @Override
+ public final long pendingSidelineMessagesCount() {
+ return actorImpl.pendingSidelineMessagesCount();
+ }
+
+ @Override
+ public void start() throws Exception {
+ actorImpl.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ actorImpl.stop();
+ }
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalOperationWorker.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalOperationWorker.java
new file mode 100644
index 00000000..72a84dd8
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalOperationWorker.java
@@ -0,0 +1,61 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.appform.dropwizard.actors.ConnectionRegistry;
+import io.appform.dropwizard.actors.actor.Actor;
+import io.appform.dropwizard.actors.actor.MessageHandlingFunction;
+import io.appform.dropwizard.actors.actor.MessageMetadata;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.RoutingKey;
+import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
+import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.util.Set;
+
+@Getter
+@EqualsAndHashCode
+@SuppressWarnings({"java:S119", "java:S107"})
+public class HierarchicalOperationWorker, Message>
+ extends Actor {
+
+ private final RoutingKey routingKey;
+ private final MessageHandlingFunction handlerFunction;
+ private final MessageHandlingFunction expiredMessageHandlingFunction;
+
+ public HierarchicalOperationWorker(final MessageType messageType,
+ final HierarchicalOperationWorkerConfig workerConfig,
+ final HierarchicalActorConfig hierarchicalActorConfig,
+ final RoutingKey routingKey,
+ final ConnectionRegistry connectionRegistry,
+ final ObjectMapper mapper,
+ final RetryStrategyFactory retryStrategyFactory,
+ final ExceptionHandlingFactory exceptionHandlingFactory,
+ final Class extends Message> clazz,
+ final Set> droppedExceptionTypes,
+ final MessageHandlingFunction handlerFunction,
+ final MessageHandlingFunction expiredMessageHandlingFunction) {
+ super(messageType,
+ HierarchicalRouterUtils.toActorConfig(messageType, routingKey, workerConfig, hierarchicalActorConfig),
+ connectionRegistry,
+ mapper,
+ retryStrategyFactory,
+ exceptionHandlingFactory,
+ clazz,
+ droppedExceptionTypes);
+ this.routingKey = routingKey;
+ this.handlerFunction = handlerFunction;
+ this.expiredMessageHandlingFunction = expiredMessageHandlingFunction;
+ }
+
+ @Override
+ protected boolean handle(Message message, MessageMetadata messageMetadata) throws Exception {
+ return handlerFunction.apply(message, messageMetadata);
+ }
+
+ @Override
+ protected boolean handleExpiredMessages(Message message, MessageMetadata messageMetadata) throws Exception {
+ return expiredMessageHandlingFunction.apply(message, messageMetadata);
+ }
+
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalOperationWorkerConfig.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalOperationWorkerConfig.java
new file mode 100644
index 00000000..6185960d
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalOperationWorkerConfig.java
@@ -0,0 +1,82 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.appform.dropwizard.actors.TtlConfig;
+import io.appform.dropwizard.actors.actor.*;
+import io.appform.dropwizard.actors.exceptionhandler.config.ExceptionHandlerConfig;
+import io.appform.dropwizard.actors.retry.config.NoRetryConfig;
+import io.appform.dropwizard.actors.retry.config.RetryConfig;
+import lombok.*;
+
+import javax.validation.Valid;
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@EqualsAndHashCode
+@ToString
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class HierarchicalOperationWorkerConfig {
+
+ @Min(1)
+ @Max(100)
+ @Builder.Default
+ private int concurrency = 3;
+
+ @Min(1)
+ @Max(100)
+ @Builder.Default
+ private int prefetchCount = 1;
+
+ @Min(2)
+ @Max(32)
+ private Integer shardCount;
+
+ @Valid
+ private ProducerConfig producer;
+
+ @Valid
+ private ConsumerConfig consumer;
+
+ @Builder.Default
+ private boolean delayed = false;
+
+ @Builder.Default
+ private DelayType delayType = DelayType.DELAYED;
+
+ @Builder.Default
+ private boolean priorityQueue = false;
+
+ @Builder.Default
+ private QueueType queueType = QueueType.CLASSIC;
+
+ @Builder.Default
+ private int quorumInitialGroupSize = 3;
+
+ @Builder.Default
+ private HaMode haMode = HaMode.ALL;
+
+ @Builder.Default
+ private String haParams = "";
+
+ @Builder.Default
+ private int maxPriority = 10;
+
+ @Builder.Default
+ private boolean lazyMode = false;
+
+ @NotNull
+ @Valid
+ @Builder.Default
+ private RetryConfig retryConfig = new NoRetryConfig();
+
+ private ExceptionHandlerConfig exceptionHandlerConfig;
+
+ @Valid
+ private TtlConfig ttlConfig;
+
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalRouterUtils.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalRouterUtils.java
new file mode 100644
index 00000000..39ee2cb4
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalRouterUtils.java
@@ -0,0 +1,166 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.appform.dropwizard.actors.actor.ActorConfig;
+import io.appform.dropwizard.actors.actor.ConsumerConfig;
+import io.appform.dropwizard.actors.actor.ProducerConfig;
+import io.appform.dropwizard.actors.connectivity.strategy.ConnectionIsolationStrategy;
+import io.appform.dropwizard.actors.connectivity.strategy.ConnectionIsolationStrategyVisitor;
+import io.appform.dropwizard.actors.connectivity.strategy.DefaultConnectionStrategy;
+import io.appform.dropwizard.actors.connectivity.strategy.SharedConnectionStrategy;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.RoutingKey;
+import io.appform.dropwizard.actors.utils.MapperUtils;
+import lombok.SneakyThrows;
+import lombok.experimental.UtilityClass;
+import lombok.val;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@UtilityClass
+public class HierarchicalRouterUtils {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private static final String EXCHANGES = "exchanges";
+ private static final String ACTORS = "actors";
+
+ private static final BiFunction, String, String> beautifierFunction = (stream, delimiter) -> stream
+ .filter(e -> !StringUtils.isEmpty(e))
+ .collect(Collectors.joining(delimiter));
+
+ public static final Function actorConfigToWorkerConfigFunc =
+ actorConfig -> HierarchicalOperationWorkerConfig.builder()
+ .concurrency(actorConfig.getConcurrency())
+ .prefetchCount(actorConfig.getPrefetchCount())
+ .shardCount(actorConfig.getShardCount())
+ .consumer(actorConfig.getConsumer())
+ .producer(actorConfig.getProducer())
+ .delayed(actorConfig.isDelayed())
+ .delayType(actorConfig.getDelayType())
+ .priorityQueue(actorConfig.isPriorityQueue())
+ .queueType(actorConfig.getQueueType())
+ .quorumInitialGroupSize(actorConfig.getQuorumInitialGroupSize())
+ .haMode(actorConfig.getHaMode())
+ .haParams(actorConfig.getHaParams())
+ .maxPriority(actorConfig.getMaxPriority())
+ .maxPriority(actorConfig.getMaxPriority())
+ .lazyMode(actorConfig.isLazyMode())
+ .retryConfig(actorConfig.getRetryConfig())
+ .exceptionHandlerConfig(actorConfig.getExceptionHandlerConfig())
+ .ttlConfig(actorConfig.getTtlConfig())
+ .build();
+
+ @SneakyThrows
+ public static > ActorConfig toActorConfig(final MessageType messageType,
+ final RoutingKey routingKeyData,
+ final HierarchicalOperationWorkerConfig workerConfig,
+ final HierarchicalActorConfig mainActorConfig) {
+ val useParentConfigInWorker = mainActorConfig.isUseParentConfigInWorker();
+ return ActorConfig.builder()
+ // Custom fields
+ .exchange(exchangeName(mainActorConfig.getExchange(), messageType, routingKeyData))
+ .prefix(prefix(mainActorConfig.getPrefix(), routingKeyData))
+ .consumer(consumerConfig(workerConfig.getConsumer(), mainActorConfig.getConsumer(), routingKeyData))
+ .producer(producerConfig(workerConfig.getProducer(), mainActorConfig.getProducer(), routingKeyData))
+
+ // Copy parent data
+ .concurrency(useParentConfigInWorker ? mainActorConfig.getConcurrency() : workerConfig.getConcurrency())
+ .shardCount(useParentConfigInWorker ? mainActorConfig.getShardCount() :
+ Objects.nonNull(workerConfig.getShardCount()) && workerConfig.getShardCount() >= 1 ? workerConfig.getShardCount() : null)
+ .priorityQueue(mainActorConfig.isPriorityQueue())
+ .maxPriority(useParentConfigInWorker ? mainActorConfig.getMaxPriority() : workerConfig.getMaxPriority())
+ .delayed(useParentConfigInWorker ? mainActorConfig.isDelayed() : workerConfig.isDelayed())
+ .delayType(useParentConfigInWorker ? mainActorConfig.getDelayType() : workerConfig.getDelayType())
+ .queueType(useParentConfigInWorker ? mainActorConfig.getQueueType() : workerConfig.getQueueType())
+ .haMode(useParentConfigInWorker ? mainActorConfig.getHaMode() : workerConfig.getHaMode())
+ .haParams(useParentConfigInWorker ? mainActorConfig.getHaParams() : workerConfig.getHaParams())
+ .lazyMode(useParentConfigInWorker ? mainActorConfig.isLazyMode() : workerConfig.isLazyMode())
+ .quorumInitialGroupSize(useParentConfigInWorker ? mainActorConfig.getQuorumInitialGroupSize() : workerConfig.getQuorumInitialGroupSize())
+ .retryConfig(useParentConfigInWorker ? mainActorConfig.getRetryConfig() : workerConfig.getRetryConfig())
+ .exceptionHandlerConfig(useParentConfigInWorker ? mainActorConfig.getExceptionHandlerConfig() : workerConfig.getExceptionHandlerConfig())
+ .ttlConfig(useParentConfigInWorker ? mainActorConfig.getTtlConfig(): workerConfig.getTtlConfig())
+ .build();
+ }
+
+ public static > String exchangeName(final String parentExchangeName,
+ final MessageType messageType,
+ final RoutingKey routingKeyData) {
+ val routingKey = routingKeyData.getRoutingKey();
+
+ if (!StringUtils.isEmpty(parentExchangeName)) {
+ // For backward compatibility
+ if(routingKey.isEmpty()) {
+ return parentExchangeName;
+ }
+
+ return beautifierFunction.apply(Stream.of(parentExchangeName, String.join(".", routingKey)), ".");
+ }
+
+ return beautifierFunction.apply(Stream.of(EXCHANGES, String.join(".", routingKey), messageType.name()), ".");
+ }
+
+ private static String prefix(final String parentPrefixName,
+ final RoutingKey routingKeyData) {
+ val routingKey = routingKeyData.getRoutingKey();
+ if (!StringUtils.isEmpty(parentPrefixName)) {
+ return beautifierFunction.apply(Stream.of(parentPrefixName, String.join(".", routingKey)), ".");
+ }
+ return beautifierFunction.apply(Stream.of(ACTORS, String.join(".", routingKey)), ".");
+ }
+
+
+ private static ProducerConfig producerConfig(final ProducerConfig workerConfig,
+ final ProducerConfig parentProducerConfig,
+ final RoutingKey routingKeyData) {
+ if (Objects.isNull(workerConfig)) {
+ return MapperUtils.clone(parentProducerConfig, ProducerConfig.class);
+ }
+
+ if (Objects.isNull(workerConfig.getConnectionIsolationStrategy())) {
+ val isolationStrtegy = MapperUtils.clone(parentProducerConfig.getConnectionIsolationStrategy(), ConnectionIsolationStrategy.class);
+ workerConfig.setConnectionIsolationStrategy(isolationStrtegy);
+ }
+ updateIsolationStrategyConfig(workerConfig.getConnectionIsolationStrategy(), routingKeyData.getRoutingKey());
+ return workerConfig;
+ }
+
+ private static ConsumerConfig consumerConfig(final ConsumerConfig workerConfig,
+ final ConsumerConfig parentConsumerConfig,
+ final RoutingKey routingKeyData) {
+ if (Objects.isNull(workerConfig)) {
+ return MapperUtils.clone(parentConsumerConfig, ConsumerConfig.class);
+ }
+
+ if (Objects.isNull(workerConfig.getConnectionIsolationStrategy())) {
+ val isolationStrtegy = MapperUtils.clone(parentConsumerConfig.getConnectionIsolationStrategy(), ConnectionIsolationStrategy.class);
+ workerConfig.setConnectionIsolationStrategy(isolationStrtegy);
+ }
+ updateIsolationStrategyConfig(workerConfig.getConnectionIsolationStrategy(), routingKeyData.getRoutingKey());
+ return workerConfig;
+ }
+
+
+ private static void updateIsolationStrategyConfig(final ConnectionIsolationStrategy isolationStrategy,
+ final List routingKeyData) {
+ isolationStrategy.accept(new ConnectionIsolationStrategyVisitor() {
+ @Override
+ public Void visit(SharedConnectionStrategy strategy) {
+ val producerIsolationStrategyName = beautifierFunction.apply(Stream.of("p",
+ strategy.getName(), String.join("_", routingKeyData)), "_");
+ strategy.setName(producerIsolationStrategyName);
+ return null;
+ }
+
+ @Override
+ public Void visit(DefaultConnectionStrategy strategy) {
+ return null;
+ }
+ });
+ }
+
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalUnmanagedBaseActor.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalUnmanagedBaseActor.java
new file mode 100644
index 00000000..ab5ff969
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/HierarchicalUnmanagedBaseActor.java
@@ -0,0 +1,167 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP;
+import io.appform.dropwizard.actors.ConnectionRegistry;
+import io.appform.dropwizard.actors.actor.MessageHandlingFunction;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.HierarchicalDataStoreSupplierTree;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.HierarchicalTreeConfig;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.HierarchicalRoutingKey;
+import io.appform.dropwizard.actors.common.ErrorCode;
+import io.appform.dropwizard.actors.common.RabbitmqActorException;
+import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
+import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import lombok.val;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * This actor maintain all actors hierarchy as per provided hierarchical actor config.
+ * This is not Managed and will not be automatically started by dropwizard.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+@Slf4j
+public class HierarchicalUnmanagedBaseActor, Message> {
+
+ private final HierarchicalTreeConfig hierarchicalTreeConfig;
+ private final ConnectionRegistry connectionRegistry;
+ private final ObjectMapper mapper;
+ private final RetryStrategyFactory retryStrategyFactory;
+ private final ExceptionHandlingFactory exceptionHandlingFactory;
+ private final Class extends Message> clazz;
+ private final Set> droppedExceptionTypes;
+ private final MessageType messageType;
+ private final MessageHandlingFunction handlerFunction;
+ private final MessageHandlingFunction expiredMessageHandlingFunction;
+
+ private HierarchicalDataStoreSupplierTree<
+ HierarchicalOperationWorkerConfig,
+ HierarchicalActorConfig,
+ MessageType,
+ HierarchicalOperationWorker> worker;
+
+
+ public HierarchicalUnmanagedBaseActor(MessageType messageType,
+ HierarchicalActorConfig hierarchicalActorConfig,
+ ConnectionRegistry connectionRegistry,
+ ObjectMapper mapper,
+ RetryStrategyFactory retryStrategyFactory,
+ ExceptionHandlingFactory exceptionHandlingFactory,
+ Class extends Message> clazz,
+ Set> droppedExceptionTypes,
+ MessageHandlingFunction handlerFunction,
+ MessageHandlingFunction expiredMessageHandlingFunction) {
+ this.messageType = messageType;
+ this.hierarchicalTreeConfig = new HierarchicalTreeConfig<>(hierarchicalActorConfig, hierarchicalActorConfig.getChildren());
+ this.connectionRegistry = connectionRegistry;
+ this.mapper = mapper;
+ this.retryStrategyFactory = retryStrategyFactory;
+ this.exceptionHandlingFactory = exceptionHandlingFactory;
+ this.clazz = clazz;
+ this.droppedExceptionTypes = droppedExceptionTypes;
+ this.handlerFunction = handlerFunction;
+ this.expiredMessageHandlingFunction = expiredMessageHandlingFunction;
+ }
+
+ public void start() throws Exception {
+ log.info("Initializing Router");
+ this.initializeRouter();
+ log.info("Staring all workers");
+ worker.traverse(hierarchicalOperationWorker -> {
+ try {
+ log.info("Starting worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
+ hierarchicalOperationWorker.start();
+ } catch (Exception e) {
+ log.error("Unable to start worker: {}", hierarchicalOperationWorker);
+ val errorMessage = "Unable to start worker: " + hierarchicalOperationWorker.getType();
+ throw new RabbitmqActorException(ErrorCode.INTERNAL_ERROR, errorMessage, e);
+ }
+ });
+ }
+
+ public void stop() throws Exception {
+ log.info("Stopping all workers");
+ worker.traverse(hierarchicalOperationWorker -> {
+ try {
+ log.info("Stopping worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
+ hierarchicalOperationWorker.stop();
+ } catch (Exception e) {
+ log.error("Unable to stop worker: {}", hierarchicalOperationWorker);
+ val errorMessage = "Unable to stop worker: " + hierarchicalOperationWorker.getType();
+ throw new RabbitmqActorException(ErrorCode.INTERNAL_ERROR, errorMessage, e);
+ }
+ });
+ }
+
+ public final long pendingMessagesCount() {
+ val atomicLong = new AtomicLong(0l);
+ worker.traverse(hierarchicalOperationWorker -> {
+ atomicLong.getAndAdd(hierarchicalOperationWorker.pendingMessagesCount());
+ });
+ return atomicLong.get();
+ }
+
+ public final long pendingSidelineMessagesCount() {
+ val atomicLong = new AtomicLong(0l);
+ worker.traverse(hierarchicalOperationWorker -> {
+ atomicLong.getAndAdd(hierarchicalOperationWorker.pendingSidelineMessagesCount());
+ });
+ return atomicLong.get();
+ }
+
+ public final void publishWithDelay(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final long delayMilliseconds) throws Exception {
+ publishActor(routingKey).publishWithDelay(message, delayMilliseconds);
+ }
+
+ public final void publishWithExpiry(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final long expiryInMs) throws Exception {
+ publishActor(routingKey).publishWithExpiry(message, expiryInMs);
+ }
+
+ public final void publish(final HierarchicalRoutingKey routingKey,
+ final Message message) throws Exception {
+ publishActor(routingKey).publish(message);
+ }
+
+ public final void publish(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final AMQP.BasicProperties properties) throws Exception {
+ publishActor(routingKey).publish(message, properties);
+ }
+
+ private HierarchicalOperationWorker publishActor(final HierarchicalRoutingKey routingKey) {
+ return (HierarchicalOperationWorker) this.worker.get(messageType, routingKey);
+ }
+
+ private void initializeRouter() {
+ this.worker = new HierarchicalDataStoreSupplierTree<>(
+ messageType,
+ hierarchicalTreeConfig,
+ HierarchicalRouterUtils.actorConfigToWorkerConfigFunc,
+ (routingKey, messageTypeKey, workerConfig) -> new HierarchicalOperationWorker<>(
+ messageType,
+ workerConfig,
+ hierarchicalTreeConfig.getDefaultData(),
+ routingKey,
+ connectionRegistry,
+ mapper,
+ retryStrategyFactory,
+ exceptionHandlingFactory,
+ clazz,
+ droppedExceptionTypes,
+ handlerFunction,
+ expiredMessageHandlingFunction)
+ );
+ }
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/IHierarchicalBaseActor.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/IHierarchicalBaseActor.java
new file mode 100644
index 00000000..cd3159e9
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/IHierarchicalBaseActor.java
@@ -0,0 +1,62 @@
+package io.appform.dropwizard.actors.actor.hierarchical;
+
+import com.rabbitmq.client.AMQP;
+import io.appform.dropwizard.actors.actor.IBaseActor;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.HierarchicalRoutingKey;
+
+/**
+ * This interface is used to implement any actor which have support of hierarchical message processing. RoutingKey param will drive queue selection from hierarchy
+ *
+ * @param
+ */
+@SuppressWarnings({"java:S112", "java:S119"})
+public interface IHierarchicalBaseActor extends IBaseActor {
+
+ /**
+ * This method is used to publish message with provided delay in milliseconds on queue matching to provided routingKey
+ *
+ * @param routingKey param used to select queue from hierarchy
+ * @param message data to be published in queue
+ * @param delayMilliseconds param to provide delay value
+ * @throws Exception
+ */
+ void publishWithDelay(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final long delayMilliseconds) throws Exception;
+
+ /**
+ * This method is used to publish message with provided expiry in milliseconds,
+ * message will be auto-expired post expiryMs crosses on queue matching to provided routingKey
+ *
+ * @param routingKey param used to select queue from hierarchy
+ * @param message data to be published in queue
+ * @param expiryInMs param to provide expiration time of message
+ * @throws Exception
+ */
+ void publishWithExpiry(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final long expiryInMs) throws Exception;
+
+ /**
+ * This method is used to publish message in queue
+ *
+ * @param routingKey param used to select queue from hierarchy on queue matching to provided routingKey
+ * @param message data to be published in queue
+ * @throws Exception
+ */
+ void publish(final HierarchicalRoutingKey routingKey,
+ final Message message) throws Exception;
+
+ /**
+ * This method is used to publish message in queue with additional properties of AMQP on queue matching to provided routingKey
+ *
+ * @param routingKey param used to select queue from hierarchy
+ * @param message data to be published in queue
+ * @param properties map of amqp properties
+ * @throws Exception
+ */
+ void publish(final HierarchicalRoutingKey routingKey,
+ final Message message,
+ final AMQP.BasicProperties properties) throws Exception;
+
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreSupplierTree.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreSupplierTree.java
new file mode 100644
index 00000000..bab2d0c5
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreSupplierTree.java
@@ -0,0 +1,63 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree;
+
+import com.google.common.collect.Lists;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.RoutingKey;
+import lombok.val;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+@SuppressWarnings("java:S119")
+public class HierarchicalDataStoreSupplierTree extends HierarchicalDataStoreTree {
+
+ private static final Function, RoutingKey> routingKeyGenerator = (list) -> RoutingKey.builder()
+ .list(list)
+ .build();
+
+ public HierarchicalDataStoreSupplierTree(final NODE_KEY_TYPE key,
+ final HierarchicalTreeConfig treeConfig,
+ final Function rootNodeConverterSupplier,
+ final TriConsumerSupplier supplier) {
+ super(supplier.get(
+ routingKeyGenerator.apply(List.of()),
+ key,
+ rootNodeConverterSupplier.apply(treeConfig.getDefaultData())
+ ));
+ buildTree(key, treeConfig.getChildrenData(), supplier);
+ }
+
+ private void buildTree(final NODE_KEY_TYPE key,
+ final HierarchicalDataStoreTreeNode childrenList,
+ final TriConsumerSupplier supplier) {
+ val tokenList = Lists.newArrayList();
+ buildTreeHelper(key, childrenList, tokenList, supplier);
+ }
+
+ private void buildTreeHelper(final NODE_KEY_TYPE key,
+ final HierarchicalDataStoreTreeNode rootChildrenData,
+ final List tokenList,
+ final TriConsumerSupplier supplier) {
+ val childrenList = rootChildrenData.getChildren();
+ if (childrenList.isEmpty()) {
+ add(routingKeyGenerator.apply(tokenList), key, null);
+ return;
+ }
+
+ for (String childrenToken : childrenList.keySet()) {
+ val currentChildrenData = childrenList.get(childrenToken);
+
+ tokenList.add(childrenToken);
+
+ val routingKey = routingKeyGenerator.apply(tokenList.stream().map(String::valueOf).toList());
+ val currentChildrenDefaultData = Objects.nonNull(currentChildrenData.getNodeData()) ?
+ currentChildrenData.getNodeData() : rootChildrenData.getNodeData();
+
+ add(routingKey, key, supplier.get(routingKey, key, currentChildrenDefaultData));
+ buildTreeHelper(key, currentChildrenData, tokenList, supplier);
+
+ tokenList.remove(childrenToken);
+ }
+ }
+
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreTree.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreTree.java
new file mode 100644
index 00000000..f6529711
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreTree.java
@@ -0,0 +1,65 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import com.google.common.collect.Maps;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.HierarchicalRoutingKey;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import lombok.val;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+@Slf4j
+@ToString
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+@SuppressWarnings("java:S119")
+public class HierarchicalDataStoreTree {
+
+ private final NODE_TYPE defaultData;
+ @JsonUnwrapped
+ private final Map> rootNodes = Maps.newConcurrentMap();
+
+ public HierarchicalDataStoreTree() {
+ this.defaultData = null;
+ }
+
+ public HierarchicalDataStoreTree(NODE_TYPE defaultData) {
+ this.defaultData = defaultData;
+ }
+
+ public void add(final HierarchicalRoutingKey routingKey, final NODE_KEY_TYPE key, final NODE_TYPE data) {
+ rootNodes.computeIfAbsent(key, t -> new HierarchicalDataStoreTreeNode<>(0, String.valueOf(key), defaultData));
+ if (Objects.isNull(data)) {
+ return;
+ }
+ rootNodes.get(key)
+ .add(routingKey, data);
+ }
+
+ public void traverse(final Consumer consumer) {
+ rootNodes.forEach((NODEKEYTYPE, vHierarchicalStoreNode) -> {
+ if (vHierarchicalStoreNode != null) {
+ vHierarchicalStoreNode.traverse(consumer);
+ }
+ });
+ }
+
+ public NODE_TYPE get(final NODE_KEY_TYPE key, final HierarchicalRoutingKey routingKey) {
+ if (!rootNodes.containsKey(key)) {
+ log.warn("Key {} not found in {} keys {}. Using default {}", key, rootNodes.keySet(), defaultData);
+ return defaultData;
+ }
+
+ val routingKeyToken = routingKey.getRoutingKey();
+ if (routingKeyToken== null || routingKeyToken.isEmpty()) {
+ log.warn("keys are empty {}. Using default {}", key, rootNodes.keySet(), defaultData);
+ return defaultData;
+ }
+
+ return rootNodes.get(key)
+ .find(routingKey);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreTreeNode.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreTreeNode.java
new file mode 100644
index 00000000..84c0f755
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalDataStoreTreeNode.java
@@ -0,0 +1,107 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.common.collect.Maps;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.HierarchicalRoutingKey;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+@Slf4j
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+@Data
+public class HierarchicalDataStoreTreeNode {
+
+ @JsonIgnore
+ private final int depth;
+ @JsonIgnore
+ private final K token;
+
+ private V nodeData;
+ private Map> children = Maps.newConcurrentMap();
+
+
+ public HierarchicalDataStoreTreeNode() {
+ this.depth = 0;
+ this.token = null;
+ this.nodeData = null;
+ }
+
+ public HierarchicalDataStoreTreeNode(K token) {
+ this.depth = 0;
+ this.token = token;
+ this.nodeData = null;
+ }
+
+
+ @Builder
+ public HierarchicalDataStoreTreeNode(final int depth, final K token, final V nodeData) {
+ this.depth = depth;
+ this.token = token;
+ this.nodeData = nodeData;
+ }
+
+ void traverse(final Consumer consumer) {
+ if (nodeData != null) {
+ consumer.accept(nodeData);
+ }
+ children.forEach((k, kvHierarchicalStoreNode) -> {
+ if (kvHierarchicalStoreNode != null) {
+ kvHierarchicalStoreNode.traverse(consumer);
+ }
+ });
+ }
+
+ void addChild(final List tokens, final V defaultData) {
+ final K key = tokens.get(depth);
+
+ log.debug("depth: {} name: {} key: {} tokens: {} defaultData: {}", depth, token, key, tokens, defaultData);
+
+ if (tokens.size() > depth + 1) {
+ children.computeIfAbsent(key, k -> new HierarchicalDataStoreTreeNode<>(depth + 1, tokens.get(depth), null));
+ children.get(key).addChild(tokens, defaultData);
+ } else {
+ if (!children.containsKey(key)) {
+ children.put(key, new HierarchicalDataStoreTreeNode(depth + 1, tokens.get(depth), defaultData));
+ } else {
+ if (children.get(key)
+ .getNodeData() == null) {
+ children.get(key)
+ .setNodeData(defaultData);
+ } else {
+ log.error("Request to overwrite at {} existing defaultData: {} new defaultData {}", tokens, children.get(key)
+ .getNodeData(), defaultData);
+ }
+ }
+ }
+ }
+
+ V findNode(final List tokens) {
+ if (tokens.size() == depth) {
+ return nodeData;
+ }
+
+ if (!children.containsKey(tokens.get(depth))) {
+ return nodeData;
+ }
+
+ V load = children.get(tokens.get(depth))
+ .findNode(tokens);
+ return load == null
+ ? nodeData
+ : load;
+ }
+
+ public void add(final HierarchicalRoutingKey routingKey, final V payload) {
+ addChild(routingKey.getRoutingKey(), payload);
+ }
+
+ public V find(final HierarchicalRoutingKey routingKey) {
+ return findNode(routingKey.getRoutingKey());
+ }
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalTreeConfig.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalTreeConfig.java
new file mode 100644
index 00000000..03591fb0
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/HierarchicalTreeConfig.java
@@ -0,0 +1,18 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@SuppressWarnings("java:S119")
+public class HierarchicalTreeConfig {
+ private ROOT_TYPE defaultData;
+ @JsonUnwrapped
+ private HierarchicalDataStoreTreeNode childrenData;
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/TriConsumerSupplier.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/TriConsumerSupplier.java
new file mode 100644
index 00000000..66bb6b12
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/TriConsumerSupplier.java
@@ -0,0 +1,6 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree;
+
+@FunctionalInterface
+public interface TriConsumerSupplier {
+ public S get(R routingKey, K key, V value);
+}
\ No newline at end of file
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/key/HierarchicalRoutingKey.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/key/HierarchicalRoutingKey.java
new file mode 100644
index 00000000..0bf94fcf
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/key/HierarchicalRoutingKey.java
@@ -0,0 +1,7 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree.key;
+
+import java.util.List;
+
+public interface HierarchicalRoutingKey {
+ List getRoutingKey();
+}
diff --git a/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/key/RoutingKey.java b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/key/RoutingKey.java
new file mode 100644
index 00000000..add23eb9
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/actor/hierarchical/tree/key/RoutingKey.java
@@ -0,0 +1,20 @@
+package io.appform.dropwizard.actors.actor.hierarchical.tree.key;
+
+
+import lombok.Builder;
+
+import java.util.List;
+
+public class RoutingKey implements HierarchicalRoutingKey {
+ private final List list;
+
+ @Builder
+ public RoutingKey(final List list) {
+ this.list = list;
+ }
+
+ @Override
+ public List getRoutingKey() {
+ return list;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/appform/dropwizard/actors/base/UnmanagedConsumer.java b/src/main/java/io/appform/dropwizard/actors/base/UnmanagedConsumer.java
index 09b82edc..9f734721 100644
--- a/src/main/java/io/appform/dropwizard/actors/base/UnmanagedConsumer.java
+++ b/src/main/java/io/appform/dropwizard/actors/base/UnmanagedConsumer.java
@@ -1,6 +1,7 @@
package io.appform.dropwizard.actors.base;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import io.appform.dropwizard.actors.actor.ActorConfig;
@@ -113,4 +114,9 @@ private String getConsumerTag(int consumerIndex) {
.map(tagPrefix -> tagPrefix + "_" + consumerIndex)
.orElse(StringUtils.EMPTY);
}
+
+ @VisibleForTesting
+ public String queueName() {
+ return queueName;
+ }
}
diff --git a/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java b/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java
index 5f7ad20d..2d950d84 100644
--- a/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java
+++ b/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java
@@ -4,6 +4,7 @@
import static io.appform.dropwizard.actors.common.Constants.MESSAGE_PUBLISHED_TEXT;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
@@ -271,4 +272,9 @@ protected final ObjectMapper mapper() {
private String getRoutingKey() {
return config.isSharded() ? NamingUtils.getShardedQueueName(queueName, getShardId()) : queueName;
}
+
+ @VisibleForTesting
+ public String queueName() {
+ return queueName;
+ }
}
diff --git a/src/main/java/io/appform/dropwizard/actors/utils/MapperUtils.java b/src/main/java/io/appform/dropwizard/actors/utils/MapperUtils.java
new file mode 100644
index 00000000..cb7faeb7
--- /dev/null
+++ b/src/main/java/io/appform/dropwizard/actors/utils/MapperUtils.java
@@ -0,0 +1,42 @@
+package io.appform.dropwizard.actors.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class MapperUtils {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public T deserialize(final byte[] data,
+ final Class valueType) {
+ if (data == null) {
+ return null;
+ }
+ try {
+ return mapper.readValue(data, valueType);
+ } catch (Exception e) {
+ throw new RuntimeException("ERROR in Deserilizing", e);
+ }
+ }
+
+
+ @SuppressWarnings({"java:S1168"})
+ public byte[] serialize(final Object data) {
+ if (data == null) {
+ return null;
+ }
+ try {
+ return mapper.writeValueAsBytes(data);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("ERROR in serilizing", e);
+ }
+ }
+
+ public T clone(final T object,
+ final Class clazz) {
+ return deserialize(serialize(object), clazz);
+ }
+
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/RMQIntegrationTestHelper.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/RMQIntegrationTestHelper.java
new file mode 100644
index 00000000..5f281af9
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/RMQIntegrationTestHelper.java
@@ -0,0 +1,105 @@
+package io.appform.dropwizard.actors.actor.integration;
+
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.Channel;
+import io.appform.dropwizard.actors.ConnectionRegistry;
+import io.appform.dropwizard.actors.RabbitmqActorBundle;
+import io.appform.dropwizard.actors.TtlConfig;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.config.Broker;
+import io.appform.dropwizard.actors.config.MetricConfig;
+import io.appform.dropwizard.actors.config.RMQConfig;
+import io.appform.dropwizard.actors.connectivity.RMQConnection;
+import io.appform.dropwizard.actors.connectivity.actor.RabbitMQBundleTestAppConfiguration;
+import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
+import io.appform.dropwizard.actors.observers.TerminalRMQObserver;
+import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
+import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
+import io.dropwizard.setup.Environment;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.val;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+@Getter
+public class RMQIntegrationTestHelper {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final RetryStrategyFactory retryStrategyFactory = new RetryStrategyFactory();
+ private final ExceptionHandlingFactory exceptionHandlingFactory = new ExceptionHandlingFactory();
+ public final Set> droppedExceptionTypes = Set.of();
+
+ @SneakyThrows
+ private RMQConnection getRmqConnections() {
+ RMQConnection rmqConnection = mock(RMQConnection.class);
+ doReturn(new RMQConfig()).when(rmqConnection).getConfig();
+ val mockChannel = mock(Channel.class);
+ doReturn(mockChannel).when(rmqConnection).newChannel();
+ doReturn(mockChannel).when(rmqConnection).channel();
+ doNothing().when(rmqConnection).ensure(anyString(), anyString(), anyMap());
+ doNothing().when(rmqConnection).ensure(anyString(), anyString(), anyString(), anyMap());
+ doNothing().when(rmqConnection).ensure(anyString(), anyString(), any(Map.class));
+ doAnswer(invocation -> {
+ byte[] string = invocation.getArgument(3, byte[].class);
+ val exchangeName = invocation.getArgument(0, String.class);
+ val actionMessage = new ObjectMapper().readValue(string, ActionMessage.class);
+ actionMessage.setExchangeName(exchangeName);
+ return null;
+ }).when(mockChannel).basicPublish(anyString(), anyString(), any(), any());
+ doReturn(new TerminalRMQObserver()).when(rmqConnection).getRootObserver();
+ return rmqConnection;
+ }
+
+ public ConnectionRegistry getConnectionRegistry() {
+ val rmqConnection = getRmqConnections();
+ val connectionRegistry = Mockito.mock(ConnectionRegistry.class);
+ when(connectionRegistry.createOrGet(any())).thenReturn(rmqConnection);
+ return connectionRegistry;
+//
+// RMQConfig config = RMQConfig.builder()
+// .brokers(List.of(new Broker("localhost", 5672)))
+// .userName("guest")
+// .threadPoolSize(1)
+// .password("guest")
+// .secure(false)
+// .startupGracePeriodSeconds(1)
+// .metricConfig(MetricConfig.builder().enabledForAll(true).build())
+// .build();
+// val actorBundleImpl = new RabbitmqActorBundle() {
+// @Override
+// protected TtlConfig ttlConfig() {
+// return TtlConfig.builder()
+// .ttl(Duration.ofMinutes(30))
+// .ttlEnabled(true)
+// .build();
+// }
+// @Override
+// protected RMQConfig getConfig(RabbitMQBundleTestAppConfiguration rabbitMQBundleTestAppConfiguration) {
+// return config;
+// }
+// };
+//
+// Environment environment = Mockito.mock(Environment.class);
+// LifecycleEnvironment lifecycle = Mockito.mock(LifecycleEnvironment.class);
+// Mockito.doReturn(new MetricRegistry()).when(environment).metrics();
+// Mockito.doReturn(lifecycle).when(environment).lifecycle();
+// Mockito.doNothing().when(lifecycle).manage(ArgumentMatchers.any(ConnectionRegistry.class));
+//
+// actorBundleImpl.run(new RabbitMQBundleTestAppConfiguration(), new Environment("TESTAPP"));
+// return actorBundleImpl.getConnectionRegistry();
+ }
+
+
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/data/ActionMessage.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/ActionMessage.java
new file mode 100644
index 00000000..fb388b0b
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/ActionMessage.java
@@ -0,0 +1,32 @@
+package io.appform.dropwizard.actors.actor.integration.data;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Setter;
+import lombok.ToString;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+@EqualsAndHashCode
+@ToString
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(name = FlowType.C2M_AUTH_FLOW_TEXT, value = C2MDataActionMessage.class),
+ @JsonSubTypes.Type(name = FlowType.C2C_AUTH_FLOW_TEXT, value = C2CDataActionMessage.class)
+})
+public abstract class ActionMessage {
+
+ @NotNull
+ private final FlowType type;
+
+ @Setter
+ private String exchangeName;
+
+ protected ActionMessage(FlowType type) {
+ this.type = type;
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/data/C2CDataActionMessage.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/C2CDataActionMessage.java
new file mode 100644
index 00000000..d7efbe90
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/C2CDataActionMessage.java
@@ -0,0 +1,23 @@
+package io.appform.dropwizard.actors.actor.integration.data;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class C2CDataActionMessage extends ActionMessage {
+ private String data;
+
+ public C2CDataActionMessage() {
+ super(FlowType.C2C_AUTH_FLOW);
+ }
+
+ @Builder
+ public C2CDataActionMessage(String data) {
+ this();
+ this.data = data;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/data/C2MDataActionMessage.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/C2MDataActionMessage.java
new file mode 100644
index 00000000..dccc9808
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/C2MDataActionMessage.java
@@ -0,0 +1,23 @@
+package io.appform.dropwizard.actors.actor.integration.data;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class C2MDataActionMessage extends ActionMessage {
+ private String data;
+
+ public C2MDataActionMessage() {
+ super(FlowType.C2M_AUTH_FLOW);
+ }
+
+ @Builder
+ public C2MDataActionMessage(String data) {
+ this();
+ this.data = data;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/data/FlowType.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/FlowType.java
new file mode 100644
index 00000000..a8175038
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/data/FlowType.java
@@ -0,0 +1,25 @@
+package io.appform.dropwizard.actors.actor.integration.data;
+
+public enum FlowType {
+ C2M_AUTH_FLOW {
+ @Override
+ public T accept(FlowTypeVisitor visitor) {
+ return visitor.visitC2M();
+ }
+ },
+ C2C_AUTH_FLOW {
+ @Override
+ public T accept(FlowTypeVisitor visitor) {
+ return visitor.visitC2C();
+ }
+ };
+
+ public static final String C2M_AUTH_FLOW_TEXT = "C2M_AUTH_FLOW";
+ public static final String C2C_AUTH_FLOW_TEXT = "C2C_AUTH_FLOW";
+
+ public abstract T accept(FlowTypeVisitor visitor);
+ public interface FlowTypeVisitor {
+ T visitC2M();
+ T visitC2C();
+ }
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/C2CDataActionMessageHierarchicalActor.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/C2CDataActionMessageHierarchicalActor.java
new file mode 100644
index 00000000..1c8ad92e
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/C2CDataActionMessageHierarchicalActor.java
@@ -0,0 +1,33 @@
+package io.appform.dropwizard.actors.actor.integration.hierarchical;
+
+import io.appform.dropwizard.actors.actor.MessageMetadata;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActor;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActorConfig;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2CDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+
+
+public class C2CDataActionMessageHierarchicalActor extends HierarchicalActor {
+
+
+ public C2CDataActionMessageHierarchicalActor(final FlowType flowType,
+ final HierarchicalActorConfig hierarchicalTreeConfig,
+ final RMQIntegrationTestHelper routerTestHelper) {
+ super(flowType,
+ hierarchicalTreeConfig,
+ routerTestHelper.getConnectionRegistry(),
+ routerTestHelper.getMapper(),
+ routerTestHelper.getRetryStrategyFactory(),
+ routerTestHelper.getExceptionHandlingFactory(),
+ C2CDataActionMessage.class,
+ routerTestHelper.getDroppedExceptionTypes());
+ }
+
+ @Override
+ protected boolean handle(ActionMessage actionMessage, MessageMetadata messageMetadata) throws Exception {
+ System.out.println("C2C : " + actionMessage);
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/C2MDataActionMessageHierarchicalActor.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/C2MDataActionMessageHierarchicalActor.java
new file mode 100644
index 00000000..e1c25b30
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/C2MDataActionMessageHierarchicalActor.java
@@ -0,0 +1,33 @@
+package io.appform.dropwizard.actors.actor.integration.hierarchical;
+
+import io.appform.dropwizard.actors.actor.MessageMetadata;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActor;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActorConfig;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2MDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+
+
+public class C2MDataActionMessageHierarchicalActor extends HierarchicalActor {
+
+
+ public C2MDataActionMessageHierarchicalActor(final FlowType flowType,
+ final HierarchicalActorConfig hierarchicalTreeConfig,
+ final RMQIntegrationTestHelper routerTestHelper) {
+ super(flowType,
+ hierarchicalTreeConfig,
+ routerTestHelper.getConnectionRegistry(),
+ routerTestHelper.getMapper(),
+ routerTestHelper.getRetryStrategyFactory(),
+ routerTestHelper.getExceptionHandlingFactory(),
+ C2MDataActionMessage.class,
+ routerTestHelper.getDroppedExceptionTypes());
+ }
+
+ @Override
+ protected boolean handle(ActionMessage actionMessage, MessageMetadata messageMetadata) throws Exception {
+ System.out.println("C2M : " + actionMessage);
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowHierarchicalActorConfig.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowHierarchicalActorConfig.java
new file mode 100644
index 00000000..7835aeb3
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowHierarchicalActorConfig.java
@@ -0,0 +1,17 @@
+package io.appform.dropwizard.actors.actor.integration.hierarchical;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActorConfig;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class FlowHierarchicalActorConfig> {
+ private Map workers;
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowHierarchicalActorTest.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowHierarchicalActorTest.java
new file mode 100644
index 00000000..42f60228
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowHierarchicalActorTest.java
@@ -0,0 +1,120 @@
+package io.appform.dropwizard.actors.actor.integration.hierarchical;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalBaseActor;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2CDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2MDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+import io.appform.dropwizard.actors.actor.hierarchical.tree.key.RoutingKey;
+import io.appform.dropwizard.actors.utils.YamlReader;
+import lombok.SneakyThrows;
+import lombok.val;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+class FlowHierarchicalActorTest {
+
+ private final static RMQIntegrationTestHelper ROUTER_TEST_HELPER = new RMQIntegrationTestHelper();
+ private final static FlowHierarchicalActorConfig RMQ_CONFIG = YamlReader.loadConfig("rmqHierarchical.yaml", new TypeReference<>() {
+ });
+ private Map> actorActors;
+
+ @SneakyThrows
+ public void createActors() {
+ actorActors = RMQ_CONFIG.getWorkers()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getKey().accept(new FlowTypeHierarchicalActorBuilder(e.getValue(), ROUTER_TEST_HELPER))));
+ Assertions.assertNotNull(actorActors);
+ Assertions.assertEquals(actorActors.size(), RMQ_CONFIG.getWorkers().size());
+
+ for (val entry : actorActors.entrySet()) {
+ val routerActor = entry.getValue();
+ routerActor.start();
+ }
+ }
+
+ @SneakyThrows
+ public void cleanUp() {
+ for (val entry : actorActors.entrySet()) {
+ entry.getValue().stop();
+ }
+ }
+
+
+ @Test
+ void testRouter() {
+ createActors();
+ val messages = Map.of(
+ RoutingKey.builder().list(List.of("")).build(),
+ C2MDataActionMessage.builder()
+ .data("C2M")
+ .build(),
+
+ RoutingKey.builder().list(List.of("REGULAR", "JAR")).build(),
+ C2MDataActionMessage.builder()
+ .data("C2M-REGULAR-JAR-SOME")
+ .build(),
+
+ RoutingKey.builder().list(List.of("REGULAR")).build(),
+ C2CDataActionMessage.builder()
+ .data("C2C-REGULAR")
+ .build(),
+
+ RoutingKey.builder().list(List.of("C2C_AUTH_FLOW")).build(),
+ C2CDataActionMessage.builder()
+ .data("C2C")
+ .build(),
+
+ RoutingKey.builder().list(List.of("FULL_AUTH", "JAR")).build(),
+ C2MDataActionMessage.builder()
+ .data("C2M-FULL_AUTH-JAR-SOME")
+ .build()
+ );
+
+ messages.forEach((routingKey, message) -> {
+ val flowType = message.getType();
+
+ if (actorActors.containsKey(flowType)) {
+ val router = actorActors.get(flowType);
+ val flowLevelPrefix = Arrays.asList(RMQ_CONFIG.getWorkers().get(flowType).getPrefix().split("\\."));
+
+ Assertions.assertNotNull(router);
+ val worker = router.getActorImpl().getWorker().get(flowType, routingKey);
+ Assertions.assertNotNull(worker);
+ val publisherQueueName = worker.getActorImpl().getPublishActor().queueName();
+ Assertions.assertNotNull(publisherQueueName);
+ val publisherQueueNameTokens = new LinkedHashSet<>(Arrays.stream(worker
+ .getActorImpl()
+ .getPublishActor()
+ .queueName()
+ .split("\\."))
+ .filter(e -> !e.isBlank() && !flowLevelPrefix.contains(e))
+ .toList());
+
+ val expectedElementsInQueueName = new LinkedHashSet<>(routingKey.getRoutingKey().stream().filter(e -> !e.isBlank()).toList());
+ expectedElementsInQueueName.add(flowType.name());
+
+ publisherQueueNameTokens.forEach(ele -> Assertions.assertTrue(expectedElementsInQueueName.contains(ele)));
+ message.setExchangeName(String.join("-", expectedElementsInQueueName));
+ try {
+ router.publish(routingKey, message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ cleanUp();
+ }
+
+
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowTypeHierarchicalActorBuilder.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowTypeHierarchicalActorBuilder.java
new file mode 100644
index 00000000..7902251f
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/hierarchical/FlowTypeHierarchicalActorBuilder.java
@@ -0,0 +1,29 @@
+package io.appform.dropwizard.actors.actor.integration.hierarchical;
+
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActor;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActorConfig;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+
+public class FlowTypeHierarchicalActorBuilder implements FlowType.FlowTypeVisitor> {
+
+ private final HierarchicalActorConfig hierarchicalTreeConfig;
+ private final RMQIntegrationTestHelper routerTestHelper;
+
+ public FlowTypeHierarchicalActorBuilder(final HierarchicalActorConfig hierarchicalTreeConfig,
+ final RMQIntegrationTestHelper routerTestHelper) {
+ this.hierarchicalTreeConfig = hierarchicalTreeConfig;
+ this.routerTestHelper = routerTestHelper;
+ }
+
+ @Override
+ public HierarchicalActor visitC2M() {
+ return new C2MDataActionMessageHierarchicalActor(FlowType.C2M_AUTH_FLOW, hierarchicalTreeConfig, routerTestHelper);
+ }
+
+ @Override
+ public HierarchicalActor visitC2C() {
+ return new C2CDataActionMessageHierarchicalActor(FlowType.C2C_AUTH_FLOW, hierarchicalTreeConfig, routerTestHelper);
+ }
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/C2CDataActionMessageActor.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/C2CDataActionMessageActor.java
new file mode 100644
index 00000000..043620ea
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/C2CDataActionMessageActor.java
@@ -0,0 +1,33 @@
+package io.appform.dropwizard.actors.actor.integration.normal;
+
+import io.appform.dropwizard.actors.actor.Actor;
+import io.appform.dropwizard.actors.actor.ActorConfig;
+import io.appform.dropwizard.actors.actor.MessageMetadata;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2CDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+
+
+public class C2CDataActionMessageActor extends Actor {
+
+
+ public C2CDataActionMessageActor(final FlowType flowType,
+ final ActorConfig config,
+ final RMQIntegrationTestHelper routerTestHelper) {
+ super(flowType,
+ config,
+ routerTestHelper.getConnectionRegistry(),
+ routerTestHelper.getMapper(),
+ routerTestHelper.getRetryStrategyFactory(),
+ routerTestHelper.getExceptionHandlingFactory(),
+ C2CDataActionMessage.class,
+ routerTestHelper.getDroppedExceptionTypes());
+ }
+
+ @Override
+ protected boolean handle(ActionMessage actionMessage, MessageMetadata messageMetadata) throws Exception {
+ System.out.println("C2C : " + actionMessage);
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/C2MDataActionMessageActor.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/C2MDataActionMessageActor.java
new file mode 100644
index 00000000..dc682513
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/C2MDataActionMessageActor.java
@@ -0,0 +1,33 @@
+package io.appform.dropwizard.actors.actor.integration.normal;
+
+import io.appform.dropwizard.actors.actor.Actor;
+import io.appform.dropwizard.actors.actor.ActorConfig;
+import io.appform.dropwizard.actors.actor.MessageMetadata;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2MDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+
+
+public class C2MDataActionMessageActor extends Actor {
+
+
+ public C2MDataActionMessageActor(final FlowType flowType,
+ final ActorConfig config,
+ final RMQIntegrationTestHelper routerTestHelper) {
+ super(flowType,
+ config,
+ routerTestHelper.getConnectionRegistry(),
+ routerTestHelper.getMapper(),
+ routerTestHelper.getRetryStrategyFactory(),
+ routerTestHelper.getExceptionHandlingFactory(),
+ C2MDataActionMessage.class,
+ routerTestHelper.getDroppedExceptionTypes());
+ }
+
+ @Override
+ protected boolean handle(ActionMessage actionMessage, MessageMetadata messageMetadata) throws Exception {
+ System.out.println("C2M : " + actionMessage);
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowActorConfig.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowActorConfig.java
new file mode 100644
index 00000000..59097ecf
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowActorConfig.java
@@ -0,0 +1,18 @@
+package io.appform.dropwizard.actors.actor.integration.normal;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.appform.dropwizard.actors.actor.ActorConfig;
+import io.appform.dropwizard.actors.actor.hierarchical.HierarchicalActorConfig;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class FlowActorConfig> {
+ private Map workers;
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowActorTest.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowActorTest.java
new file mode 100644
index 00000000..6b89af5b
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowActorTest.java
@@ -0,0 +1,99 @@
+package io.appform.dropwizard.actors.actor.integration.normal;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.appform.dropwizard.actors.actor.Actor;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2CDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.C2MDataActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+import io.appform.dropwizard.actors.utils.YamlReader;
+import lombok.SneakyThrows;
+import lombok.val;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+class FlowActorTest {
+
+ private final static RMQIntegrationTestHelper ROUTER_TEST_HELPER = new RMQIntegrationTestHelper();
+ private final static FlowActorConfig RMQ_CONFIG = YamlReader.loadConfig("rmq.yaml", new TypeReference<>() {
+ });
+ private Map> actorActors;
+
+ @SneakyThrows
+ public void createActors() {
+ actorActors = RMQ_CONFIG.getWorkers()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getKey().accept(new FlowTypeActorBuilder(e.getValue(), ROUTER_TEST_HELPER))));
+ Assertions.assertNotNull(actorActors);
+ Assertions.assertEquals(actorActors.size(), RMQ_CONFIG.getWorkers().size());
+
+ for (val entry : actorActors.entrySet()) {
+ val routerActor = entry.getValue();
+ routerActor.start();
+ }
+ }
+
+ @SneakyThrows
+ public void cleanUp() {
+ for (val entry : actorActors.entrySet()) {
+ entry.getValue().stop();
+ }
+ }
+
+
+ @Test
+ void testRouter() {
+ createActors();
+ val messages = List.of(
+ C2MDataActionMessage.builder()
+ .data("C2M")
+ .build(),
+
+ C2MDataActionMessage.builder()
+ .data("C2M-REGULAR-JAR-SOME")
+ .build(),
+
+ C2CDataActionMessage.builder()
+ .data("C2C-REGULAR")
+ .build(),
+
+ C2CDataActionMessage.builder()
+ .data("C2C")
+ .build(),
+
+ C2MDataActionMessage.builder()
+ .data("C2M-FULL_AUTH-JAR-SOME")
+ .build()
+ );
+
+ messages.forEach(message -> {
+ val flowType = message.getType();
+
+ if (actorActors.containsKey(flowType)) {
+ val router = actorActors.get(flowType);
+ val flowLevelPrefix = Arrays.asList(RMQ_CONFIG.getWorkers().get(flowType).getPrefix().split("\\."));
+
+ Assertions.assertNotNull(router);
+ val publisherQueueName = router.getActorImpl().getPublishActor().queueName();
+ Assertions.assertNotNull(publisherQueueName);
+ message.setExchangeName(String.join("-", publisherQueueName));
+ try {
+ router.publish(message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ cleanUp();
+ }
+
+
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowTypeActorBuilder.java b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowTypeActorBuilder.java
new file mode 100644
index 00000000..a37d14b6
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/actor/integration/normal/FlowTypeActorBuilder.java
@@ -0,0 +1,29 @@
+package io.appform.dropwizard.actors.actor.integration.normal;
+
+import io.appform.dropwizard.actors.actor.Actor;
+import io.appform.dropwizard.actors.actor.ActorConfig;
+import io.appform.dropwizard.actors.actor.integration.RMQIntegrationTestHelper;
+import io.appform.dropwizard.actors.actor.integration.data.ActionMessage;
+import io.appform.dropwizard.actors.actor.integration.data.FlowType;
+
+public class FlowTypeActorBuilder implements FlowType.FlowTypeVisitor> {
+
+ private final ActorConfig config;
+ private final RMQIntegrationTestHelper routerTestHelper;
+
+ public FlowTypeActorBuilder(final ActorConfig config,
+ final RMQIntegrationTestHelper routerTestHelper) {
+ this.config = config;
+ this.routerTestHelper = routerTestHelper;
+ }
+
+ @Override
+ public Actor visitC2M() {
+ return new C2MDataActionMessageActor(FlowType.C2M_AUTH_FLOW, config, routerTestHelper);
+ }
+
+ @Override
+ public Actor visitC2C() {
+ return new C2CDataActionMessageActor(FlowType.C2C_AUTH_FLOW, config, routerTestHelper);
+ }
+}
diff --git a/src/test/java/io/appform/dropwizard/actors/utils/YamlReader.java b/src/test/java/io/appform/dropwizard/actors/utils/YamlReader.java
new file mode 100644
index 00000000..0edcc929
--- /dev/null
+++ b/src/test/java/io/appform/dropwizard/actors/utils/YamlReader.java
@@ -0,0 +1,48 @@
+package io.appform.dropwizard.actors.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.io.Resources;
+import lombok.SneakyThrows;
+import lombok.experimental.UtilityClass;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+@UtilityClass
+public class YamlReader {
+
+ // Config reader
+ private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
+
+ @SneakyThrows
+ public T readYAML(final String data, final TypeReference typeReference) {
+ return yamlMapper.readValue(data, typeReference);
+ }
+
+ @SneakyThrows
+ public T loadConfig(final String filePath, final TypeReference typeReference) {
+ String data = fixture(filePath);
+ return readYAML(data, typeReference);
+ }
+
+ public static String fixture(String filename) {
+ return fixture(filename, StandardCharsets.UTF_8);
+ }
+
+ private static String fixture(String filename, Charset charset) {
+ URL resource = Resources.getResource(filename);
+
+ try {
+ return Resources.toString(resource, charset).trim();
+ } catch (IOException var4) {
+ IOException e = var4;
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+
+}
diff --git a/src/test/resources/rmq.yaml b/src/test/resources/rmq.yaml
new file mode 100644
index 00000000..11edf151
--- /dev/null
+++ b/src/test/resources/rmq.yaml
@@ -0,0 +1,13 @@
+workers:
+ C2M_AUTH_FLOW:
+ exchange: prod.mandate.actors.c2m
+ delayed: false
+ prefix: prod.mandate.actors.c2m
+ concurrency: 1
+ shardCount: 1
+ C2C_AUTH_FLOW:
+ exchange: prod.mandate.actors.c2c
+ delayed: false
+ prefix: prod.mandate.actors.c2c
+ concurrency: 1
+ shardCount: 1
diff --git a/src/test/resources/rmqHierarchical.yaml b/src/test/resources/rmqHierarchical.yaml
new file mode 100644
index 00000000..7bfa630b
--- /dev/null
+++ b/src/test/resources/rmqHierarchical.yaml
@@ -0,0 +1,34 @@
+workers:
+ C2M_AUTH_FLOW:
+ exchange: prod.mandate.actors
+ delayed: false
+ prefix: prod.mandate.actors
+ concurrency: 1
+ shardCount: 1
+ children:
+ REGULAR:
+ nodeData:
+ concurrency: 1
+ shardCount: 2
+ children:
+ HOTSTAR:
+ nodeData:
+ concurrency: 1
+ shardCount: 1
+ JAR:
+ nodeData:
+ concurrency: 1
+ shardCount: 1
+ FULL_AUTH:
+ nodeData:
+ concurrency: 1
+ shardCount: 1
+ C2C_AUTH_FLOW:
+ concurrency: 1
+ shardCount: 1
+ children:
+ REGULAR:
+ nodeData:
+ concurrency: 1
+ shardCount: 1
+