Skip to content

Commit

Permalink
Merge pull request #41754 from lochana-chathura/worker_new
Browse files Browse the repository at this point in the history
Implement new worker semantics - Part I
  • Loading branch information
lochana-chathura authored Nov 23, 2023
2 parents a0469f6 + b9cf39a commit 71e46db
Show file tree
Hide file tree
Showing 24 changed files with 337 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCollectContextInvocation;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
Expand Down Expand Up @@ -914,6 +915,14 @@ public static WorkerReceiveNode createWorkerReceiveNode() {
return new BLangWorkerReceive();
}

public static BLangCombinedWorkerReceive createAlternateWorkerReceiveNode() {
return new BLangCombinedWorkerReceive(NodeKind.ALTERNATE_WORKER_RECEIVE);
}

public static BLangCombinedWorkerReceive createMultipleWorkerReceiveNode() {
return new BLangCombinedWorkerReceive(NodeKind.MULTIPLE_WORKER_RECEIVE);
}

public static WorkerSendExpressionNode createWorkerSendNode() {
return new BLangWorkerAsyncSendExpr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ public enum NodeKind {
WHILE,
LOCK,
WORKER_RECEIVE,
ALTERNATE_WORKER_RECEIVE,
MULTIPLE_WORKER_RECEIVE,
WORKER_ASYNC_SEND,
WORKER_SYNC_SEND,
WORKER_FLUSH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import org.wso2.ballerinalang.compiler.tree.BLangClassDefinition;
import org.wso2.ballerinalang.compiler.tree.BLangExternalFunctionBody;
import org.wso2.ballerinalang.compiler.tree.BLangFunction;
import org.wso2.ballerinalang.compiler.tree.BLangIdentifier;
import org.wso2.ballerinalang.compiler.tree.BLangImportPackage;
import org.wso2.ballerinalang.compiler.tree.BLangNodeVisitor;
import org.wso2.ballerinalang.compiler.tree.BLangPackage;
Expand All @@ -104,6 +103,7 @@
import org.wso2.ballerinalang.compiler.tree.BLangXMLNS.BLangLocalXMLNS;
import org.wso2.ballerinalang.compiler.tree.BLangXMLNS.BLangPackageXMLNS;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangBinaryExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangDynamicArgExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangErrorConstructorExpr;
Expand Down Expand Up @@ -163,6 +163,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangWorkerAsyncSendExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangWorkerFlushExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangWorkerSendReceiveExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangWorkerSyncSendExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangXMLAttribute;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangXMLCommentLiteral;
Expand Down Expand Up @@ -598,9 +599,10 @@ public void visit(BLangFunction astFunc) {

//create channelDetails array
int i = 0;
for (String channelName : astFunc.sendsToThis) {
birFunc.workerChannels[i] = new BIRNode.ChannelDetails(channelName, astFunc.defaultWorkerName.value
.equals(DEFAULT_WORKER_NAME), isWorkerSend(channelName, astFunc.defaultWorkerName.value));
for (BLangWorkerSendReceiveExpr.Channel channel : astFunc.sendsToThis) {
String channelId = channel.channelId();
birFunc.workerChannels[i] = new BIRNode.ChannelDetails(channelId, astFunc.defaultWorkerName.value
.equals(DEFAULT_WORKER_NAME), isWorkerSend(channelId, astFunc.defaultWorkerName.value));
i++;
}

Expand Down Expand Up @@ -1141,11 +1143,16 @@ public void visit(BLangForkJoin forkJoin) {
forkJoin.workers.forEach(worker -> worker.accept(this));
}

@Override
public void visit(BLangCombinedWorkerReceive combinedWorkerReceive) {
// TODO: 22/11/23 implement
throw new AssertionError("alternate/multiple receive not yet implemented");
}

@Override
public void visit(BLangWorkerReceive workerReceive) {
BIRBasicBlock thenBB = new BIRBasicBlock(this.env.nextBBId());
addToTrapStack(thenBB);
String channel = workerReceive.workerIdentifier.value + "->" + env.enclFunc.workerName.value;

BIRVariableDcl tempVarDcl = new BIRVariableDcl(workerReceive.getBType(), this.env.nextLocalVarId(names),
VarScope.FUNCTION, VarKind.TEMP);
Expand All @@ -1155,8 +1162,9 @@ public void visit(BLangWorkerReceive workerReceive) {

boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);

this.env.enclBB.terminator = new BIRTerminator.WorkerReceive(workerReceive.pos, names.fromString(channel),
lhsOp, isOnSameStrand, thenBB, this.currentScope);
this.env.enclBB.terminator = new BIRTerminator.WorkerReceive(workerReceive.pos,
names.fromString(workerReceive.getChannel().channelId()), lhsOp, isOnSameStrand, thenBB,
this.currentScope);

this.env.enclBasicBlocks.add(thenBB);
this.env.enclBB = thenBB;
Expand All @@ -1174,13 +1182,11 @@ public void visit(BLangWorkerAsyncSendExpr asyncSendExpr) {
this.env.enclFunc.localVars.add(tempVarDcl);
BIROperand lhsOp = new BIROperand(tempVarDcl);
this.env.targetOperand = lhsOp;

String channelName = this.env.enclFunc.workerName.value + "->" + asyncSendExpr.workerIdentifier.value;
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);

this.env.enclBB.terminator = new BIRTerminator.WorkerSend(
asyncSendExpr.pos, names.fromString(channelName), dataOp, isOnSameStrand, false, lhsOp,
thenBB, this.currentScope);
asyncSendExpr.pos, names.fromString(asyncSendExpr.getChannel().channelId()), dataOp, isOnSameStrand,
false, lhsOp, thenBB, this.currentScope);

this.env.enclBasicBlocks.add(thenBB);
this.env.enclBB = thenBB;
Expand All @@ -1199,11 +1205,10 @@ public void visit(BLangWorkerSyncSendExpr syncSend) {
BIROperand lhsOp = new BIROperand(tempVarDcl);
this.env.targetOperand = lhsOp;

String channelName = this.env.enclFunc.workerName.value + "->" + syncSend.workerIdentifier.value;
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);

this.env.enclBB.terminator = new BIRTerminator.WorkerSend(
syncSend.pos, names.fromString(channelName), dataOp, isOnSameStrand, true, lhsOp,
syncSend.pos, names.fromString(syncSend.getChannel().channelId()), dataOp, isOnSameStrand, true, lhsOp,
thenBB, this.currentScope);

this.env.enclBasicBlocks.add(thenBB);
Expand All @@ -1216,15 +1221,13 @@ public void visit(BLangWorkerFlushExpr flushExpr) {
addToTrapStack(thenBB);

//create channelDetails array
BIRNode.ChannelDetails[] channels = new BIRNode.ChannelDetails[flushExpr.workerIdentifierList.size()];
BIRNode.ChannelDetails[] channels = new BIRNode.ChannelDetails[flushExpr.cachedWorkerSendStmts.size()];
int i = 0;
for (BLangIdentifier workerIdentifier : flushExpr.workerIdentifierList) {
String channelName = this.env.enclFunc.workerName.value + "->" + workerIdentifier.value;
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);
channels[i] = new BIRNode.ChannelDetails(channelName, isOnSameStrand, true);
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);
for (BLangWorkerAsyncSendExpr sendStmt : flushExpr.cachedWorkerSendStmts) {
channels[i] = new BIRNode.ChannelDetails(sendStmt.getChannel().channelId(), isOnSameStrand, true);
i++;
}

BIRVariableDcl tempVarDcl = new BIRVariableDcl(flushExpr.getBType(), this.env.nextLocalVarId(names),
VarScope.FUNCTION, VarKind.TEMP);
this.env.enclFunc.localVars.add(tempVarDcl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangArrowFunction;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangBinaryExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
Expand Down Expand Up @@ -1515,6 +1516,11 @@ public void visit(BLangWorkerSyncSendExpr syncSendExpr) {
result = syncSendExpr;
}

@Override
public void visit(BLangCombinedWorkerReceive combinedWorkerReceive) {
result = combinedWorkerReceive;
}

@Override
public void visit(BLangWorkerReceive workerReceiveNode) {
result = workerReceiveNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangBinaryExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
Expand Down Expand Up @@ -1189,6 +1190,11 @@ public void visit(BLangWorkerSyncSendExpr syncSendExpr) {
result = syncSendExpr;
}

@Override
public void visit(BLangCombinedWorkerReceive combinedWorkerReceive) {
result = combinedWorkerReceive;
}

@Override
public void visit(BLangWorkerReceive workerReceiveNode) {
result = workerReceiveNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCollectContextInvocation;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
Expand Down Expand Up @@ -615,6 +616,11 @@ public void visit(BLangWorkerAsyncSendExpr asyncSendExpr) {
result = asyncSendExpr;
}

@Override
public void visit(BLangCombinedWorkerReceive combinedWorkerReceive) {
result = combinedWorkerReceive;
}

@Override
public void visit(BLangWorkerReceive workerReceiveNode) {
result = workerReceiveNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangBinaryExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
Expand Down Expand Up @@ -8142,6 +8143,11 @@ public void visit(BLangWorkerSyncSendExpr syncSendExpr) {
result = syncSendExpr;
}

@Override
public void visit(BLangCombinedWorkerReceive combinedWorkerReceive) {
result = combinedWorkerReceive;
}

@Override
public void visit(BLangWorkerReceive workerReceiveNode) {
result = workerReceiveNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCollectContextInvocation;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangElvisExpr;
Expand Down Expand Up @@ -2715,6 +2716,14 @@ public void visit(BLangWorkerAsyncSendExpr asyncSendExpr) {
this.acceptNode(asyncSendExpr.expr);
}

@Override
public void visit(BLangCombinedWorkerReceive combinedWorkerReceive) {
for (BLangWorkerReceive bLangWorkerReceive : combinedWorkerReceive.getWorkerReceives()) {
acceptNode(bLangWorkerReceive);
}
result = combinedWorkerReceive;
}

@Override
public void visit(BLangWorkerReceive workerReceiveNode) {
workerReceiveNode.sendExpression = rewrite(workerReceiveNode.sendExpression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.wso2.ballerinalang.compiler.parser;

import io.ballerina.compiler.syntax.tree.AlternateReceiveWorkerNode;
import io.ballerina.compiler.syntax.tree.AnnotAccessExpressionNode;
import io.ballerina.compiler.syntax.tree.AnnotationAttachPointNode;
import io.ballerina.compiler.syntax.tree.AnnotationDeclarationNode;
Expand Down Expand Up @@ -347,6 +348,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCollectContextInvocation;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangElvisExpr;
Expand Down Expand Up @@ -2508,20 +2510,44 @@ public BLangNode transform(TrapExpressionNode trapExpressionNode) {

@Override
public BLangNode transform(ReceiveActionNode receiveActionNode) {
BLangWorkerReceive workerReceiveExpr = (BLangWorkerReceive) TreeBuilder.createWorkerReceiveNode();
Location receiveActionPos = getPosition(receiveActionNode);
Node receiveWorkers = receiveActionNode.receiveWorkers();
Token workerName;

if (receiveWorkers.kind() == SyntaxKind.SIMPLE_NAME_REFERENCE) {
workerName = ((SimpleNameReferenceNode) receiveWorkers).name();
} else {
// TODO: implement multiple-receive-action support
Location receiveFieldsPos = getPosition(receiveWorkers);
dlog.error(receiveFieldsPos, DiagnosticErrorCode.MULTIPLE_RECEIVE_ACTION_NOT_YET_SUPPORTED);
workerName = NodeFactory.createMissingToken(SyntaxKind.IDENTIFIER_TOKEN,
NodeFactory.createEmptyMinutiaeList(), NodeFactory.createEmptyMinutiaeList());
BLangWorkerReceive singleWorkerRecv = createSimpleWorkerReceive((SimpleNameReferenceNode) receiveWorkers);
singleWorkerRecv.pos = receiveActionPos;
return singleWorkerRecv;
}
workerReceiveExpr.setWorkerName(createIdentifier(workerName));
workerReceiveExpr.pos = getPosition(receiveActionNode);

if (receiveWorkers.kind() == SyntaxKind.ALTERNATE_RECEIVE_WORKER) {
SeparatedNodeList<SimpleNameReferenceNode> alternateWorkers =
((AlternateReceiveWorkerNode) receiveWorkers).workers();
List<BLangWorkerReceive> workerReceives = new ArrayList<>(alternateWorkers.size());
for (SimpleNameReferenceNode w : alternateWorkers) {
workerReceives.add(createSimpleWorkerReceive(w));
}

BLangCombinedWorkerReceive alternateWorkerRecv = TreeBuilder.createAlternateWorkerReceiveNode();
alternateWorkerRecv.setWorkerReceives(workerReceives);
alternateWorkerRecv.pos = receiveActionPos;
return alternateWorkerRecv;
}


// TODO: implement multiple-receive-action support
dlog.error(getPosition(receiveWorkers), DiagnosticErrorCode.MULTIPLE_RECEIVE_ACTION_NOT_YET_SUPPORTED);
Token missingIdentifier = NodeFactory.createMissingToken(SyntaxKind.IDENTIFIER_TOKEN,
NodeFactory.createEmptyMinutiaeList(), NodeFactory.createEmptyMinutiaeList());
SimpleNameReferenceNode simpleNameRef = NodeFactory.createSimpleNameReferenceNode(missingIdentifier);
BLangWorkerReceive singleWorkerRecv = createSimpleWorkerReceive(simpleNameRef);
singleWorkerRecv.pos = receiveActionPos;
return singleWorkerRecv;
}

private BLangWorkerReceive createSimpleWorkerReceive(SimpleNameReferenceNode simpleNameRef) {
BLangWorkerReceive workerReceiveExpr = (BLangWorkerReceive) TreeBuilder.createWorkerReceiveNode();
workerReceiveExpr.setWorkerName(createIdentifier(simpleNameRef.name()));
workerReceiveExpr.pos = getPosition(simpleNameRef);
return workerReceiveExpr;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckPanickedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCheckedExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCollectContextInvocation;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCombinedWorkerReceive;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangCommitExpr;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstRef;
import org.wso2.ballerinalang.compiler.tree.expressions.BLangConstant;
Expand Down Expand Up @@ -1028,6 +1029,19 @@ public void visit(BLangWorkerAsyncSendExpr asyncSendExpr) {
clone.workerIdentifier = asyncSendExpr.workerIdentifier;
}

@Override
public void visit(BLangCombinedWorkerReceive source) {
BLangCombinedWorkerReceive clone = new BLangCombinedWorkerReceive(source.getKind());
source.cloneRef = clone;

List<BLangWorkerReceive> workerReceives = new ArrayList<>(source.getWorkerReceives().size());
for (BLangWorkerReceive workerReceive : source.getWorkerReceives()) {
workerReceives.add(clone(workerReceive));
}

clone.setWorkerReceives(workerReceives);
}

@Override
public void visit(BLangWorkerReceive source) {

Expand Down
Loading

0 comments on commit 71e46db

Please sign in to comment.