diff --git a/cmd/runner/main.go b/cmd/runner/main.go index d932890..932bf36 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -12,6 +12,7 @@ import ( "time" "github.com/alitto/pond" + "github.com/pborman/uuid" "github.com/uber-go/tally/v4/prometheus" sdktally "go.temporal.io/sdk/contrib/tally" "go.uber.org/automaxprocs/maxprocs" @@ -21,6 +22,7 @@ import ( var nWorfklows = flag.Int("c", 10, "concurrent workflows") var sWorkflow = flag.String("t", "", "workflow type") +var sSignalType = flag.String("s", "", "signal type") var bWait = flag.Bool("w", true, "wait for workflows to complete") var sNamespace = flag.String("n", "default", "namespace") var sTaskQueue = flag.String("tq", "benchmark", "task queue") @@ -107,17 +109,41 @@ func main() { pool := pond.New(*nWorfklows, 0) + var starter func() (client.WorkflowRun, error) + + if *sSignalType != "" { + starter = func() (client.WorkflowRun, error) { + wID := uuid.New() + return c.SignalWithStartWorkflow( + context.Background(), + wID, + *sSignalType, + nil, + client.StartWorkflowOptions{ + ID: wID, + TaskQueue: *sTaskQueue, + }, + *sWorkflow, + input..., + ) + } + } else { + starter = func() (client.WorkflowRun, error) { + return c.ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: *sTaskQueue, + }, + *sWorkflow, + input..., + ) + } + } + go (func() { for { pool.Submit(func() { - wf, err := c.ExecuteWorkflow( - context.Background(), - client.StartWorkflowOptions{ - TaskQueue: *sTaskQueue, - }, - *sWorkflow, - input..., - ) + wf, err := starter() if err != nil { log.Println("Unable to start workflow", err) return diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 78815c6..ee9df88 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -105,6 +105,7 @@ func main() { w := worker.New(c, taskQueue, workerOptions) w.RegisterWorkflowWithOptions(workflows.ExecuteActivityWorkflow, workflow.RegisterOptions{Name: "ExecuteActivity"}) + w.RegisterWorkflowWithOptions(workflows.ReceiveSignalWorkflow, workflow.RegisterOptions{Name: "ReceiveSignal"}) w.RegisterActivityWithOptions(activities.SleepActivity, activity.RegisterOptions{Name: "Sleep"}) w.RegisterActivityWithOptions(activities.EchoActivity, activity.RegisterOptions{Name: "Echo"}) diff --git a/deployment.yaml b/deployment.yaml index 20d59d6..94d83fd 100644 --- a/deployment.yaml +++ b/deployment.yaml @@ -18,7 +18,7 @@ spec: component: workers spec: containers: - - image: ghcr.io/temporalio/benchmark-workers:main + - image: ghcr.io/temporalio/benchmark-workers:latest imagePullPolicy: Always name: benchmark-workers env: @@ -54,7 +54,7 @@ spec: component: soak-test spec: containers: - - image: ghcr.io/temporalio/benchmark-workers:main + - image: ghcr.io/temporalio/benchmark-workers:latest imagePullPolicy: Always name: benchmark-soak-test env: diff --git a/workflows/workflow.go b/workflows/workflow.go index 02820bf..8a624c0 100644 --- a/workflows/workflow.go +++ b/workflows/workflow.go @@ -12,6 +12,11 @@ type ExecuteActivityWorkflowInput struct { Input interface{} } +type ReceiveSignalWorkflowInput struct { + Count int + Name string +} + func ExecuteActivityWorkflow(ctx workflow.Context, input ExecuteActivityWorkflowInput) error { ao := workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, @@ -27,3 +32,15 @@ func ExecuteActivityWorkflow(ctx workflow.Context, input ExecuteActivityWorkflow return nil } + +func ReceiveSignalWorkflow(ctx workflow.Context, input ReceiveSignalWorkflowInput) error { + ch := workflow.GetSignalChannel(ctx, input.Name) + + for i := 0; i < input.Count; i++ { + var data interface{} + + ch.Receive(ctx, &data) + } + + return nil +}