diff --git a/core/internal/runtime/commands.go b/core/internal/runtime/commands.go index eefe50a5..92540e0c 100644 --- a/core/internal/runtime/commands.go +++ b/core/internal/runtime/commands.go @@ -381,16 +381,30 @@ func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.Session logger.Error("Asynchronous invoke of %s raised error %s\nStacktrace: %v", msg["path"], result.Message, result.Stack) } else if result.TailCall { cr := result.Value.(map[string]interface{}) - nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow} - if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" { - nextActor.DeferredLockID = uuid.New().String() + if _, ok := cr["serviceName"]; ok { + nextService := rpc.Service{Name: cr["serviceName"].(string)} + dest = &rpc.Destination{Target: nextService, Method: serviceEndpoint} + } else if _, ok := cr["actorType"]; ok { + nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow} + if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" { + nextActor.DeferredLockID = uuid.New().String() + } + dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint} + } else { + logger.Error("Asynchronous invoke of %s returned unsupported tail call result %v", msg["path"], cr) + err = fmt.Errorf("Asynchronous invoke of %s returned unsupported tail call result %v", msg["path"], cr) + } + if dest != nil { + msg := map[string]string{ + "command": "tell", + "path": cr["path"].(string), + "payload": cr["payload"].(string)} + if cr["method"] != nil { + msg["method"] = cr["method"].(string) + msg["header"] = "{\"Content-Type\": [\"application/json\"]}" + } + reply, err = json.Marshal(msg) } - dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint} - msg := map[string]string{ - "command": "tell", - "path": cr["path"].(string), - "payload": cr["payload"].(string)} - reply, err = json.Marshal(msg) } } } else { @@ -402,16 +416,29 @@ func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.Session var result actorCallResult if err = json.Unmarshal([]byte(replyStruct.Payload), &result); err == nil && result.TailCall { cr := result.Value.(map[string]interface{}) - nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow} - if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" { - nextActor.DeferredLockID = uuid.New().String() + if _, ok := cr["serviceName"]; ok { + nextService := rpc.Service{Name: cr["serviceName"].(string)} + dest = &rpc.Destination{Target: nextService, Method: serviceEndpoint} + } else if _, ok := cr["actorType"]; ok { + nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow} + if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" { + nextActor.DeferredLockID = uuid.New().String() + } + dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint} + } else { + err = fmt.Errorf("Invoke of %s returned unsupported tail call result %v", msg["path"], cr) + } + if dest != nil { + msg := map[string]string{ + "command": "call", + "path": cr["path"].(string), + "payload": cr["payload"].(string)} + if cr["method"] != nil { + msg["method"] = cr["method"].(string) + msg["header"] = "{\"Content-Type\": [\"application/json\"]}" + } + reply, err = json.Marshal(msg) } - dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint} - msg := map[string]string{ - "command": "call", - "path": cr["path"].(string), - "payload": cr["payload"].(string)} - reply, err = json.Marshal(msg) } } if reply == nil { diff --git a/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/Kar.java b/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/Kar.java index 26d21bf9..fd1848e1 100644 --- a/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/Kar.java +++ b/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/Kar.java @@ -626,8 +626,9 @@ private static JsonValue callProcessResponse(Response response) /** * An actor method may return a TailCall to indicate that the "result" - * of the method is to schedule a subsequent invocation (either to itself or - * to another actor instance). + * of the method is to schedule a subsequent invocation + * (to itself or to another actor instance or to a service). + * * If the calling and callee Actors are the same, then by default the * actor lock is retained between the two calls. This ensures that the Actor's * state is not changed between the end of the calling method and the start of @@ -635,6 +636,7 @@ private static JsonValue callProcessResponse(Response response) * true to support interleaving multiple flows of execution on an actor instance. */ public static final class TailCall { + public final String service; public final ActorRef actor; public final String path; public final JsonValue[] args; @@ -645,11 +647,20 @@ public TailCall(ActorRef actor, String path, JsonValue... args) { } public TailCall(ActorRef actor, String path, boolean releaseLock, JsonValue... args) { + this.service = null; this.actor = actor; this.path = path; this.releaseLock = releaseLock; this.args = args; } + + public TailCall(String service, String path, JsonValue body) { + this.service = service; + this.actor = null; + this.path = path; + this.releaseLock = true; + this.args = new JsonValue[] { body }; + } } /** diff --git a/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/liberty/ActorRuntimeResource.java b/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/liberty/ActorRuntimeResource.java index 4b54e92c..2015475a 100644 --- a/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/liberty/ActorRuntimeResource.java +++ b/sdk-java/kar-runtime-liberty/src/main/java/com/ibm/research/kar/liberty/ActorRuntimeResource.java @@ -180,13 +180,19 @@ public Response invokeActorMethod(@PathParam("type") String type, @PathParam("id if (result instanceof TailCall) { TailCall cr = (TailCall)result; JsonObjectBuilder crb = factory.createObjectBuilder(); - JsonArrayBuilder argb = factory.createArrayBuilder(); - for (JsonValue arg: cr.args) { - argb.add(arg); + if (cr.service != null) { + crb.add("payload", cr.args[0].toString()); + crb.add("serviceName", cr.service); + crb.add("method", "POST"); + } else { + JsonArrayBuilder argb = factory.createArrayBuilder(); + for (JsonValue arg: cr.args) { + argb.add(arg); + } + crb.add("payload", argb.build().toString()); + crb.add("actorType", cr.actor.getType()); + crb.add("actorId", cr.actor.getId()); } - crb.add("payload", argb.build().toString()); - crb.add("actorType", cr.actor.getType()); - crb.add("actorId", cr.actor.getId()); crb.add("path", "/"+cr.path); if (cr.releaseLock) { crb.add("releaseLock", "true"); diff --git a/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/Kar.java b/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/Kar.java index ba715f5e..abbd73f1 100644 --- a/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/Kar.java +++ b/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/Kar.java @@ -478,10 +478,22 @@ public static Uni tailCallReleasingLock(ActorRef actor, String path, J return Uni.createFrom().item(new TailCall(actor, path, true, args)); } + /** + * Continue execution by doing a tail call to the specified service. + * @param service The name of the service to invoke. + * @param path The service endpoint to invoke. + * @param body The request body with which to invoke the service endpoint. + * @return a Uni that represents the desired continuation. + */ + public static Uni tailCall(String service, String path, JsonValue body) { + return Uni.createFrom().item(new TailCall(service, path, body)); + } + /** * An actor method may return a TailCall to indicate that the "result" - * of the method is to schedule a subsequent invocation (either to itself or - * to another actor instance). + * of the method is to schedule a subsequent invocation + * (to itself or to another actor instance or to a service). + * * If the calling and callee Actors are the same, then by default the * actor lock is retained between the two calls. This ensures that the Actor's * state is not changed between the end of the calling method and the start of @@ -489,6 +501,7 @@ public static Uni tailCallReleasingLock(ActorRef actor, String path, J * true to support interleaving multiple flows of execution on an actor instance. */ public static final class TailCall { + public final String service; public final ActorRef actor; public final String path; public final boolean releaseLock; @@ -499,11 +512,20 @@ public TailCall(ActorRef actor, String path, JsonValue... args) { } public TailCall(ActorRef actor, String path, boolean releaseLock, JsonValue... args) { + this.service = null; this.actor = actor; this.path = path; this.releaseLock = releaseLock; this.args = args; } + + public TailCall(String service, String path, JsonValue body) { + this.service = service; + this.actor = null; + this.path = path; + this.releaseLock = true; + this.args = new JsonValue[] { body }; + } } /** diff --git a/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/quarkus/ActorEndpoints.java b/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/quarkus/ActorEndpoints.java index 359cfaa1..715a8dd4 100644 --- a/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/quarkus/ActorEndpoints.java +++ b/sdk-java/kar-runtime-quarkus/src/main/java/com/ibm/research/kar/quarkus/ActorEndpoints.java @@ -231,13 +231,19 @@ private static JsonObject encodeInvocationResult(Object result) { if (result instanceof TailCall) { TailCall cr = (TailCall)result; JsonObjectBuilder crb = factory.createObjectBuilder(); - JsonArrayBuilder argb = factory.createArrayBuilder(); - for (JsonValue arg: cr.args) { - argb.add(arg); + if (cr.service != null) { + crb.add("payload", cr.args[0].toString()); + crb.add("serviceName", cr.service); + crb.add("method", "POST"); + } else { + JsonArrayBuilder argb = factory.createArrayBuilder(); + for (JsonValue arg: cr.args) { + argb.add(arg); + } + crb.add("payload", argb.build().toString()); + crb.add("actorType", cr.actor.getType()); + crb.add("actorId", cr.actor.getId()); } - crb.add("payload", argb.build().toString()); - crb.add("actorType", cr.actor.getType()); - crb.add("actorId", cr.actor.getId()); crb.add("path", "/"+cr.path); if (cr.releaseLock) { crb.add("releaseLock", "true"); diff --git a/sdk-js/index.d.ts b/sdk-js/index.d.ts index 89ce43e4..f2ebe9b4 100644 --- a/sdk-js/index.d.ts +++ b/sdk-js/index.d.ts @@ -122,6 +122,14 @@ export function tell (service: string, path: string, body: any): Promise; */ export function call (service: string, path: string, body: any): Promise; +/** + * Construct a result object that encodes a tail call to a service. + * @param service The service to invoke. + * @param path The service endpoint to invoke. + * @param body The request body with which to invoke the service endpoint. + */ + export function tailCall (service: string, path: string, body: any): any; + /** * Actor operations */ @@ -188,7 +196,7 @@ export namespace actor { * @param path The actor method to invoke. * @param args The arguments with which to invoke the actor method. */ - export function tailCallReleasingLock(callee: Actor, path: string, ...args: any[]): any; + export function tailCallReleasingLock(callee: Actor, path: string, ...args: any[]): any; namespace reminders { /** diff --git a/sdk-js/index.js b/sdk-js/index.js index d1fb63b5..75e06ba7 100644 --- a/sdk-js/index.js +++ b/sdk-js/index.js @@ -131,6 +131,10 @@ const tell = (service, path, body) => post(`service/${service}/call/${path}`, bo const call = (service, path, body) => post(`service/${service}/call/${path}`, body, { 'Content-Type': 'application/json' }) +function encodeTailCall (service, path, payload) { + return { tailCall: true, value: { serviceName: service, path: '/' + path, payload, method: 'POST' } } +} + const resolver = request => () => fetch(url + 'await', { method: 'POST', body: request, headers: { 'Content-Type': 'text/plain' } }).then(parse) const resolverActor = request => () => fetch(url + 'await', { method: 'POST', body: request, headers: { 'Content-Type': 'text/plain' } }).then(parseActor) @@ -393,6 +397,7 @@ module.exports = { tell, call, asyncCall, + tailCall: encodeTailCall, actor: { proxy: actorProxy, tell: actorTell,