-
Notifications
You must be signed in to change notification settings - Fork 28
/
multiple-active-streams.ts
148 lines (131 loc) · 4.52 KB
/
multiple-active-streams.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import { DFUSE_API_KEY, runMain, DFUSE_API_NETWORK } from "../../config"
import {
createDfuseClient,
waitFor,
Stream,
dynamicMessageDispatcher,
ActionTraceInboundMessage,
OnStreamMessage,
} from "@dfuse/client"
type BuyRamBytesData = {
bytes: number
receiver: string
payer: string
}
type TransferData = {
from: string
to: string
quantity: string
memo: string
}
/**
* In this example, we showcase how to have multiple active streams
* at the same time. We will listen for `eosio::buyrambytes` action
* on one stream, and for `eosio.token::transfer` notifications performed
* on receiver `eosio.ram`.
*
* We will also show the differences and impacts of having two separate
* streams, instead of a single one by implementing a single stream that
* listens for both actions in one pass.
*
* You will learn how to have multiple active streams, that multiple
* active streams are independent from each other and that the ordering of messages
* across streams is not guaranteed.
*
* You will also see how to workaround this problem in some circumstances
* by creating a merged stream, filtering required messages from
* a pool of possibilities. Having a single stream will always guarantee the
* ordering of messages.
*/
async function main(): Promise<void> {
const client = createDfuseClient({
apiKey: DFUSE_API_KEY,
network: DFUSE_API_NETWORK,
})
const buyRamData = { accounts: "eosio", action_names: "buyrambytes" }
const buyRamStream: Stream = await client.streamActionTraces(
buyRamData,
dynamicMessageDispatcher({
listening: onListeningFactory("buy_ram"),
action_trace: onBuyRamAction,
})
)
const ramData = { accounts: "eosio.token", action_names: "transfer", receivers: "eosio.ram" }
const ramStream: Stream = await client.streamActionTraces(
ramData,
dynamicMessageDispatcher({
listening: onListeningFactory("ram_transfer"),
action_trace: onTransferToEosioRamAction,
})
)
console.log(
"Notice how `Buy RAM` and `RAM cost` happens in random order, due to using 2 independent streams."
)
await waitFor(60000)
await buyRamStream.close()
await ramStream.close()
console.log("")
const mergedData = {
accounts: "eosio|eosio.token",
action_names: "buyrambytes|transfer",
receivers: "eosio|eosio.token|eosio.ram",
}
const mergedStream: Stream = await client.streamActionTraces(
mergedData,
dynamicMessageDispatcher({
listening: onListeningFactory("merged"),
action_trace: onMergedAction,
})
)
console.log(
"Notice how `Buy RAM` is always before `RAM cost` thanks to the strict ordering of a single stream."
)
await waitFor(60000)
await mergedStream.close()
client.release()
}
function onListeningFactory(tag: string): OnStreamMessage {
return () => {
console.log(`Stream [${tag}] is now listening.`)
}
}
function onBuyRamAction(message: ActionTraceInboundMessage<BuyRamBytesData>): void {
const data = message.data.trace.act.data
console.log(`Buy RAM: ${data.payer} pays ${data.bytes} bytes to ${data.receiver}`)
}
function onTransferToEosioRamAction(message: ActionTraceInboundMessage<TransferData>): void {
const data = message.data.trace.act.data
console.log(`RAM cost: ${data.from} pays ${data.quantity} for the RAM`)
}
/**
* This is coming from a stream with multiple possibilities. The default
* logic is that you will receive any action matching one of the various
* combination of forming the three parameters `account/action/receiver`.
*
* In most use cases, you only care about a subset of the
* combinations, as in our example here where we only care about
* two possibilities.
*
* When using a merged stream, you have a strict ordering of the
* action as they appear on the chain, in the correct order. So
* buy ram will come in after `eosio.ram` transfer action (as our
* current `newaccount` action is implemented, might be different in
* the future on a different side/siste chain).
*/
function onMergedAction(message: ActionTraceInboundMessage): void {
const action = message.data.trace.act
if (action.account === "eosio" && action.name === "buyrambytes") {
onBuyRamAction(message as ActionTraceInboundMessage<BuyRamBytesData>)
return
}
if (
action.account === "eosio.token" &&
action.name === "transfer" &&
message.data.trace.receipt.receiver === "eosio.ram"
) {
onTransferToEosioRamAction(message as ActionTraceInboundMessage<TransferData>)
return
}
// We don't care about any other possibilities, so let's discard them
}
runMain(main)