forked from onflow/flow-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.go
199 lines (173 loc) · 6.21 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// (c) 2019 Dapper Labs - ALL RIGHTS RESERVED
package ingestion
import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
)
// defaultGuaranteeQueueCapacity maximum capacity of pending events queue, everything above will be dropped
const defaultGuaranteeQueueCapacity = 1000
// defaultIngestionEngineWorkers number of goroutines engine will use for processing events
const defaultIngestionEngineWorkers = 3
// Engine represents the ingestion engine, used to funnel collections from a
// cluster of collection nodes to the set of consensus nodes. It represents the
// link between collection nodes and consensus nodes and has a counterpart with
// the same engine ID in the collection node.
type Engine struct {
component.Component
log zerolog.Logger // used to log relevant actions with context
me module.Local // used to access local node information
con network.Conduit // conduit to receive/send guarantees
core *Core // core logic of processing guarantees
pendingGuarantees engine.MessageStore // message store of pending events
messageHandler *engine.MessageHandler // message handler for incoming events
}
// New creates a new collection propagation engine.
func New(
log zerolog.Logger,
engineMetrics module.EngineMetrics,
net network.Network,
me module.Local,
core *Core,
) (*Engine, error) {
logger := log.With().Str("ingestion", "engine").Logger()
guaranteesQueue, err := fifoqueue.NewFifoQueue(
defaultGuaranteeQueueCapacity,
fifoqueue.WithLengthObserver(func(len int) { core.mempool.MempoolEntries(metrics.ResourceCollectionGuaranteesQueue, uint(len)) }),
)
if err != nil {
return nil, fmt.Errorf("could not create guarantees queue: %w", err)
}
pendingGuarantees := &engine.FifoMessageStore{
FifoQueue: guaranteesQueue,
}
handler := engine.NewMessageHandler(
logger,
engine.NewNotifier(),
engine.Pattern{
Match: func(msg *engine.Message) bool {
_, ok := msg.Payload.(*flow.CollectionGuarantee)
if ok {
engineMetrics.MessageReceived(metrics.EngineConsensusIngestion, metrics.MessageCollectionGuarantee)
}
return ok
},
Store: pendingGuarantees,
},
)
// initialize the propagation engine with its dependencies
e := &Engine{
log: logger,
me: me,
core: core,
pendingGuarantees: pendingGuarantees,
messageHandler: handler,
}
componentManagerBuilder := component.NewComponentManagerBuilder()
for i := 0; i < defaultIngestionEngineWorkers; i++ {
componentManagerBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
err := e.loop(ctx)
if err != nil {
ctx.Throw(err)
}
})
}
e.Component = componentManagerBuilder.Build()
// register the engine with the network layer and store the conduit
con, err := net.Register(channels.ReceiveGuarantees, e)
if err != nil {
return nil, fmt.Errorf("could not register engine: %w", err)
}
e.con = con
return e, nil
}
// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
err := e.ProcessLocal(event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}
// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
err := e.Process(channel, originID, event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}
// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.messageHandler.Process(e.me.NodeID(), event)
}
// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns error only in unexpected scenario.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
err := e.messageHandler.Process(originID, event)
if err != nil {
if engine.IsIncompatibleInputTypeError(err) {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
return fmt.Errorf("unexpected error while processing engine message: %w", err)
}
return nil
}
// processAvailableMessages processes the given ingestion engine event.
func (e *Engine) processAvailableMessages(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default: // fall through to business logic
}
msg, ok := e.pendingGuarantees.Get()
if ok {
originID := msg.OriginID
err := e.core.OnGuarantee(originID, msg.Payload.(*flow.CollectionGuarantee))
if err != nil {
if engine.IsInvalidInputError(err) {
e.log.Error().Str("origin", originID.String()).Err(err).Msg("received invalid collection guarantee")
return nil
}
if engine.IsOutdatedInputError(err) {
e.log.Warn().Str("origin", originID.String()).Err(err).Msg("received outdated collection guarantee")
return nil
}
if engine.IsUnverifiableInputError(err) {
e.log.Warn().Str("origin", originID.String()).Err(err).Msg("received unverifiable collection guarantee")
return nil
}
return fmt.Errorf("processing collection guarantee unexpected err: %w", err)
}
continue
}
// when there is no more messages in the queue, back to the loop to wait
// for the next incoming message to arrive.
return nil
}
}
func (e *Engine) loop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-e.messageHandler.GetNotifier():
err := e.processAvailableMessages(ctx)
if err != nil {
return fmt.Errorf("internal error processing queued message: %w", err)
}
}
}
}