Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formatted console.log output of example.js #36

Closed
1 change: 1 addition & 0 deletions aggregator/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (agg *Aggregator) stopRepl() {
func (agg *Aggregator) startRepl() {
var err error
repListener, err = net.Listen("unix", agg.config.SocketPath)

if err != nil {
return
}
Expand Down
19 changes: 10 additions & 9 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"

"github.com/AvaProtocol/ap-avs/core/auth"
"github.com/AvaProtocol/ap-avs/core/chainio/aa"
"github.com/AvaProtocol/ap-avs/core/config"
"github.com/AvaProtocol/ap-avs/core/taskengine"
Expand All @@ -42,7 +43,7 @@ type RpcServer struct {
func (r *RpcServer) CreateWallet(ctx context.Context, payload *avsproto.CreateWalletReq) (*avsproto.CreateWalletResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}
return r.engine.CreateSmartWallet(user, payload)
}
Expand All @@ -53,7 +54,7 @@ func (r *RpcServer) GetNonce(ctx context.Context, payload *avsproto.NonceRequest

nonce, err := aa.GetNonce(r.smartWalletRpc, ownerAddress, big.NewInt(0))
if err != nil {
return nil, status.Errorf(codes.Code(avsproto.Error_SmartWalletRpcError), "cannot determine nonce for smart wallet")
return nil, status.Errorf(codes.Code(avsproto.Error_SmartWalletRpcError), taskengine.NonceFetchingError)
}

return &avsproto.NonceResp{
Expand All @@ -65,7 +66,7 @@ func (r *RpcServer) GetNonce(ctx context.Context, payload *avsproto.NonceRequest
func (r *RpcServer) GetSmartAccountAddress(ctx context.Context, payload *avsproto.AddressRequest) (*avsproto.AddressResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}

wallets, err := r.engine.GetSmartWallets(user.Address)
Expand All @@ -78,7 +79,7 @@ func (r *RpcServer) GetSmartAccountAddress(ctx context.Context, payload *avsprot
func (r *RpcServer) CancelTask(ctx context.Context, taskID *avsproto.UUID) (*wrapperspb.BoolValue, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}

r.config.Logger.Info("Process Cancel Task",
Expand All @@ -98,7 +99,7 @@ func (r *RpcServer) CancelTask(ctx context.Context, taskID *avsproto.UUID) (*wra
func (r *RpcServer) DeleteTask(ctx context.Context, taskID *avsproto.UUID) (*wrapperspb.BoolValue, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}

r.config.Logger.Info("Process Delete Task",
Expand All @@ -118,7 +119,7 @@ func (r *RpcServer) DeleteTask(ctx context.Context, taskID *avsproto.UUID) (*wra
func (r *RpcServer) CreateTask(ctx context.Context, taskPayload *avsproto.CreateTaskReq) (*avsproto.CreateTaskResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}

task, err := r.engine.CreateTask(user, taskPayload)
Expand All @@ -134,7 +135,7 @@ func (r *RpcServer) CreateTask(ctx context.Context, taskPayload *avsproto.Create
func (r *RpcServer) ListTasks(ctx context.Context, payload *avsproto.ListTasksReq) (*avsproto.ListTasksResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}

r.config.Logger.Info("Process List Task",
Expand All @@ -155,15 +156,15 @@ func (r *RpcServer) ListTasks(ctx context.Context, payload *avsproto.ListTasksRe
func (r *RpcServer) GetTask(ctx context.Context, taskID *avsproto.UUID) (*avsproto.Task, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid authentication key")
return nil, status.Errorf(codes.Unauthenticated, auth.InvalidAuthenticationKey)
}

r.config.Logger.Info("Process Get Task",
"user", user.Address.String(),
"taskID", string(taskID.Bytes),
)

task, err := r.engine.GetTaskByUser(user, string(taskID.Bytes))
task, err := r.engine.GetTask(user, string(taskID.Bytes))
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions core/auth/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package auth

const (
InvalidAuthenticationKey = "Invalid authentication key"
)
92 changes: 46 additions & 46 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -140,7 +141,7 @@ func (n *Engine) GetSmartWallets(owner common.Address) ([]*avsproto.SmartWallet,
salt := big.NewInt(0)
sender, err := aa.GetSenderAddress(rpcConn, owner, salt)
if err != nil {
return nil, status.Errorf(codes.Code(avsproto.Error_SmartWalletNotFoundError), "cannot determine smart wallet address")
return nil, status.Errorf(codes.Code(avsproto.Error_SmartWalletNotFoundError), SmartAccountCreationError)
}

// now load the customize wallet with different salt or factory that was initialed and store in our db
Expand All @@ -155,7 +156,7 @@ func (n *Engine) GetSmartWallets(owner common.Address) ([]*avsproto.SmartWallet,
items, err := n.db.GetByPrefix(WalletByOwnerPrefix(owner))

if err != nil {
return nil, status.Errorf(codes.Code(avsproto.Error_SmartWalletNotFoundError), "cannot determine smart wallet address")
return nil, status.Errorf(codes.Code(avsproto.Error_SmartWalletNotFoundError), SmartAccountCreationError)
}

for _, item := range items {
Expand All @@ -176,15 +177,15 @@ func (n *Engine) CreateSmartWallet(user *model.User, payload *avsproto.CreateWal
// Verify data
// when user passing a custom factory address, we want to validate it
if payload.FactoryAddress != "" && !common.IsHexAddress(payload.FactoryAddress) {
return nil, status.Errorf(codes.InvalidArgument, "invalid factory address")
return nil, status.Errorf(codes.InvalidArgument, InvalidFactoryAddressError)
}

salt := big.NewInt(0)
if payload.Salt != "" {
var ok bool
salt, ok = math.ParseBig256(payload.Salt)
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "invalid salt value")
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountSaltError)
}
}

Expand All @@ -208,7 +209,7 @@ func (n *Engine) CreateSmartWallet(user *model.User, payload *avsproto.CreateWal
updates[string(WalletStorageKey(user.Address, sender.Hex()))], err = wallet.ToJSON()

if err = n.db.BatchWrite(updates); err != nil {
return nil, status.Errorf(codes.Code(avsproto.Error_StorageWriteError), "cannot update key to storage")
return nil, status.Errorf(codes.Code(avsproto.Error_StorageWriteError), StorageWriteError)
}

return &avsproto.CreateWalletResp{
Expand All @@ -222,17 +223,17 @@ func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskRe

if taskPayload.SmartWalletAddress != "" {
if !ValidWalletAddress(taskPayload.SmartWalletAddress) {
return nil, status.Errorf(codes.InvalidArgument, "invalid smart account address")
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}

if valid, _ := ValidWalletOwner(n.db, user, common.HexToAddress(taskPayload.SmartWalletAddress)); !valid {
return nil, status.Errorf(codes.InvalidArgument, "invalid smart account address")
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}
}
task, err := model.NewTaskFromProtobuf(user, taskPayload)

if err != nil {
return nil, err
return nil, status.Errorf(codes.Code(avsproto.Error_TaskDataMissingError), err.Error())
}

updates := map[string][]byte{}
Expand Down Expand Up @@ -349,23 +350,25 @@ func (n *Engine) AggregateChecksResult(address string, ids []string) error {
func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksReq) ([]*avsproto.Task, error) {
// by default show the task from the default smart wallet, if proving we look into that wallet specifically
owner := user.SmartAccountAddress
if payload.SmartWalletAddress != "" {
if !ValidWalletAddress(payload.SmartWalletAddress) {
return nil, status.Errorf(codes.InvalidArgument, "invalid smart account address")
}
if payload.SmartWalletAddress == "" {
return nil, status.Errorf(codes.InvalidArgument, MissingSmartWalletAddressError)
}

if valid, _ := ValidWalletOwner(n.db, user, common.HexToAddress(payload.SmartWalletAddress)); !valid {
return nil, status.Errorf(codes.InvalidArgument, "invalid smart account address")
}
if !ValidWalletAddress(payload.SmartWalletAddress) {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}

smartWallet := common.HexToAddress(payload.SmartWalletAddress)
owner = &smartWallet
if valid, _ := ValidWalletOwner(n.db, user, common.HexToAddress(payload.SmartWalletAddress)); !valid {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}

smartWallet := common.HexToAddress(payload.SmartWalletAddress)
owner = &smartWallet

taskIDs, err := n.db.GetByPrefix(SmartWalletTaskStoragePrefix(user.Address, *owner))

if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), "storage is not ready")
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), StorageUnavailableError)
}

tasks := make([]*avsproto.Task, len(taskIDs))
Expand All @@ -377,56 +380,53 @@ func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksRe
continue
}

task := &model.Task{
ID: taskID,
Owner: user.Address.Hex(),
}
task := model.NewTask()
if err := task.FromStorageData(taskRawByte); err != nil {
continue
}
task.ID = taskID

tasks[i], _ = task.ToProtoBuf()
}

return tasks, nil
}

func (n *Engine) GetTaskByUser(user *model.User, taskID string) (*model.Task, error) {
task := &model.Task{
ID: taskID,
Owner: user.Address.Hex(),
}
func (n *Engine) GetTaskByID(taskID string) (*model.Task, error) {
for status, _ := range avsproto.TaskStatus_name {
if rawTaskData, err := n.db.GetKey(TaskStorageKey(taskID, avsproto.TaskStatus(status))); err == nil {
task := model.NewTask()
err = task.FromStorageData(rawTaskData)

// Get Task Status
rawStatus, err := n.db.GetKey([]byte(TaskUserKey(task)))
if err != nil {
return nil, grpcstatus.Errorf(codes.NotFound, "task not found")
if err == nil {
return task, nil
}

return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_TaskDataCorrupted), TaskStorageCorruptedError)
}
}
status, _ := strconv.Atoi(string(rawStatus))

taskRawByte, err := n.db.GetKey(TaskStorageKey(taskID, avsproto.TaskStatus(status)))
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

func (n *Engine) GetTask(user *model.User, taskID string) (*model.Task, error) {
task, err := n.GetTaskByID(taskID)
if err != nil {
taskRawByte, err = n.db.GetKey([]byte(
TaskStorageKey(taskID, avsproto.TaskStatus_Executing),
))
if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_TaskDataCorrupted), "task data storage is corrupted")
}
return nil, err
}

err = task.FromStorageData(taskRawByte)
if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_TaskDataCorrupted), "task data storage is corrupted")
if strings.ToLower(task.Owner) != strings.ToLower(user.Address.Hex()) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

return task, nil
}

func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error) {
task, err := n.GetTaskByUser(user, taskID)
task, err := n.GetTask(user, taskID)

if err != nil {
return false, grpcstatus.Errorf(codes.NotFound, "task not found")
return false, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

if task.Status == avsproto.TaskStatus_Executing {
Expand All @@ -440,10 +440,10 @@ func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error)
}

func (n *Engine) CancelTaskByUser(user *model.User, taskID string) (bool, error) {
task, err := n.GetTaskByUser(user, taskID)
task, err := n.GetTask(user, taskID)

if err != nil {
return false, grpcstatus.Errorf(codes.NotFound, "task not found")
return false, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

if task.Status != avsproto.TaskStatus_Active {
Expand Down
18 changes: 18 additions & 0 deletions core/taskengine/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package taskengine

const (
TaskNotFoundError = "task not found"

InvalidSmartAccountAddressError = "invalid smart account address"
InvalidFactoryAddressError = "invalid factory address"
InvalidSmartAccountSaltError = "invalid salt value"
SmartAccountCreationError = "cannot determine smart wallet address"
NonceFetchingError = "cannot determine nonce for smart wallet"

MissingSmartWalletAddressError = "Missing smart_wallet_address"

StorageUnavailableError = "storage is not ready"
StorageWriteError = "cannot write to storage"

TaskStorageCorruptedError = "task data storage is corrupted"
)
7 changes: 4 additions & 3 deletions core/taskengine/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,18 @@ func (c *ContractProcessor) Perform(job *apqueue.Job) error {
// Process entrypoint node, then from the next pointer, and flow of the node, we will follow the chain of execution
action := task.Nodes[0]

if action.ContractExecution == nil {
// TODO: move to vm.go
if action.GetContractWrite() == nil {
err := fmt.Errorf("invalid task action")
task.AppendExecution(currentTime.Unix(), "", err)
task.SetFailed()
return err
}

userOpCalldata, e := aa.PackExecute(
common.HexToAddress(action.ContractExecution.ContractAddress),
common.HexToAddress(action.GetContractWrite().ContractAddress),
big.NewInt(0),
common.FromHex(action.ContractExecution.CallData),
common.FromHex(action.GetContractWrite().CallData),
)
//calldata := common.FromHex("b61d27f600000000000000000000000069256ca54e6296e460dec7b29b7dcd97b81a3d55000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000044a9059cbb000000000000000000000000e0f7d11fd714674722d325cd86062a5f1882e13a0000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000000000000")

Expand Down
6 changes: 6 additions & 0 deletions core/taskengine/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func TaskUserKey(t *model.Task) []byte {
))
}

// Convert task status gRPC enum into the storage prefix
// c: completed. task is completed and no longer being check for trigger anymore
// f: failed. task is failed to executed, and no longer being check for trigger anymore
// x: executing. task is being execured currently.
// l: cancelled. task is cancelled by user, no longer being check for trigger
// a: actived. task is actived, and will be checked for triggering. task may had executed zero or more time depend on repeatable or not
func TaskStatusToStorageKey(v avsproto.TaskStatus) string {
switch v {
case 1:
Expand Down
Loading
Loading