Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/csr retrieve entity #1246

Draft
wants to merge 19 commits into
base: feature/new-csr
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
41e4af9
wip: not tested merge entities function
thomasBousselin Oct 3, 2024
c7bff1e
wip: not tested merge entities function
thomasBousselin Oct 7, 2024
cc6de35
feat: retrieve Entity Working
thomasBousselin Oct 8, 2024
e89f92f
feat: first error handling and fix pr comments
thomasBousselin Oct 10, 2024
3de35ba
feat: test for mergeEntity
thomasBousselin Oct 11, 2024
53f0b87
fix: working Where statement for CSR / add some tests on Query CSR / …
bobeal Oct 13, 2024
c8bf170
feat: fix merging returning lists and PR comments
thomasBousselin Oct 14, 2024
e8ccf0c
feat: fix merging returning lists and PR comments
thomasBousselin Oct 15, 2024
8387130
feat: updating
thomasBousselin Oct 16, 2024
dd31576
feat: MR comment - clearer name - no warning impact in ApiException -…
thomasBousselin Oct 21, 2024
3bcf7b3
feat: get warnings from merge
thomasBousselin Oct 22, 2024
690fd59
feat: separate context source utils + work with sysAttrs
thomasBousselin Oct 23, 2024
f38c04b
feat: warning following rfc7234 + docker-compose for csr + fix tests
thomasBousselin Oct 24, 2024
129228a
feat: doc csr launch
thomasBousselin Oct 25, 2024
b07e032
feat: doc csr launch
thomasBousselin Oct 25, 2024
6d4831a
chore: slight refactoring of the docker compose context source instance
bobeal Oct 26, 2024
8740239
chore: misc typo, wording and naming
bobeal Oct 27, 2024
f74ceec
feat: always call context source for normalized representation + PR c…
thomasBousselin Oct 30, 2024
deca20c
feat: registrationName in CSR + Warning fixes
thomasBousselin Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .context-source.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
API_GATEWAY_PORT=8090
KAFKA_PORT=29093
POSTGRES_PORT=5433
SEARCH_SERVICE_PORT=8093
SUBSCRIPTION_SERVICE_PORT=8094

# Used by subscription service when searching entities for recurring subscriptions
# (those defined with a timeInterval parameter)
SUBSCRIPTION_ENTITY_SERVICE_URL=http://search-service:8093

# Used as a base URL by subscription service when serving contexts for notifications
SUBSCRIPTION_STELLIO_URL=http://localhost:8090

CONTAINER_NAME_PREFIX=context-source-
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ APPLICATION_TENANTS_0_DBSCHEMA=public
APPLICATION_PAGINATION_LIMIT_DEFAULT=30
APPLICATION_PAGINATION_LIMIT_MAX=100
APPLICATION_PAGINATION_TEMPORAL_LIMIT=10000

CONTAINER_NAME_PREFIX=
23 changes: 13 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# you can launch a second instance of Stellio with (for instance to use it as a context source)
# docker compose --env-file .env --env-file .context-source.env -p stellio-context-source up -d
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: stellio-kafka
container_name: "${CONTAINER_NAME_PREFIX}stellio-kafka"
hostname: stellio-kafka
ports:
- "29092:29092"
- "${KAFKA_PORT:-29092}:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
Expand All @@ -20,15 +22,15 @@ services:
CLUSTER_ID: ZGE2MTQ4NDk4NGU3NDE2Mm
postgres:
image: stellio/stellio-timescale-postgis:16-2.16.0-3.3
container_name: stellio-postgres
container_name: "${CONTAINER_NAME_PREFIX}stellio-postgres"
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASS=${POSTGRES_PASS}
- POSTGRES_DBNAME=${POSTGRES_DBNAME}
- POSTGRES_MULTIPLE_EXTENSIONS=postgis,timescaledb,pgcrypto
- ACCEPT_TIMESCALE_TUNING=TRUE
ports:
- "5432:5432"
- "${POSTGRES_PORT:-5432}:5432"
volumes:
- stellio-postgres-storage:/var/lib/postgresql
healthcheck:
Expand All @@ -38,14 +40,14 @@ services:
retries: 20
start_period: 10s
api-gateway:
container_name: stellio-api-gateway
container_name: "${CONTAINER_NAME_PREFIX}stellio-api-gateway"
image: stellio/stellio-api-gateway:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
ports:
- "8080:8080"
- "${API_GATEWAY_PORT:-8080}:8080"
search-service:
container_name: stellio-search-service
container_name: "${CONTAINER_NAME_PREFIX}stellio-search-service"
image: stellio/stellio-search-service:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
Expand All @@ -62,14 +64,14 @@ services:
- APPLICATION_PAGINATION_TEMPORAL-LIMIT=${APPLICATION_PAGINATION_TEMPORAL_LIMIT}

