Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for jsonldContext member in subscriptions #1230

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ fun Throwable.toAPIException(specificMessage: String? = null): APIException =
when (this) {
is APIException -> this
is JsonLdError ->
if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED)
if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED ||
this.code == JsonLdErrorCode.LOADING_DOCUMENT_FAILED
)
LdContextNotAvailableException(specificMessage ?: "Unable to load remote context (cause was: $this)")
else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)")
else -> BadRequestDataException(specificMessage ?: this.localizedMessage)
Expand Down
16 changes: 16 additions & 0 deletions shared/src/main/kotlin/com/egm/stellio/shared/util/JsonLdUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package com.egm.stellio.shared.util

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.apicatalog.jsonld.JsonLd
import com.apicatalog.jsonld.JsonLdError
import com.apicatalog.jsonld.JsonLdOptions
import com.apicatalog.jsonld.context.cache.LruCache
import com.apicatalog.jsonld.document.JsonDocument
import com.apicatalog.jsonld.http.DefaultHttpClient
import com.apicatalog.jsonld.loader.DocumentLoaderOptions
import com.apicatalog.jsonld.loader.HttpLoader
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.util.JsonUtils.deserializeAs
import com.egm.stellio.shared.util.JsonUtils.deserializeAsList
Expand Down Expand Up @@ -121,6 +125,7 @@ object JsonLdUtils {
contextCache = LruCache(CONTEXT_CACHE_CAPACITY)
documentCache = LruCache(DOCUMENT_CACHE_CAPACITY)
}
private val loader = HttpLoader(DefaultHttpClient.defaultInstance())

private fun buildContextDocument(contexts: List<String>): JsonStructure {
val contextsArray = Json.createArrayBuilder()
Expand Down Expand Up @@ -244,6 +249,17 @@ object JsonLdUtils {
}
}

fun checkJsonldContext(context: URI): Either<APIException, Unit> = either {
return try {
loader.loadDocument(context, DocumentLoaderOptions())
Unit.right()
} catch (e: JsonLdError) {
e.toAPIException(e.cause?.cause?.message).left()
} catch (e: IllegalArgumentException) {
BadRequestDataException(e.cause?.message ?: "Provided context is invalid: $context").left()
}
}

