Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of ryw_delay to minimize retries on IAM fetch #2207

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class IamFetchReadyCondition(
override val id: String
get() = ID

override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean {
override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, RywData>>): Boolean {
val tokenMap = indexedTokens[key] ?: return false
val userUpdateTokenSet = tokenMap[IamFetchRywTokenKey.USER] != null

Expand All @@ -33,9 +33,16 @@ class IamFetchReadyCondition(
return userUpdateTokenSet
}

override fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String? {
override fun getRywData(indexedTokens: Map<String, Map<IConsistencyKeyEnum, RywData?>>): RywData? {
val tokenMap = indexedTokens[key] ?: return null
// maxOrNull compares lexicographically
return listOfNotNull(tokenMap[IamFetchRywTokenKey.USER], tokenMap[IamFetchRywTokenKey.SUBSCRIPTION]).maxOrNull()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this code again I believe there was a bug here before this PR. maxOrNull() is going to give the highest values, but this is different for strings vs numbers. Example with string, is the byte value is compared left to right, so "9" > 50". To avoid this we could either convert the strings into numbers or left pad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxOrNull compares lexicographically & we're generating the token already with a buffer so they're all equal length


/**
* Collect non-null RywData objects and find the one with the largest rywToken lexicographically
* Note: this works because RYW tokens are always the same length
*/
return listOfNotNull(
tokenMap[IamFetchRywTokenKey.USER],
tokenMap[IamFetchRywTokenKey.SUBSCRIPTION],
).maxByOrNull { it.rywToken ?: "" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.onesignal.common.consistency

data class RywData(val rywToken: String, val rywDelay: Long?)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.common.consistency.impl

import com.onesignal.common.consistency.RywData
import com.onesignal.common.consistency.models.ICondition
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
import com.onesignal.common.consistency.models.IConsistencyManager
Expand All @@ -18,8 +19,8 @@ import kotlinx.coroutines.sync.withLock
*/
class ConsistencyManager : IConsistencyManager {
private val mutex = Mutex()
private val indexedTokens: MutableMap<String, MutableMap<IConsistencyKeyEnum, String>> = mutableMapOf()
private val conditions: MutableList<Pair<ICondition, CompletableDeferred<String?>>> =
private val indexedTokens: MutableMap<String, MutableMap<IConsistencyKeyEnum, RywData>> = mutableMapOf()
private val conditions: MutableList<Pair<ICondition, CompletableDeferred<RywData?>>> =
mutableListOf()

/**
Expand All @@ -29,10 +30,10 @@ class ConsistencyManager : IConsistencyManager {
* key: K - corresponds to the operation for which we have a read-your-write token
* value: String? - the token (read-your-write token)
*/
override suspend fun setRywToken(
override suspend fun setRywData(
id: String,
key: IConsistencyKeyEnum,
value: String,
value: RywData,
) {
mutex.withLock {
val rywTokens = indexedTokens.getOrPut(id) { mutableMapOf() }
Expand All @@ -44,9 +45,9 @@ class ConsistencyManager : IConsistencyManager {
/**
* Register a condition with its corresponding deferred action. Returns a deferred condition.
*/
override suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?> {
override suspend fun getRywDataFromAwaitableCondition(condition: ICondition): CompletableDeferred<RywData?> {
mutex.withLock {
val deferred = CompletableDeferred<String?>()
val deferred = CompletableDeferred<RywData?>()
val pair = Pair(condition, deferred)
conditions.add(pair)
checkConditionsAndComplete()
Expand All @@ -55,7 +56,7 @@ class ConsistencyManager : IConsistencyManager {
}

override suspend fun resolveConditionsWithID(id: String) {
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<RywData?>>>()

for ((condition, deferred) in conditions) {
if (condition.id == id) {
Expand All @@ -74,13 +75,13 @@ class ConsistencyManager : IConsistencyManager {
* IMPORTANT: calling code should be protected by mutex to avoid potential inconsistencies
*/
private fun checkConditionsAndComplete() {
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<RywData?>>>()

for ((condition, deferred) in conditions) {
if (condition.isMet(indexedTokens)) {
val newestToken = condition.getNewestToken(indexedTokens)
val rywDataForNewestToken = condition.getRywData(indexedTokens)
if (!deferred.isCompleted) {
deferred.complete(newestToken)
deferred.complete(rywDataForNewestToken)
}
completedConditions.add(Pair(condition, deferred))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.onesignal.common.consistency.models

import com.onesignal.common.consistency.RywData

interface ICondition {
/**
* Every implementation should define a unique ID & make available via a companion object for
Expand All @@ -11,11 +13,11 @@ interface ICondition {
* Define a condition that "unblocks" execution
* e.g. we have token (A && B) || A
*/
fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean
fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, RywData>>): Boolean

/**
* Used to process tokens according to their format & return the newest token.
* e.g. numeric strings would be compared differently from JWT tokens
*/
fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String?
fun getRywData(indexedTokens: Map<String, Map<IConsistencyKeyEnum, RywData?>>): RywData?
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.common.consistency.models

import com.onesignal.common.consistency.RywData
import kotlinx.coroutines.CompletableDeferred

interface IConsistencyManager {
Expand All @@ -10,10 +11,10 @@ interface IConsistencyManager {
* key: IConsistencyKeyEnum - corresponds to the operation for which we have a read-your-write token
* value: String? - the read-your-write token
*/
suspend fun setRywToken(
suspend fun setRywData(
id: String,
key: IConsistencyKeyEnum,
value: String,
value: RywData,
)

/**
Expand All @@ -22,7 +23,7 @@ interface IConsistencyManager {
* condition: ICondition - the condition to be registered
* Returns: CompletableDeferred<String?> - a deferred action that completes when the condition is met
*/
suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?>
suspend fun getRywDataFromAwaitableCondition(condition: ICondition): CompletableDeferred<RywData?>

/**
* Resolve all conditions with a specific ID
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.user.internal.backend

import com.onesignal.common.consistency.RywData
import com.onesignal.common.exceptions.BackendException

interface ISubscriptionBackendService {
Expand All @@ -21,7 +22,7 @@ interface ISubscriptionBackendService {
aliasLabel: String,
aliasValue: String,
subscription: SubscriptionObject,
): Pair<String, String?>?
): Pair<String, RywData?>?

/**
* Update an existing subscription with the properties provided.
Expand All @@ -34,7 +35,7 @@ interface ISubscriptionBackendService {
appId: String,
subscriptionId: String,
subscription: SubscriptionObject,
): String?
): RywData?

/**
* Delete an existing subscription.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.user.internal.backend

import com.onesignal.common.consistency.RywData
import com.onesignal.common.exceptions.BackendException

interface IUserBackendService {
Expand Down Expand Up @@ -47,7 +48,7 @@ interface IUserBackendService {
properties: PropertiesObject,
refreshDeviceMetadata: Boolean,
propertyiesDelta: PropertiesDeltasObject,
): String?
): RywData?

/**
* Retrieve a user from the backend.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.user.internal.backend.impl

import com.onesignal.common.consistency.RywData
import com.onesignal.common.exceptions.BackendException
import com.onesignal.common.safeJSONObject
import com.onesignal.common.toMap
Expand All @@ -16,7 +17,7 @@ internal class SubscriptionBackendService(
aliasLabel: String,
aliasValue: String,
subscription: SubscriptionObject,
): Pair<String, String?>? {
): Pair<String, RywData?>? {
val jsonSubscription = JSONConverter.convertToJSON(subscription)
jsonSubscription.remove("id")
val requestJSON = JSONObject().put("subscription", jsonSubscription)
Expand All @@ -27,25 +28,32 @@ internal class SubscriptionBackendService(
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseJSON = JSONObject(response.payload!!)
val subscriptionJSON = responseJSON.safeJSONObject("subscription")
val responseJSON = response.payload?.let { JSONObject(it) }
val subscriptionJSON = responseJSON?.safeJSONObject("subscription")

if (subscriptionJSON == null || !subscriptionJSON.has("id")) {
return null
}

var rywToken: String? = null
if (responseJSON.has("ryw_token")) {
rywToken = responseJSON.getString("ryw_token")
fun JSONObject.safeString(key: String): String? = if (this.has(key)) this.getString(key) else null

fun JSONObject.safeLong(key: String): Long? = if (this.has(key)) this.getLong(key) else null
val rywToken = responseJSON.safeString("ryw_token")
val rywDelay = responseJSON.safeLong("ryw_delay")
var rywData: RywData? = null

if (rywToken != null) {
rywData = RywData(rywToken, rywDelay)
}

return Pair(subscriptionJSON.getString("id"), rywToken)
return Pair(subscriptionJSON.getString("id"), rywData)
}

override suspend fun updateSubscription(
appId: String,
subscriptionId: String,
subscription: SubscriptionObject,
): String? {
): RywData? {
val requestJSON =
JSONObject()
.put("subscription", JSONConverter.convertToJSON(subscription))
Expand All @@ -56,9 +64,17 @@ internal class SubscriptionBackendService(
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseBody = JSONObject(response.payload)
return if (responseBody.has("ryw_token")) {
responseBody.getString("ryw_token")
fun JSONObject.safeString(key: String): String? = if (this.has(key)) this.getString(key) else null

fun JSONObject.safeLong(key: String): Long? = if (this.has(key)) this.getLong(key) else null

val responseJSON = response.payload?.let { JSONObject(it) }

val rywToken = responseJSON?.safeString("ryw_token")
val rywDelay = responseJSON?.safeLong("ryw_delay")

return if (rywToken !== null) {
RywData(rywToken, rywDelay)
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.onesignal.user.internal.backend.impl

import com.onesignal.common.consistency.RywData
import com.onesignal.common.exceptions.BackendException
import com.onesignal.common.putMap
import com.onesignal.core.internal.http.IHttpClient
Expand Down Expand Up @@ -52,7 +53,7 @@ internal class UserBackendService(
properties: PropertiesObject,
refreshDeviceMetadata: Boolean,
propertyiesDelta: PropertiesDeltasObject,
): String? {
): RywData? {
val jsonObject =
JSONObject()
.put("refresh_device_metadata", refreshDeviceMetadata)
Expand All @@ -71,9 +72,17 @@ internal class UserBackendService(
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseBody = JSONObject(response.payload)
return if (responseBody.has("ryw_token")) {
responseBody.getString("ryw_token")
fun JSONObject.safeString(key: String): String? = if (this.has(key)) this.getString(key) else null

fun JSONObject.safeLong(key: String): Long? = if (this.has(key)) this.getLong(key) else null

val responseJSON = response.payload?.let { JSONObject(it) }

val rywToken = responseJSON?.safeString("ryw_token")
val rywDelay = responseJSON?.safeLong("ryw_delay")

return if (rywToken != null) {
RywData(rywToken, rywDelay)
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ internal class SubscriptionOperationExecutor(
) ?: return ExecutionResponse(ExecutionResult.SUCCESS)

val backendSubscriptionId = result.first
val rywToken = result.second
val rywData = result.second

if (rywToken != null) {
_consistencyManager.setRywToken(createOperation.onesignalId, IamFetchRywTokenKey.SUBSCRIPTION, rywToken)
if (rywData?.rywToken != null) {
_consistencyManager.setRywData(createOperation.onesignalId, IamFetchRywTokenKey.SUBSCRIPTION, rywData)
} else {
_consistencyManager.resolveConditionsWithID(IamFetchReadyCondition.ID)
}
Expand Down Expand Up @@ -188,10 +188,10 @@ internal class SubscriptionOperationExecutor(
AndroidUtils.getAppVersion(_applicationService.appContext),
)

val rywToken = _subscriptionBackend.updateSubscription(lastOperation.appId, lastOperation.subscriptionId, subscription)
val rywData = _subscriptionBackend.updateSubscription(lastOperation.appId, lastOperation.subscriptionId, subscription)

if (rywToken != null) {
_consistencyManager.setRywToken(startingOperation.onesignalId, IamFetchRywTokenKey.SUBSCRIPTION, rywToken)
if (rywData?.rywToken != null) {
_consistencyManager.setRywData(startingOperation.onesignalId, IamFetchRywTokenKey.SUBSCRIPTION, rywData)
} else {
_consistencyManager.resolveConditionsWithID(IamFetchReadyCondition.ID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ internal class UpdateUserOperationExecutor(

if (appId != null && onesignalId != null) {
try {
val rywToken =
val rywData =
_userBackend.updateUser(
appId,
IdentityConstants.ONESIGNAL_ID,
Expand All @@ -148,8 +148,8 @@ internal class UpdateUserOperationExecutor(
deltasObject,
)

if (rywToken != null) {
_consistencyManager.setRywToken(onesignalId, IamFetchRywTokenKey.USER, rywToken)
if (rywData?.rywToken != null) {
_consistencyManager.setRywData(onesignalId, IamFetchRywTokenKey.USER, rywData)
} else {
_consistencyManager.resolveConditionsWithID(IamFetchReadyCondition.ID)
}
Expand Down
Loading
Loading