Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kavitha|add dynamic properties based on route desc #21

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static class RouteDescription {
private Source source;
private List<Destination> destinations;
private ErrorDestination errorDestination;
private LinkedHashMap<String, String> additionalProperties = new LinkedHashMap<>(0);
private List<AdditionalProperty> additionalProperties;
private LinkedHashMap<String, String> derivedProperties = new LinkedHashMap<>(0);
private Destination healthCheckDestination;
private FilterBy filterBy;
Expand Down Expand Up @@ -115,6 +115,19 @@ public static class ErrorDestination extends Destination {
private String cronExpressionForRetryStop;
}

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class AdditionalProperty {
private String parentPath;
private String filterKeyPath;
private String filterValue;
private LinkedHashMap<String, String> staticProperties = new LinkedHashMap<>(0);
private LinkedHashMap<String, String> dynamicProperties = new LinkedHashMap<>(0);
}

public enum BahmniEventType {
BAHMNI_PATIENT_CREATED, BAHMNI_PATIENT_UPDATED, BAHMNI_APPOINTMENT_CREATED, BAHMNI_APPOINTMENT_UPDATED, BAHMNI_ENCOUNTER_CREATED, BAHMNI_ENCOUNTER_UPDATED, BAHMNI_VISIT_CREATED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void configure() {

EventPropertiesFilter eventPropertiesFilter = new EventPropertiesFilter(objectMapper, routeDescription);
PatientPropertiesFilter patientPropertiesFilter = new PatientPropertiesFilter(objectMapper, routeDescription, bahmniAPIGateway);
EventProcessor eventProcessor = new EventProcessor(objectMapper, routeDescription);
EventProcessor eventProcessor = new EventProcessor(routeDescription);
DerivedPropertiesGenerator derivedPropertiesGenerator = new DerivedPropertiesGenerator(routeDescription);

String sourceTopic = routeDescription.getErrorDestination().getQueue().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void configure() {

EventPropertiesFilter eventPropertiesFilter = new EventPropertiesFilter(objectMapper, routeDescription);
PatientPropertiesFilter patientPropertiesFilter = new PatientPropertiesFilter(objectMapper, routeDescription, bahmniAPIGateway);
EventProcessor eventProcessor = new EventProcessor(objectMapper, routeDescription);
EventProcessor eventProcessor = new EventProcessor(routeDescription);
DerivedPropertiesGenerator derivedPropertiesGenerator = new DerivedPropertiesGenerator(routeDescription);

String sourceTopic = routeDescription.getSource().getTopic().getName();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package org.bahmni.eventrouterservice.route;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.bahmni.eventrouterservice.configuration.RouteDescriptionLoader.RouteDescription;
import org.bahmni.eventrouterservice.configuration.RouteDescriptionLoader.AdditionalProperty;

import java.util.LinkedHashMap;
import java.util.List;

@Slf4j
class EventProcessor implements Processor {

private final ObjectMapper objectMapper;
private final RouteDescription routeDescription;

public EventProcessor(ObjectMapper objectMapper, RouteDescription routeDescription) {
this.objectMapper = objectMapper;
public EventProcessor(RouteDescription routeDescription) {
this.routeDescription = routeDescription;
}

Expand All @@ -28,7 +28,7 @@ public void process(Exchange exchange) {
return;
}
String payloadAsJsonString = exchange.getIn().getBody(String.class);
String updatedPayloadAsJson = addStaticData(payloadAsJsonString, routeDescription.getAdditionalProperties());
String updatedPayloadAsJson = addAdditionalProperties(payloadAsJsonString, routeDescription.getAdditionalProperties());
exchange.getIn().setBody(updatedPayloadAsJson);

String destinationTopic = getDestination(exchange.getIn().getHeader("eventType"));
Expand All @@ -40,14 +40,38 @@ private String getDestination(Object eventType) {
return routeDescription.getDestinationBasedOn(eventTypeAsString).getTopic().getName();
}

private String addStaticData(String jsonBodyAsString, LinkedHashMap<String, String> additionalProperties) {
private String addAdditionalProperties(String payloadAsJsonString, List<AdditionalProperty> additionalProperties) {
try {
ObjectNode objectNode = objectMapper.readValue(jsonBodyAsString, ObjectNode.class);
additionalProperties.forEach(objectNode::put);
log.info("Added additional properties to payload for uuid : "+objectNode.get("uuid"));
return objectMapper.writeValueAsString(objectNode);
} catch (JsonProcessingException exception) {
log.info("Failed to process payload : " + exception.getMessage());
DocumentContext[] contextRef = {JsonPath.parse(payloadAsJsonString)};

for (AdditionalProperty obj : additionalProperties) {
String parentPath = obj.getParentPath();
String filterKeyPath = obj.getFilterKeyPath();
String filterValue = obj.getFilterValue();

if (obj.getStaticProperties() != null && obj.getStaticProperties().size() > 0) {
obj.getStaticProperties().entrySet().forEach(entry -> {
contextRef[0].put(JsonPath.compile(parentPath), entry.getKey(), entry.getValue());
});
}

if (obj.getDynamicProperties() != null && obj.getDynamicProperties().size() > 0) {
obj.getDynamicProperties().entrySet().forEach(entry -> {
contextRef[0] = contextRef[0].map(parentPath, (currentValue, configuration) -> {
Object filterObj = JsonPath.read(currentValue, filterKeyPath);
Object valueObj = JsonPath.read(currentValue, entry.getValue());
DocumentContext obsContext = JsonPath.parse(currentValue);
if ((filterKeyPath == null && filterValue == null) || (filterObj != null && filterObj.toString().contains(filterValue))) {
obsContext.put(JsonPath.compile("$"), entry.getKey(), valueObj);
}
return obsContext.json();
});
});
}
}
return contextRef[0].jsonString();
} catch (Exception exception) {
log.info("Failed to process payload for additional properties : " + exception.getMessage());
throw new RuntimeException(exception);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import org.apache.camel.Message;
import org.bahmni.eventrouterservice.configuration.RouteDescriptionLoader.Destination;
import org.bahmni.eventrouterservice.configuration.RouteDescriptionLoader.RouteDescription;
import org.bahmni.eventrouterservice.configuration.RouteDescriptionLoader.AdditionalProperty;
import org.bahmni.eventrouterservice.model.Topic;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.ArrayList;

import static org.bahmni.eventrouterservice.configuration.RouteDescriptionLoader.BahmniEventType.BAHMNI_PATIENT_UPDATED;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand All @@ -25,13 +28,16 @@ public void givenAdditionalPropertyKeyWithValue_whenStartProcessing_thenShouldAd

RouteDescription routeDescription = mock(RouteDescription.class);
Destination destination = new Destination(BAHMNI_PATIENT_UPDATED, new Topic("topicName", null), null);
LinkedHashMap<String, String> additionalProperties = new LinkedHashMap<>();
additionalProperties.put("facility", "Ethopia");
LinkedHashMap<String, String> staticProperties = new LinkedHashMap<>();
staticProperties.put("facility", "Bahmni");
AdditionalProperty additionalProperty = new AdditionalProperty("$", null, null, staticProperties, null);
List<AdditionalProperty> additionalProperties = new ArrayList();
additionalProperties.add(additionalProperty);
when(routeDescription.getAdditionalProperties()).thenReturn(additionalProperties);
when(routeDescription.getDestinationBasedOn("BAHMNI_PATIENT_UPDATED")).thenReturn(destination);


EventProcessor eventProcessor = new EventProcessor(new ObjectMapper(), routeDescription);
EventProcessor eventProcessor = new EventProcessor(routeDescription);

Exchange exchange = mock(Exchange.class);
Message message = mock(Message.class);
Expand All @@ -41,20 +47,23 @@ public void givenAdditionalPropertyKeyWithValue_whenStartProcessing_thenShouldAd

assertDoesNotThrow(() -> eventProcessor.process(exchange));

verify(message, times(1)).setBody("{\"uuid\":\"patientUuid\",\"facility\":\"Ethopia\"}");
verify(message, times(1)).setBody("{\"uuid\":\"patientUuid\",\"facility\":\"Bahmni\"}");
}

@Test
public void givenDestinationBasedOnEventType_whenStartProcessing_thenShouldChangeDestinationAsPerEventType() {

RouteDescription routeDescription = mock(RouteDescription.class);
Destination destination = new Destination(BAHMNI_PATIENT_UPDATED, new Topic("topicName", null), null);
LinkedHashMap<String, String> additionalProperties = new LinkedHashMap<>();
additionalProperties.put("facility", "Ethopia");
LinkedHashMap<String, String> staticProperties = new LinkedHashMap<>();
staticProperties.put("facility", "Bahmni");
AdditionalProperty additionalProperty = new AdditionalProperty("$", null, null, staticProperties, null);
List<AdditionalProperty> additionalProperties = new ArrayList();
additionalProperties.add(additionalProperty);
when(routeDescription.getAdditionalProperties()).thenReturn(additionalProperties);
when(routeDescription.getDestinationBasedOn("BAHMNI_PATIENT_UPDATED")).thenReturn(destination);

EventProcessor eventProcessor = new EventProcessor(new ObjectMapper(), routeDescription);
EventProcessor eventProcessor = new EventProcessor(routeDescription);

Exchange exchange = mock(Exchange.class);
Message message = mock(Message.class);
Expand All @@ -71,10 +80,9 @@ public void givenDestinationBasedOnEventType_whenStartProcessing_thenShouldChang
public void givenAdditionalPropertiesAreEmpty_whenStartProcessing_thenShouldNotAddInPayload() {

RouteDescription routeDescription = mock(RouteDescription.class);
LinkedHashMap<String, String> additionalProperties = new LinkedHashMap<>();
when(routeDescription.getAdditionalProperties()).thenReturn(additionalProperties);
when(routeDescription.getAdditionalProperties()).thenReturn(new ArrayList());

EventProcessor eventProcessor = new EventProcessor(new ObjectMapper(), routeDescription);
EventProcessor eventProcessor = new EventProcessor(routeDescription);

Exchange exchange = mock(Exchange.class);

Expand All @@ -87,11 +95,14 @@ public void givenAdditionalPropertiesAreEmpty_whenStartProcessing_thenShouldNotA
public void givenAdditionalPropertyKeyWithValueWithInvalidPayload_whenStartProcessing_thenShouldThrowRuntimeException() {

RouteDescription routeDescription = mock(RouteDescription.class);
LinkedHashMap<String, String> additionalProperties = new LinkedHashMap<>();
additionalProperties.put("facility", "Ethopia");
LinkedHashMap<String, String> staticProperties = new LinkedHashMap<>();
staticProperties.put("facility", "Bahmni");
AdditionalProperty additionalProperty = new AdditionalProperty("$", null, null, staticProperties, null);
List<AdditionalProperty> additionalProperties = new ArrayList();
additionalProperties.add(additionalProperty);
when(routeDescription.getAdditionalProperties()).thenReturn(additionalProperties);

EventProcessor eventProcessor = new EventProcessor(new ObjectMapper(), routeDescription);
EventProcessor eventProcessor = new EventProcessor(routeDescription);

Exchange exchange = mock(Exchange.class);
Message message = mock(Message.class);
Expand Down
9 changes: 6 additions & 3 deletions src/test/resources/route-descriptions.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
"cronExpressionForRetryStart": "* 5 * * * ?",
"cronExpressionForRetryStop": "* 7 * * * ?"
},
"additionalProperties": {
"facility": "Ethopia"
},
"additionalProperties": [{
"parentPath": "$",
"staticProperties": {
"facility": "Bahmni"
}
}],
"derivedProperties": {
"patientUuid": "$.patient.uuid"
},
Expand Down
Loading