private fun transformGeoPropertyToWKT(): (Map.Entry<String, Any>) -> Any = {
if (NGSILD_GEO_PROPERTIES_TERMS.contains(it.key)) {
when (it.value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ data class Subscription(
val throttling: Int? = null,
val lang: String? = null,
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
val datasetId: List<String>? = null
val datasetId: List<String>? = null,
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
val jsonldContext: URI? = null
) {

@Transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ class NotificationService(
AttributeRepresentation.SIMPLIFIED
else AttributeRepresentation.NORMALIZED

val contexts = it.jsonldContext?.let { listOf(it.toString()) } ?: it.contexts

val compactedEntity = compactEntity(
ExpandedEntity(filteredEntity),
it.contexts
contexts
).toFinalRepresentation(
NgsiLdDataRepresentation(
entityRepresentation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_LOCATION_PROPERTY
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM
import com.egm.stellio.shared.util.JsonLdUtils.checkJsonldContext
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.subscription.config.SubscriptionProperties
import com.egm.stellio.subscription.model.*
Expand Down Expand Up @@ -56,6 +57,7 @@ class SubscriptionService(
checkExpiresAtInTheFuture(subscription).bind()
checkIdPatternIsValid(subscription).bind()
checkNotificationTriggersAreValid(subscription).bind()
checkJsonLdContextIsValid(subscription).bind()
}

private fun checkTypeIsSubscription(subscription: Subscription): Either<APIException, Unit> =
Expand Down Expand Up @@ -134,6 +136,14 @@ class SubscriptionService(
else BadRequestDataException("Unknown notification trigger in ${subscription.notificationTrigger}").left()
}

suspend fun checkJsonLdContextIsValid(subscription: Subscription): Either<APIException, Unit> = either {
val jsonldContext = subscription.jsonldContext

if (jsonldContext != null) {
checkJsonldContext(jsonldContext).bind()
}
}

@Transactional
suspend fun create(subscription: Subscription, sub: Option<Sub>): Either<APIException, Unit> = either {
validateNewSubscription(subscription).bind()
Expand All @@ -149,11 +159,11 @@ class SubscriptionService(
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts, throttling, sys_attrs, lang, datasetId)
expires_at, sub, contexts, throttling, sys_attrs, lang, datasetId, jsonld_context)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts, :throttling, :sys_attrs, :lang, :datasetId)
:expires_at, :sub, :contexts, :throttling, :sys_attrs, :lang, :datasetId, :jsonld_context)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand Down Expand Up @@ -182,6 +192,7 @@ class SubscriptionService(
.bind("sys_attrs", subscription.notification.sysAttrs)
.bind("lang", subscription.lang)
.bind("datasetId", subscription.datasetId?.toTypedArray())
.bind("jsonld_context", subscription.jsonldContext)
.execute().bind()

geoQuery?.let {
Expand Down Expand Up @@ -252,7 +263,8 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang, datasetId
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand All @@ -269,25 +281,30 @@ class SubscriptionService(
suspend fun getContextsForSubscription(id: URI): Either<APIException, List<String>> {
val selectStatement =
"""
SELECT contexts
SELECT contexts, jsonld_context
FROM subscription
WHERE id = :id
""".trimIndent()

return databaseClient.sql(selectStatement)
.bind("id", id)
.oneToResult {
toList(it["contexts"]!!)
it["jsonld_context"]?.let { listOf(it as String) } ?: toList(it["contexts"]!!)
}
}

fun getContextsLink(subscription: Subscription): String =
if (subscription.contexts.size > 1) {
val linkToRetrieveContexts = subscriptionProperties.stellioUrl +
"/ngsi-ld/v1/subscriptions/${subscription.id}/context"
buildContextLinkHeader(linkToRetrieveContexts)
} else
buildContextLinkHeader(subscription.contexts[0])
fun getContextsLink(subscription: Subscription): String {
val contextLink = when {
subscription.jsonldContext != null -> subscription.jsonldContext.toString()
subscription.contexts.size > 1 -> {
val linkToRetrieveContexts = subscriptionProperties.stellioUrl +
"/ngsi-ld/v1/subscriptions/${subscription.id}/context"
linkToRetrieveContexts
}
else -> subscription.contexts[0]
}
return buildContextLinkHeader(contextLink)
}

suspend fun isCreatorOf(subscriptionId: URI, sub: Option<Sub>): Either<APIException, Boolean> {
val selectStatement =
Expand Down Expand Up @@ -359,7 +376,8 @@ class SubscriptionService(
"modifiedAt",
"throttling",
"lang",
"datasetId"
"datasetId",
"jsonldContext"
).contains(it.key) -> {
val columnName = it.key.toSqlColumnName()
val value = it.value.toSqlValue(it.key)
Expand Down Expand Up @@ -502,7 +520,8 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang, datasetId
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down Expand Up @@ -544,7 +563,8 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling, sys_attrs, lang, datasetId
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
Expand Down Expand Up @@ -699,7 +719,8 @@ class SubscriptionService(
contexts = toList(row["contexts"]!!),
throttling = toNullableInt(row["throttling"]),
lang = row["lang"] as? String,
datasetId = toNullableList(row["datasetId"])
datasetId = toNullableList(row["datasetId"]),
jsonldContext = toNullableUri(row["jsonld_context"])
)
}

Expand Down Expand Up @@ -732,7 +753,8 @@ class SubscriptionService(
contexts = toList(row["contexts"]!!),
throttling = toNullableInt(row["throttling"]),
lang = row["lang"] as? String,
datasetId = toNullableList(row["datasetId"])
datasetId = toNullableList(row["datasetId"]),
jsonldContext = toNullableUri(row["jsonld_context"])
)
}

Expand Down Expand Up @@ -768,7 +790,8 @@ class SubscriptionService(
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling, sys_attrs, lang, datasetId
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscription
ADD jsonld_context text;
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class NotificationServiceTests {
}

@Test
fun `it should notify the subscriber and use the contexts of the subscription to compact`() = runTest {
fun `it should notify the subscriber and use subscription contexts to compact`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
Expand Down Expand Up @@ -215,6 +215,43 @@ class NotificationServiceTests {
}
}

@Test
fun `it should notify the subscriber and use jsonldContext to compact when it is provided`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSONLD
)
),
contexts = listOf(NGSILD_TEST_CORE_CONTEXT),
jsonldContext = APIC_COMPOUND_CONTEXT.toUri()
)
val expandedEntity = expandJsonLdEntity(rawEntity)

coEvery {
subscriptionService.getMatchingSubscriptions(any(), any(), any())
} returns listOf(subscription).right()
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.notifyMatchingSubscribers(
expandedEntity,
setOf(NGSILD_NAME_TERM),
ATTRIBUTE_UPDATED
).shouldSucceedWith { notificationResults ->
val notificationResult = notificationResults[0]
assertTrue(notificationResult.second.data[0].containsKey(NGSILD_NAME_TERM))
assertTrue(notificationResult.second.data[0].containsKey(MANAGED_BY_COMPACT_RELATIONSHIP))
assertEquals(APIC_COMPOUND_CONTEXT, notificationResult.second.data[0][JsonLdUtils.JSONLD_CONTEXT])
}
}

@Test
fun `it should send a simplified payload when format is keyValues and include only the specified attributes`() =
runTest {
Expand Down Expand Up @@ -372,6 +409,37 @@ class NotificationServiceTests {
)
}

@Test
fun `it should add a Link header containing the jsonldContext of the subscription when provided`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSON
)
),
jsonldContext = APIC_COMPOUND_CONTEXT.toUri()
)

coEvery { subscriptionService.getContextsLink(any()) } returns buildContextLinkHeader(APIC_COMPOUND_CONTEXT)
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.callSubscriber(subscription, rawEntity.deserializeAsMap())

val link = buildContextLinkHeader(subscription.jsonldContext.toString())
verify(
1,
postRequestedFor(urlPathEqualTo("/notification"))
.withHeader(HttpHeaders.LINK, equalTo(link))
)
}

@Test
fun `it should add an NGSILD-Tenant header if the subscription is not from the default context`() = runTest {
val subscription = gimmeRawSubscription().copy(
Expand Down Expand Up @@ -517,7 +585,8 @@ class NotificationServiceTests {
)
),
lang = "fr",
contexts = APIC_COMPOUND_CONTEXTS
contexts = APIC_COMPOUND_CONTEXTS,
jsonldContext = APIC_COMPOUND_CONTEXT.toUri()
)

val expandedEntity = expandJsonLdEntity(
Expand Down
Loading
Loading