Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/csr retrieve entity #1246

Draft
wants to merge 19 commits into
base: feature/new-csr
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
41e4af9
wip: not tested merge entities function
thomasBousselin Oct 3, 2024
c7bff1e
wip: not tested merge entities function
thomasBousselin Oct 7, 2024
cc6de35
feat: retrieve Entity Working
thomasBousselin Oct 8, 2024
e89f92f
feat: first error handling and fix pr comments
thomasBousselin Oct 10, 2024
3de35ba
feat: test for mergeEntity
thomasBousselin Oct 11, 2024
53f0b87
fix: working Where statement for CSR / add some tests on Query CSR / …
bobeal Oct 13, 2024
c8bf170
feat: fix merging returning lists and PR comments
thomasBousselin Oct 14, 2024
e8ccf0c
feat: fix merging returning lists and PR comments
thomasBousselin Oct 15, 2024
8387130
feat: updating
thomasBousselin Oct 16, 2024
dd31576
feat: MR comment - clearer name - no warning impact in ApiException -…
thomasBousselin Oct 21, 2024
3bcf7b3
feat: get warnings from merge
thomasBousselin Oct 22, 2024
690fd59
feat: separate context source utils + work with sysAttrs
thomasBousselin Oct 23, 2024
f38c04b
feat: warning following rfc7234 + docker-compose for csr + fix tests
thomasBousselin Oct 24, 2024
129228a
feat: doc csr launch
thomasBousselin Oct 25, 2024
b07e032
feat: doc csr launch
thomasBousselin Oct 25, 2024
6d4831a
chore: slight refactoring of the docker compose context source instance
bobeal Oct 26, 2024
8740239
chore: misc typo, wording and naming
bobeal Oct 27, 2024
f74ceec
feat: always call context source for normalized representation + PR c…
thomasBousselin Oct 30, 2024
deca20c
feat: registrationName in CSR + Warning fixes
thomasBousselin Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions context-source-docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
services:
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
context-source-postgres:
image: stellio/stellio-timescale-postgis:16-2.16.0-3.3
container_name: context-source-stellio-postgres
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASS=${POSTGRES_PASS}
- POSTGRES_DBNAME=${POSTGRES_DBNAME}
- POSTGRES_MULTIPLE_EXTENSIONS=postgis,timescaledb,pgcrypto
- ACCEPT_TIMESCALE_TUNING=TRUE
ports:
- "65432:5432"
volumes:
- context-source-stellio-postgres-storage:/var/lib/postgresql
healthcheck:
test: ["CMD-SHELL", "pg_isready -h localhost -U stellio"]
interval: 10s
timeout: 5s
retries: 20
start_period: 10s
context-source-api-gateway:
container_name: context-source-stellio-api-gateway
image: stellio/stellio-api-gateway:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
ports:
- "18080:8080"
context-source-search-service:
container_name: context-source-stellio-search-service
image: stellio/stellio-search-service:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
- SPRING_R2DBC_URL=r2dbc:pool:postgresql://context-source-postgres:5432/${STELLIO_SEARCH_DB_DATABASE}
- SPRING_FLYWAY_URL=jdbc:postgresql://context-source-postgres:5432/${STELLIO_SEARCH_DB_DATABASE}
- SPRING_R2DBC_USERNAME=${POSTGRES_USER}
- SPRING_R2DBC_PASSWORD=${POSTGRES_PASS}
- APPLICATION_AUTHENTICATION_ENABLED=${STELLIO_AUTHENTICATION_ENABLED}
- APPLICATION_TENANTS_0_ISSUER=${APPLICATION_TENANTS_0_ISSUER}
- APPLICATION_TENANTS_0_NAME=${APPLICATION_TENANTS_0_NAME}
- APPLICATION_TENANTS_0_DBSCHEMA=${APPLICATION_TENANTS_0_DBSCHEMA}
- APPLICATION_PAGINATION_LIMIT-DEFAULT=${APPLICATION_PAGINATION_LIMIT_DEFAULT}
- APPLICATION_PAGINATION_LIMIT-MAX=${APPLICATION_PAGINATION_LIMIT_MAX}
- APPLICATION_PAGINATION_TEMPORAL-LIMIT=${APPLICATION_PAGINATION_TEMPORAL_LIMIT}

