Skip to content

Commit

Permalink
feat: separate context source utils + work with sysAttrs
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasBousselin committed Oct 23, 2024
1 parent 3bcf7b3 commit 690fd59
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 96 deletions.
1 change: 1 addition & 0 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
<ID>TooGenericExceptionCaught:ContextSourceCaller.kt$ContextSourceCaller$e: Exception</ID>
</CurrentIssues>
</SmellBaseline>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.egm.stellio.search.csr.service

import arrow.core.*
import arrow.core.raise.either
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.util.*
import org.springframework.core.codec.DecodingException
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

object ContextSourceCaller {

suspend fun getDistributedInformation(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
path: String,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
val uri = URI("${csr.endpoint}$path")

val request = WebClient.create()
.method(HttpMethod.GET)
.uri { uriBuilder ->
uriBuilder.scheme(uri.scheme)
.host(uri.host)
.port(uri.port)
.path(uri.path)
.queryParams(params)
.build()
}
.header(HttpHeaders.LINK, httpHeaders.getFirst(HttpHeaders.LINK))
return try {
val (statusCode, response) = request
.awaitExchange { response ->
response.statusCode() to response.awaitBodyOrNull<CompactedEntity>()
}
when {
statusCode.is2xxSuccessful -> {
JsonLdUtils.logger.info("Successfully received response from CSR at $uri")
response.right()
}

statusCode.isSameCodeAs(HttpStatus.NOT_FOUND) -> {
JsonLdUtils.logger.info("CSR returned 404 at $uri: $response")
null.right()
}

else -> {
JsonLdUtils.logger.warn("Error contacting CSR at $uri: $response")

MiscellaneousPersistentWarning(
"the CSR ${csr.id} returned an error $statusCode at : $uri response: \"$response\""
).left()
}
}
} catch (e: Exception) {
when (e) {
is DecodingException -> RevalidationFailedWarning(
"the CSR ${csr.id} as : $uri returned badly formed data message: \"${e.cause}:${e.message}\""
)

else -> MiscellaneousWarning(
"Error connecting to csr ${csr.id} at : $uri message : \"${e.cause}:${e.message}\""
)
}.left()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,11 @@ import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_ID_TERM
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CREATED_AT_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_DATASET_ID_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_MODIFIED_AT_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_OBSERVED_AT_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SCOPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.logger
import com.fasterxml.jackson.core.JacksonException
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.WebClientException
import org.springframework.web.reactive.function.client.awaitBody
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI
import java.time.ZonedDateTime
import kotlin.random.Random.Default.nextBoolean

Expand All @@ -34,58 +24,6 @@ typealias DataSetId = String?
typealias AttributeByDataSetId = Map<DataSetId, CompactedAttributeInstance>
object ContextSourceUtils {

suspend fun getDistributedInformation(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
path: String,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
val uri = URI("${csr.endpoint}$path")

val request = WebClient.create()
.method(HttpMethod.GET)
.uri { uriBuilder ->
uriBuilder.scheme(uri.scheme)
.host(uri.host)
.port(uri.port)
.path(uri.path)
.queryParams(params)
.build()
}
.header(HttpHeaders.LINK, httpHeaders.getFirst(HttpHeaders.LINK))
return try {
val (statusCode, response) = request
.awaitExchange { response ->
response.statusCode() to response.awaitBody<CompactedEntity>()
}
when {
statusCode.is2xxSuccessful -> {
logger.info("Successfully received response from CSR at $uri")
response.right()
}
statusCode.isSameCodeAs(HttpStatus.NOT_FOUND) -> {
logger.info("CSR returned 404 at $uri: $response")
null.right()
}
else -> {
logger.warn("Error contacting CSR at $uri: $response")

MiscellaneousPersistentWarning(
"the CSR ${csr.id} returned an error $statusCode at : $uri response: \"$response\""
).left()
}
}
} catch (e: WebClientException) {
MiscellaneousWarning(
"Error connecting to csr ${csr.id} at : $uri message : \"${e.message}\""
).left()
} catch (e: JacksonException) { // todo get the good exception for invalid payload
RevalidationFailedWarning(
"the CSR ${csr.id} as : $uri returned badly formed data message: \"${e.message}\""
).left()
}
}

fun mergeEntities(
localEntity: CompactedEntity?,
remoteEntitiesWithMode: List<CompactedEntityWithMode>
Expand All @@ -111,12 +49,17 @@ object ContextSourceUtils {
): Either<NGSILDWarning, CompactedEntity> = either {
remoteEntity.mapValues { (key, value) ->
val localValue = localEntity[key]
when { // todo sysAttrs
when {
localValue == null -> value
key == JSONLD_ID_TERM || key == JSONLD_CONTEXT -> localValue
key == JSONLD_TYPE_TERM || key == NGSILD_SCOPE_TERM ->
mergeTypeOrScope(localValue, value)

key == NGSILD_CREATED_AT_TERM ->
if ((value as String?).isBefore(localValue as String?)) value
else localValue
key == NGSILD_MODIFIED_AT_TERM ->
if ((localValue as String?).isBefore(value as String?)) value
else localValue
else -> mergeAttribute(
localValue,
value,
Expand Down Expand Up @@ -188,9 +131,10 @@ object ContextSourceUtils {
private fun CompactedAttributeInstance.isBefore(
attr: CompactedAttributeInstance,
property: String
): Boolean = (
(this[property] as? String)?.isDateTime() == true &&
(attr[property] as? String)?.isDateTime() == true &&
ZonedDateTime.parse(this[property] as String) < ZonedDateTime.parse(attr[property] as String)
)
): Boolean = (this[property] as String?)?.isBefore(attr[property] as String?) == true

private fun String?.isBefore(date: String?) =
this?.isDateTime() == true &&
date?.isDateTime() == true &&
ZonedDateTime.parse(this) < ZonedDateTime.parse(date)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import arrow.core.right
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.service.ContextSourceCaller
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.csr.service.ContextSourceUtils
import com.egm.stellio.search.entity.service.EntityQueryService
Expand Down Expand Up @@ -218,7 +219,7 @@ class EntityHandler(

// we can add parMap(concurrency = X) if this trigger too much http connexion at the same time
val (remoteEntitiesWithMode, warnings) = matchingCSR.parMap { csr ->
val response = ContextSourceUtils.getDistributedInformation(
val response = ContextSourceCaller.getDistributedInformation(
httpHeaders,
csr,
"/ngsi-ld/v1/entities/$entityId",
Expand All @@ -232,6 +233,7 @@ class EntityHandler(
warnings.mapNotNull { (warning, _) -> warning.leftOrNull() }.toMutableList()
}

// we could simplify the code if we check the JsonPayload beforehand
val (mergeWarnings, mergedEntity) = ContextSourceUtils.mergeEntities(
localEntity.getOrNull(),
remoteEntitiesWithMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.github.tomakehurst.wiremock.client.WireMock.*
import com.github.tomakehurst.wiremock.junit5.WireMockTest
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType.APPLICATION_JSON_VALUE
Expand All @@ -20,15 +21,15 @@ class ContextSourceCallerTests {

private val apiaryId = "urn:ngsi-ld:Apiary:TEST"

fun gimmeRawCSR() = ContextSourceRegistration(
private fun gimmeRawCSR() = ContextSourceRegistration(
id = "urn:ngsi-ld:ContextSourceRegistration:test".toUri(),
endpoint = "http://localhost:8089".toUri(),
information = emptyList(),
operations = listOf(Operation.FEDERATION_OPS),
createdAt = ngsiLdDateTime(),

)
val emptyParams = LinkedMultiValueMap<String, String>()
private val emptyParams = LinkedMultiValueMap<String, String>()
private val entityWithSysAttrs =
"""
{
Expand All @@ -46,6 +47,19 @@ class ContextSourceCallerTests {
}
""".trimIndent()

private val entityWithBadPayload =
"""
{
"id":"$apiaryId",
"type":"Apiary",
"name": {
"type":"Property",
"value":"ApiarySophia",
,
"@context":[ "$APIC_COMPOUND_CONTEXT" ]
}
""".trimIndent()

@Test
fun `getDistributedInformation should return the entity when the request succeed`() = runTest {
val csr = gimmeRawCSR()
Expand All @@ -58,23 +72,66 @@ class ContextSourceCallerTests {
)
)

val response = ContextSourceUtils.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)
val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)
assertJsonPayloadsAreEqual(entityWithSysAttrs, serializeObject(response.getOrNull()!!))
}

@Test
fun `getDistributedInformation should fail with a `() = runTest {
fun `getDistributedInformation should return a MiscellaneousWarning if it receive no answer`() = runTest {
val csr = gimmeRawCSR()
val path = "/ngsi-ld/v1/entities/$apiaryId"

val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)

assertTrue(response.isLeft())
assertInstanceOf(MiscellaneousWarning::class.java, response.leftOrNull())
}

@Test
fun `getDistributedInformation should return a RevalidationFailedWarning when receiving a bad payload`() = runTest {
val csr = gimmeRawCSR()
val path = "/ngsi-ld/v1/entities/$apiaryId"
stubFor(
get(urlMatching(path))
.willReturn(
ok()
.withHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE).withBody(entityWithSysAttrs)
.withHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE).withBody(entityWithBadPayload)
)
)

val response = ContextSourceUtils.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)
assertJsonPayloadsAreEqual(entityWithSysAttrs, serializeObject(response.getOrNull()!!))
val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)

assertTrue(response.isLeft())
assertInstanceOf(RevalidationFailedWarning::class.java, response.leftOrNull())
}

@Test
fun `getDistributedInformation should return MiscellaneousPersistentWarning when receiving error 500`() = runTest {
val csr = gimmeRawCSR()
val path = "/ngsi-ld/v1/entities/$apiaryId"
stubFor(
get(urlMatching(path))
.willReturn(unauthorized())
)

val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)

assertTrue(response.isLeft())
assertInstanceOf(MiscellaneousPersistentWarning::class.java, response.leftOrNull())
}

@Test
fun `getDistributedInformation should return null when receiving an error 404`() = runTest {
val csr = gimmeRawCSR()
val path = "/ngsi-ld/v1/entities/$apiaryId"
stubFor(
get(urlMatching(path))
.willReturn(notFound())
)

val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams)

assertTrue(response.isRight())
assertNull(response.leftOrNull())
}
}
Loading

0 comments on commit 690fd59

Please sign in to comment.