ports:
- "8083:8083"
- "${SEARCH_SERVICE_PORT:-8083}:8083"
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_started
subscription-service:
container_name: stellio-subscription-service
container_name: "${CONTAINER_NAME_PREFIX}stellio-subscription-service"
image: stellio/stellio-subscription-service:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
Expand All @@ -86,7 +88,7 @@ services:
- APPLICATION_PAGINATION_LIMIT-DEFAULT=${APPLICATION_PAGINATION_LIMIT_DEFAULT}
- APPLICATION_PAGINATION_LIMIT-MAX=${APPLICATION_PAGINATION_LIMIT_MAX}
ports:
- "8084:8084"
- "${SUBSCRIPTION_SERVICE_PORT:-8084}:8084"
depends_on:
postgres:
condition: service_healthy
Expand All @@ -95,3 +97,4 @@ services:

volumes:
stellio-postgres-storage:
name: "${CONTAINER_NAME_PREFIX}stellio-postgres-storage"
1 change: 1 addition & 0 deletions search-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
runtimeOnly("org.postgresql:postgresql")
runtimeOnly("io.r2dbc:r2dbc-pool")

testImplementation("org.wiremock:wiremock-standalone:3.3.1")
testImplementation("org.testcontainers:postgresql")
testImplementation("org.testcontainers:kafka")
testImplementation("org.testcontainers:r2dbc")
Expand Down
1 change: 1 addition & 0 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
<ID>TooGenericExceptionCaught:ContextSourceCaller.kt$ContextSourceCaller$e: Exception</ID>
</CurrentIssues>
</SmellBaseline>
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ class ContextSourceRegistrationHandler(
applicationProperties.pagination.limitMax
).bind()
val contextSourceRegistrations = contextSourceRegistrationService.getContextSourceRegistrations(
paginationQuery.limit,
paginationQuery.offset,
sub
limit = paginationQuery.limit,
offset = paginationQuery.offset,
).serialize(contexts, mediaType, includeSysAttrs)
val contextSourceRegistrationsCount = contextSourceRegistrationService.getContextSourceRegistrationsCount(
sub
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.egm.stellio.search.csr.model

import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val csf: String? = null
) {
fun buildWhereStatement(): String {
val idFilter = if (ids.isNotEmpty())
"""
(
entity_info.id is null OR
entity_info.id in ('${ids.joinToString("', '")}')
) AND
(
entity_info.idPattern is null OR
${ids.joinToString(" OR ") { "'$it' ~ entity_info.idPattern" }}
)
""".trimIndent()
else "true"
return idFilter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CSR_TERM
Expand All @@ -13,26 +15,36 @@ import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.shared.util.JsonUtils.deserializeAs
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.module.kotlin.convertValue
import org.springframework.http.MediaType
import java.net.URI
import java.time.ZonedDateTime
import java.util.*
import java.util.UUID
import java.util.regex.Pattern

data class ContextSourceRegistration(
val id: URI = "urn:ngsi-ld:ContextSourceRegistration:${UUID.randomUUID()}".toUri(),
val endpoint: URI,
val registrationName: String? = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to "bind" in CSRService (create and get operations)

val type: String = NGSILD_CSR_TERM,
val mode: Mode = Mode.INCLUSIVE,
val information: List<RegistrationInfo> = emptyList(),
val operations: List<Operation> = listOf(Operation.FEDERATION_OPS),
val createdAt: ZonedDateTime = ngsiLdDateTime(),
val modifiedAt: ZonedDateTime? = null,
val observationInterval: TimeInterval? = null,
val managementInterval: TimeInterval? = null
val managementInterval: TimeInterval? = null,

val status: StatusType? = null,
val timesSent: Int = 0,
val timesFailed: Int = 0,
val lastFailure: ZonedDateTime? = null,
val lastSuccess: ZonedDateTime? = null,
) {

fun isAuxiliary(): Boolean = mode == Mode.AUXILIARY

data class TimeInterval(
val start: ZonedDateTime,
val end: ZonedDateTime? = null
Expand Down Expand Up @@ -156,6 +168,14 @@ data class ContextSourceRegistration(
fun alreadyExistsMessage(id: URI) = "A CSourceRegistration with id $id already exists"
fun unauthorizedMessage(id: URI) = "User is not authorized to access CSourceRegistration $id"
}

enum class StatusType(val status: String) {
@JsonProperty("ok")
OK("ok"),

@JsonProperty("failed")
FAILED("failed")
}
}

fun List<ContextSourceRegistration>.serialize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.egm.stellio.search.csr.model

import org.springframework.http.HttpHeaders
import org.springframework.http.ResponseEntity

/**
* Implements NGSILD-Warning as defined in 6.3.17
*/
open class NGSILDWarning(
private val code: Int,
open val message: String,
open val csr: ContextSourceRegistration
) {
// follow rfc7234 https://www.rfc-editor.org/rfc/rfc7234.html#section-5.5
fun getHeaderMessage(): String = "$code ${getWarnAgent()} \"${getWarnText()}\""

// new line are forbidden in headers
private fun getWarnText(): String = message.replace("\n", " ")
private fun getWarnAgent(): String = csr.registrationName ?: csr.id.toString()

companion object {
const val HEADER_NAME = "NGSILD-Warning"
const val RESPONSE_IS_STALE_WARNING_CODE = 110
const val REVALIDATION_FAILED_WARNING_CODE = 111
const val MISCELLANEOUS_WARNING_CODE = 199
const val MISCELLANEOUS_PERSISTENT_WARNING_CODE = 299
}
}

data class ResponseIsStaleWarning(
override val message: String,
override val csr: ContextSourceRegistration
) : NGSILDWarning(RESPONSE_IS_STALE_WARNING_CODE, message, csr)

data class RevalidationFailedWarning(
override val message: String,
override val csr: ContextSourceRegistration
) : NGSILDWarning(REVALIDATION_FAILED_WARNING_CODE, message, csr)

data class MiscellaneousWarning(
override val message: String,
override val csr: ContextSourceRegistration
) : NGSILDWarning(MISCELLANEOUS_WARNING_CODE, message, csr)

data class MiscellaneousPersistentWarning(
override val message: String,
override val csr: ContextSourceRegistration
) : NGSILDWarning(MISCELLANEOUS_PERSISTENT_WARNING_CODE, message, csr)

fun ResponseEntity<*>.addWarnings(warnings: List<NGSILDWarning>?): ResponseEntity<*> {
val headers = HttpHeaders.writableHttpHeaders(this.headers)
if (!warnings.isNullOrEmpty())
headers.addAll(NGSILDWarning.HEADER_NAME, warnings.map { it.getHeaderMessage() })

return ResponseEntity.status(this.statusCode)
.headers(headers)
.body(this.body)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.egm.stellio.search.csr.service

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.csr.model.*
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.util.QUERY_PARAM_GEOMETRY_PROPERTY
import com.egm.stellio.shared.util.QUERY_PARAM_LANG
import com.egm.stellio.shared.util.QUERY_PARAM_OPTIONS
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.core.codec.DecodingException
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

object ContextSourceCaller {
val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun getDistributedInformation(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
path: String,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
val uri = URI("${csr.endpoint}$path")
params.remove(QUERY_PARAM_GEOMETRY_PROPERTY)
params.remove(QUERY_PARAM_OPTIONS) // only request normalized
params.remove(QUERY_PARAM_LANG) // todo not sure its needed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is handled when doing the final representation, so yes, it should be removed.

val request = WebClient.create()
.method(HttpMethod.GET)
.uri { uriBuilder ->
uriBuilder.scheme(uri.scheme)
.host(uri.host)
.port(uri.port)
.path(uri.path)
.queryParams(params)
.build()
}
.header(HttpHeaders.LINK, httpHeaders.getFirst(HttpHeaders.LINK))
return runCatching {
val (statusCode, response) = request
.awaitExchange { response ->
response.statusCode() to response.awaitBodyOrNull<CompactedEntity>()
}
when {
statusCode.is2xxSuccessful -> {
logger.info("Successfully received data from CSR ${csr.id} at $uri")
response.right()
}

statusCode.isSameCodeAs(HttpStatus.NOT_FOUND) -> {
logger.info("CSR returned 404 at $uri: $response")
null.right()
}

else -> {
logger.warn("Error contacting CSR at $uri: $response")
MiscellaneousPersistentWarning(
"$uri returned an error $statusCode with response: $response",
csr
).left()
}
}
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Error contacting CSR at $uri: ${e.message}")
logger.warn(e.stackTraceToString())
when (e) {
is DecodingException -> RevalidationFailedWarning(
"$uri returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
)

else -> MiscellaneousWarning(
"Error connecting to $uri message : \"${e.cause}:${e.message}\"",
csr
)
}.left()
}
)
}
}
Loading
Loading