Skip to content

Commit

Permalink
implement basic tail call from Actor to Service (#313)
Browse files Browse the repository at this point in the history
Limitations:
  1. The service endpoint must be a POST
  2. The service endpoint must accept application/json
  • Loading branch information
dgrove-oss authored Apr 22, 2022
1 parent 8f1641f commit b44bf55
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 35 deletions.
63 changes: 45 additions & 18 deletions core/internal/runtime/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,30 @@ func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.Session
logger.Error("Asynchronous invoke of %s raised error %s\nStacktrace: %v", msg["path"], result.Message, result.Stack)
} else if result.TailCall {
cr := result.Value.(map[string]interface{})
nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow}
if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" {
nextActor.DeferredLockID = uuid.New().String()
if _, ok := cr["serviceName"]; ok {
nextService := rpc.Service{Name: cr["serviceName"].(string)}
dest = &rpc.Destination{Target: nextService, Method: serviceEndpoint}
} else if _, ok := cr["actorType"]; ok {
nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow}
if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" {
nextActor.DeferredLockID = uuid.New().String()
}
dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint}
} else {
logger.Error("Asynchronous invoke of %s returned unsupported tail call result %v", msg["path"], cr)
err = fmt.Errorf("Asynchronous invoke of %s returned unsupported tail call result %v", msg["path"], cr)
}
if dest != nil {
msg := map[string]string{
"command": "tell",
"path": cr["path"].(string),
"payload": cr["payload"].(string)}
if cr["method"] != nil {
msg["method"] = cr["method"].(string)
msg["header"] = "{\"Content-Type\": [\"application/json\"]}"
}
reply, err = json.Marshal(msg)
}
dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint}
msg := map[string]string{
"command": "tell",
"path": cr["path"].(string),
"payload": cr["payload"].(string)}
reply, err = json.Marshal(msg)
}
}
} else {
Expand All @@ -402,16 +416,29 @@ func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.Session
var result actorCallResult
if err = json.Unmarshal([]byte(replyStruct.Payload), &result); err == nil && result.TailCall {
cr := result.Value.(map[string]interface{})
nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow}
if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" {
nextActor.DeferredLockID = uuid.New().String()
if _, ok := cr["serviceName"]; ok {
nextService := rpc.Service{Name: cr["serviceName"].(string)}
dest = &rpc.Destination{Target: nextService, Method: serviceEndpoint}
} else if _, ok := cr["actorType"]; ok {
nextActor := rpc.Session{Name: cr["actorType"].(string), ID: cr["actorId"].(string), Flow: target.Flow}
if nextActor.Name == target.Name && nextActor.ID == target.ID && cr["releaseLock"] != "true" {
nextActor.DeferredLockID = uuid.New().String()
}
dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint}
} else {
err = fmt.Errorf("Invoke of %s returned unsupported tail call result %v", msg["path"], cr)
}
if dest != nil {
msg := map[string]string{
"command": "call",
"path": cr["path"].(string),
"payload": cr["payload"].(string)}
if cr["method"] != nil {
msg["method"] = cr["method"].(string)
msg["header"] = "{\"Content-Type\": [\"application/json\"]}"
}
reply, err = json.Marshal(msg)
}
dest = &rpc.Destination{Target: nextActor, Method: actorEndpoint}
msg := map[string]string{
"command": "call",
"path": cr["path"].(string),
"payload": cr["payload"].(string)}
reply, err = json.Marshal(msg)
}
}
if reply == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,15 +626,17 @@ private static JsonValue callProcessResponse(Response response)

