Skip to content

Commit

Permalink
Add SignalToStart support.
Browse files Browse the repository at this point in the history
  • Loading branch information
robholland committed Jan 3, 2024
1 parent 7820809 commit fe9be01
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 10 deletions.
42 changes: 34 additions & 8 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/alitto/pond"
"github.com/pborman/uuid"
"github.com/uber-go/tally/v4/prometheus"
sdktally "go.temporal.io/sdk/contrib/tally"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -21,6 +22,7 @@ import (

var nWorfklows = flag.Int("c", 10, "concurrent workflows")
var sWorkflow = flag.String("t", "", "workflow type")
var sSignalType = flag.String("s", "", "signal type")
var bWait = flag.Bool("w", true, "wait for workflows to complete")
var sNamespace = flag.String("n", "default", "namespace")
var sTaskQueue = flag.String("tq", "benchmark", "task queue")
Expand Down Expand Up @@ -107,17 +109,41 @@ func main() {

pool := pond.New(*nWorfklows, 0)

var starter func() (client.WorkflowRun, error)

if *sSignalType != "" {
starter = func() (client.WorkflowRun, error) {
wID := uuid.New()
return c.SignalWithStartWorkflow(
context.Background(),
wID,
*sSignalType,
nil,
client.StartWorkflowOptions{
ID: wID,
TaskQueue: *sTaskQueue,
},
*sWorkflow,
input...,
)
}
} else {
starter = func() (client.WorkflowRun, error) {
return c.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{
TaskQueue: *sTaskQueue,
},
*sWorkflow,
input...,
)
}
}

go (func() {
for {
pool.Submit(func() {
wf, err := c.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{
TaskQueue: *sTaskQueue,
},
*sWorkflow,
input...,
)
wf, err := starter()
if err != nil {
log.Println("Unable to start workflow", err)
return
Expand Down
1 change: 1 addition & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {
w := worker.New(c, taskQueue, workerOptions)

w.RegisterWorkflowWithOptions(workflows.ExecuteActivityWorkflow, workflow.RegisterOptions{Name: "ExecuteActivity"})
w.RegisterWorkflowWithOptions(workflows.ReceiveSignalWorkflow, workflow.RegisterOptions{Name: "ReceiveSignal"})
w.RegisterActivityWithOptions(activities.SleepActivity, activity.RegisterOptions{Name: "Sleep"})
w.RegisterActivityWithOptions(activities.EchoActivity, activity.RegisterOptions{Name: "Echo"})

Expand Down
4 changes: 2 additions & 2 deletions deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
component: workers
spec:
containers:
- image: ghcr.io/temporalio/benchmark-workers:main
- image: ghcr.io/temporalio/benchmark-workers:latest
imagePullPolicy: Always
name: benchmark-workers
env:
Expand Down Expand Up @@ -54,7 +54,7 @@ spec:
component: soak-test
spec:
containers:
- image: ghcr.io/temporalio/benchmark-workers:main
- image: ghcr.io/temporalio/benchmark-workers:latest
imagePullPolicy: Always
name: benchmark-soak-test
env:
Expand Down
17 changes: 17 additions & 0 deletions workflows/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type ExecuteActivityWorkflowInput struct {
Input interface{}
}

type ReceiveSignalWorkflowInput struct {
Count int
Name string
}

func ExecuteActivityWorkflow(ctx workflow.Context, input ExecuteActivityWorkflowInput) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
Expand All @@ -27,3 +32,15 @@ func ExecuteActivityWorkflow(ctx workflow.Context, input ExecuteActivityWorkflow

return nil
}

func ReceiveSignalWorkflow(ctx workflow.Context, input ReceiveSignalWorkflowInput) error {
ch := workflow.GetSignalChannel(ctx, input.Name)

for i := 0; i < input.Count; i++ {
var data interface{}

ch.Receive(ctx, &data)
}

return nil
}

0 comments on commit fe9be01

Please sign in to comment.