Skip to content

Commit

Permalink
[16141] Integrating receiver enrichment into the UP pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
wcutshall committed Jan 14, 2025
1 parent f1a0e3a commit e94837d
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ class FHIRFunctions(
@FunctionName("elr-fhir-receiver-enrichment")
@StorageAccount("AzureWebJobsStorage")
fun receiverEnrichment(
@QueueTrigger(name = "receiver-enrichment", queueName = QueueMessage.elrReceiverEnrichmentQueueName)
@QueueTrigger(name = "message", queueName = QueueMessage.elrReceiverEnrichmentQueueName)
message: String,
@BindingName("DequeueCount") dequeueCount: Int = 1,
) {
// TODO Change to ReceiverEnrichment object.
process(
message,
dequeueCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class FHIRDestinationFilter(
metadata = this.metadata,
topic = queueMessage.topic,
destination = receiver,
nextAction = TaskAction.receiver_filter
nextAction = TaskAction.receiver_enrichment
)

// create item lineage
Expand All @@ -159,7 +159,7 @@ class FHIRDestinationFilter(
)

val nextEvent = ProcessEvent(
Event.EventAction.RECEIVER_FILTER,
Event.EventAction.RECEIVER_ENRICHMENT,
report.id,
Options.None,
emptyMap(),
Expand Down Expand Up @@ -209,7 +209,7 @@ class FHIRDestinationFilter(
nextEvent,
report,
blobInfo.blobUrl,
FhirReceiverFilterQueueMessage(
FhirReceiverEnrichmentQueueMessage(
report.id,
blobInfo.blobUrl,
BlobUtils.digestToString(blobInfo.digest),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,6 @@ class FHIRTranslator(
receiver: Receiver,
bundle: Bundle,
): ByteArray {
if (receiver.enrichmentSchemaNames.isNotEmpty()) {
receiver.enrichmentSchemaNames.forEach { enrichmentSchemaName ->
logger.info("Applying enrichment schema $enrichmentSchemaName")
val transformer = FhirTransformer(
enrichmentSchemaName,
)
transformer.process(bundle)
}
}
when (receiver.format) {
MimeFormat.FHIR -> {
if (receiver.schemaName.isNotEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ fun registerPrimeRouterQueueMessageSubtypes() {
QueueMessage.ObjectMapperProvider.registerSubtypes(
FhirConvertQueueMessage::class.java,
FhirDestinationFilterQueueMessage::class.java,
FhirReceiverEnrichmentQueueMessage::class.java,
FhirReceiverFilterQueueMessage::class.java,
FhirTranslateQueueMessage::class.java,
BatchEventQueueMessage::class.java,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import gov.cdc.prime.router.common.validFHIRRecord1
import gov.cdc.prime.router.db.ReportStreamTestDatabaseContainer
import gov.cdc.prime.router.db.ReportStreamTestDatabaseSetupExtension
import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter
import gov.cdc.prime.router.fhirengine.engine.FhirReceiverFilterQueueMessage
import gov.cdc.prime.router.fhirengine.engine.FhirReceiverEnrichmentQueueMessage
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.history.db.ReportGraph
import gov.cdc.prime.router.metadata.LookupTable
Expand Down Expand Up @@ -204,15 +204,15 @@ class FHIRDestinationFilterIntegrationTests : Logging {
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReports = fetchChildReports(report, txn, 2, 2)
with(routedReports.first()) {
assertThat(this.nextAction).isEqualTo(TaskAction.receiver_filter)
assertThat(this.nextAction).isEqualTo(TaskAction.receiver_enrichment)
assertThat(this.receivingOrg).isEqualTo("phd")
assertThat(this.receivingOrgSvc).isEqualTo("x")
assertThat(this.schemaName).isEqualTo("None")
assertThat(this.schemaTopic).isEqualTo(Topic.FULL_ELR)
assertThat(this.bodyFormat).isEqualTo("FHIR")
}
with(routedReports.last()) {
assertThat(this.nextAction).isEqualTo(TaskAction.receiver_filter)
assertThat(this.nextAction).isEqualTo(TaskAction.receiver_enrichment)
assertThat(this.receivingOrg).isEqualTo("phd")
assertThat(this.receivingOrgSvc).isEqualTo("y")
assertThat(this.schemaName).isEqualTo("None")
Expand All @@ -233,15 +233,15 @@ class FHIRDestinationFilterIntegrationTests : Logging {
// check queue message
val expectedRouteQueueMessages = routedReports.flatMap { report ->
listOf(
FhirReceiverFilterQueueMessage(
FhirReceiverEnrichmentQueueMessage(
report.reportId,
report.bodyUrl,
BlobUtils.digestToString(report.blobDigest),
"phd.fhir-elr-no-transform",
UniversalPipelineTestUtils.fhirSenderWithNoTransform.topic,
"phd.x"
),
FhirReceiverFilterQueueMessage(
FhirReceiverEnrichmentQueueMessage(
report.reportId,
report.bodyUrl,
BlobUtils.digestToString(report.blobDigest),
Expand All @@ -256,7 +256,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {

verify(exactly = 2) {
QueueAccess.sendMessage(
QueueMessage.elrReceiverFilterQueueName,
QueueMessage.elrReceiverEnrichmentQueueName,
match {
expectedRouteQueueMessages.contains(it)
}
Expand Down Expand Up @@ -309,7 +309,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {
// check results
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReport = fetchChildReports(report, txn, 1).single()
assertThat(routedReport.nextAction).isEqualTo(TaskAction.receiver_filter)
assertThat(routedReport.nextAction).isEqualTo(TaskAction.receiver_enrichment)
assertThat(routedReport.receivingOrg).isEqualTo("phd")
assertThat(routedReport.receivingOrgSvc).isEqualTo("x")
assertThat(routedReport.schemaName).isEqualTo("None")
Expand All @@ -324,7 +324,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {
assertThat(reportContents).isEqualTo(routedBundle)

// check queue message
val expectedQueueMessage = FhirReceiverFilterQueueMessage(
val expectedQueueMessage = FhirReceiverEnrichmentQueueMessage(
routedReport.reportId,
routedReport.bodyUrl,
BlobUtils.digestToString(routedReport.blobDigest),
Expand All @@ -336,7 +336,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {
// filter should permit message and should not mangle message
verify(exactly = 1) {
QueueAccess.sendMessage(
QueueMessage.elrReceiverFilterQueueName,
QueueMessage.elrReceiverEnrichmentQueueName,
expectedQueueMessage.serialize()
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,7 @@ class FHIRTranslatorIntegrationTests : Logging {
routingFilter = listOf("true"),
conditionFilter = listOf("true"),
format = MimeFormat.HL7,
enrichmentSchemaNames = listOf(
"classpath:/enrichments/testing.yml",
"classpath:/enrichments/testing2.yml"
)
enrichmentSchemaNames = emptyList()
)
)
val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData)
Expand All @@ -277,7 +274,7 @@ class FHIRTranslatorIntegrationTests : Logging {

@Suppress("ktlint:standard:max-line-length")
val expectedOutput = "MSH|^~\\&|||||||ORU/ACK - Unsolicited transmission of an observation message|849547|P|2.5.1|||||USA\r" +
"SFT|Orange Software Vendor Name|0.2-YELLOW|Purple PRIME ReportStream|0.1-SNAPSHOT||20210622\r" +
"SFT|Centers for Disease Control and Prevention|0.1-SNAPSHOT|PRIME Data Hub|0.1-SNAPSHOT||20210622\r" +
"PID|1||||Steuber||20150707|O||^^^^^^^^Native Hawaiian or Other Pacific Islander|^^^IG^^s4fgh||~|||||||||^^^^^^^^Non Hispanic or Latino|||||||20210614\r" +
"ORC|||||||||||||||||||||Any facility USA|^^^IG||^^^IG\r" +
"OBR|1|||^^^^^^^^SARS-CoV+SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen by Rapid immunoassay\r" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import assertk.assertThat
import assertk.assertions.isEqualTo
import assertk.assertions.isNotEmpty
import assertk.assertions.isTrue
import ca.uhn.hl7v2.util.Hl7InputStreamMessageIterator
import ca.uhn.hl7v2.util.Terser
import gov.cdc.prime.router.ActionLogDetail
import gov.cdc.prime.router.ActionLogger
Expand Down Expand Up @@ -513,55 +512,6 @@ class FhirTranslatorTests {
assertThat(terser.get(MSH_11_1)).isEqualTo("T")
}

/**
* When the receiver is in production mode and sender is in testing mode, output HL7 should be 'T'
*/
@Test
fun `test receiver enrichment`() {
mockkClass(BlobAccess::class)
mockkObject(BlobAccess.Companion)
every { BlobAccess.Companion.getBlobConnection(any()) } returns "testconnection"

// set up
val schemaName = ORU_R01_SCHEMA
val receiver = Receiver(
RECEIVER_NAME,
ORGANIZATION_NAME,
Topic.FULL_ELR,
CustomerStatus.ACTIVE,
schemaName,
translation = UnitTestUtils.createConfig(useTestProcessingMode = false, schemaName = schemaName),
enrichmentSchemaNames = listOf(
"classpath:/enrichments/testing.yml",
"classpath:/enrichments/testing2.yml"
)
)

val testOrg = DeepOrganization(
ORGANIZATION_NAME, "test", Organization.Jurisdiction.FEDERAL,
receivers = listOf(receiver)
)

val settings = FileSettings().loadOrganizations(testOrg)

val fhirData = File("src/test/resources/fhirengine/engine/valid_data_testing_sender.fhir").readText()
val bundle = FhirTranscoder.decode(fhirData)

val engine = makeFhirEngine(settings = settings)

// act
val byteArray = engine.getByteArrayFromBundle(receiver, bundle)
val messageIterator = Hl7InputStreamMessageIterator(byteArray.inputStream())
val message = messageIterator.next()
val terser = Terser(message)

// assert
assertThat(terser.get("SFT-1-1")).isEqualTo("Orange Software Vendor Name")
assertThat(terser.get("SFT-2")).isEqualTo("0.2-YELLOW")
// because while it will initially get set, it will then be overridden by the transform
assertThat(terser.get("SFT-3")).isEqualTo("PRIME ReportStream")
}

@Test
fun `test full elr translation hl7 translation exception`() {
mockkObject(BlobAccess)
Expand Down

0 comments on commit e94837d

Please sign in to comment.