diff --git a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt index 5a01f375d3b..83e23240bae 100644 --- a/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt +++ b/prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt @@ -161,11 +161,10 @@ class FHIRFunctions( @FunctionName("elr-fhir-receiver-enrichment") @StorageAccount("AzureWebJobsStorage") fun receiverEnrichment( - @QueueTrigger(name = "receiver-enrichment", queueName = QueueMessage.elrReceiverEnrichmentQueueName) + @QueueTrigger(name = "message", queueName = QueueMessage.elrReceiverEnrichmentQueueName) message: String, @BindingName("DequeueCount") dequeueCount: Int = 1, ) { - // TODO Change to ReceiverEnrichment object. process( message, dequeueCount, diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt index 5be29b4626e..41aa62215ba 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt @@ -140,7 +140,7 @@ class FHIRDestinationFilter( metadata = this.metadata, topic = queueMessage.topic, destination = receiver, - nextAction = TaskAction.receiver_filter + nextAction = TaskAction.receiver_enrichment ) // create item lineage @@ -159,7 +159,7 @@ class FHIRDestinationFilter( ) val nextEvent = ProcessEvent( - Event.EventAction.RECEIVER_FILTER, + Event.EventAction.RECEIVER_ENRICHMENT, report.id, Options.None, emptyMap(), @@ -209,7 +209,7 @@ class FHIRDestinationFilter( nextEvent, report, blobInfo.blobUrl, - FhirReceiverFilterQueueMessage( + FhirReceiverEnrichmentQueueMessage( report.id, blobInfo.blobUrl, BlobUtils.digestToString(blobInfo.digest), diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt index a35c533a2b7..46c45e937e6 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt @@ -218,15 +218,6 @@ class FHIRTranslator( receiver: Receiver, bundle: Bundle, ): ByteArray { - if (receiver.enrichmentSchemaNames.isNotEmpty()) { - receiver.enrichmentSchemaNames.forEach { enrichmentSchemaName -> - logger.info("Applying enrichment schema $enrichmentSchemaName") - val transformer = FhirTransformer( - enrichmentSchemaName, - ) - transformer.process(bundle) - } - } when (receiver.format) { MimeFormat.FHIR -> { if (receiver.schemaName.isNotEmpty()) { diff --git a/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt b/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt index b6c8654c924..e916d35cdbf 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/PrimeRouterQueueMessage.kt @@ -148,6 +148,7 @@ fun registerPrimeRouterQueueMessageSubtypes() { QueueMessage.ObjectMapperProvider.registerSubtypes( FhirConvertQueueMessage::class.java, FhirDestinationFilterQueueMessage::class.java, + FhirReceiverEnrichmentQueueMessage::class.java, FhirReceiverFilterQueueMessage::class.java, FhirTranslateQueueMessage::class.java, BatchEventQueueMessage::class.java, diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt index 2c8f314fbe6..a0d37def0f0 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt @@ -42,7 +42,7 @@ import gov.cdc.prime.router.common.validFHIRRecord1 import gov.cdc.prime.router.db.ReportStreamTestDatabaseContainer import gov.cdc.prime.router.db.ReportStreamTestDatabaseSetupExtension import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter -import gov.cdc.prime.router.fhirengine.engine.FhirReceiverFilterQueueMessage +import gov.cdc.prime.router.fhirengine.engine.FhirReceiverEnrichmentQueueMessage import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder import gov.cdc.prime.router.history.db.ReportGraph import gov.cdc.prime.router.metadata.LookupTable @@ -204,7 +204,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val routedReports = fetchChildReports(report, txn, 2, 2) with(routedReports.first()) { - assertThat(this.nextAction).isEqualTo(TaskAction.receiver_filter) + assertThat(this.nextAction).isEqualTo(TaskAction.receiver_enrichment) assertThat(this.receivingOrg).isEqualTo("phd") assertThat(this.receivingOrgSvc).isEqualTo("x") assertThat(this.schemaName).isEqualTo("None") @@ -212,7 +212,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { assertThat(this.bodyFormat).isEqualTo("FHIR") } with(routedReports.last()) { - assertThat(this.nextAction).isEqualTo(TaskAction.receiver_filter) + assertThat(this.nextAction).isEqualTo(TaskAction.receiver_enrichment) assertThat(this.receivingOrg).isEqualTo("phd") assertThat(this.receivingOrgSvc).isEqualTo("y") assertThat(this.schemaName).isEqualTo("None") @@ -233,7 +233,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { // check queue message val expectedRouteQueueMessages = routedReports.flatMap { report -> listOf( - FhirReceiverFilterQueueMessage( + FhirReceiverEnrichmentQueueMessage( report.reportId, report.bodyUrl, BlobUtils.digestToString(report.blobDigest), @@ -241,7 +241,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { UniversalPipelineTestUtils.fhirSenderWithNoTransform.topic, "phd.x" ), - FhirReceiverFilterQueueMessage( + FhirReceiverEnrichmentQueueMessage( report.reportId, report.bodyUrl, BlobUtils.digestToString(report.blobDigest), @@ -256,7 +256,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { verify(exactly = 2) { QueueAccess.sendMessage( - QueueMessage.elrReceiverFilterQueueName, + QueueMessage.elrReceiverEnrichmentQueueName, match { expectedRouteQueueMessages.contains(it) } @@ -309,7 +309,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { // check results ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> val routedReport = fetchChildReports(report, txn, 1).single() - assertThat(routedReport.nextAction).isEqualTo(TaskAction.receiver_filter) + assertThat(routedReport.nextAction).isEqualTo(TaskAction.receiver_enrichment) assertThat(routedReport.receivingOrg).isEqualTo("phd") assertThat(routedReport.receivingOrgSvc).isEqualTo("x") assertThat(routedReport.schemaName).isEqualTo("None") @@ -324,7 +324,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { assertThat(reportContents).isEqualTo(routedBundle) // check queue message - val expectedQueueMessage = FhirReceiverFilterQueueMessage( + val expectedQueueMessage = FhirReceiverEnrichmentQueueMessage( routedReport.reportId, routedReport.bodyUrl, BlobUtils.digestToString(routedReport.blobDigest), @@ -336,7 +336,7 @@ class FHIRDestinationFilterIntegrationTests : Logging { // filter should permit message and should not mangle message verify(exactly = 1) { QueueAccess.sendMessage( - QueueMessage.elrReceiverFilterQueueName, + QueueMessage.elrReceiverEnrichmentQueueName, expectedQueueMessage.serialize() ) } diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt index 99eada67991..1605fc676d4 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt @@ -258,10 +258,7 @@ class FHIRTranslatorIntegrationTests : Logging { routingFilter = listOf("true"), conditionFilter = listOf("true"), format = MimeFormat.HL7, - enrichmentSchemaNames = listOf( - "classpath:/enrichments/testing.yml", - "classpath:/enrichments/testing2.yml" - ) + enrichmentSchemaNames = emptyList() ) ) val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData) @@ -277,7 +274,7 @@ class FHIRTranslatorIntegrationTests : Logging { @Suppress("ktlint:standard:max-line-length") val expectedOutput = "MSH|^~\\&|||||||ORU/ACK - Unsolicited transmission of an observation message|849547|P|2.5.1|||||USA\r" + - "SFT|Orange Software Vendor Name|0.2-YELLOW|Purple PRIME ReportStream|0.1-SNAPSHOT||20210622\r" + + "SFT|Centers for Disease Control and Prevention|0.1-SNAPSHOT|PRIME Data Hub|0.1-SNAPSHOT||20210622\r" + "PID|1||||Steuber||20150707|O||^^^^^^^^Native Hawaiian or Other Pacific Islander|^^^IG^^s4fgh||~|||||||||^^^^^^^^Non Hispanic or Latino|||||||20210614\r" + "ORC|||||||||||||||||||||Any facility USA|^^^IG||^^^IG\r" + "OBR|1|||^^^^^^^^SARS-CoV+SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen by Rapid immunoassay\r" + diff --git a/prime-router/src/test/kotlin/fhirengine/engine/FhirTranslatorTests.kt b/prime-router/src/test/kotlin/fhirengine/engine/FhirTranslatorTests.kt index 3f0ad2531aa..f77669d3f29 100644 --- a/prime-router/src/test/kotlin/fhirengine/engine/FhirTranslatorTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/engine/FhirTranslatorTests.kt @@ -5,7 +5,6 @@ import assertk.assertThat import assertk.assertions.isEqualTo import assertk.assertions.isNotEmpty import assertk.assertions.isTrue -import ca.uhn.hl7v2.util.Hl7InputStreamMessageIterator import ca.uhn.hl7v2.util.Terser import gov.cdc.prime.router.ActionLogDetail import gov.cdc.prime.router.ActionLogger @@ -513,55 +512,6 @@ class FhirTranslatorTests { assertThat(terser.get(MSH_11_1)).isEqualTo("T") } - /** - * When the receiver is in production mode and sender is in testing mode, output HL7 should be 'T' - */ - @Test - fun `test receiver enrichment`() { - mockkClass(BlobAccess::class) - mockkObject(BlobAccess.Companion) - every { BlobAccess.Companion.getBlobConnection(any()) } returns "testconnection" - - // set up - val schemaName = ORU_R01_SCHEMA - val receiver = Receiver( - RECEIVER_NAME, - ORGANIZATION_NAME, - Topic.FULL_ELR, - CustomerStatus.ACTIVE, - schemaName, - translation = UnitTestUtils.createConfig(useTestProcessingMode = false, schemaName = schemaName), - enrichmentSchemaNames = listOf( - "classpath:/enrichments/testing.yml", - "classpath:/enrichments/testing2.yml" - ) - ) - - val testOrg = DeepOrganization( - ORGANIZATION_NAME, "test", Organization.Jurisdiction.FEDERAL, - receivers = listOf(receiver) - ) - - val settings = FileSettings().loadOrganizations(testOrg) - - val fhirData = File("src/test/resources/fhirengine/engine/valid_data_testing_sender.fhir").readText() - val bundle = FhirTranscoder.decode(fhirData) - - val engine = makeFhirEngine(settings = settings) - - // act - val byteArray = engine.getByteArrayFromBundle(receiver, bundle) - val messageIterator = Hl7InputStreamMessageIterator(byteArray.inputStream()) - val message = messageIterator.next() - val terser = Terser(message) - - // assert - assertThat(terser.get("SFT-1-1")).isEqualTo("Orange Software Vendor Name") - assertThat(terser.get("SFT-2")).isEqualTo("0.2-YELLOW") - // because while it will initially get set, it will then be overridden by the transform - assertThat(terser.get("SFT-3")).isEqualTo("PRIME ReportStream") - } - @Test fun `test full elr translation hl7 translation exception`() { mockkObject(BlobAccess)