Skip to content

Commit

Permalink
feat: allow setting a specific timezone for aggregates (#1245)
Browse files Browse the repository at this point in the history
By default, bucket start times are aligned at 00:00:00UTC (see https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/#timezones). This property allows to align them different timezones which is useful for weekly or monthly aggregates for instance.
  • Loading branch information
bobeal authored Sep 30, 2024
1 parent b60ffe7 commit 811cd3e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties("search")
data class SearchProperties(
val payloadMaxBodySize: Int,
var onOwnerDeleteCascadeEntities: Boolean
val onOwnerDeleteCascadeEntities: Boolean,
val timezoneForTimeBuckets: String = "GMT"
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.common.util.*
import com.egm.stellio.search.entity.model.*
import com.egm.stellio.search.entity.model.Attribute.AttributeValueType
Expand All @@ -29,7 +30,8 @@ import java.time.ZonedDateTime

@Service
class ScopeService(
private val databaseClient: DatabaseClient
private val databaseClient: DatabaseClient,
private val searchProperties: SearchProperties
) {

@Transactional
Expand Down Expand Up @@ -150,7 +152,7 @@ class ScopeService(
val computedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT entity_id,
public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as start,
public.time_bucket('$aggrPeriodDuration', time, '${searchProperties.timezoneForTimeBuckets}', TIMESTAMPTZ '${computedOrigin!!}') as start,
$allAggregates
"""
} else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.common.util.*
import com.egm.stellio.search.entity.model.Attribute
import com.egm.stellio.search.entity.model.AttributeMetadata
Expand All @@ -28,7 +29,8 @@ import java.util.UUID

@Service
class AttributeInstanceService(
private val databaseClient: DatabaseClient
private val databaseClient: DatabaseClient,
private val searchProperties: SearchProperties
) {

private val attributesInstancesTables = listOf("attribute_instance", "attribute_instance_audit")
Expand Down Expand Up @@ -223,7 +225,7 @@ class AttributeInstanceService(
val computedOrigin = origin ?: temporalQuery.timeAt
"""
SELECT temporal_entity_attribute,
public.time_bucket('$aggrPeriodDuration', time, TIMESTAMPTZ '${computedOrigin!!}') as start,
public.time_bucket('$aggrPeriodDuration', time, '${searchProperties.timezoneForTimeBuckets}', TIMESTAMPTZ '${computedOrigin!!}') as start,
$allAggregates
""".trimIndent()
} else
Expand Down
4 changes: 4 additions & 0 deletions search-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ server.port = 8083
search.payload-max-body-size = 2048000

search.on-owner-delete-cascade-entities = false
# by default, bucket start times are aligned at 00:00:00UTC
# https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/#timezones
# this property allows to align them different timezones which is useful for weekly or monthly aggregates for instance
search.timezone-for-time-buckets = GMT
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package com.egm.stellio.search.temporal.service

import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.entity.model.Attribute
import com.egm.stellio.search.entity.service.EntityAttributeService
import com.egm.stellio.search.support.*
import com.egm.stellio.search.temporal.model.*
import com.egm.stellio.shared.model.OperationNotSupportedException
import com.egm.stellio.shared.util.*
import com.ninjasquad.springmockk.MockkBean
import io.mockk.every
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.AbstractObjectAssert
import org.assertj.core.api.Assertions
import org.assertj.core.api.InstanceOfAssertFactories
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertInstanceOf
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
Expand All @@ -39,10 +43,18 @@ class AggregatedTemporalQueryServiceTests : WithTimescaleContainer, WithKafkaCon
@Autowired
private lateinit var r2dbcEntityTemplate: R2dbcEntityTemplate

@MockkBean(relaxed = true)
private lateinit var searchProperties: SearchProperties

private val now = ngsiLdDateTime()
private val attributeUuid = UUID.randomUUID()
private val entityId = "urn:ngsi-ld:BeeHive:${UUID.randomUUID()}".toUri()

@BeforeEach
fun mockSearchProperties() {
every { searchProperties.timezoneForTimeBuckets } returns "GMT"
}

@AfterEach
fun clearAttributesInstances() {
r2dbcEntityTemplate.delete(AttributeInstance::class.java)
Expand Down Expand Up @@ -428,6 +440,36 @@ class AggregatedTemporalQueryServiceTests : WithTimescaleContainer, WithKafkaCon
}
}

@Test
fun `it should aggregate using the specified timezone`() = runTest {
// set the timezone to Europe/Paris to have all the results aggregated on January 2024
every { searchProperties.timezoneForTimeBuckets } returns "Europe/Paris"

val attribute = createAttribute(Attribute.AttributeValueType.NUMBER)
val startTimestamp = ZonedDateTime.parse("2023-12-31T23:00:00Z")
(0..9).forEach { i ->
val attributeInstance = gimmeNumericPropertyAttributeInstance(attributeUuid)
.copy(
time = startTimestamp.plusHours(i.toLong()),
measuredValue = 1.0
)
attributeInstanceService.create(attributeInstance)
}

val temporalEntitiesQuery = createTemporalEntitiesQuery("sum", "P1M")
attributeInstanceService.search(
temporalEntitiesQuery.copy(
temporalQuery = temporalEntitiesQuery.temporalQuery.copy(timeAt = startTimestamp)
),
attribute,
startTimestamp
)
.shouldSucceedWith { results ->
assertEquals(1, results.size)
assertEquals(10.0, (results[0] as AggregatedAttributeInstanceResult).values[0].value)
}
}

private suspend fun createAttribute(
attributeValueType: Attribute.AttributeValueType
): Attribute {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.egm.stellio.search.temporal.service

import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.entity.model.Attribute
import com.egm.stellio.search.entity.model.AttributeMetadata
import com.egm.stellio.search.entity.service.EntityAttributeService
Expand Down Expand Up @@ -61,6 +62,9 @@ class AttributeInstanceServiceTests : WithTimescaleContainer, WithKafkaContainer
@Autowired
private lateinit var databaseClient: DatabaseClient

@Autowired
private lateinit var searchProperties: SearchProperties

@Autowired
private lateinit var r2dbcEntityTemplate: R2dbcEntityTemplate

Expand Down Expand Up @@ -562,7 +566,10 @@ class AttributeInstanceServiceTests : WithTimescaleContainer, WithKafkaContainer

@Test
fun `it should create an attribute instance if it has a non null value`() = runTest {
val attributeInstanceService = spyk(AttributeInstanceService(databaseClient), recordPrivateCalls = true)
val attributeInstanceService = spyk(
AttributeInstanceService(databaseClient, searchProperties),
recordPrivateCalls = true
)
val attributeMetadata = AttributeMetadata(
measuredValue = 550.0,
value = null,
Expand Down Expand Up @@ -621,7 +628,10 @@ class AttributeInstanceServiceTests : WithTimescaleContainer, WithKafkaContainer

@Test
fun `it should create an attribute instance with boolean value`() = runTest {
val attributeInstanceService = spyk(AttributeInstanceService(databaseClient), recordPrivateCalls = true)
val attributeInstanceService = spyk(
AttributeInstanceService(databaseClient, searchProperties),
recordPrivateCalls = true
)
val attributeMetadata = AttributeMetadata(
measuredValue = null,
value = false.toString(),
Expand Down

0 comments on commit 811cd3e

Please sign in to comment.