-
Notifications
You must be signed in to change notification settings - Fork 2
/
cloudtasks.go
138 lines (120 loc) · 3.49 KB
/
cloudtasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package appwrap
import (
"context"
"fmt"
"net/http"
"net/url"
"runtime"
"sync"
"cloud.google.com/go/cloudtasks/apiv2"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
)
type cloudTaskqueue struct {
ctx context.Context
client cloudTasksClient
location CloudTasksLocation
project string
}
type cloudTasksClient interface {
CreateTask(ctx context.Context, req *taskspb.CreateTaskRequest, opts ...gax.CallOption) (*taskspb.Task, error)
}
var (
// This needs to be a pointer to guarantee atomic reads/writes to the value inside newCloudTaskqueue
tqClient *cloudTasksClient
tqClientMtx sync.Mutex
)
const (
concurrentReq = 12
queuePathFmt = "projects/%s/locations/%s/queues/%s"
taskNameFmt = "projects/%s/locations/%s/queues/%s/tasks/%s"
)
func newCloudTaskqueue(c context.Context, loc CloudTasksLocation) Taskqueue {
if c == nil {
return newDevTaskqueue()
}
if tqClient == nil {
tqClientMtx.Lock()
defer tqClientMtx.Unlock()
if tqClient == nil {
totalConnPool := runtime.GOMAXPROCS(0) * concurrentReq
o := []option.ClientOption{
// Options borrowed from construction of the datastore client
option.WithGRPCConnectionPool(totalConnPool),
}
if rawClient, err := cloudtasks.NewClient(c, o...); err != nil {
panic(fmt.Sprintf("creating taskqueue client: %s", err))
} else {
var client cloudTasksClient = rawClient // convert to cloudTasksClient interface
tqClient = &client
}
}
}
aeInfo := NewAppengineInfoFromContext(c)
return cloudTaskqueue{
client: *tqClient,
ctx: c,
project: aeInfo.NativeProjectID(),
location: loc,
}
}
func newDevTaskqueue() Taskqueue {
return cloudTaskqueue{}
}
func (t cloudTaskqueue) getFullQueueName(queueName string) string {
return fmt.Sprintf(queuePathFmt, t.project, t.location, queueName)
}
func (t cloudTaskqueue) Add(c context.Context, task CloudTask, queueName string) (CloudTask, error) {
newTask, err := t.createTask(task.Copy(), queueName)
switch task.(type) {
case *cloudTaskAppEngineImpl:
return &cloudTaskAppEngineImpl{
task: newTask,
}, err
case *cloudTaskHttpImpl:
return &cloudTaskHttpImpl{
task: newTask,
}, err
}
panic("Only AppEngine and Http target tasks are supported")
}
func (t cloudTaskqueue) AddMulti(c context.Context, tasks []CloudTask, queueName string) ([]CloudTask, error) {
errList := make(MultiError, len(tasks))
addedTasks := make([]CloudTask, len(tasks))
var haveErr bool
for i, task := range tasks {
addedTasks[i], errList[i] = t.Add(c, task, queueName)
if errList[i] != nil {
haveErr = true
}
}
if haveErr {
return addedTasks, errList
}
return addedTasks, nil
}
func (t cloudTaskqueue) NewAppEngineCloudTask(path string, params url.Values) AppEngineTask {
task := NewAppEngineTask()
h := make(http.Header)
h.Set("Content-Type", "application/x-www-form-urlencoded")
task.SetMethod("POST")
task.SetPayload([]byte(params.Encode()))
task.SetHeader(h)
task.SetPath(path)
return task
}
func (t cloudTaskqueue) createTask(task CloudTask, queueName string) (*taskspb.Task, error) {
var googleTask *taskspb.Task
taskCopy := task.Copy()
switch task.(type) {
case *cloudTaskAppEngineImpl:
googleTask = taskCopy.(*cloudTaskAppEngineImpl).task
case *cloudTaskHttpImpl:
googleTask = taskCopy.(*cloudTaskHttpImpl).task
}
return t.client.CreateTask(context.Background(), &taskspb.CreateTaskRequest{
Task: googleTask,
Parent: t.getFullQueueName(queueName),
})
}