diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index ff5b8ca9..850d963f 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -2,7 +2,7 @@ org = "ballerinai" name = "transaction" version = "@toml.version@" -distribution = "2201.8.0" +distribution = "2201.8.5" [platform.java17] graalvmCompatible = true diff --git a/gradle.properties b/gradle.properties index 8315064b..6325383c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,9 +1,9 @@ org.gradle.caching=true puppycrawlCheckstyleVersion=10.12.0 group=org.ballerinalang -version=1.8.0 +version=1.8.0-SNAPSHOT -ballerinaLangVersion=2201.8.0 +ballerinaLangVersion=2201.8.5 stdlibIoVersion=1.6.0 stdlibConstraintVersion=1.4.0 stdlibOsVersion=1.8.0 diff --git a/transaction-ballerina/Ballerina.toml b/transaction-ballerina/Ballerina.toml index 78208b51..65ac94f4 100644 --- a/transaction-ballerina/Ballerina.toml +++ b/transaction-ballerina/Ballerina.toml @@ -2,13 +2,13 @@ org = "ballerinai" name = "transaction" version = "0.0.0" -distribution = "2201.8.0" +distribution = "2201.8.5" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] artifactId = "transaction" -version = "1.8.0" -path = "../transaction-native/build/libs/transaction-native-1.8.0.jar" +version = "1.8.0-SNAPSHOT" +path = "../transaction-native/build/libs/transaction-native-1.8.0-SNAPSHOT.jar" groupId = "ballerina" diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index c7a1759c..e4818992 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.8.0" +distribution-version = "2201.8.5" [[package]] org = "ballerina" @@ -22,7 +22,7 @@ dependencies = [ [[package]] org = "ballerina" name = "cache" -version = "3.7.0" +version = "3.7.1" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "jballerina.java"}, @@ -64,7 +64,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.0" +version = "2.10.7" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, @@ -270,7 +270,7 @@ dependencies = [ [[package]] org = "ballerina" name = "observe" -version = "1.2.0" +version = "1.2.2" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] diff --git a/transaction-ballerina/commons.bal b/transaction-ballerina/commons.bal index 906388e3..acb11e78 100644 --- a/transaction-ballerina/commons.bal +++ b/transaction-ballerina/commons.bal @@ -13,15 +13,14 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - import ballerina/cache; import ballerina/http; +import ballerina/lang.'transaction as lang_trx; +import ballerina/lang.'value as value; import ballerina/log; -import ballerina/uuid; import ballerina/task; import ballerina/time; -import ballerina/lang.'transaction as lang_trx; -import ballerina/lang.'value as value; +import ballerina/uuid; # ID of the local participant used when registering with the initiator. string localParticipantId = uuid:createType4AsString(); @@ -30,7 +29,8 @@ string localParticipantId = uuid:createType4AsString(); map initiatedTransactions = {}; # This map is used for caching transaction that are this Ballerina instance participates in. -@tainted map participatedTransactions = {}; +@tainted +map participatedTransactions = {}; # This cache is used for caching HTTP connectors against the URL, since creating connectors is expensive. cache:Cache httpClientCache = new; @@ -55,8 +55,8 @@ function cleanupTransactions() returns error? { while (i < participatedTransactionsArr.length()) { var twopcTxn = participatedTransactionsArr[i]; i += 1; - //TODO: commenting due to a caching issue - //foreach var twopcTxn in participatedTransactions { + //TODO: commenting due to a caching issue + //foreach var twopcTxn in participatedTransactions { final string participatedTxnId = getParticipatedTransactionId(twopcTxn.transactionId, twopcTxn.transactionBlockId); if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 120d) { @@ -85,20 +85,23 @@ function cleanupTransactions() returns error? { } } } - if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { + if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { // We don't want dead transactions hanging around removeParticipatedTransaction(participatedTxnId); } } } worker w2 returns () { - TwoPhaseCommitTransaction[] initiatedTransactionsArr = initiatedTransactions.toArray(); + TwoPhaseCommitTransaction[] initiatedTransactionsArr; + lock { + initiatedTransactionsArr = initiatedTransactions.toArray(); + } int i = 0; - while(i < initiatedTransactionsArr.length()) { + while (i < initiatedTransactionsArr.length()) { var twopcTxn = initiatedTransactionsArr[i]; i += 1; - //TODO:commenting due to a caching issue - //foreach var twopcTxn in initiatedTransactions { + //TODO:commenting due to a caching issue + //foreach var twopcTxn in initiatedTransactions { if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 120) { if (twopcTxn.state != TXN_STATE_ABORTED) { // Commit the transaction since prepare hasn't been received @@ -114,7 +117,7 @@ function cleanupTransactions() returns error? { } } } - if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { + if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { // We don't want dead transactions hanging around removeInitiatedTransaction(twopcTxn.transactionId); } @@ -125,7 +128,6 @@ function cleanupTransactions() returns error? { return value; } - function isRegisteredParticipant(string participantId, map participants) returns boolean { return participants.hasKey(participantId); } @@ -135,8 +137,8 @@ function isValidCoordinationType(string coordinationType) returns boolean { while (i < coordinationTypes.length()) { var coordType = coordinationTypes[i]; i += 1; - //TODO:commenting due to caching issue; - //foreach var coordType in coordinationTypes { + //TODO:commenting due to caching issue; + //foreach var coordType in coordinationTypes { if (coordinationType == coordType) { return true; } @@ -148,7 +150,7 @@ function protoName(UProtocol p) returns string { if (p is RemoteProtocol) { return p.name; } else { - return p.name; + return p.name; } } @@ -156,19 +158,19 @@ function protocolCompatible(string coordinationType, UProtocol?[] participantPro boolean participantProtocolIsValid = false; string[] validProtocols = coordinationTypeToProtocolsMap[coordinationType] ?: []; int i = 0; - while ( i < participantProtocols.length()) { + while (i < participantProtocols.length()) { var p = participantProtocols[i]; i += 1; - //TODO: commenting due to a caching issue - //foreach var p in participantProtocols { + //TODO: commenting due to a caching issue + //foreach var p in participantProtocols { if (p is UProtocol) { UProtocol participantProtocol = p; int j = 0; while (j < validProtocols.length()) { var validProtocol = validProtocols[j]; j += 1; - //TODO: commenting due to a caching issue - //foreach var validProtocol in validProtocols { + //TODO: commenting due to a caching issue + //foreach var validProtocol in validProtocols { if (protoName(participantProtocol) == validProtocol) { participantProtocolIsValid = true; break; @@ -188,11 +190,12 @@ type JsonTypedesc typedesc; function respondToBadRequest(http:Caller ep, string msg) { log:printError(msg); - http:Response res = new; res.statusCode = http:STATUS_BAD_REQUEST; - RequestError requestError = {errorMessage:msg}; + http:Response res = new; + res.statusCode = http:STATUS_BAD_REQUEST; + RequestError requestError = {errorMessage: msg}; var resPayload = requestError.cloneWithType(JsonTypedesc); if (resPayload is json) { - res.setJsonPayload(<@untainted json> resPayload); + res.setJsonPayload(<@untainted json>resPayload); var resResult = ep->respond(res); if (resResult is error) { log:printError("Could not send Bad Request error response to caller", 'error = resResult); @@ -220,7 +223,7 @@ function getParticipantProtocolAt(string protocolName, string transactionBlockId # corresponding to the coordinationType will also be created and stored as an initiated transaction. # # + coordinationType - The type of the coordination relevant to the transaction block for which this TransactionContext -# is being created for. +# is being created for. # + transactionBlockId - The ID of the transaction block. # + return - TransactionContext if the coordination type is valid or an error in case of an invalid coordination type. function createTransactionContext(string coordinationType, string transactionBlockId) returns TransactionContext|error { @@ -229,15 +232,17 @@ function createTransactionContext(string coordinationType, string transactionBlo error err = error(msg); return err; } else { - TwoPhaseCommitTransaction txn = new(uuid(), transactionBlockId, coordinationType = coordinationType); + TwoPhaseCommitTransaction txn = new (uuid(), transactionBlockId, coordinationType = coordinationType); string txnId = txn.transactionId; txn.isInitiated = true; - initiatedTransactions[txnId] = txn; + lock { + initiatedTransactions[txnId] = txn; + } TransactionContext txnContext = { - transactionId:txnId, - transactionBlockId:transactionBlockId, - coordinationType:coordinationType, - registerAtURL:"http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) + + transactionId: txnId, + transactionBlockId: transactionBlockId, + coordinationType: coordinationType, + registerAtURL: "http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) + initiatorCoordinatorBasePath + "/" + transactionBlockId + registrationPath }; return txnContext; @@ -254,20 +259,24 @@ function createTransactionContext(string coordinationType, string transactionBlo # + return - TransactionContext if the registration is successul or an error in case of a failure. function registerLocalParticipantWithInitiator(string transactionId, string transactionBlockId, string registerAtURL) returns TransactionContext|error { - final string trxId = transactionId; final string participantId = getParticipantId(transactionBlockId); //TODO: Protocol name should be passed down from the transaction statement - LocalProtocol participantProtocol = {name:PROTOCOL_DURABLE}; - var initiatedTxn = initiatedTransactions[transactionId]; + LocalProtocol participantProtocol = {name: PROTOCOL_DURABLE}; + TwoPhaseCommitTransaction? initiatedTxn; + lock { + initiatedTxn = initiatedTransactions[transactionId]; + } if (initiatedTxn is ()) { return error lang_trx:Error("Transaction-Unknown. Invalid TID:" + transactionId); } else { if (isRegisteredParticipant(participantId, initiatedTxn.participants)) { // Already-Registered log:printDebug("Already-Registered. TID:" + trxId + ", participant ID:" + participantId); TransactionContext txnCtx = { - transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL }; return txnCtx; } else if (!protocolCompatible(initiatedTxn.coordinationType, [participantProtocol])) { // Invalid-Protocol @@ -275,17 +284,21 @@ function registerLocalParticipantWithInitiator(string transactionId, string tran participantId); } else { //Set initiator protocols - TwoPhaseCommitTransaction participatedTxn = new(transactionId, transactionBlockId); + TwoPhaseCommitTransaction participatedTxn = new (transactionId, transactionBlockId); //Protocol initiatorProto = {name: PROTOCOL_DURABLE, transactionBlockId:transactionBlockId}; //participatedTxn.coordinatorProtocols = [initiatorProto]; - LocalParticipant participant = new(participantId, participatedTxn, [participantProtocol]); + LocalParticipant participant = new (participantId, participatedTxn, [participantProtocol]); initiatedTxn.participants[participantId] = participant; string participatedTxnId = getParticipatedTransactionId(transactionId, transactionBlockId); participatedTransactions[participatedTxnId] = participatedTxn; - TransactionContext txnCtx = {transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL}; + TransactionContext txnCtx = { + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL + }; log:printDebug("Registered local participant: " + participantId + " for transaction:" + trxId); return txnCtx; } @@ -299,24 +312,34 @@ function removeParticipatedTransaction(string participatedTxnId) { } } +function hasInitiatedTransaction(string transactionId) returns boolean { + lock { + return initiatedTransactions.hasKey(transactionId); + } +} + function removeInitiatedTransaction(string transactionId) { - var removed = trap initiatedTransactions.remove(transactionId); - if (removed is error) { - panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed"); + lock { + var removed = trap initiatedTransactions.remove(transactionId); + if (removed is error) { + panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed"); + } } } function getInitiatorClient(string registerAtURL) returns InitiatorClientEP { InitiatorClientEP initiatorEP; if (httpClientCache.hasKey(registerAtURL)) { - return checkpanic httpClientCache.get(registerAtURL); + return checkpanic httpClientCache.get(registerAtURL); } else { lock { if (httpClientCache.hasKey(registerAtURL)) { - return checkpanic httpClientCache.get(registerAtURL); + return checkpanic httpClientCache.get(registerAtURL); } - initiatorEP = new({ registerAtURL: registerAtURL, timeout: 15, - retryConfig: { count: 2, interval: 5 } + initiatorEP = new ({ + registerAtURL: registerAtURL, + timeout: 15, + retryConfig: {count: 2, interval: 5} }); cache:Error? result = httpClientCache.put(registerAtURL, initiatorEP); if (result is cache:Error) { @@ -330,15 +353,17 @@ function getInitiatorClient(string registerAtURL) returns InitiatorClientEP { function getParticipant2pcClient(string participantURL) returns Participant2pcClientEP { Participant2pcClientEP participantEP; - if (httpClientCache.hasKey(<@untainted> participantURL)) { - return checkpanic httpClientCache.get(<@untainted>participantURL); + if (httpClientCache.hasKey(<@untainted>participantURL)) { + return checkpanic httpClientCache.get(<@untainted>participantURL); } else { lock { - if (httpClientCache.hasKey(<@untainted> participantURL)) { - return checkpanic httpClientCache.get(<@untainted>participantURL); + if (httpClientCache.hasKey(<@untainted>participantURL)) { + return checkpanic httpClientCache.get(<@untainted>participantURL); } - participantEP = new({ participantURL: participantURL, - timeout: 15, retryConfig: { count: 2, interval: 5 } + participantEP = new ({ + participantURL: participantURL, + timeout: 15, + retryConfig: {count: 2, interval: 5} }); cache:Error? result = httpClientCache.put(participantURL, participantEP); if (result is cache:Error) { @@ -352,13 +377,13 @@ function getParticipant2pcClient(string participantURL) returns Participant2pcCl # Registers a participant with the initiator's coordinator. This function will be called by the participant. # -# + transactionId - Global transaction ID to which this participant is registering with. +# + transactionId - Global transaction ID to which this participant is registering with. # + transactionBlockId - The local ID of the transaction block on the participant. # + registerAtURL - The URL of the coordinator. # + participantProtocols - The coordination protocals supported by the participant. # + return - TransactionContext if the registration is successful or an error in case of a failure. function registerParticipantWithRemoteInitiator(string transactionId, string transactionBlockId, - string registerAtURL, RemoteProtocol[] participantProtocols) + string registerAtURL, RemoteProtocol[] participantProtocols) returns TransactionContext|error { InitiatorClientEP initiatorEP = getInitiatorClient(registerAtURL); @@ -368,8 +393,10 @@ function registerParticipantWithRemoteInitiator(string transactionId, string tra if (participatedTransactions.hasKey(participatedTxnId)) { log:printDebug("Already registered with initiator for transaction:" + participatedTxnId); TransactionContext txnCtx = { - transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL }; return txnCtx; } @@ -385,12 +412,14 @@ function registerParticipantWithRemoteInitiator(string transactionId, string tra return error lang_trx:Error(msg); } else { RemoteProtocol[] coordinatorProtocols = result.coordinatorProtocols; - TwoPhaseCommitTransaction twopcTxn = new(transactionId, transactionBlockId); + TwoPhaseCommitTransaction twopcTxn = new (transactionId, transactionBlockId); twopcTxn.coordinatorProtocols = toProtocolArray(coordinatorProtocols); participatedTransactions[participatedTxnId] = twopcTxn; TransactionContext txnCtx = { - transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL }; final string trxId = transactionId; log:printDebug("Registered with coordinator for transaction: " + trxId); diff --git a/transaction-ballerina/tests/transaction_concurrency.bal b/transaction-ballerina/tests/transaction_concurrency.bal new file mode 100644 index 00000000..1eaaaa8c --- /dev/null +++ b/transaction-ballerina/tests/transaction_concurrency.bal @@ -0,0 +1,97 @@ +// Copyright (c) 2023 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/lang.runtime; +import ballerina/task; +import ballerina/test; + +isolated map taskCounterMap = {"A": 0, "B": 0, "C": 0, "D": 0, "E": 0}; + +public isolated function performTransaction() returns error? { + transaction { + check commit; + } +} + +public isolated class ExecuteTask { + + *task:Job; + private int counter = 0; + private final string name; + + function init(string name) { + self.name = name; + } + + public isolated function execute() { + int count = 0; + lock { + count = self.counter.cloneReadOnly(); + if count >= 100 { + return; + } + self.counter += 1; + } + + var err = trap performTransaction(); + + if !(err is ()) { + test:assertFail(string`Error in task " ${self.name} : ${err.toString()}`); + } + + lock { + int i = taskCounterMap.get(self.name); + taskCounterMap[self.name] = (i + 1); + } + } + + public isolated function scheduleTaskExecution(decimal interval) { + do { + _ = check task:scheduleJobRecurByFrequency(self, interval); + } on fail error err { + test:assertFail(string`Error in scheduling task ${self.name}: ${err.toString()}`); + } + } +} + +public function scheduleTasks() returns error? { + ExecuteTask[] tasks = []; + string[] taskNames = ["A", "B", "C", "D", "E"]; + decimal interval = 0.1; // 100 milliseconds + + from string taskName in taskNames + do { + tasks.push(new ExecuteTask(taskName)); + }; + + from ExecuteTask task in tasks + do { + task.scheduleTaskExecution(interval); + }; + + runtime:sleep(10); // Sleep to allow tasks to run +} + +@test:Config { + before: scheduleTasks +} +public function testTransactionConcurrency() { + map expectedCountsMap = {"A": 100, "B": 100, "C": 100, "D": 100, "E": 100}; + map actualCountsMap; + lock { + actualCountsMap = taskCounterMap.cloneReadOnly(); + } + test:assertEquals(actualCountsMap, expectedCountsMap, "Transaction concurrency test failed."); +} diff --git a/transaction-ballerina/transaction_block.bal b/transaction-ballerina/transaction_block.bal index 8ccc922b..12c7fd31 100644 --- a/transaction-ballerina/transaction_block.bal +++ b/transaction-ballerina/transaction_block.bal @@ -13,7 +13,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - import ballerina/jballerina.java; import ballerina/lang.'transaction as lang_trx; import ballerina/log; @@ -47,24 +46,27 @@ function beginRemoteParticipant(string transactionBlockId) { # initiator via a network call. # # + transactionId - Globally unique transaction ID. If this is a new transaction which is initiated, then this -# will be null. -# If this is a participant in an existing transaction, then it will have a value. +# will be null. +# If this is a participant in an existing transaction, then it will have a value. # + transactionBlockId - ID of the transaction block. Each transaction block in a process has a unique ID. # + registerAtUrl - The URL of the initiator # + coordinationType - Coordination type of this transaction # + return - Newly created/existing TransactionContext for this transaction. public function beginTransaction(string? transactionId, string transactionBlockId, string registerAtUrl, - string coordinationType) returns TransactionContext|error { + string coordinationType) returns TransactionContext|error { if (transactionId is string) { - if (initiatedTransactions.hasKey(transactionId)) { // if participant & initiator are in the same process + if (hasInitiatedTransaction(transactionId)) { // if participant & initiator are in the same process // we don't need to do a network call and can simply do a local function call return registerLocalParticipantWithInitiator(transactionId, transactionBlockId, registerAtUrl); } else { //TODO: set the proper protocol string protocolName = PROTOCOL_DURABLE; - RemoteProtocol[] protocols = [{ - name:protocolName, url:getParticipantProtocolAt(protocolName, <@untainted> transactionBlockId) - }]; + RemoteProtocol[] protocols = [ + { + name: protocolName, + url: getParticipantProtocolAt(protocolName, <@untainted>transactionBlockId) + } + ]; return registerParticipantWithRemoteInitiator(transactionId, transactionBlockId, registerAtUrl, protocols); } } else { @@ -111,12 +113,13 @@ transactional function endTransaction(string transactionId, string transactionBl } string participatedTxnId = getParticipatedTransactionId(transactionId, transactionBlockId); - if (!initiatedTransactions.hasKey(transactionId) && !participatedTransactions.hasKey(participatedTxnId)) { + + if (!hasInitiatedTransaction(transactionId) && !participatedTransactions.hasKey(participatedTxnId)) { error err = error("Transaction: " + participatedTxnId + " not found"); panic err; } - var initiatedTxn = initiatedTransactions[transactionId]; + TwoPhaseCommitTransaction? initiatedTxn = initiatedTransactions[transactionId]; if (initiatedTxn is ()) { return ""; } else { @@ -134,7 +137,7 @@ transactional function endTransaction(string transactionId, string transactionBl # # + transactionBlockId - ID of the transaction block. Each transaction block in a process has a unique ID. # + return - Transaction context. -function registerRemoteParticipant(string transactionBlockId) returns TransactionContext? = @java:Method { +function registerRemoteParticipant(string transactionBlockId) returns TransactionContext? = @java:Method { 'class: "org.ballerinalang.stdlib.transaction.Utils", name: "registerRemoteParticipant" } external;