ports:
- "18083:8083"
depends_on:
context-source-postgres:
condition: service_healthy



volumes:
context-source-stellio-postgres-storage:
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class ContextSourceRegistrationHandler(
applicationProperties.pagination.limitMax
).bind()
val contextSourceRegistrations = contextSourceRegistrationService.getContextSourceRegistrations(
paginationQuery.limit,
paginationQuery.offset,
sub
limit = paginationQuery.limit,
offset = paginationQuery.offset,
sub = sub,
).serialize(contexts, mediaType, includeSysAttrs)
val contextSourceRegistrationsCount = contextSourceRegistrationService.getContextSourceRegistrationsCount(
sub
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.egm.stellio.search.csr.model

import java.net.URI

data class CSRFilters( // todo use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
val ids: Set<URI> = emptySet(),

) {
fun buildWHEREStatement(): String {
val idsMatcher = if (ids.isNotEmpty()) """"(
entity_info.id is null
OR entity_info.id in ('${ids.joinToString("', '")}')
) AND (
entity_info.\"idPattern\" is null OR
${ids.joinToString(" OR ") { "'$it' ~ entity_info.\"idPattern\"" }}
)
""".trimIndent()
else "true"
return idsMatcher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.egm.stellio.search.csr.service
import arrow.core.*
import arrow.core.raise.either
import com.egm.stellio.search.common.util.*
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.ContextSourceRegistration.RegistrationInfo
import com.egm.stellio.search.csr.model.ContextSourceRegistration.TimeInterval
Expand Down Expand Up @@ -154,13 +155,16 @@ class ContextSourceRegistrationService(
}

suspend fun getContextSourceRegistrations(
limit: Int,
offset: Int,
sub: Option<Sub>
sub: Option<Sub>,
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
filters: CSRFilters = CSRFilters(),
limit: Int = Int.MAX_VALUE,
offset: Int = 0,
): List<ContextSourceRegistration> {
val filterQuery = filters.buildWHEREStatement()

val selectStatement =
"""
SELECT id,
SELECT csr.id,
endpoint,
mode,
information,
Expand All @@ -171,11 +175,16 @@ class ContextSourceRegistrationService(
management_interval_end,
created_at,
modified_at
FROM context_source_registration
WHERE sub = :sub
ORDER BY id
FROM context_source_registration as csr
LEFT JOIN jsonb_to_recordset(information)
as information(entities jsonb,propertyNames text[],relationshipNames text[] ) on true
LEFT JOIN jsonb_to_recordset(entities)
as entity_info(id text,"idPattern" text,"type" text) on true
WHERE sub = :sub AND $filterQuery
GROUP BY csr.id
ORDER BY csr.id
LIMIT :limit
OFFSET :offset)
OFFSET :offset
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
""".trimIndent()
return databaseClient.sql(selectStatement)
.bind("limit", limit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.egm.stellio.search.csr.service

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.Mode
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_ID_TERM
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_DATASET_ID_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_MODIFIED_AT_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_OBSERVED_AT_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SCOPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.logger
import com.egm.stellio.shared.util.isDateTime
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody
import org.springframework.web.reactive.function.client.awaitExchange
import java.time.ZonedDateTime
import java.util.stream.Collectors.toSet
import kotlin.random.Random.Default.nextBoolean

typealias SingleAttribute = Map<String, Any> // todo maybe use the actual attribute type
typealias CompactedAttribute = List<SingleAttribute>
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
typealias CompactedEntityWithMode = Pair<CompactedEntity, Mode>

object ContextSourceUtils {

suspend fun call(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
method: HttpMethod,
path: String,
body: String? = null
): Either<APIException, CompactedEntity> = either {
val uri = "${csr.endpoint}$path"
val request = WebClient.create(uri)
.method(method)
.headers { newHeader -> "Link" to httpHeaders["Link"] }
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
body?.let { request.bodyValue(it) }
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
val (statusCode, response) = request
.awaitExchange { response -> response.statusCode() to response.awaitBody<CompactedEntity>() }
return if (statusCode.is2xxSuccessful) {
logger.info("Successfully received Informations from CSR at : $uri")
response.right()
} else {
logger.info("Error contacting CSR at : $uri")
logger.info("Error contacting CSR at : $response")
ContextSourceRequestException(
response.toString(),
HttpStatus.valueOf(statusCode.value())
).left()
}
}

fun mergeEntity(
localEntity: CompactedEntity?,
entitiesWithMode: List<CompactedEntityWithMode>
): CompactedEntity? {
if (localEntity == null && entitiesWithMode.isEmpty()) return null

val mergedEntity = localEntity?.toMutableMap() ?: mutableMapOf()

entitiesWithMode.sortedBy { (_, mode) -> mode == Mode.AUXILIARY }
.forEach { (entity, mode) ->
entity.entries.forEach {
(key, value) ->
when {
!mergedEntity.containsKey(key) -> mergedEntity[key] = value
key == JSONLD_ID_TERM || key == JSONLD_CONTEXT -> {}
key == JSONLD_TYPE_TERM || key == NGSILD_SCOPE_TERM ->
mergedEntity[key] = mergeTypeOrScope(mergedEntity[key]!!, value)
else -> mergedEntity[key] = mergeAttribute(
mergedEntity[key]!!,
value,
mode == Mode.AUXILIARY
)
}
}
}
return mergedEntity
}

fun mergeTypeOrScope(
type1: Any, // String || List<String> || Set<String>
type2: Any
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
) = when {
type1 is List<*> && type2 is List<*> -> type1.toSet() + type2.toSet()
type1 is List<*> -> type1.toSet() + type2
type2 is List<*> -> type2.toSet() + type1
type1 == type2 -> setOf(type1)
else -> setOf(type1, type2)
}.toList()

/**
* Implements 4.5.5 - Multi-Attribute Support
*/
fun mergeAttribute(
attribute1: Any,
attribute2: Any,
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
isAuxiliary: Boolean = false
): CompactedAttribute {
val mergeMap = attributeToDatasetIdMap(attribute1).toMutableMap()
val attribute2Map = attributeToDatasetIdMap(attribute2)
attribute2Map.entries.forEach { (datasetId, value) ->
when {
mergeMap[datasetId] == null -> mergeMap[datasetId] = value
isAuxiliary -> {}
mergeMap[datasetId]!!.isBefore(value, NGSILD_OBSERVED_AT_TERM) -> mergeMap[datasetId] = value
value.isBefore(mergeMap[datasetId]!!, NGSILD_OBSERVED_AT_TERM) -> {}
mergeMap[datasetId]!!.isBefore(value, NGSILD_MODIFIED_AT_TERM) -> mergeMap[datasetId] = value
value.isBefore(mergeMap[datasetId]!!, NGSILD_MODIFIED_AT_TERM) -> {}
nextBoolean() -> mergeMap[datasetId] = value
else -> {}
}
}
return mergeMap.values.toList()
}

private fun attributeToDatasetIdMap(attribute: Any): Map<String?, Map<String, Any>> = when (attribute) {
is Map<*, *> -> {
attribute as SingleAttribute
mapOf(attribute[NGSILD_DATASET_ID_TERM] as? String to attribute)
}
is List<*> -> {
attribute as CompactedAttribute
attribute.associate {
it[NGSILD_DATASET_ID_TERM] as? String to it
}
}
else -> throw InternalErrorException(
"the attribute is nor a list nor a map, check that you have excluded the CORE Members"
)
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
}

private fun SingleAttribute.isBefore(
attr: SingleAttribute,
property: String
): Boolean = (
(this[property] as? String)?.isDateTime() == true &&
(attr[property] as? String)?.isDateTime() == true &&
ZonedDateTime.parse(this[property] as String) < ZonedDateTime.parse(attr[property] as String)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import arrow.core.getOrElse
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.csr.service.ContextSourceUtils
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
import com.egm.stellio.search.entity.util.composeEntitiesQuery
Expand All @@ -20,6 +23,7 @@ import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.egm.stellio.shared.web.BaseHandler
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType.APPLICATION_JSON_VALUE
import org.springframework.http.ResponseEntity
Expand All @@ -34,7 +38,8 @@ import java.util.Optional
class EntityHandler(
private val applicationProperties: ApplicationProperties,
private val entityService: EntityService,
private val entityQueryService: EntityQueryService
private val entityQueryService: EntityQueryService,
private val contextSourceRegistrationService: ContextSourceRegistrationService
) : BaseHandler() {

/**
Expand Down Expand Up @@ -182,7 +187,7 @@ class EntityHandler(
suspend fun getByURI(
@RequestHeader httpHeaders: HttpHeaders,
@PathVariable entityId: URI,
@RequestParam params: MultiValueMap<String, String>
@RequestParam params: MultiValueMap<String, String>,
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
): ResponseEntity<*> = either {
val mediaType = getApplicableMediaType(httpHeaders).bind()
val sub = getSubFromSecurityContext()
Expand All @@ -194,18 +199,46 @@ class EntityHandler(
contexts
).bind()

val expandedEntity = entityQueryService.queryEntity(entityId, sub.getOrNull()).bind()

expandedEntity.checkContainsAnyOf(queryParams.attrs).bind()
val csrFilters = CSRFilters(setOf(entityId))

val filteredExpandedEntity = ExpandedEntity(
expandedEntity.filterAttributes(queryParams.attrs, queryParams.datasetId)
val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(
bobeal marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you also have to filter CSRs that match the current operation (so retrieveEntity in the operations or federationOps in the operation groups)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filtering the results from the call to the service is not a clean way to do it. it should be part of the call to the service.

sub,
csrFilters
)
val compactedEntity = compactEntity(filteredExpandedEntity, contexts)

// todo local parameter (6.3.18)
// todo parrallelize calls
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
val localEntity = either {
val expandedEntity = entityQueryService.queryEntity(entityId, sub.getOrNull()).bind()
expandedEntity.checkContainsAnyOf(queryParams.attrs).bind()

val filteredExpandedEntity = ExpandedEntity(
expandedEntity.filterAttributes(queryParams.attrs, queryParams.datasetId)
)
compactEntity(filteredExpandedEntity, contexts)
}

val entitiesWithMode = matchingCSR.map { csr ->
ContextSourceUtils.call(
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
httpHeaders,
csr,
HttpMethod.GET,
"/ngsi-ld/v1/entities/$entityId"
) to csr.mode
}.filter { it.first.isRight() } // ignore all errors
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
.map { (response, mode) ->
response.getOrNull()!! to mode
}

if (localEntity.isLeft() && entitiesWithMode.isEmpty()) localEntity.bind()
val mergeEntity = ContextSourceUtils.mergeEntity(localEntity.getOrNull(), entitiesWithMode)
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
?: throw ResourceNotFoundException("No entity with id: $entityId found")
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved

val ngsiLdDataRepresentation = parseRepresentations(params, mediaType)
prepareGetSuccessResponseHeaders(mediaType, contexts)
.body(serializeObject(compactedEntity.toFinalRepresentation(ngsiLdDataRepresentation)))
.body(
serializeObject(mergeEntity.toFinalRepresentation(ngsiLdDataRepresentation))
)
}.fold(
{ it.toErrorResponse() },
{ it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.egm.stellio.search.authorization.web

import com.egm.stellio.search.authorization.service.AuthorizationService
import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.entity.service.EntityEventService
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
Expand Down Expand Up @@ -39,6 +40,9 @@ class AnonymousUserHandlerTests {
@MockkBean
private lateinit var entityQueryService: EntityQueryService

@MockkBean
private lateinit var contextSourceRegistrationService: ContextSourceRegistrationService

@Test
@WithAnonymousUser
fun `it should not authorize an anonymous to call the API`() {
Expand Down
Loading
Loading