-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathqueue.go
69 lines (60 loc) · 2.37 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0
package asyncjobs
import (
"context"
"sync"
"time"
"github.com/nats-io/jsm.go/api"
)
// Queue represents a work queue
type Queue struct {
// Name is a unique name for the work queue, should be in the character range a-zA-Z0-9
Name string `json:"name"`
// MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire
MaxAge time.Duration `json:"max_age"`
// MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied.
MaxEntries int `json:"max_entries"`
// DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected
DiscardOld bool `json:"discard_old"`
// MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries
MaxTries int `json:"max_tries"`
// MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime
MaxRunTime time.Duration `json:"max_runtime"`
// MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent
MaxConcurrent int `json:"max_concurrent"`
// NoCreate will not try to create a queue, will bind to an existing one or fail
NoCreate bool
mu sync.Mutex
storage Storage
}
// QueueInfo holds information about a queue state
type QueueInfo struct {
// Name is the name of the queue
Name string `json:"name"`
// Time is the information was gathered
Time time.Time `json:"time"`
// Stream is the active JetStream Stream Information
Stream *api.StreamInfo `json:"stream_info"`
// Consumer is the worker stream information
Consumer *api.ConsumerInfo `json:"consumer_info"`
}
func (q *Queue) retryTaskByID(ctx context.Context, id string) error {
return q.storage.RetryTaskByID(ctx, q, id)
}
func (q *Queue) enqueueTask(ctx context.Context, task *Task) error {
task.Queue = q.Name
return q.storage.EnqueueTask(ctx, q, task)
}
func newDefaultQueue() *Queue {
return &Queue{
Name: "DEFAULT",
MaxRunTime: time.Minute,
MaxTries: 100,
MaxConcurrent: DefaultQueueMaxConcurrent,
MaxAge: 0,
DiscardOld: false,
mu: sync.Mutex{},
}
}