Skip to content

Commit

Permalink
Internalize logic to stage FinalizeBlock events
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Sep 27, 2024
1 parent 7193a10 commit 2e7d9c9
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 94 deletions.
1 change: 1 addition & 0 deletions proto/dydxprotocol/clob/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent {
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
StreamOrderbookUpdate orderbook_update = 3;
}
}
6 changes: 3 additions & 3 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 58 additions & 38 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,38 +391,21 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 {
}

// Stage a subaccount update event in transient store, during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
// If not `DeliverTx`, return since we don't stream optimistic subaccount updates.
if !lib.IsDeliverTxMode(ctx) {
return
}
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

// Stage a fill event in transient store, during `FinalizeBlock`.
// Since `FinalizeBlock` code block can be called more than once with optimistic
// execution (once optimistically and optionally once on the canonical block),
// we need to stage the events in transient store and later emit them
// during `Precommit`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
lib.AssertDeliverTxMode(ctx)
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}

sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
Expand Down Expand Up @@ -545,17 +528,38 @@ func getStreamUpdatesFromOffchainUpdates(
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode)
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, ctx.ExecMode())
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates)
if err != nil {
panic(err)
}
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
},
}
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
Expand Down Expand Up @@ -595,28 +599,44 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// SendOrderbookFillUpdate groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// TODO
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
[]clobtypes.StreamOrderbookFill{orderbookFill},
blockHeight,
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &orderbookFill,
},
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
}

// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
Expand Down
16 changes: 5 additions & 11 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
}
Expand Down Expand Up @@ -79,19 +79,13 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
func (sm *NoopGrpcStreamingManager) Stop() {
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
}

func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
return nil
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate(
func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
Expand Down
14 changes: 5 additions & 9 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ type FullNodeStreamingManager interface {
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
)
SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)
SendTakerOrderStatus(
Expand All @@ -50,11 +50,7 @@ type FullNodeStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
)
StageFinalizeBlockSubaccountUpdate(
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
)
Expand Down
4 changes: 2 additions & 2 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates(
) {
}

func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
func (f *FakeMemClobKeeper) SendOrderbookFillUpdate(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
orderbookFill types.StreamOrderbookFill,
) {
}

Expand Down
17 changes: 7 additions & 10 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,22 +310,19 @@ func (k Keeper) SendOrderbookUpdates(
k.GetFullNodeStreamingManager().SendOrderbookUpdates(
offchainUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
ctx,
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
// SendOrderbookFillUpdate sends the orderbook fills to the Full Node streaming manager.
func (k Keeper) SendOrderbookFillUpdate(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
orderbookFill types.StreamOrderbookFill,
) {
if len(orderbookFills) == 0 {
return
}
k.GetFullNodeStreamingManager().SendOrderbookFillUpdates(
orderbookFills,
k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
orderbookFill,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
ctx,
k.PerpetualIdToClobPairId,
)
}
Expand Down
18 changes: 10 additions & 8 deletions protocol/x/clob/keeper/process_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,11 @@ func (k Keeper) PersistMatchOrdersToState(
makerOrders,
)

k.GetFullNodeStreamingManager().StageFinalizeBlockFill(
ctx,
k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
streamOrderbookFill,
uint32(ctx.BlockHeight()),
ctx,
k.PerpetualIdToClobPairId,
)
}

Expand Down Expand Up @@ -670,9 +672,11 @@ func (k Keeper) PersistMatchLiquidationToState(
takerOrder,
makerOrders,
)
k.GetFullNodeStreamingManager().StageFinalizeBlockFill(
ctx,
k.GetFullNodeStreamingManager().SendOrderbookFillUpdate(
streamOrderbookFill,
uint32(ctx.BlockHeight()),
ctx,
k.PerpetualIdToClobPairId,
)
}
return nil
Expand Down Expand Up @@ -843,11 +847,9 @@ func (k Keeper) PersistMatchDeleveragingToState(
},
},
}
k.SendOrderbookFillUpdates(
k.SendOrderbookFillUpdate(
ctx,
[]types.StreamOrderbookFill{
streamOrderbookFill,
},
streamOrderbookFill,
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches(
)
clobMatch := internalOperation.GetMatch()
orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerOrders)
m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill})
m.clobKeeper.SendOrderbookFillUpdate(ctx, orderbookMatchFill)
}

// Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism.
Expand Down
4 changes: 2 additions & 2 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ type MemClobKeeper interface {
ctx sdk.Context,
offchainUpdates *OffchainUpdates,
)
SendOrderbookFillUpdates(
SendOrderbookFillUpdate(
ctx sdk.Context,
orderbookFills []StreamOrderbookFill,
orderbookFill StreamOrderbookFill,
)
SendTakerOrderStatus(
ctx sdk.Context,
Expand Down
Loading

0 comments on commit 2e7d9c9

Please sign in to comment.