diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 3f84278c..6b5a3557 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,6 +1,6 @@ @Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { - const val project = "0.5.3-SNAPSHOT" + const val project = "0.5.3" const val java = 17 const val kotlin = "1.9.22" diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java index 293a792f..39ad35e7 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java @@ -63,6 +63,7 @@ public class OuraSourceTask extends SourceTask { private AvroData avroData = new AvroData(20); private KafkaOffsetManager offsetManager; String TIMESTAMP_OFFSET_KEY = "timestamp"; + long TIMEOUT = 60000L; public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) { OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config; @@ -144,6 +145,8 @@ public List poll() throws InterruptedException { List sourceRecords = Collections.emptyList(); do { + Thread.sleep(TIMEOUT); + Map configs = context.configs(); Iterator requestIterator = this.requests() .iterator(); diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index d948d874..6d35132e 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -89,10 +89,9 @@ constructor( logger.info("Offsets found in persistence: " + offsetTime.toString()) offsetTime.coerceAtLeast(startDate) } - val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate + val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now() if (Duration.between(startOffset, endDate) <= ONE_DAY) { - logger.info("Interval between dates is too short. Backing off..") - userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) + logger.info("Interval between dates is too short. Not requesting..") return emptySequence() } val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate) @@ -130,19 +129,12 @@ constructor( ouraOffsetManager.updateOffsets( request.route, request.user, - Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)), + Instant.ofEpochSecond(offset).plus(ONE_DAY), ) - val currentNextRequestTime = userNextRequest[request.user.versionedId] val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME) - userNextRequest[request.user.versionedId] = - currentNextRequestTime?.let { - if (currentNextRequestTime > nextRequestTime) { - currentNextRequestTime - } else { - nextRequestTime - } - } - ?: nextRequestTime + userNextRequest[request.user.versionedId] = userNextRequest[request.user.versionedId]?.let { + if (it > nextRequestTime) it else nextRequestTime + } ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { logger.info("No records found, updating offsets to end date..") @@ -152,6 +144,8 @@ constructor( request.endDate, ) userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) + } else { + userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) } } return records diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt index 2b33c3d8..57905f8e 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt @@ -16,12 +16,13 @@ data class OuraUser( @JsonProperty("externalId") override val externalId: String?, @JsonProperty("isAuthorized") override val isAuthorized: Boolean, @JsonProperty("startDate") override val startDate: Instant, - @JsonProperty("endDate") override val endDate: Instant, + @JsonProperty("endDate") override val endDate: Instant? = null, @JsonProperty("version") override val version: String? = null, @JsonProperty("serviceUserId") override val serviceUserId: String? = null, ) : User { override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId) override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}" - fun isComplete() = isAuthorized && startDate.isBefore(endDate) && serviceUserId != null + fun isComplete() = + isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null } diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt b/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt index ee6fd496..b2247354 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt @@ -10,7 +10,7 @@ interface User { val sourceId: String val externalId: String? val startDate: Instant - val endDate: Instant + val endDate: Instant? val createdAt: Instant val humanReadableUserId: String? val serviceUserId: String?