Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync back release PR with dev #144

Closed
wants to merge 10 commits into from
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate")
object Versions {
const val project = "0.5.3-SNAPSHOT"
const val project = "0.5.3"

const val java = 17
const val kotlin = "1.9.22"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class OuraSourceTask extends SourceTask {
private AvroData avroData = new AvroData(20);
private KafkaOffsetManager offsetManager;
String TIMESTAMP_OFFSET_KEY = "timestamp";
long TIMEOUT = 60000L;

public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) {
OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config;
Expand Down Expand Up @@ -144,6 +145,8 @@ public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> sourceRecords = Collections.emptyList();

do {
Thread.sleep(TIMEOUT);

Map<String, String> configs = context.configs();
Iterator<? extends RestRequest> requestIterator = this.requests()
.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,9 @@ constructor(
logger.info("Offsets found in persistence: " + offsetTime.toString())
offsetTime.coerceAtLeast(startDate)
}
val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate
val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now()
if (Duration.between(startOffset, endDate) <= ONE_DAY) {
logger.info("Interval between dates is too short. Backing off..")
userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME)
logger.info("Interval between dates is too short. Not requesting..")
return emptySequence()
}
val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate)
Expand Down Expand Up @@ -130,19 +129,12 @@ constructor(
ouraOffsetManager.updateOffsets(
request.route,
request.user,
Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)),
Instant.ofEpochSecond(offset).plus(ONE_DAY),
)
val currentNextRequestTime = userNextRequest[request.user.versionedId]
val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME)
userNextRequest[request.user.versionedId] =
currentNextRequestTime?.let {
if (currentNextRequestTime > nextRequestTime) {
currentNextRequestTime
} else {
nextRequestTime
}
}
?: nextRequestTime
userNextRequest[request.user.versionedId] = userNextRequest[request.user.versionedId]?.let {
if (it > nextRequestTime) it else nextRequestTime
} ?: nextRequestTime
} else {
if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) {
logger.info("No records found, updating offsets to end date..")
Expand All @@ -152,6 +144,8 @@ constructor(
request.endDate,
)
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
} else {
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
}
}
return records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ data class OuraUser(
@JsonProperty("externalId") override val externalId: String?,
@JsonProperty("isAuthorized") override val isAuthorized: Boolean,
@JsonProperty("startDate") override val startDate: Instant,
@JsonProperty("endDate") override val endDate: Instant,
@JsonProperty("endDate") override val endDate: Instant? = null,
@JsonProperty("version") override val version: String? = null,
@JsonProperty("serviceUserId") override val serviceUserId: String? = null,
) : User {
override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId)
override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}"

fun isComplete() = isAuthorized && startDate.isBefore(endDate) && serviceUserId != null
fun isComplete() =
isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface User {
val sourceId: String
val externalId: String?
val startDate: Instant
val endDate: Instant
val endDate: Instant?
val createdAt: Instant
val humanReadableUserId: String?
val serviceUserId: String?
Expand Down
Loading