Skip to content

Commit

Permalink
reusing subscription ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 18, 2024
1 parent 9bd4b19 commit 803eeac
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct {

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
activeSubscriptionIds map[uint32]bool

// stream will batch and flush out messages every 10 ms.
ticker *time.Ticker
Expand Down Expand Up @@ -106,7 +106,7 @@ func NewFullNodeStreamingManager(
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
activeSubscriptionIds: make(map[uint32]bool),

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
Expand Down Expand Up @@ -170,6 +170,16 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
}
}

// getNextAvailableSubscriptionId returns next available subscription id. Assumes the
// lock has been acquired.
func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 {
id := uint32(0)
for _, inUse := sm.activeSubscriptionIds[id]; inUse; _, inUse = sm.activeSubscriptionIds[id] {
id = id + uint32(1)
}
return id
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
Expand All @@ -188,8 +198,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
}

subscriptionId := sm.getNextAvailableSubscriptionId()

subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
Expand All @@ -204,7 +217,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}
for _, subaccountId := range sIds {
Expand All @@ -215,7 +228,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append(
sm.subaccountIdToSubscriptionIdMapping[subaccountId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}

Expand All @@ -228,7 +241,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
),
)
sm.orderbookSubscriptions[subscription.subscriptionId] = subscription
sm.nextSubscriptionId++
sm.activeSubscriptionIds[subscription.subscriptionId] = true
sm.EmitMetrics()
sm.Unlock()

Expand Down Expand Up @@ -280,6 +293,7 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)
delete(sm.activeSubscriptionIds, subscriptionIdToRemove)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
Expand Down

0 comments on commit 803eeac

Please sign in to comment.