/**
* An actor method may return a TailCall to indicate that the "result"
* of the method is to schedule a subsequent invocation (either to itself or
* to another actor instance).
* of the method is to schedule a subsequent invocation
* (to itself or to another actor instance or to a service).
*
* If the calling and callee Actors are the same, then by default the
* actor lock is retained between the two calls. This ensures that the Actor's
* state is not changed between the end of the calling method and the start of
* the callee method. This default can be overriden by setting `releaseLock` to
* true to support interleaving multiple flows of execution on an actor instance.
*/
public static final class TailCall {
public final String service;
public final ActorRef actor;
public final String path;
public final JsonValue[] args;
Expand All @@ -645,11 +647,20 @@ public TailCall(ActorRef actor, String path, JsonValue... args) {
}

public TailCall(ActorRef actor, String path, boolean releaseLock, JsonValue... args) {
this.service = null;
this.actor = actor;
this.path = path;
this.releaseLock = releaseLock;
this.args = args;
}

public TailCall(String service, String path, JsonValue body) {
this.service = service;
this.actor = null;
this.path = path;
this.releaseLock = true;
this.args = new JsonValue[] { body };
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,19 @@ public Response invokeActorMethod(@PathParam("type") String type, @PathParam("id
if (result instanceof TailCall) {
TailCall cr = (TailCall)result;
JsonObjectBuilder crb = factory.createObjectBuilder();
JsonArrayBuilder argb = factory.createArrayBuilder();
for (JsonValue arg: cr.args) {
argb.add(arg);
if (cr.service != null) {
crb.add("payload", cr.args[0].toString());
crb.add("serviceName", cr.service);
crb.add("method", "POST");
} else {
JsonArrayBuilder argb = factory.createArrayBuilder();
for (JsonValue arg: cr.args) {
argb.add(arg);
}
crb.add("payload", argb.build().toString());
crb.add("actorType", cr.actor.getType());
crb.add("actorId", cr.actor.getId());
}
crb.add("payload", argb.build().toString());
crb.add("actorType", cr.actor.getType());
crb.add("actorId", cr.actor.getId());
crb.add("path", "/"+cr.path);
if (cr.releaseLock) {
crb.add("releaseLock", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,17 +478,30 @@ public static Uni<TailCall> tailCallReleasingLock(ActorRef actor, String path, J
return Uni.createFrom().item(new TailCall(actor, path, true, args));
}

/**
* Continue execution by doing a tail call to the specified service.
* @param service The name of the service to invoke.
* @param path The service endpoint to invoke.
* @param body The request body with which to invoke the service endpoint.
* @return a Uni that represents the desired continuation.
*/
public static Uni<TailCall> tailCall(String service, String path, JsonValue body) {
return Uni.createFrom().item(new TailCall(service, path, body));
}

/**
* An actor method may return a TailCall to indicate that the "result"
* of the method is to schedule a subsequent invocation (either to itself or
* to another actor instance).
* of the method is to schedule a subsequent invocation
* (to itself or to another actor instance or to a service).
*
* If the calling and callee Actors are the same, then by default the
* actor lock is retained between the two calls. This ensures that the Actor's
* state is not changed between the end of the calling method and the start of
* the callee method. This default can be overriden by setting `releaseLock` to
* true to support interleaving multiple flows of execution on an actor instance.
*/
public static final class TailCall {
public final String service;
public final ActorRef actor;
public final String path;
public final boolean releaseLock;
Expand All @@ -499,11 +512,20 @@ public TailCall(ActorRef actor, String path, JsonValue... args) {
}

public TailCall(ActorRef actor, String path, boolean releaseLock, JsonValue... args) {
this.service = null;
this.actor = actor;
this.path = path;
this.releaseLock = releaseLock;
this.args = args;
}

public TailCall(String service, String path, JsonValue body) {
this.service = service;
this.actor = null;
this.path = path;
this.releaseLock = true;
this.args = new JsonValue[] { body };
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,19 @@ private static JsonObject encodeInvocationResult(Object result) {
if (result instanceof TailCall) {
TailCall cr = (TailCall)result;
JsonObjectBuilder crb = factory.createObjectBuilder();
JsonArrayBuilder argb = factory.createArrayBuilder();
for (JsonValue arg: cr.args) {
argb.add(arg);
if (cr.service != null) {
crb.add("payload", cr.args[0].toString());
crb.add("serviceName", cr.service);
crb.add("method", "POST");
} else {
JsonArrayBuilder argb = factory.createArrayBuilder();
for (JsonValue arg: cr.args) {
argb.add(arg);
}
crb.add("payload", argb.build().toString());
crb.add("actorType", cr.actor.getType());
crb.add("actorId", cr.actor.getId());
}
crb.add("payload", argb.build().toString());
crb.add("actorType", cr.actor.getType());
crb.add("actorId", cr.actor.getId());
crb.add("path", "/"+cr.path);
if (cr.releaseLock) {
crb.add("releaseLock", "true");
Expand Down
10 changes: 9 additions & 1 deletion sdk-js/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ export function tell (service: string, path: string, body: any): Promise<any>;
*/
export function call (service: string, path: string, body: any): Promise<any>;

/**
* Construct a result object that encodes a tail call to a service.
* @param service The service to invoke.
* @param path The service endpoint to invoke.
* @param body The request body with which to invoke the service endpoint.
*/
export function tailCall (service: string, path: string, body: any): any;

/**
* Actor operations
*/
Expand Down Expand Up @@ -188,7 +196,7 @@ export namespace actor {
* @param path The actor method to invoke.
* @param args The arguments with which to invoke the actor method.
*/
export function tailCallReleasingLock(callee: Actor, path: string, ...args: any[]): any;
export function tailCallReleasingLock(callee: Actor, path: string, ...args: any[]): any;

namespace reminders {
/**
Expand Down
5 changes: 5 additions & 0 deletions sdk-js/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ const tell = (service, path, body) => post(`service/${service}/call/${path}`, bo

const call = (service, path, body) => post(`service/${service}/call/${path}`, body, { 'Content-Type': 'application/json' })

function encodeTailCall (service, path, payload) {
return { tailCall: true, value: { serviceName: service, path: '/' + path, payload, method: 'POST' } }
}

const resolver = request => () => fetch(url + 'await', { method: 'POST', body: request, headers: { 'Content-Type': 'text/plain' } }).then(parse)

const resolverActor = request => () => fetch(url + 'await', { method: 'POST', body: request, headers: { 'Content-Type': 'text/plain' } }).then(parseActor)
Expand Down Expand Up @@ -393,6 +397,7 @@ module.exports = {
tell,
call,
asyncCall,
tailCall: encodeTailCall,
actor: {
proxy: actorProxy,
tell: actorTell,
Expand Down

0 comments on commit b44bf55

Please sign in to comment.