Skip to content

Commit

Permalink
implement the new trigger, node, edge structure
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Nov 14, 2024
1 parent 7ea1dee commit 460df3c
Show file tree
Hide file tree
Showing 26 changed files with 4,771 additions and 2,523 deletions.
4 changes: 1 addition & 3 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
// Open and setup our database
func (agg *Aggregator) initDB(ctx context.Context) error {
var err error
agg.db, err = storage.New(&storage.Config{
Path: agg.config.DbPath,
})
agg.db, err = storage.NewWithPath(agg.config.DbPath)

if err != nil {
panic(err)
Expand Down
6 changes: 1 addition & 5 deletions aggregator/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package aggregator
import (
"context"
"fmt"
"math/big"
"strings"
"time"

"github.com/AvaProtocol/ap-avs/core/auth"
"github.com/AvaProtocol/ap-avs/core/chainio/aa"
"github.com/AvaProtocol/ap-avs/model"
avsproto "github.com/AvaProtocol/ap-avs/protobuf"
"github.com/ethereum/go-ethereum/accounts"
Expand Down Expand Up @@ -131,11 +129,9 @@ func (r *RpcServer) verifyAuth(ctx context.Context) (*model.User, error) {
Address: common.HexToAddress(claims["sub"].(string)),
}

smartAccountAddress, err := aa.GetSenderAddress(r.ethrpc, user.Address, big.NewInt(0))
if err != nil {
if err := user.LoadDefaultSmartWallet(r.smartWalletRpc); err != nil {
return nil, fmt.Errorf("Rpc error")
}
user.SmartAccountAddress = smartAccountAddress

return &user, nil
}
Expand Down
10 changes: 10 additions & 0 deletions aggregator/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ func (agg *Aggregator) stopRepl() {
}

}

// Repl allow an operator to look into node storage directly with a REPL interface.
// It doesn't listen via TCP socket but directly unix socket on file system.
func (agg *Aggregator) startRepl() {
var err error

if _, err := os.Stat(agg.config.SocketPath); err == nil {
// File exists, most likely result of a previous crash without cleaning, attempt to delete
os.Remove(agg.config.SocketPath)
}
repListener, err = net.Listen("unix", agg.config.SocketPath)

if err != nil {
return
}
Expand All @@ -48,6 +57,7 @@ func handleConnection(agg *Aggregator, conn net.Conn) {

reader := bufio.NewReader(conn)
fmt.Fprintln(conn, "AP CLI REPL")
fmt.Fprintln(conn, "Use `list <prefix>*` to list key, `get <key>` to inspect content ")
fmt.Fprintln(conn, "-------------------------")

for {
Expand Down
2 changes: 1 addition & 1 deletion aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (r *RpcServer) GetTask(ctx context.Context, taskID *avsproto.UUID) (*avspro
"taskID", string(taskID.Bytes),
)

task, err := r.engine.GetTaskByUser(user, string(taskID.Bytes))
task, err := r.engine.GetTask(user, string(taskID.Bytes))
if err != nil {
return nil, err
}
Expand Down
73 changes: 37 additions & 36 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -220,6 +221,7 @@ func (n *Engine) CreateSmartWallet(user *model.User, payload *avsproto.CreateWal
func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskReq) (*model.Task, error) {
var err error

fmt.Println("user", user)
if taskPayload.SmartWalletAddress != "" {
if !ValidWalletAddress(taskPayload.SmartWalletAddress) {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
Expand All @@ -232,7 +234,7 @@ func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskRe
task, err := model.NewTaskFromProtobuf(user, taskPayload)

if err != nil {
return nil, err
return nil, status.Errorf(codes.Code(avsproto.Error_TaskDataMissingError), err.Error())
}

updates := map[string][]byte{}
Expand Down Expand Up @@ -349,19 +351,21 @@ func (n *Engine) AggregateChecksResult(address string, ids []string) error {
func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksReq) ([]*avsproto.Task, error) {
// by default show the task from the default smart wallet, if proving we look into that wallet specifically
owner := user.SmartAccountAddress
if payload.SmartWalletAddress != "" {
if !ValidWalletAddress(payload.SmartWalletAddress) {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}
if payload.SmartWalletAddress == "" {
return nil, status.Errorf(codes.InvalidArgument, MissingSmartWalletAddressError)
}

if valid, _ := ValidWalletOwner(n.db, user, common.HexToAddress(payload.SmartWalletAddress)); !valid {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}
if !ValidWalletAddress(payload.SmartWalletAddress) {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}

smartWallet := common.HexToAddress(payload.SmartWalletAddress)
owner = &smartWallet
if valid, _ := ValidWalletOwner(n.db, user, common.HexToAddress(payload.SmartWalletAddress)); !valid {
return nil, status.Errorf(codes.InvalidArgument, InvalidSmartAccountAddressError)
}

smartWallet := common.HexToAddress(payload.SmartWalletAddress)
owner = &smartWallet

taskIDs, err := n.db.GetByPrefix(SmartWalletTaskStoragePrefix(user.Address, *owner))

if err != nil {
Expand All @@ -377,53 +381,50 @@ func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksRe
continue
}

task := &model.Task{
ID: taskID,
Owner: user.Address.Hex(),
}
task := model.NewTask()
if err := task.FromStorageData(taskRawByte); err != nil {
continue
}
task.ID = taskID

tasks[i], _ = task.ToProtoBuf()
}

return tasks, nil
}

func (n *Engine) GetTaskByUser(user *model.User, taskID string) (*model.Task, error) {
task := &model.Task{
ID: taskID,
Owner: user.Address.Hex(),
}

// Get Task Status
rawStatus, err := n.db.GetKey([]byte(TaskUserKey(task)))
if err != nil {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}
status, _ := strconv.Atoi(string(rawStatus))
func (n *Engine) GetTaskByID(taskID string) (*model.Task, error) {
for status, _ := range avsproto.TaskStatus_name {
if rawTaskData, err := n.db.GetKey(TaskStorageKey(taskID, avsproto.TaskStatus(status))); err == nil {
task := model.NewTask()
err = task.FromStorageData(rawTaskData)

taskRawByte, err := n.db.GetKey(TaskStorageKey(taskID, avsproto.TaskStatus(status)))
if err == nil {
return task, nil
}

if err != nil {
taskRawByte, err = n.db.GetKey([]byte(
TaskStorageKey(taskID, avsproto.TaskStatus_Executing),
))
if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_TaskDataCorrupted), TaskStorageCorruptedError)
}
}

err = task.FromStorageData(taskRawByte)
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

func (n *Engine) GetTask(user *model.User, taskID string) (*model.Task, error) {
task, err := n.GetTaskByID(taskID)
if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_TaskDataCorrupted), TaskStorageCorruptedError)
return nil, err
}

if strings.ToLower(task.Owner) != strings.ToLower(user.Address.Hex()) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

return task, nil
}

func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error) {
task, err := n.GetTaskByUser(user, taskID)
task, err := n.GetTask(user, taskID)

if err != nil {
return false, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
Expand All @@ -440,7 +441,7 @@ func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error)
}

func (n *Engine) CancelTaskByUser(user *model.User, taskID string) (bool, error) {
task, err := n.GetTaskByUser(user, taskID)
task, err := n.GetTask(user, taskID)

if err != nil {
return false, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
Expand Down
2 changes: 2 additions & 0 deletions core/taskengine/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
SmartAccountCreationError = "cannot determine smart wallet address"
NonceFetchingError = "cannot determine nonce for smart wallet"

MissingSmartWalletAddressError = "Missing smart_wallet_address"

StorageUnavailableError = "storage is not ready"
StorageWriteError = "cannot write to storage"

Expand Down
7 changes: 4 additions & 3 deletions core/taskengine/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,18 @@ func (c *ContractProcessor) Perform(job *apqueue.Job) error {
// Process entrypoint node, then from the next pointer, and flow of the node, we will follow the chain of execution
action := task.Nodes[0]

if action.ContractExecution == nil {
// TODO: move to vm.go
if action.GetContractWrite() == nil {
err := fmt.Errorf("invalid task action")
task.AppendExecution(currentTime.Unix(), "", err)
task.SetFailed()
return err
}

userOpCalldata, e := aa.PackExecute(
common.HexToAddress(action.ContractExecution.ContractAddress),
common.HexToAddress(action.GetContractWrite().ContractAddress),
big.NewInt(0),
common.FromHex(action.ContractExecution.CallData),
common.FromHex(action.GetContractWrite().CallData),
)
//calldata := common.FromHex("b61d27f600000000000000000000000069256ca54e6296e460dec7b29b7dcd97b81a3d55000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000044a9059cbb000000000000000000000000e0f7d11fd714674722d325cd86062a5f1882e13a0000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000000000000")

Expand Down
6 changes: 6 additions & 0 deletions core/taskengine/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func TaskUserKey(t *model.Task) []byte {
))
}

// Convert task status gRPC enum into the storage prefix
// c: completed. task is completed and no longer being check for trigger anymore
// f: failed. task is failed to executed, and no longer being check for trigger anymore
// x: executing. task is being execured currently.
// l: cancelled. task is cancelled by user, no longer being check for trigger
// a: actived. task is actived, and will be checked for triggering. task may had executed zero or more time depend on repeatable or not
func TaskStatusToStorageKey(v avsproto.TaskStatus) string {
switch v {
case 1:
Expand Down
20 changes: 20 additions & 0 deletions core/taskengine/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package taskengine

import (
"os"

"github.com/AvaProtocol/ap-avs/storage"
)

// Shortcut to initialize a storage at the given path, panic if we cannot create db
func TestMustDB() storage.Storage {
dir, err := os.MkdirTemp("", "aptest")
if err != nil {
panic(err)
}
db, err := storage.NewWithPath(dir)
if err != nil {
panic(err)
}
return db
}
2 changes: 1 addition & 1 deletion core/taskengine/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func ValidWalletAddress(address string) bool {

func ValidWalletOwner(db storage.Storage, u *model.User, smartWalletAddress common.Address) (bool, error) {
// the smart wallet adress is the default one
if u.Address.Hex() == smartWalletAddress.Hex() {
if u.SmartAccountAddress.Hex() == smartWalletAddress.Hex() {
return true, nil
}

Expand Down
50 changes: 50 additions & 0 deletions core/taskengine/validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package taskengine

import (
"testing"

"github.com/AvaProtocol/ap-avs/model"
"github.com/AvaProtocol/ap-avs/storage"
"github.com/ethereum/go-ethereum/common"
)

func TestWalletOwnerReturnTrueForDefaultAddress(t *testing.T) {
smartAddress := common.HexToAddress("0x5Df343de7d99fd64b2479189692C1dAb8f46184a")

result, err := ValidWalletOwner(nil, &model.User{
Address: common.HexToAddress("0xe272b72E51a5bF8cB720fc6D6DF164a4D5E321C5"),
SmartAccountAddress: &smartAddress,
}, common.HexToAddress("0x5Df343de7d99fd64b2479189692C1dAb8f46184a"))

if !result || err != nil {
t.Errorf("expect true, got false")
}
}

func TestWalletOwnerReturnTrueForNonDefaultAddress(t *testing.T) {
db := TestMustDB()
defer storage.Destroy(db.(*storage.BadgerStorage))

eoa := common.HexToAddress("0xe272b72E51a5bF8cB720fc6D6DF164a4D5E321C5")
defaultSmartWallet := common.HexToAddress("0x5Df343de7d99fd64b2479189692C1dAb8f46184a")
customSmartWallet := common.HexToAddress("0xdD85693fd14b522a819CC669D6bA388B4FCd158d")

result, err := ValidWalletOwner(db, &model.User{
Address: eoa,
SmartAccountAddress: &defaultSmartWallet,
}, customSmartWallet)
if result == true {
t.Errorf("expect 0xdD85693fd14b522a819CC669D6bA388B4FCd158d not owned by 0xe272b72E51a5bF8cB720fc6D6DF164a4D5E321C5, got true")
}

// setup wallet binding
db.Set([]byte(WalletStorageKey(eoa, customSmartWallet.Hex())), []byte("1"))

result, err = ValidWalletOwner(db, &model.User{
Address: eoa,
SmartAccountAddress: &defaultSmartWallet,
}, customSmartWallet)
if !result || err != nil {
t.Errorf("expect 0xdD85693fd14b522a819CC669D6bA388B4FCd158d owned by 0xe272b72E51a5bF8cB720fc6D6DF164a4D5E321C5, got false")
}
}
5 changes: 5 additions & 0 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package taskengine

// The VM is the core component that load the node information and execute them, yield finaly result
type VM struct {
}
39 changes: 36 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
# Ava Protocol Example
# Ava Protocol Examples

Example code on how to interact with Ava Protocol RPC server to create and
manage task.
Example codes on how to interact with Ava Protocol RPC server to create and
manage tasks.

Examples weren't written to be parameterized or extensible. Its only purpose
is to show how to run a specific example, and allow the audience to see
how the code will look like.

Therefore, the script is harded coded, there is no parameter to provide or anything.

If you need to change a parameter for a test, edit the code and re-run it.

# Available example

## Prepare depedencies

```
npm ci
```

Then run:

```
node example.js
```

it will list all available action to run.

## Setting env

```
export env=<development|staging|production>
export PRIVATE_KEY=<any-wallet-private-key>
```

The test example using a dummy token which anyone can mint https://sepolia.etherscan.io/address/0x2e8bdb63d09ef989a0018eeb1c47ef84e3e61f7b#writeProxyContract
Loading

0 comments on commit 460df3c

Please sign in to comment.