From 58ef7a11e06c91967c26765c66935c0bc2c370fa Mon Sep 17 00:00:00 2001 From: Nan Date: Thu, 20 Jun 2024 09:40:39 -0700 Subject: [PATCH 1/8] [tests] make Mock Client use a concurrent DispatchQueue * When testing concurrency, the Mock Client actually was synchronizing the requests, so crashes were not reproducible * By changing to a concurrent DispatchQueue, this is more reflective of the real Client's behavior --- .../OneSignalCoreMocks/MockOneSignalClient.swift | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift index f2f7ce65e..6b8a72e0f 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift @@ -28,7 +28,8 @@ import OneSignalCore */ @objc public class MockOneSignalClient: NSObject, IOneSignalClient { - public let executionQueue: DispatchQueue = DispatchQueue(label: "com.onesignal.execution") + public let executionQueue: DispatchQueue = DispatchQueue(label: "com.onesignal.execution", attributes: .concurrent) + let lock = NSLock() var mockResponses: [String: [String: Any]] = [:] var mockFailureResponses: [String: NSError] = [:] @@ -90,7 +91,9 @@ public class MockOneSignalClient: NSObject, IOneSignalClient { public func execute(_ request: OneSignalRequest, onSuccess successBlock: @escaping OSResultSuccessBlock, onFailure failureBlock: @escaping OSFailureBlock) { print("🧪 MockOneSignalClient execute called") - executedRequests.append(request) + lock.withLock { + executedRequests.append(request) + } if executeInstantaneously { finishExecutingRequest(request, onSuccess: successBlock, onFailure: failureBlock) From f2869a84e8c3156d2d1ced2b1779e9e7c17de3c9 Mon Sep 17 00:00:00 2001 From: Nan Date: Fri, 21 Jun 2024 11:53:32 -0700 Subject: [PATCH 2/8] [tests] Mock Client has flag to let all requests through * Add a flag called `fireSuccessForAllRequests` that will fire the success callback for all requests so that they don't need to be manually set up using mock responses beforehand. * This is useful for tests that want the client to return success but don't need the details of particular requests. --- .../OneSignalCoreMocks/MockOneSignalClient.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift index 6b8a72e0f..0f8df1b02 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOneSignalClient.swift @@ -37,6 +37,8 @@ public class MockOneSignalClient: NSObject, IOneSignalClient { public var networkRequestCount = 0 public var executedRequests: [OneSignalRequest] = [] public var executeInstantaneously = false + /// Set to true to make it unnecessary to setup mock responses for every request possible + public var fireSuccessForAllRequests = false var remoteParamsResponse: [String: Any]? var shouldUseProvisionalAuthorization = false // new in iOS 12 (aka Direct to History) @@ -138,6 +140,9 @@ public class MockOneSignalClient: NSObject, IOneSignalClient { successBlock(mockResponses[stringifiedRequest]) } else if (mockFailureResponses[stringifiedRequest]) != nil { failureBlock(mockFailureResponses[stringifiedRequest]) + } else if fireSuccessForAllRequests { + allRequestsHandled = false + successBlock([:]) } else { allRequestsHandled = false print("🧪 cannot find a mock response for request: \(stringifiedRequest)") From 63140d4eedb7669ff410ff5410b1a886e51b50d7 Mon Sep 17 00:00:00 2001 From: Nan Date: Wed, 19 Jun 2024 15:13:55 -0700 Subject: [PATCH 3/8] Add dispatch queue to Subscription Executor * Synchronize access to the delta queue and request queues * Add test `testSubscriptionExecutorConcurrency` that reproduced crash that is then fixed by the changes here. --- .../OSSubscriptionOperationExecutor.swift | 234 ++++++++++-------- .../OSRequestDeleteSubscription.swift | 4 +- .../OneSignalUserTests.swift | 41 +++ 3 files changed, 172 insertions(+), 107 deletions(-) diff --git a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSSubscriptionOperationExecutor.swift b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSSubscriptionOperationExecutor.swift index 4718de69f..4fd23f296 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSSubscriptionOperationExecutor.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSSubscriptionOperationExecutor.swift @@ -37,6 +37,9 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor { var updateRequestQueue: [OSRequestUpdateSubscription] = [] var subscriptionModels: [String: OSSubscriptionModel] = [:] + // The Subscription executor dispatch queue, serial. This synchronizes access to the delta and request queues. + private let dispatchQueue = DispatchQueue(label: "OneSignal.OSSubscriptionOperationExecutor", target: .global()) + init() { // Read unfinished deltas from cache, if any... if var deltaQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, defaultValue: []) as? [OSDelta] { @@ -152,75 +155,84 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor { } func enqueueDelta(_ delta: OSDelta) { - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor enqueueDelta: \(delta)") - deltaQueue.append(delta) + self.dispatchQueue.async { + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor enqueueDelta: \(delta)") + self.deltaQueue.append(delta) + } } func cacheDeltaQueue() { - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) + self.dispatchQueue.async { + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) + } } func processDeltaQueue(inBackground: Bool) { - if !deltaQueue.isEmpty { - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor processDeltaQueue with queue: \(deltaQueue)") - } - for delta in deltaQueue { - guard let subModel = delta.model as? OSSubscriptionModel - else { - OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)") - continue + self.dispatchQueue.async { + if !self.deltaQueue.isEmpty { + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor processDeltaQueue with queue: \(self.deltaQueue)") } + for delta in self.deltaQueue { + guard let subModel = delta.model as? OSSubscriptionModel + else { + OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)") + continue + } - switch delta.name { - case OS_ADD_SUBSCRIPTION_DELTA: - // Only create the request if the identity model exists - if let identityModel = OneSignalUserManagerImpl.sharedInstance.getIdentityModel(delta.identityModelId) { - let request = OSRequestCreateSubscription( - subscriptionModel: subModel, - identityModel: identityModel + switch delta.name { + case OS_ADD_SUBSCRIPTION_DELTA: + // Only create the request if the identity model exists + if let identityModel = OneSignalUserManagerImpl.sharedInstance.getIdentityModel(delta.identityModelId) { + let request = OSRequestCreateSubscription( + subscriptionModel: subModel, + identityModel: identityModel + ) + self.addRequestQueue.append(request) + } else { + OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)") + } + case OS_REMOVE_SUBSCRIPTION_DELTA: + let request = OSRequestDeleteSubscription( + subscriptionModel: subModel ) - addRequestQueue.append(request) - } else { - OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor.processDeltaQueue dropped \(delta)") + self.removeRequestQueue.append(request) + + case OS_UPDATE_SUBSCRIPTION_DELTA: + let request = OSRequestUpdateSubscription( + subscriptionObject: [delta.property: delta.value], + subscriptionModel: subModel + ) + self.updateRequestQueue.append(request) + + default: + // Log error + OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSSubscriptionOperationExecutor met incompatible OSDelta type: \(delta).") } - case OS_REMOVE_SUBSCRIPTION_DELTA: - let request = OSRequestDeleteSubscription( - subscriptionModel: subModel - ) - removeRequestQueue.append(request) - - case OS_UPDATE_SUBSCRIPTION_DELTA: - let request = OSRequestUpdateSubscription( - subscriptionObject: [delta.property: delta.value], - subscriptionModel: subModel - ) - updateRequestQueue.append(request) - - default: - // Log error - OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSSubscriptionOperationExecutor met incompatible OSDelta type: \(delta).") } - } - self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue + self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue - // persist executor's requests (including new request) to storage - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + // persist executor's requests (including new request) to storage + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead? + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead? - processRequestQueue(inBackground: inBackground) + self.processRequestQueue(inBackground: inBackground) + } } // Bypasses the operation repo to create a push subscription request func createPushSubscription(subscriptionModel: OSSubscriptionModel, identityModel: OSIdentityModel) { let request = OSRequestCreateSubscription(subscriptionModel: subscriptionModel, identityModel: identityModel) - addRequestQueue.append(request) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + self.dispatchQueue.async { + self.addRequestQueue.append(request) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + } } + /// This method is called by `processDeltaQueue` only and does not need to be added to the dispatchQueue. func processRequestQueue(inBackground: Bool) { let requestQueue: [OneSignalRequest] = addRequestQueue + removeRequestQueue + updateRequestQueue @@ -261,46 +273,50 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor { OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSSubscriptionOperationExecutor: executeCreateSubscriptionRequest making request: \(request)") OneSignalCoreImpl.sharedClient().execute(request) { result in // On success, remove request from cache (even if not hydrating model), and hydrate model - self.addRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) - - guard let response = result?["subscription"] as? [String: Any] else { - OneSignalLog.onesignalLog(.LL_ERROR, message: "Unabled to parse response to create subscription request") + self.dispatchQueue.async { + self.addRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + + guard let response = result?["subscription"] as? [String: Any] else { + OneSignalLog.onesignalLog(.LL_ERROR, message: "Unabled to parse response to create subscription request") + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } + return + } + request.subscriptionModel.hydrate(response) if inBackground { OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } - return - } - request.subscriptionModel.hydrate(response) - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } } onFailure: { error in OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor create subscription request failed with error: \(error.debugDescription)") - if let nsError = error as? NSError { - let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) - if responseType == .missing { - self.addRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) - // Logout if the user in the SDK is the same - guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel) - else { - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + if let nsError = error as? NSError { + let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) + if responseType == .missing { + self.addRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + // Logout if the user in the SDK is the same + guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel) + else { + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } + return } - return + // The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model + OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil + OneSignalUserManagerImpl.sharedInstance._logout() + } else if responseType != .retryable { + // Fail, no retry, remove from cache and queue + self.addRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) } - // The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model - OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil - OneSignalUserManagerImpl.sharedInstance._logout() - } else if responseType != .retryable { - // Fail, no retry, remove from cache and queue - self.addRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) } - } - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } } @@ -324,24 +340,28 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor { OneSignalCoreImpl.sharedClient().execute(request) { _ in // On success, remove request from cache. No model hydration occurs. // For example, if app restarts and we read in operations between sending this off and getting the response - self.removeRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + self.removeRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } onFailure: { error in OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor delete subscription request failed with error: \(error.debugDescription)") - if let nsError = error as? NSError { - let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) - if responseType != .retryable { - // Fail, no retry, remove from cache and queue - // If this request returns a missing status, that is ok as this is a delete request - self.removeRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + self.dispatchQueue.async { + if let nsError = error as? NSError { + let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) + if responseType != .retryable { + // Fail, no retry, remove from cache and queue + // If this request returns a missing status, that is ok as this is a delete request + self.removeRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + } + } + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } - } - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } } } @@ -363,23 +383,27 @@ class OSSubscriptionOperationExecutor: OSOperationExecutor { OneSignalCoreImpl.sharedClient().execute(request) { _ in // On success, remove request from cache. No model hydration occurs. // For example, if app restarts and we read in operations between sending this off and getting the response - self.updateRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + self.updateRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } onFailure: { error in OneSignalLog.onesignalLog(.LL_ERROR, message: "OSSubscriptionOperationExecutor update subscription request failed with error: \(error.debugDescription)") - if let nsError = error as? NSError { - let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) - if responseType != .retryable { - // Fail, no retry, remove from cache and queue - self.updateRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + self.dispatchQueue.async { + if let nsError = error as? NSError { + let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) + if responseType != .retryable { + // Fail, no retry, remove from cache and queue + self.updateRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_SUBSCRIPTION_EXECUTOR_UPDATE_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue) + } + } + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } - } - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } } } diff --git a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Requests/OSRequestDeleteSubscription.swift b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Requests/OSRequestDeleteSubscription.swift index 11ed44c86..83c7275b1 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Requests/OSRequestDeleteSubscription.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Requests/OSRequestDeleteSubscription.swift @@ -55,7 +55,7 @@ class OSRequestDeleteSubscription: OneSignalRequest, OSUserRequest { init(subscriptionModel: OSSubscriptionModel) { self.subscriptionModel = subscriptionModel - self.stringDescription = "OSRequestDeleteSubscription with subscriptionModel: \(subscriptionModel.address ?? "nil")" + self.stringDescription = "" super.init() self.method = DELETE _ = prepareForExecution() // sets the path property @@ -77,7 +77,7 @@ class OSRequestDeleteSubscription: OneSignalRequest, OSUserRequest { return nil } self.subscriptionModel = subscriptionModel - self.stringDescription = "OSRequestDeleteSubscription with subscriptionModel: \(subscriptionModel.address ?? "nil")" + self.stringDescription = "" super.init() self.method = HTTPMethod(rawValue: rawMethod) self.timestamp = timestamp diff --git a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift index c37b3d22b..9706f67ae 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift @@ -95,6 +95,47 @@ final class OneSignalUserTests: XCTestCase { // 2. OSPropertyOperationExecutor: `deltaQueue.append(delta)` EXC_BAD_ACCESS } + /** + This test reproduces a crash in the Subscription Executor. + It is possible for two threads to modify and cache queues concurrently. + */ + func testSubscriptionExecutorConcurrency() throws { + /* Setup */ + + let client = MockOneSignalClient() + client.setMockResponseForRequest( + request: "", + response: [:] + ) + OneSignalCoreImpl.setSharedClient(client) + + let executor = OSSubscriptionOperationExecutor() + OSOperationRepo.sharedInstance.addExecutor(executor) + + /* When */ + + DispatchQueue.concurrentPerform(iterations: 50) { _ in + // 1. Enqueue Remove Subscription Deltas to the Operation Repo + OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_REMOVE_SUBSCRIPTION_DELTA, identityModelId: UUID().uuidString, model: OSSubscriptionModel(type: .email, address: nil, subscriptionId: UUID().uuidString, reachable: true, isDisabled: false, changeNotifier: OSEventProducer()), property: "email", value: "email")) + OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_REMOVE_SUBSCRIPTION_DELTA, identityModelId: UUID().uuidString, model: OSSubscriptionModel(type: .email, address: nil, subscriptionId: UUID().uuidString, reachable: true, isDisabled: false, changeNotifier: OSEventProducer()), property: "email", value: "email")) + + // 2. Flush Operation Repo + OSOperationRepo.sharedInstance.addFlushDeltaQueueToDispatchQueue() + + // 3. Simulate updating the executor's request queue from a network response + executor.executeDeleteSubscriptionRequest(OSRequestDeleteSubscription(subscriptionModel: OSSubscriptionModel(type: .email, address: nil, subscriptionId: UUID().uuidString, reachable: true, isDisabled: false, changeNotifier: OSEventProducer())), inBackground: false) + executor.executeDeleteSubscriptionRequest(OSRequestDeleteSubscription(subscriptionModel: OSSubscriptionModel(type: .email, address: nil, subscriptionId: UUID().uuidString, reachable: true, isDisabled: false, changeNotifier: OSEventProducer())), inBackground: false) + } + + // 4. Run background threads + OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5) + + /* Then */ + // Previously caused crash: signal SIGABRT - malloc: double free for ptr + // Assert that every request SDK makes has a response set, and is handled + XCTAssertTrue(client.allRequestsHandled) + } + /** This test reproduced a crash when the property model is being encoded. */ From 6535737c95d6abc33b19a075701f679aec25b3ff Mon Sep 17 00:00:00 2001 From: Nan Date: Wed, 19 Jun 2024 14:13:54 -0700 Subject: [PATCH 4/8] Add dispatch queue to Identity Executor * Synchronize access to the delta queue and request queues * Add test `testIdentityExecutorConcurrency` that reproduced crash that is then fixed by the changes here. --- .../OSIdentityOperationExecutor.swift | 159 ++++++++++-------- .../OneSignalUserTests.swift | 39 +++++ 2 files changed, 127 insertions(+), 71 deletions(-) diff --git a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSIdentityOperationExecutor.swift b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSIdentityOperationExecutor.swift index 3811c0077..d195f390d 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSIdentityOperationExecutor.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSIdentityOperationExecutor.swift @@ -35,6 +35,9 @@ class OSIdentityOperationExecutor: OSOperationExecutor { var addRequestQueue: [OSRequestAddAliases] = [] var removeRequestQueue: [OSRequestRemoveAlias] = [] + // The Identity executor dispatch queue, serial. This synchronizes access to the delta and request queues. + private let dispatchQueue = DispatchQueue(label: "OneSignal.OSIdentityOperationExecutor", target: .global()) + init() { // Read unfinished deltas from cache, if any... if var deltaQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, defaultValue: []) as? [OSDelta] { @@ -101,53 +104,60 @@ class OSIdentityOperationExecutor: OSOperationExecutor { } func enqueueDelta(_ delta: OSDelta) { - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor enqueueDelta: \(delta)") - deltaQueue.append(delta) + self.dispatchQueue.async { + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor enqueueDelta: \(delta)") + self.deltaQueue.append(delta) + } } func cacheDeltaQueue() { - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) + self.dispatchQueue.async { + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) + } } func processDeltaQueue(inBackground: Bool) { - if !deltaQueue.isEmpty { - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor processDeltaQueue with queue: \(deltaQueue)") - } - for delta in deltaQueue { - guard let model = delta.model as? OSIdentityModel, - let aliases = delta.value as? [String: String] - else { - // Log error - continue + self.dispatchQueue.async { + if !self.deltaQueue.isEmpty { + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSIdentityOperationExecutor processDeltaQueue with queue: \(self.deltaQueue)") } + for delta in self.deltaQueue { + guard let model = delta.model as? OSIdentityModel, + let aliases = delta.value as? [String: String] + else { + // Log error + continue + } - switch delta.name { - case OS_ADD_ALIAS_DELTA: - let request = OSRequestAddAliases(aliases: aliases, identityModel: model) - addRequestQueue.append(request) + switch delta.name { + case OS_ADD_ALIAS_DELTA: + let request = OSRequestAddAliases(aliases: aliases, identityModel: model) + self.addRequestQueue.append(request) - case OS_REMOVE_ALIAS_DELTA: - for (label, _) in aliases { - let request = OSRequestRemoveAlias(labelToRemove: label, identityModel: model) - removeRequestQueue.append(request) - } + case OS_REMOVE_ALIAS_DELTA: + for (label, _) in aliases { + let request = OSRequestRemoveAlias(labelToRemove: label, identityModel: model) + self.removeRequestQueue.append(request) + } - default: - OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSIdentityOperationExecutor met incompatible OSDelta type: \(delta)") + default: + OneSignalLog.onesignalLog(.LL_DEBUG, message: "OSIdentityOperationExecutor met incompatible OSDelta type: \(delta)") + } } - } - self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue + self.deltaQueue = [] // TODO: Check that we can simply clear all the deltas in the deltaQueue - // persist executor's requests (including new request) to storage - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + // persist executor's requests (including new request) to storage + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead? + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_DELTA_QUEUE_KEY, withValue: self.deltaQueue) // This should be empty, can remove instead? - processRequestQueue(inBackground: inBackground) + self.processRequestQueue(inBackground: inBackground) + } } + /// This method is called by `processDeltaQueue` only and does not need to be added to the dispatchQueue. func processRequestQueue(inBackground: Bool) { let requestQueue: [OneSignalRequest] = addRequestQueue + removeRequestQueue @@ -188,38 +198,42 @@ class OSIdentityOperationExecutor: OSOperationExecutor { OneSignalCoreImpl.sharedClient().execute(request) { _ in // No hydration from response // On success, remove request from cache - self.addRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + self.addRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } onFailure: { error in OneSignalLog.onesignalLog(.LL_ERROR, message: "OSIdentityOperationExecutor add aliases request failed with error: \(error.debugDescription)") - if let nsError = error as? NSError { - let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) - if responseType == .missing { - // Remove from cache and queue - self.addRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) - // Logout if the user in the SDK is the same - guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel) - else { - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + if let nsError = error as? NSError { + let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) + if responseType == .missing { + // Remove from cache and queue + self.addRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) + // Logout if the user in the SDK is the same + guard OneSignalUserManagerImpl.sharedInstance.isCurrentUser(request.identityModel) + else { + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } + return } - return + // The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model + OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil + OneSignalUserManagerImpl.sharedInstance._logout() + } else if responseType != .retryable { + // Fail, no retry, remove from cache and queue + self.addRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) } - // The subscription has been deleted along with the user, so remove the subscription_id but keep the same push subscription model - OneSignalUserManagerImpl.sharedInstance.pushSubscriptionModel?.subscriptionId = nil - OneSignalUserManagerImpl.sharedInstance._logout() - } else if responseType != .retryable { - // Fail, no retry, remove from cache and queue - self.addRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_ADD_REQUEST_QUEUE_KEY, withValue: self.addRequestQueue) } - } - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } } @@ -243,25 +257,28 @@ class OSIdentityOperationExecutor: OSOperationExecutor { OneSignalCoreImpl.sharedClient().execute(request) { _ in // There is nothing to hydrate // On success, remove request from cache - self.removeRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + self.dispatchQueue.async { + self.removeRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) + } } } onFailure: { error in OneSignalLog.onesignalLog(.LL_ERROR, message: "OSIdentityOperationExecutor remove alias request failed with error: \(error.debugDescription)") - - if let nsError = error as? NSError { - let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) - if responseType != .retryable { - // Fail, no retry, remove from cache and queue - // A response of .missing could mean the alias doesn't exist on this user OR this user has been deleted - self.removeRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + self.dispatchQueue.async { + if let nsError = error as? NSError { + let responseType = OSNetworkingUtils.getResponseStatusType(nsError.code) + if responseType != .retryable { + // Fail, no retry, remove from cache and queue + // A response of .missing could mean the alias doesn't exist on this user OR this user has been deleted + self.removeRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_IDENTITY_EXECUTOR_REMOVE_REQUEST_QUEUE_KEY, withValue: self.removeRequestQueue) + } + } + if inBackground { + OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } - } - if inBackground { - OSBackgroundTaskManager.endBackgroundTask(backgroundTaskIdentifier) } } } diff --git a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift index 9706f67ae..cfb70164a 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift @@ -136,6 +136,45 @@ final class OneSignalUserTests: XCTestCase { XCTAssertTrue(client.allRequestsHandled) } + /** + This test reproduces a crash in the Identity Executor. + It is possible for two threads to modify and cache queues concurrently. + */ + func testIdentityExecutorConcurrency() throws { + /* Setup */ + let client = MockOneSignalClient() + let aliases = [UUID().uuidString: "id"] + + OneSignalCoreImpl.setSharedClient(client) + MockUserRequests.setAddAliasesResponse(with: client, aliases: aliases) + + let executor = OSIdentityOperationExecutor() + OSOperationRepo.sharedInstance.addExecutor(executor) + + /* When */ + + DispatchQueue.concurrentPerform(iterations: 50) { _ in + // 1. Enqueue Add Alias Deltas to the Operation Repo + OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_ADD_ALIAS_DELTA, identityModelId: UUID().uuidString, model: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()), property: "aliases", value: aliases)) + OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_ADD_ALIAS_DELTA, identityModelId: UUID().uuidString, model: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()), property: "aliases", value: aliases)) + + // 2. Flush Operation Repo + OSOperationRepo.sharedInstance.addFlushDeltaQueueToDispatchQueue() + + // 3. Simulate updating the executor's request queue from a network response + executor.executeAddAliasesRequest(OSRequestAddAliases(aliases: aliases, identityModel: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer())), inBackground: false) + executor.executeAddAliasesRequest(OSRequestAddAliases(aliases: aliases, identityModel: OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer())), inBackground: false) + } + + // 4. Run background threads + OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5) + + /* Then */ + // Previously caused crash: signal SIGABRT - malloc: double free for ptr + // Assert that every request SDK makes has a response set, and is handled + XCTAssertTrue(client.allRequestsHandled) + } + /** This test reproduced a crash when the property model is being encoded. */ From ec7d7e947f2c41206d713684bf7875f492727248 Mon Sep 17 00:00:00 2001 From: Nan Date: Thu, 20 Jun 2024 09:15:41 -0700 Subject: [PATCH 5/8] Add dispatch queue to User Executor * Synchronize access to the request queues * Add test `testUserExecutorConcurrency` that reproduced crash that is then fixed by the changes here. * Note that concurrent access in this executor should be mitigated already by the executor only sending one request at a time. --- .../Source/Executors/OSUserExecutor.swift | 236 +++++++++--------- .../OneSignalUserTests.swift | 39 +++ 2 files changed, 163 insertions(+), 112 deletions(-) diff --git a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift index a65ed16d0..c9983ca6a 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift @@ -37,84 +37,90 @@ class OSUserExecutor { static var userRequestQueue: [OSUserRequest] = [] static var transferSubscriptionRequestQueue: [OSRequestTransferSubscription] = [] + // The User executor dispatch queue, serial. This synchronizes access to the request queues. + private static let dispatchQueue = DispatchQueue(label: "OneSignal.OSUserExecutor", target: .global()) + // Read in requests from the cache, do not read in FetchUser requests as this is not needed. static func start() { - var userRequestQueue: [OSUserRequest] = [] - - // Read unfinished Create User + Identify User + Get Identity By Subscription requests from cache, if any... - if let cachedRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSUserRequest] { - // Hook each uncached Request to the right model reference - for request in cachedRequestQueue { - if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let req = request as? OSRequestFetchIdentityBySubscription { - if let identityModel = getIdentityModel(req.identityModel.modelId) { - // 1. The model exist in the repo, set it to be the Request's model - // It is the current user or the model has already been processed - req.identityModel = identityModel - } else { - // 2. The model do not exist, use the model on the request, and add to repo. - addIdentityModel(req.identityModel) - } - userRequestQueue.append(req) - - } else if request.isKind(of: OSRequestCreateUser.self), let req = request as? OSRequestCreateUser { - if let identityModel = getIdentityModel(req.identityModel.modelId) { - // 1. The model exist in the repo, set it to be the Request's model - req.identityModel = identityModel - } else { - // 2. The models do not exist, use the model on the request, and add to repo. - addIdentityModel(req.identityModel) - } - userRequestQueue.append(req) - - } else if request.isKind(of: OSRequestIdentifyUser.self), let req = request as? OSRequestIdentifyUser { - - if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId), - let identityModelToUpdate = getIdentityModel(req.identityModelToUpdate.modelId) { - // 1. Both models exist in the repo, set it to be the Request's models - req.identityModelToIdentify = identityModelToIdentify - req.identityModelToUpdate = identityModelToUpdate - } else if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId), - getIdentityModel(req.identityModelToUpdate.modelId) == nil { - // 2. A model is in the repo, the other model does not exist - req.identityModelToIdentify = identityModelToIdentify - addIdentityModel(req.identityModelToUpdate) - } else { - // 3. Both models don't exist yet - // Drop the request if the identityModelToIdentify does not already exist AND the request is missing OSID - // Otherwise, this request will forever fail `prepareForExecution` and block pending requests such as recovery calls to `logout` or `login` - guard request.prepareForExecution() else { - OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)") - continue + self.dispatchQueue.async { + var userRequestQueue: [OSUserRequest] = [] + + // Read unfinished Create User + Identify User + Get Identity By Subscription requests from cache, if any... + if let cachedRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSUserRequest] { + // Hook each uncached Request to the right model reference + for request in cachedRequestQueue { + if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let req = request as? OSRequestFetchIdentityBySubscription { + if let identityModel = getIdentityModel(req.identityModel.modelId) { + // 1. The model exist in the repo, set it to be the Request's model + // It is the current user or the model has already been processed + req.identityModel = identityModel + } else { + // 2. The model do not exist, use the model on the request, and add to repo. + addIdentityModel(req.identityModel) + } + userRequestQueue.append(req) + + } else if request.isKind(of: OSRequestCreateUser.self), let req = request as? OSRequestCreateUser { + if let identityModel = getIdentityModel(req.identityModel.modelId) { + // 1. The model exist in the repo, set it to be the Request's model + req.identityModel = identityModel + } else { + // 2. The models do not exist, use the model on the request, and add to repo. + addIdentityModel(req.identityModel) + } + userRequestQueue.append(req) + + } else if request.isKind(of: OSRequestIdentifyUser.self), let req = request as? OSRequestIdentifyUser { + + if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId), + let identityModelToUpdate = getIdentityModel(req.identityModelToUpdate.modelId) { + // 1. Both models exist in the repo, set it to be the Request's models + req.identityModelToIdentify = identityModelToIdentify + req.identityModelToUpdate = identityModelToUpdate + } else if let identityModelToIdentify = getIdentityModel(req.identityModelToIdentify.modelId), + getIdentityModel(req.identityModelToUpdate.modelId) == nil { + // 2. A model is in the repo, the other model does not exist + req.identityModelToIdentify = identityModelToIdentify + addIdentityModel(req.identityModelToUpdate) + } else { + // 3. Both models don't exist yet + // Drop the request if the identityModelToIdentify does not already exist AND the request is missing OSID + // Otherwise, this request will forever fail `prepareForExecution` and block pending requests such as recovery calls to `logout` or `login` + guard request.prepareForExecution() else { + OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)") + continue + } + addIdentityModel(req.identityModelToIdentify) + addIdentityModel(req.identityModelToUpdate) } - addIdentityModel(req.identityModelToIdentify) - addIdentityModel(req.identityModelToUpdate) + userRequestQueue.append(req) } - userRequestQueue.append(req) } } - } - self.userRequestQueue = userRequestQueue - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue) - // Read unfinished Transfer Subscription requests from cache, if any... - if let transferSubscriptionRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSRequestTransferSubscription] { - // We only care about the last transfer subscription request - if let request = transferSubscriptionRequestQueue.last { - // Hook the uncached Request to the model in the store - if request.subscriptionModel.modelId == OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel.modelId { - // The model exist, set it to be the Request's model - request.subscriptionModel = OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel - self.transferSubscriptionRequestQueue = [request] - } else if !request.prepareForExecution() { - // The model do not exist AND this request cannot be sent, drop this Request - OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)") - self.transferSubscriptionRequestQueue = [] + self.userRequestQueue = userRequestQueue + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue) + // Read unfinished Transfer Subscription requests from cache, if any... + if let transferSubscriptionRequestQueue = OneSignalUserDefaults.initShared().getSavedCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, defaultValue: []) as? [OSRequestTransferSubscription] { + // We only care about the last transfer subscription request + if let request = transferSubscriptionRequestQueue.last { + // Hook the uncached Request to the model in the store + if request.subscriptionModel.modelId == OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel.modelId { + // The model exist, set it to be the Request's model + request.subscriptionModel = OneSignalUserManagerImpl.sharedInstance.user.pushSubscriptionModel + self.transferSubscriptionRequestQueue = [request] + } else if !request.prepareForExecution() { + // The model do not exist AND this request cannot be sent, drop this Request + OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor.start() dropped: \(request)") + self.transferSubscriptionRequestQueue = [] + } } + } else { + OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor error encountered reading from cache for \(OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY)") } - } else { - OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor error encountered reading from cache for \(OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY)") + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue) + + executePendingRequests() } - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue) - executePendingRequests() } static private func getIdentityModel(_ modelId: String) -> OSIdentityModel? { @@ -126,61 +132,67 @@ class OSUserExecutor { } static func appendToQueue(_ request: OSUserRequest) { - if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription { - self.transferSubscriptionRequestQueue.append(req) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue) - } else { - self.userRequestQueue.append(request) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue) + self.dispatchQueue.async { + if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription { + self.transferSubscriptionRequestQueue.append(req) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue) + } else { + self.userRequestQueue.append(request) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue) + } } } static func removeFromQueue(_ request: OSUserRequest) { - if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription { - transferSubscriptionRequestQueue.removeAll(where: { $0 == req}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue) - } else { - userRequestQueue.removeAll(where: { $0 == request}) - OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue) + self.dispatchQueue.async { + if request.isKind(of: OSRequestTransferSubscription.self), let req = request as? OSRequestTransferSubscription { + transferSubscriptionRequestQueue.removeAll(where: { $0 == req}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_TRANSFER_SUBSCRIPTION_REQUEST_QUEUE_KEY, withValue: self.transferSubscriptionRequestQueue) + } else { + userRequestQueue.removeAll(where: { $0 == request}) + OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.userRequestQueue) + } } } static func executePendingRequests() { - let requestQueue: [OSUserRequest] = userRequestQueue + transferSubscriptionRequestQueue - OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSUserExecutor.executePendingRequests called with queue \(requestQueue)") - - if requestQueue.isEmpty { - return - } + self.dispatchQueue.async { + let requestQueue: [OSUserRequest] = userRequestQueue + transferSubscriptionRequestQueue + OneSignalLog.onesignalLog(.LL_VERBOSE, message: "OSUserExecutor.executePendingRequests called with queue \(requestQueue)") - // Sort the requestQueue by timestamp - for request in requestQueue.sorted(by: { first, second in - return first.timestamp < second.timestamp - }) { - // Return as soon as we reach an un-executable request - if !request.prepareForExecution() { - OneSignalLog.onesignalLog(.LL_WARN, message: "OSUserExecutor.executePendingRequests() is blocked by unexecutable request \(request)") + if requestQueue.isEmpty { return } - if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let fetchIdentityRequest = request as? OSRequestFetchIdentityBySubscription { - executeFetchIdentityBySubscriptionRequest(fetchIdentityRequest) - return - } else if request.isKind(of: OSRequestCreateUser.self), let createUserRequest = request as? OSRequestCreateUser { - executeCreateUserRequest(createUserRequest) - return - } else if request.isKind(of: OSRequestIdentifyUser.self), let identifyUserRequest = request as? OSRequestIdentifyUser { - executeIdentifyUserRequest(identifyUserRequest) - return - } else if request.isKind(of: OSRequestTransferSubscription.self), let transferSubscriptionRequest = request as? OSRequestTransferSubscription { - executeTransferPushSubscriptionRequest(transferSubscriptionRequest) - return - } else if request.isKind(of: OSRequestFetchUser.self), let fetchUserRequest = request as? OSRequestFetchUser { - executeFetchUserRequest(fetchUserRequest) - return - } else { - // Log Error - OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor met incompatible Request type that cannot be executed.") + // Sort the requestQueue by timestamp + for request in requestQueue.sorted(by: { first, second in + return first.timestamp < second.timestamp + }) { + // Return as soon as we reach an un-executable request + if !request.prepareForExecution() { + OneSignalLog.onesignalLog(.LL_WARN, message: "OSUserExecutor.executePendingRequests() is blocked by unexecutable request \(request)") + return + } + + if request.isKind(of: OSRequestFetchIdentityBySubscription.self), let fetchIdentityRequest = request as? OSRequestFetchIdentityBySubscription { + executeFetchIdentityBySubscriptionRequest(fetchIdentityRequest) + return + } else if request.isKind(of: OSRequestCreateUser.self), let createUserRequest = request as? OSRequestCreateUser { + executeCreateUserRequest(createUserRequest) + return + } else if request.isKind(of: OSRequestIdentifyUser.self), let identifyUserRequest = request as? OSRequestIdentifyUser { + executeIdentifyUserRequest(identifyUserRequest) + return + } else if request.isKind(of: OSRequestTransferSubscription.self), let transferSubscriptionRequest = request as? OSRequestTransferSubscription { + executeTransferPushSubscriptionRequest(transferSubscriptionRequest) + return + } else if request.isKind(of: OSRequestFetchUser.self), let fetchUserRequest = request as? OSRequestFetchUser { + executeFetchUserRequest(fetchUserRequest) + return + } else { + // Log Error + OneSignalLog.onesignalLog(.LL_ERROR, message: "OSUserExecutor met incompatible Request type that cannot be executed.") + } } } } diff --git a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift index cfb70164a..3bf05db47 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift @@ -175,6 +175,45 @@ final class OneSignalUserTests: XCTestCase { XCTAssertTrue(client.allRequestsHandled) } + /** + This test aims to ensure concurrency safety in the User Executor. + It is possible for two threads to modify and cache queues concurrently. + Currently, this executor only allows one request to send at a time, which should prevent concurrent access. + But out of caution and future-proofing, this test is added. + */ + func testUserExecutorConcurrency() throws { + /* Setup */ + + let client = MockOneSignalClient() + // Ensure all requests fire the executor's callback so it will modify queues and cache it + client.fireSuccessForAllRequests = true + OneSignalCoreImpl.setSharedClient(client) + + let identityModel1 = OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()) + let identityModel2 = OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()) + + OSUserExecutor.start() + + /* When */ + + DispatchQueue.concurrentPerform(iterations: 50) { _ in + let identifyRequest = OSRequestIdentifyUser(aliasLabel: OS_EXTERNAL_ID, aliasId: UUID().uuidString, identityModelToIdentify: identityModel1, identityModelToUpdate: identityModel2) + let fetchRequest = OSRequestFetchUser(identityModel: identityModel1, aliasLabel: OS_ONESIGNAL_ID, aliasId: UUID().uuidString, onNewSession: false) + + // Append and execute requests simultaneously + OSUserExecutor.appendToQueue(identifyRequest) + OSUserExecutor.appendToQueue(fetchRequest) + OSUserExecutor.executeIdentifyUserRequest(identifyRequest) + OSUserExecutor.executeFetchUserRequest(fetchRequest) + } + + // Run background threads + OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5) + + /* Then */ + // No crash + } + /** This test reproduced a crash when the property model is being encoded. */ From ca7fafadee9751c482eb53c151e5e69a9060758b Mon Sep 17 00:00:00 2001 From: Nan Date: Fri, 21 Jun 2024 11:51:39 -0700 Subject: [PATCH 6/8] [tests] add a test for Property Executor Concurrency * Add test `testPropertyExecutorConcurrency` --- .../OneSignalUserTests.swift | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift index 3bf05db47..7d2cd01a7 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUserTests/OneSignalUserTests.swift @@ -175,6 +175,44 @@ final class OneSignalUserTests: XCTestCase { XCTAssertTrue(client.allRequestsHandled) } + /** + This test aims to ensure concurrency safety in the Property Executor. + It is possible for two threads to modify and cache queues concurrently. + */ + func testPropertyExecutorConcurrency() throws { + /* Setup */ + let client = MockOneSignalClient() + // Ensure all requests fire the executor's callback so it will modify queues and cache it + client.fireSuccessForAllRequests = true + OneSignalCoreImpl.setSharedClient(client) + + let identityModel = OSIdentityModel(aliases: [OS_ONESIGNAL_ID: UUID().uuidString], changeNotifier: OSEventProducer()) + OneSignalUserManagerImpl.sharedInstance.addIdentityModelToRepo(identityModel) + + let executor = OSPropertyOperationExecutor() + OSOperationRepo.sharedInstance.addExecutor(executor) + + /* When */ + + DispatchQueue.concurrentPerform(iterations: 50) { _ in + // 1. Enqueue Deltas to the Operation Repo + OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_UPDATE_PROPERTIES_DELTA, identityModelId: identityModel.modelId, model: OSPropertiesModel(changeNotifier: OSEventProducer()), property: "language", value: UUID().uuidString)) + OSOperationRepo.sharedInstance.enqueueDelta(OSDelta(name: OS_UPDATE_PROPERTIES_DELTA, identityModelId: identityModel.modelId, model: OSPropertiesModel(changeNotifier: OSEventProducer()), property: "language", value: UUID().uuidString)) + + // 2. Flush Operation Repo + OSOperationRepo.sharedInstance.addFlushDeltaQueueToDispatchQueue() + + // 3. Simulate updating the executor's request queue from a network response + executor.executeUpdatePropertiesRequest(OSRequestUpdateProperties(params: ["properties": ["language": UUID().uuidString], "refresh_device_metadata": false], identityModel: identityModel), inBackground: false) + } + + // 4. Run background threads + OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5) + + /* Then */ + // No crash + } + /** This test aims to ensure concurrency safety in the User Executor. It is possible for two threads to modify and cache queues concurrently. From a90478c611685cff2d3e885f6088e1435cca62c7 Mon Sep 17 00:00:00 2001 From: Nan Date: Fri, 21 Jun 2024 12:06:22 -0700 Subject: [PATCH 7/8] [nit] Break up User Executor code * swiftlint error: Type Body Length Violation: Type body should span 350 lines or less excluding comments and whitespace: currently spans 354 lines (type_body_length) * No logic changes in this commit, only split up executing requests code into an extension --- .../OneSignalUser/Source/Executors/OSUserExecutor.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift index c9983ca6a..c5eb2a5c4 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalUser/Source/Executors/OSUserExecutor.swift @@ -196,7 +196,10 @@ class OSUserExecutor { } } } +} +// MARK: - Execution +extension OSUserExecutor { // We will pass minimal properties to this request static func createUser(_ user: OSUserInternal) { let originalPushToken = user.pushSubscriptionModel.address From 8248762b85a270532312b0bf8c0e0a76e9bdc6f6 Mon Sep 17 00:00:00 2001 From: Nan Date: Tue, 25 Jun 2024 10:48:04 -0700 Subject: [PATCH 8/8] [nit] run swiflint --- .../MockOSDispatchQueue.swift | 6 +++--- .../OneSignalCoreMocks/OneSignalCoreMocks.swift | 16 ++++++++-------- .../OneSignalNotificationsTests.swift | 17 ++++++++--------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOSDispatchQueue.swift b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOSDispatchQueue.swift index b02525282..842dd1be8 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOSDispatchQueue.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/MockOSDispatchQueue.swift @@ -30,7 +30,7 @@ import OneSignalOSCore public class MockDispatchQueue: OSDispatchQueue { let requestDispatch = DispatchQueue(label: "MockDispatchQueue") var numDispatches = 0 - + public init() {} public func async(execute work: @escaping @convention(block) () -> Void) { @@ -46,8 +46,8 @@ public class MockDispatchQueue: OSDispatchQueue { self.numDispatches += 1 } } - - public func waitForDispatches(_ numDispatches: Int) -> Void { + + public func waitForDispatches(_ numDispatches: Int) { while self.numDispatches < numDispatches { requestDispatch.sync { Thread.sleep(forTimeInterval: TimeInterval(1)) diff --git a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/OneSignalCoreMocks.swift b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/OneSignalCoreMocks.swift index 4e452adeb..75276eb68 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/OneSignalCoreMocks.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalCoreMocks/OneSignalCoreMocks.swift @@ -51,9 +51,9 @@ public class OneSignalCoreMocks: NSObject { let expectation = XCTestExpectation(description: "Wait for \(seconds) seconds") _ = XCTWaiter.wait(for: [expectation], timeout: seconds) } - + @objc public static func backgroundApp() { - if (OSBundleUtils.isAppUsingUIScene()) { + if OSBundleUtils.isAppUsingUIScene() { if #available(iOS 13.0, *) { NotificationCenter.default.post(name: UIScene.willDeactivateNotification, object: nil) NotificationCenter.default.post(name: UIScene.didEnterBackgroundNotification, object: nil) @@ -63,9 +63,9 @@ public class OneSignalCoreMocks: NSObject { NotificationCenter.default.post(name: UIApplication.didEnterBackgroundNotification, object: nil) } } - + @objc public static func foregroundApp() { - if (OSBundleUtils.isAppUsingUIScene()) { + if OSBundleUtils.isAppUsingUIScene() { if #available(iOS 13.0, *) { NotificationCenter.default.post(name: UIScene.willEnterForegroundNotification, object: nil) NotificationCenter.default.post(name: UIScene.didActivateNotification, object: nil) @@ -75,9 +75,9 @@ public class OneSignalCoreMocks: NSObject { NotificationCenter.default.post(name: UIApplication.didBecomeActiveNotification, object: nil) } } - + @objc public static func resignActive() { - if (OSBundleUtils.isAppUsingUIScene()) { + if OSBundleUtils.isAppUsingUIScene() { if #available(iOS 13.0, *) { NotificationCenter.default.post(name: UIScene.willDeactivateNotification, object: nil) } @@ -85,9 +85,9 @@ public class OneSignalCoreMocks: NSObject { NotificationCenter.default.post(name: UIApplication.willResignActiveNotification, object: nil) } } - + @objc public static func becomeActive() { - if (OSBundleUtils.isAppUsingUIScene()) { + if OSBundleUtils.isAppUsingUIScene() { if #available(iOS 13.0, *) { NotificationCenter.default.post(name: UIScene.didActivateNotification, object: nil) } diff --git a/iOS_SDK/OneSignalSDK/OneSignalNotificationsTests/OneSignalNotificationsTests.swift b/iOS_SDK/OneSignalSDK/OneSignalNotificationsTests/OneSignalNotificationsTests.swift index e452ae22e..e2f84df9d 100644 --- a/iOS_SDK/OneSignalSDK/OneSignalNotificationsTests/OneSignalNotificationsTests.swift +++ b/iOS_SDK/OneSignalSDK/OneSignalNotificationsTests/OneSignalNotificationsTests.swift @@ -12,7 +12,7 @@ import OneSignalCoreMocks import UIKit final class OneSignalNotificationsTests: XCTestCase { - + var notifTypes: Int32 = 0 var token: String = "" @@ -38,7 +38,7 @@ final class OneSignalNotificationsTests: XCTestCase { // Ensure that badge count == 0 XCTAssertEqual(UIApplication.shared.applicationIconBadgeNumber, 0) } - + func testDontclearBadgesWhenAppBecomesActive() throws { // NotificationManager Start to register lifecycle listener OSNotificationsManager.start() @@ -51,25 +51,24 @@ final class OneSignalNotificationsTests: XCTestCase { // Ensure that badge count == 0 XCTAssertEqual(UIApplication.shared.applicationIconBadgeNumber, 1) } - + func testUpdateNotificationTypesOnAppEntersForeground() throws { // NotificationManager Start to register lifecycle listener OSNotificationsManager.start() - + OSNotificationsManager.delegate = self - + XCTAssertEqual(self.notifTypes, 0) - + // Then background the app OneSignalCoreMocks.backgroundApp() - + // Foreground the app for within 30 seconds OneSignalCoreMocks.foregroundApp() - + // Ensure that the delegate is updated with the new notification type XCTAssertEqual(self.notifTypes, ERROR_PUSH_NEVER_PROMPTED) } - }