From 1fc74750c6a13d9eb22bf8b3d2b4e1dea3fe4662 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Tue, 24 Sep 2024 21:40:12 +0800 Subject: [PATCH 1/6] improve pickAvailableStream to continue looking for other available streams if any stream fails --- p2p/stream/common/requestmanager/requestmanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 923668a152..66f6af9960 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -79,7 +79,7 @@ func (rm *requestManager) Start() { } func (rm *requestManager) Close() { - rm.stopC <- struct{}{} + close(rm.stopC) } // DoRequest do the given request with a stream picked randomly. Return the response, stream id that From aa67dc86dd6e91300ee51af83e970e2069dd56c6 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 26 Sep 2024 20:15:26 +0800 Subject: [PATCH 2/6] add thread safe map to request manager in p2p stream layer --- p2p/stream/common/requestmanager/requestmanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 66f6af9960..923668a152 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -79,7 +79,7 @@ func (rm *requestManager) Start() { } func (rm *requestManager) Close() { - close(rm.stopC) + rm.stopC <- struct{}{} } // DoRequest do the given request with a stream picked randomly. Return the response, stream id that From 652ac13f965e69a14d70cf1edbbbab090234f75b Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 24 Oct 2024 08:33:59 +0800 Subject: [PATCH 3/6] add sort and clone methods to safe map --- p2p/stream/types/safe_map.go | 59 ++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/p2p/stream/types/safe_map.go b/p2p/stream/types/safe_map.go index e4a5e559c5..aeb2077894 100644 --- a/p2p/stream/types/safe_map.go +++ b/p2p/stream/types/safe_map.go @@ -1,6 +1,7 @@ package sttypes import ( + "sort" "sync" ) @@ -32,6 +33,32 @@ func NewSafeMapWithInitialValues[K comparable, V any](initialValues map[K]V) *Sa return m } +// Clone creates and returns a deep copy of the SafeMap +func (m *SafeMap[K, V]) Clone() *SafeMap[K, V] { + m.mu.RLock() + defer m.mu.RUnlock() + + clone := NewSafeMap[K, V]() + for k, v := range m.data { + clone.data[k] = v + } + + return clone +} + +// Snapshot returns a copy of the underlying map +func (m *SafeMap[K, V]) Snapshot() map[K]V { + m.mu.RLock() + defer m.mu.RUnlock() + + // Create a copy of the internal map for safe iteration + snapshot := make(map[K]V, len(m.data)) + for k, v := range m.data { + snapshot[k] = v + } + return snapshot +} + // Set inserts or updates a key-value pair in the map. func (m *SafeMap[K, V]) Set(key K, value V) { m.mu.Lock() @@ -94,3 +121,35 @@ func (m *SafeMap[K, V]) Clear() { defer m.mu.Unlock() m.data = make(map[K]V) // Reinitialize the map } + +// SortKeys returns a sorted clone of SafeMap. +func (m *SafeMap[K, V]) Sort(less func(i, j K) bool) *SafeMap[K, V] { + keys := m.Keys() + sort.Slice(keys, func(i, j int) bool { + return less(keys[i], keys[j]) + }) + // Create a sorted copy of the map + m.mu.RLock() + defer m.mu.RUnlock() + sorted := NewSafeMap[K, V]() + for _, k := range keys { + sorted.data[k] = m.data[k] + } + return sorted +} + +// SortedSnapshot returns a sorted copy of the underlying map +func (m *SafeMap[K, V]) SortedSnapshot(less func(i, j K) bool) map[K]V { + keys := m.Keys() + sort.Slice(keys, func(i, j int) bool { + return less(keys[i], keys[j]) + }) + // Create a sorted copy of the map + m.mu.RLock() + defer m.mu.RUnlock() + sorted := make(map[K]V, len(m.data)) + for _, k := range keys { + sorted[k] = m.data[k] + } + return sorted +} From 7164dd74c460761eb27f582d96ea18de0afc13c3 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 24 Oct 2024 08:35:51 +0800 Subject: [PATCH 4/6] add ID, setID to sync protocol --- p2p/stream/protocols/sync/client.go | 40 ++++++++++++++--------------- p2p/stream/types/interface.go | 4 +-- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/p2p/stream/protocols/sync/client.go b/p2p/stream/protocols/sync/client.go index 9ae9d5c8f8..6b980c9f8b 100644 --- a/p2p/stream/protocols/sync/client.go +++ b/p2p/stream/protocols/sync/client.go @@ -309,11 +309,11 @@ func newGetBlocksByNumberRequest(bns []uint64) *getBlocksByNumberRequest { } } -func (req *getBlocksByNumberRequest) ReqID() uint64 { +func (req *getBlocksByNumberRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getBlocksByNumberRequest) SetReqID(val uint64) { +func (req *getBlocksByNumberRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -384,11 +384,11 @@ func newGetBlockNumberRequest() *getBlockNumberRequest { } } -func (req *getBlockNumberRequest) ReqID() uint64 { +func (req *getBlockNumberRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getBlockNumberRequest) SetReqID(val uint64) { +func (req *getBlockNumberRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -433,11 +433,11 @@ func newGetBlockHashesRequest(bns []uint64) *getBlockHashesRequest { } } -func (req *getBlockHashesRequest) ReqID() uint64 { +func (req *getBlockHashesRequest) ID() uint64 { return req.pbReq.ReqId } -func (req *getBlockHashesRequest) SetReqID(val uint64) { +func (req *getBlockHashesRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -488,11 +488,11 @@ func newGetBlocksByHashesRequest(hashes []common.Hash) *getBlocksByHashesRequest } } -func (req *getBlocksByHashesRequest) ReqID() uint64 { +func (req *getBlocksByHashesRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getBlocksByHashesRequest) SetReqID(val uint64) { +func (req *getBlocksByHashesRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -562,11 +562,11 @@ func newGetNodeDataRequest(hashes []common.Hash) *getNodeDataRequest { } } -func (req *getNodeDataRequest) ReqID() uint64 { +func (req *getNodeDataRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getNodeDataRequest) SetReqID(val uint64) { +func (req *getNodeDataRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -626,11 +626,11 @@ func newGetReceiptsRequest(hashes []common.Hash) *getReceiptsRequest { } } -func (req *getReceiptsRequest) ReqID() uint64 { +func (req *getReceiptsRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getReceiptsRequest) SetReqID(val uint64) { +func (req *getReceiptsRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -706,11 +706,11 @@ func newGetAccountRangeRequest(root common.Hash, origin common.Hash, limit commo } } -func (req *getAccountRangeRequest) ReqID() uint64 { +func (req *getAccountRangeRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getAccountRangeRequest) SetReqID(val uint64) { +func (req *getAccountRangeRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -783,11 +783,11 @@ func newGetStorageRangesRequest(root common.Hash, accounts []common.Hash, origin } } -func (req *getStorageRangesRequest) ReqID() uint64 { +func (req *getStorageRangesRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getStorageRangesRequest) SetReqID(val uint64) { +func (req *getStorageRangesRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -858,11 +858,11 @@ func newGetByteCodesRequest(hashes []common.Hash, bytes uint64) *getByteCodesReq } } -func (req *getByteCodesRequest) ReqID() uint64 { +func (req *getByteCodesRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getByteCodesRequest) SetReqID(val uint64) { +func (req *getByteCodesRequest) SetID(val uint64) { req.pbReq.ReqId = val } @@ -931,11 +931,11 @@ func newGetTrieNodesRequest(root common.Hash, paths []*message.TrieNodePathSet, } } -func (req *getTrieNodesRequest) ReqID() uint64 { +func (req *getTrieNodesRequest) ID() uint64 { return req.pbReq.GetReqId() } -func (req *getTrieNodesRequest) SetReqID(val uint64) { +func (req *getTrieNodesRequest) SetID(val uint64) { req.pbReq.ReqId = val } diff --git a/p2p/stream/types/interface.go b/p2p/stream/types/interface.go index fce52c0650..cae0400ec5 100644 --- a/p2p/stream/types/interface.go +++ b/p2p/stream/types/interface.go @@ -22,8 +22,8 @@ type Protocol interface { // Request is the interface of a stream request used for common stream utils. type Request interface { - ReqID() uint64 - SetReqID(rid uint64) + ID() uint64 + SetID(rid uint64) String() string IsSupportedByProto(ProtoSpec) bool Encode() ([]byte, error) From ab1f686211646f70c81f7c1dec76b4d03513df1b Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 24 Oct 2024 08:37:20 +0800 Subject: [PATCH 5/6] each stream has capacity, add new getters to stream request manager --- p2p/stream/common/requestmanager/interface.go | 7 + .../common/requestmanager/interface_test.go | 25 +-- p2p/stream/common/requestmanager/options.go | 8 +- .../common/requestmanager/requestmanager.go | 152 ++++++++++-------- .../requestmanager/requestmanager_test.go | 14 +- p2p/stream/common/requestmanager/types.go | 118 ++++++++++---- .../common/requestmanager/types_test.go | 6 +- p2p/stream/common/streammanager/interface.go | 1 + .../common/streammanager/streammanager.go | 11 +- p2p/stream/protocols/sync/client_test.go | 8 + p2p/stream/protocols/sync/protocol.go | 12 +- 11 files changed, 238 insertions(+), 124 deletions(-) diff --git a/p2p/stream/common/requestmanager/interface.go b/p2p/stream/common/requestmanager/interface.go index e4ff8d0d51..65ba26ec1f 100644 --- a/p2p/stream/common/requestmanager/interface.go +++ b/p2p/stream/common/requestmanager/interface.go @@ -17,9 +17,16 @@ type Deliverer interface { DeliverResponse(stID sttypes.StreamID, resp sttypes.Response) } +type Streams interface { + Streams() []sttypes.Stream + NumStreams() int + AvailableCapacity() int +} + // RequestManager manages over the requests type RequestManager interface { p2ptypes.LifeCycle Requester Deliverer + Streams } diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index 6fc087103e..7cb4555554 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -74,6 +74,13 @@ func (sm *testStreamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream, return st, exist } +func (sm *testStreamManager) NumStreams() int { + sm.lock.Lock() + defer sm.lock.Unlock() + + return len(sm.streams) +} + type testStream struct { id sttypes.StreamID rm *requestManager @@ -138,14 +145,14 @@ func makeDummyTestStreams(indexes []int) []sttypes.Stream { return sts } -func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *stream] { - m := sttypes.NewSafeMap[sttypes.StreamID, *stream]() +func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *WorkerStream] { + m := sttypes.NewSafeMap[sttypes.StreamID, *WorkerStream]() for _, index := range indexes { st := &testStream{ id: makeStreamID(index), } - m.Set(st.ID(), &stream{Stream: st}) + m.Set(st.ID(), &WorkerStream{Stream: st}) } return m } @@ -166,11 +173,11 @@ func makeTestRequest(index uint64) *testRequest { } } -func (req *testRequest) ReqID() uint64 { +func (req *testRequest) ID() uint64 { return req.reqID } -func (req *testRequest) SetReqID(rid uint64) { +func (req *testRequest) SetID(rid uint64) { req.reqID = rid } @@ -180,10 +187,10 @@ func (req *testRequest) String() string { func (req *testRequest) Encode() ([]byte, error) { return rlp.EncodeToBytes(struct { - ReqID uint64 + ID uint64 Index uint64 }{ - ReqID: req.reqID, + ID: req.reqID, Index: req.index, }) } @@ -204,7 +211,7 @@ func (req *testRequest) checkResponse(rawResp sttypes.Response) error { func decodeTestRequest(b []byte) (*testRequest, error) { type SerRequest struct { - ReqID uint64 + ID uint64 Index uint64 } var sr SerRequest @@ -212,7 +219,7 @@ func decodeTestRequest(b []byte) (*testRequest, error) { return nil, err } return &testRequest{ - reqID: sr.ReqID, + reqID: sr.ID, index: sr.Index, }, nil } diff --git a/p2p/stream/common/requestmanager/options.go b/p2p/stream/common/requestmanager/options.go index f98bd098a4..6a5e4fce2e 100644 --- a/p2p/stream/common/requestmanager/options.go +++ b/p2p/stream/common/requestmanager/options.go @@ -7,12 +7,12 @@ import sttypes "github.com/harmony-one/harmony/p2p/stream/types" // 1. WithHighPriority // 2. WithBlacklist // 3. WithWhitelist -type RequestOption func(*request) +type RequestOption func(*WorkerRequest) // WithHighPriority is the request option to do request with higher priority. // High priority requests are done first. func WithHighPriority() RequestOption { - return func(req *request) { + return func(req *WorkerRequest) { req.priority = reqPriorityHigh } } @@ -20,7 +20,7 @@ func WithHighPriority() RequestOption { // WithBlacklist is the request option not to assign the request to the blacklisted // stream ID. func WithBlacklist(blacklist []sttypes.StreamID) RequestOption { - return func(req *request) { + return func(req *WorkerRequest) { for _, stid := range blacklist { req.addBlacklistedStream(stid) } @@ -31,7 +31,7 @@ func WithBlacklist(blacklist []sttypes.StreamID) RequestOption { // given stream IDs. // If a request is not with this option, all streams will be allowed. func WithWhitelist(whitelist []sttypes.StreamID) RequestOption { - return func(req *request) { + return func(req *WorkerRequest) { for _, stid := range whitelist { req.addWhiteListStream(stid) } diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 923668a152..dc80af5c3c 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -20,19 +20,20 @@ import ( // TODO: each peer is able to have a queue of requests instead of one request at a time. // TODO: add QoS evaluation for each stream type requestManager struct { - streams *sttypes.SafeMap[sttypes.StreamID, *stream] // All streams - available *sttypes.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request - pendings *sttypes.SafeMap[uint64, *request] // requests that are sent but not received response - waitings requestQueues // double linked list of requests that are on the waiting list + //streams *sttypes.SafeMap[sttypes.StreamID, *WorkerStream] // All streams + streams *sttypes.SafeMap[sttypes.StreamID, *WorkerStream] // Streams that are available for request + pendings *sttypes.SafeMap[uint64, *WorkerRequest] // requests that are sent but not received response + waitings requestQueues // double linked list of requests that are on the waiting list // Stream events sm streammanager.Reader newStreamC <-chan streammanager.EvtStreamAdded rmStreamC <-chan streammanager.EvtStreamRemoved + // Request events cancelReqC chan cancelReqData // request being canceled deliveryC chan responseData - newRequestC chan *request + newRequestC chan *WorkerRequest subs []event.Subscription logger zerolog.Logger @@ -56,17 +57,17 @@ func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager { logger := utils.Logger().With().Str("module", "request manager").Logger() return &requestManager{ - streams: sttypes.NewSafeMap[sttypes.StreamID, *stream](), - available: sttypes.NewSafeMap[sttypes.StreamID, struct{}](), - pendings: sttypes.NewSafeMap[uint64, *request](), - waitings: newRequestQueues(), - - sm: sm, - newStreamC: newStreamC, - rmStreamC: rmStreamC, + //streams: sttypes.NewSafeMap[sttypes.StreamID, *WorkerStream](), + streams: sttypes.NewSafeMap[sttypes.StreamID, *WorkerStream](), + pendings: sttypes.NewSafeMap[uint64, *WorkerRequest](), + waitings: newRequestQueues(), + + sm: sm, + // newStreamC: newStreamC, + // rmStreamC: rmStreamC, cancelReqC: make(chan cancelReqData, 16), deliveryC: make(chan responseData, 128), - newRequestC: make(chan *request, 128), + newRequestC: make(chan *WorkerRequest, 128), subs: []event.Subscription{sub1, sub2}, logger: logger, @@ -90,7 +91,7 @@ func (rm *requestManager) DoRequest(ctx context.Context, raw sttypes.Request, op } func (rm *requestManager) doRequestAsync(ctx context.Context, raw sttypes.Request, options ...RequestOption) <-chan responseData { - req := &request{ + req := &WorkerRequest{ Request: raw, respC: make(chan responseData, 1), doneC: make(chan struct{}), @@ -201,7 +202,7 @@ func (rm *requestManager) loop() { } } -func (rm *requestManager) handleNewRequest(req *request) bool { +func (rm *requestManager) handleNewRequest(req *WorkerRequest) bool { rm.logger.Debug().Str("request", req.String()). Msg("add new outgoing request to waiting queue") err := rm.addRequestToWaitings(req, reqPriorityLow) @@ -232,18 +233,18 @@ func (rm *requestManager) validateDelivery(data responseData) error { if data.err != nil { return data.err } - st, _ := rm.streams.Get(data.stID) - if st == nil { + st, ok := rm.streams.Get(data.stID) + if !ok { return fmt.Errorf("data delivered from dead stream: %v", data.stID) } req, _ := rm.pendings.Get(data.resp.ReqID()) if req == nil { return fmt.Errorf("stale p2p response delivery") } - if req.owner == nil || req.owner.ID() != data.stID { + if req.OwnerID() != data.stID { return fmt.Errorf("unexpected delivery stream") } - if st.req == nil || st.req.ReqID() != data.resp.ReqID() { + if req, _ := st.GetRequest(req.ID()); req == nil || req.ID() != data.resp.ReqID() { // Possible when request is canceled return fmt.Errorf("unexpected deliver request") } @@ -257,10 +258,7 @@ func (rm *requestManager) handleCancelRequest(data cancelReqData) { ) rm.waitings.Remove(req) rm.removePendingRequest(req) - var stid sttypes.StreamID - if req.owner != nil { - stid = req.owner.ID() - } + stid := req.OwnerID() req.doneWithResponse(responseData{ resp: nil, stID: stid, @@ -268,8 +266,8 @@ func (rm *requestManager) handleCancelRequest(data cancelReqData) { }) } -func (rm *requestManager) getNextRequest() (*request, *stream) { - var req *request +func (rm *requestManager) getNextRequest() (*WorkerRequest, *WorkerStream) { + var req *WorkerRequest for { req = rm.popRequestFromWaitings() if req == nil { @@ -298,40 +296,50 @@ func (rm *requestManager) genReqID() uint64 { } } -func (rm *requestManager) addPendingRequest(req *request, st *stream) { +func (rm *requestManager) addPendingRequest(req *WorkerRequest, st *WorkerStream) error { reqID := rm.genReqID() - req.SetReqID(reqID) + req.SetID(reqID) - req.owner = st - st.req = req + req.SetOwnerID(st.ID()) + if err := st.AssignRequest(req); err != nil { + return err + } - rm.available.Delete(st.ID()) - rm.pendings.Set(req.ReqID(), req) + //rm.available.Delete(st.ID()) + rm.pendings.Set(req.ID(), req) + return nil } -func (rm *requestManager) removePendingRequest(req *request) { - if _, ok := rm.pendings.Get(req.ReqID()); !ok { +func (rm *requestManager) removePendingRequest(req *WorkerRequest) { + if _, ok := rm.pendings.Get(req.ID()); !ok { return } - rm.pendings.Delete(req.ReqID()) + rm.pendings.Delete(req.ID()) - if st := req.owner; st != nil { + if st, _ := rm.streams.Get(req.OwnerID()); st != nil { st.clearPendingRequest() - rm.available.Set(st.ID(), struct{}{}) + //rm.available.Set(st.ID(), struct{}{}) } } -func (rm *requestManager) pickAvailableStream(req *request) (*stream, error) { - availableStreamIDs := rm.available.Keys() - for _, id := range availableStreamIDs { - if !req.isStreamAllowed(id) { +func (rm *requestManager) pickAvailableStream(req *WorkerRequest) (*WorkerStream, error) { + // sort streams by capacity + streams := rm.streams.SortedSnapshot(func(i sttypes.StreamID, j sttypes.StreamID) bool { + st1, _ := rm.streams.Get(i) + st2, _ := rm.streams.Get(i) + return st1.AvailableCapacity() < st2.AvailableCapacity() + }) + + //find the first available stream with highest free capacity + for id, st := range streams { + if st.AvailableCapacity() <= 0 { continue } - st, ok := rm.streams.Get(id) - if !ok { + if !req.isStreamAllowed(id) { continue } - if st.req != nil { + _, ok := rm.sm.GetStreamByID(id) + if !ok { continue } spec, _ := st.ProtoSpec() @@ -342,20 +350,36 @@ func (rm *requestManager) pickAvailableStream(req *request) (*stream, error) { return nil, errors.New("no more available streams") } +func (rm *requestManager) Streams() []sttypes.Stream { + return rm.sm.GetStreams() +} + +func (rm *requestManager) NumStreams() int { + return rm.sm.NumStreams() +} + +func (rm *requestManager) AvailableCapacity() int { + cap := 0 + rm.streams.Iterate(func(id sttypes.StreamID, ws *WorkerStream) { + cap += ws.AvailableCapacity() + }) + return cap +} + func (rm *requestManager) refreshStreams() { added, removed := checkStreamUpdates(rm.streams, rm.sm.GetStreams()) for _, st := range added { - rm.logger.Info().Str("streamID", string(st.ID())).Msg("adding new stream") + rm.logger.Info().Str("streamID", string(st.ID())).Msg("adding new stream to request manager") rm.addNewStream(st) } for _, st := range removed { - rm.logger.Info().Str("streamID", string(st.ID())).Msg("removing stream") + rm.logger.Info().Str("streamID", string(st.ID())).Msg("removing stream from request manager") rm.removeStream(st) } } -func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { +func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *WorkerStream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*WorkerStream) { targetM := make(map[sttypes.StreamID]sttypes.Stream) for _, target := range targets { @@ -365,7 +389,7 @@ func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targ added = append(added, target) } } - exists.Iterate(func(id sttypes.StreamID, st *stream) { + exists.Iterate(func(id sttypes.StreamID, st *WorkerStream) { if _, ok := targetM[id]; !ok { removed = append(removed, st) } @@ -374,36 +398,36 @@ func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targ } func (rm *requestManager) addNewStream(st sttypes.Stream) { - rm.streams.Set(st.ID(), &stream{Stream: st}) - rm.available.Set(st.ID(), struct{}{}) + rm.streams.Set(st.ID(), NewWorkerStream(st)) + //rm.available.Set(st.ID(), struct{}{}) } // removeStream remove the stream from request manager, clear the pending request // of the stream. -func (rm *requestManager) removeStream(st *stream) { - id := st.ID() - rm.available.Delete(id) - rm.streams.Delete(id) +func (rm *requestManager) removeStream(st *WorkerStream) { + stid := st.ID() + //rm.available.Delete(id) + rm.streams.Delete(stid) cleared := st.clearPendingRequest() - if cleared != nil { - cleared.doneWithResponse(responseData{ - stID: id, + cleared.Iterate(func(id uint64, wr *WorkerRequest) { + wr.doneWithResponse(responseData{ + stID: stid, err: errors.New("stream removed when doing request"), }) - } + }) } func (rm *requestManager) close() { for _, sub := range rm.subs { sub.Unsubscribe() } - rm.pendings.Iterate(func(key uint64, req *request) { + rm.pendings.Iterate(func(key uint64, req *WorkerRequest) { req.doneWithResponse(responseData{err: ErrClosed}) }) - rm.streams = sttypes.NewSafeMap[sttypes.StreamID, *stream]() - rm.available = sttypes.NewSafeMap[sttypes.StreamID, struct{}]() - rm.pendings = sttypes.NewSafeMap[uint64, *request]() + rm.streams = sttypes.NewSafeMap[sttypes.StreamID, *WorkerStream]() + //rm.available = sttypes.NewSafeMap[sttypes.StreamID, struct{}]() + rm.pendings = sttypes.NewSafeMap[uint64, *WorkerRequest]() rm.waitings = newRequestQueues() close(rm.stopC) } @@ -415,10 +439,10 @@ const ( reqPriorityHigh ) -func (rm *requestManager) addRequestToWaitings(req *request, priority reqPriority) error { +func (rm *requestManager) addRequestToWaitings(req *WorkerRequest, priority reqPriority) error { return rm.waitings.Push(req, priority) } -func (rm *requestManager) popRequestFromWaitings() *request { +func (rm *requestManager) popRequestFromWaitings() *WorkerRequest { return rm.waitings.Pop() } diff --git a/p2p/stream/common/requestmanager/requestmanager_test.go b/p2p/stream/common/requestmanager/requestmanager_test.go index 64ad4458d7..185fb242e0 100644 --- a/p2p/stream/common/requestmanager/requestmanager_test.go +++ b/p2p/stream/common/requestmanager/requestmanager_test.go @@ -77,7 +77,7 @@ func TestRequestManager_NewStream(t *testing.T) { time.Sleep(defTestSleep) - if ts.rm.streams.Length() != 4 || ts.rm.available.Length() != 4 { + if ts.rm.streams.Length() != 4 { t.Errorf("unexpected stream size") } } @@ -106,7 +106,7 @@ func TestRequestManager_RemoveStream(t *testing.T) { t.Errorf("unexpected error: %v", errors.New("stream removed when doing request")) } - if ts.rm.streams.Length() != 2 || ts.rm.available.Length() != 2 { + if ts.rm.streams.Length() != 2 { t.Errorf("unexpected stream size") } } @@ -368,7 +368,7 @@ func TestRequestManager_Concurrency(t *testing.T) { func TestGenReqID(t *testing.T) { retry := 100000 rm := &requestManager{ - pendings: sttypes.NewSafeMap[uint64, *request](), + pendings: sttypes.NewSafeMap[uint64, *WorkerRequest](), } for i := 0; i != retry; i++ { @@ -382,7 +382,7 @@ func TestGenReqID(t *testing.T) { func TestCheckStreamUpdates(t *testing.T) { tests := []struct { - exists *sttypes.SafeMap[sttypes.StreamID, *stream] + exists *sttypes.SafeMap[sttypes.StreamID, *WorkerStream] targets []sttypes.Stream expAddedIndexes []int expRemovedIndexes []int @@ -453,7 +453,7 @@ func checkStreamIDsEqual(sts []sttypes.Stream, expIndexes []int) error { return nil } -func checkStreamIDsEqual2(sts []*stream, expIndexes []int) error { +func checkStreamIDsEqual2(sts []*WorkerStream, expIndexes []int) error { if len(sts) != len(expIndexes) { return fmt.Errorf("size not equal") } @@ -518,9 +518,7 @@ func (ts *testSuite) pickOneOccupiedStream() sttypes.StreamID { IDs := ts.rm.pendings.Keys() for _, id := range IDs { req, _ := ts.rm.pendings.Get(id) - if req.owner != nil { - return req.owner.ID() - } + return req.OwnerID() } return "" } diff --git a/p2p/stream/common/requestmanager/types.go b/p2p/stream/common/requestmanager/types.go index c73488e1db..94c2508ff1 100644 --- a/p2p/stream/common/requestmanager/types.go +++ b/p2p/stream/common/requestmanager/types.go @@ -15,17 +15,57 @@ var ( // ErrClosed is request error that the module is closed during request ErrClosed = errors.New("request manager module closed") + + // ErrNotAvailableCapacity is the error that stream has fully occupied with requests + ErrNotAvailableCapacity = errors.New("not available capacity") +) + +var ( + MaxWorkerStreamCapacity = 10 ) -// stream is the wrapped version of sttypes.Stream. -// TODO: enable stream handle multiple pending requests at the same time -type stream struct { +// WorkerStream is the wrapped version of sttypes.Stream. +type WorkerStream struct { sttypes.Stream - req *request // currently one stream is dealing with one request + //req *WorkerRequest // currently one stream is dealing with one request + reqs *sttypes.SafeMap[uint64, *WorkerRequest] +} + +func NewWorkerStream(st sttypes.Stream) *WorkerStream { + return &WorkerStream{ + Stream: st, + reqs: sttypes.NewSafeMap[uint64, *WorkerRequest](), + } +} + +func (ws *WorkerStream) GetRequest(ID uint64) (*WorkerRequest, bool) { + return ws.reqs.Get(ID) +} + +func (ws *WorkerStream) AvailableCapacity() int { + cap := MaxWorkerStreamCapacity - ws.reqs.Length() + if cap < 0 { + cap = 0 + } + return cap +} + +func (ws *WorkerStream) AssignRequest(req *WorkerRequest) error { + cap := ws.AvailableCapacity() + if cap <= 0 { + return ErrNotAvailableCapacity + } + ws.reqs.Set(req.ID(), req) + return nil +} + +func (ws *WorkerStream) RemoveRequest(req *WorkerRequest) error { + ws.reqs.Delete(req.ID()) + return nil } // request is the wrapped request within module -type request struct { +type WorkerRequest struct { sttypes.Request // underlying request // result field respC chan responseData // channel to receive response from delivered message @@ -33,7 +73,7 @@ type request struct { atmDone uint32 doneC chan struct{} // stream info - owner *stream // Current owner + ownerID sttypes.StreamID // Current owner // utils lock sync.RWMutex raw *interface{} @@ -43,21 +83,35 @@ type request struct { whitelist map[sttypes.StreamID]struct{} // allowed streams } -func (req *request) ReqID() uint64 { +func (req *WorkerRequest) ID() uint64 { req.lock.RLock() defer req.lock.RUnlock() - return req.Request.ReqID() + return req.Request.ID() } -func (req *request) SetReqID(val uint64) { +func (req *WorkerRequest) SetID(val uint64) { req.lock.Lock() defer req.lock.Unlock() - req.Request.SetReqID(val) + req.Request.SetID(val) } -func (req *request) doneWithResponse(resp responseData) { +func (req *WorkerRequest) OwnerID() sttypes.StreamID { + req.lock.RLock() + defer req.lock.RUnlock() + + return req.ownerID +} + +func (req *WorkerRequest) SetOwnerID(id sttypes.StreamID) { + req.lock.Lock() + defer req.lock.Unlock() + + req.ownerID = id +} + +func (req *WorkerRequest) doneWithResponse(resp responseData) { notDone := atomic.CompareAndSwapUint32(&req.atmDone, 0, 1) if notDone { req.respC <- resp @@ -66,22 +120,22 @@ func (req *request) doneWithResponse(resp responseData) { } } -func (req *request) isDone() bool { +func (req *WorkerRequest) isDone() bool { return atomic.LoadUint32(&req.atmDone) == 1 } -func (req *request) isStreamAllowed(stid sttypes.StreamID) bool { +func (req *WorkerRequest) isStreamAllowed(stid sttypes.StreamID) bool { return req.isStreamWhitelisted(stid) && !req.isStreamBlacklisted(stid) } -func (req *request) addBlacklistedStream(stid sttypes.StreamID) { +func (req *WorkerRequest) addBlacklistedStream(stid sttypes.StreamID) { if req.blacklist == nil { req.blacklist = make(map[sttypes.StreamID]struct{}) } req.blacklist[stid] = struct{}{} } -func (req *request) isStreamBlacklisted(stid sttypes.StreamID) bool { +func (req *WorkerRequest) isStreamBlacklisted(stid sttypes.StreamID) bool { if req.blacklist == nil { return false } @@ -89,14 +143,14 @@ func (req *request) isStreamBlacklisted(stid sttypes.StreamID) bool { return ok } -func (req *request) addWhiteListStream(stid sttypes.StreamID) { +func (req *WorkerRequest) addWhiteListStream(stid sttypes.StreamID) { if req.whitelist == nil { req.whitelist = make(map[sttypes.StreamID]struct{}) } req.whitelist[stid] = struct{}{} } -func (req *request) isStreamWhitelisted(stid sttypes.StreamID) bool { +func (req *WorkerRequest) isStreamWhitelisted(stid sttypes.StreamID) bool { if req.whitelist == nil { return true } @@ -104,17 +158,17 @@ func (req *request) isStreamWhitelisted(stid sttypes.StreamID) bool { return ok } -func (st *stream) clearPendingRequest() *request { - req := st.req - if req == nil { +func (st *WorkerStream) clearPendingRequest() *sttypes.SafeMap[uint64, *WorkerRequest] { + reqs := st.reqs + if reqs == nil { return nil } - st.req = nil - return req + st.reqs = sttypes.NewSafeMap[uint64, *WorkerRequest]() + return reqs } type cancelReqData struct { - req *request + req *WorkerRequest err error } @@ -139,7 +193,7 @@ func newRequestQueues() requestQueues { } // Push add a new request to requestQueues. -func (q *requestQueues) Push(req *request, priority reqPriority) error { +func (q *requestQueues) Push(req *WorkerRequest, priority reqPriority) error { if priority == reqPriorityHigh || req.priority == reqPriorityHigh { return q.reqsPHigh.push(req) } @@ -147,14 +201,14 @@ func (q *requestQueues) Push(req *request, priority reqPriority) error { } // Pop will first pop the request from high priority, and then pop from low priority -func (q *requestQueues) Pop() *request { +func (q *requestQueues) Pop() *WorkerRequest { if req := q.reqsPHigh.pop(); req != nil { return req } return q.reqsPLow.pop() } -func (q *requestQueues) Remove(req *request) { +func (q *requestQueues) Remove(req *WorkerRequest) { q.reqsPHigh.remove(req) q.reqsPLow.remove(req) } @@ -162,18 +216,18 @@ func (q *requestQueues) Remove(req *request) { // requestQueue is a thread safe request double linked list type requestQueue struct { l *list.List - elemM map[*request]*list.Element // Yes, pointer as map key + elemM map[*WorkerRequest]*list.Element // Yes, pointer as map key lock sync.Mutex } func newRequestQueue() *requestQueue { return &requestQueue{ l: list.New(), - elemM: make(map[*request]*list.Element), + elemM: make(map[*WorkerRequest]*list.Element), } } -func (rl *requestQueue) push(req *request) error { +func (rl *requestQueue) push(req *WorkerRequest) error { rl.lock.Lock() defer rl.lock.Unlock() @@ -185,7 +239,7 @@ func (rl *requestQueue) push(req *request) error { return nil } -func (rl *requestQueue) pop() *request { +func (rl *requestQueue) pop() *WorkerRequest { rl.lock.Lock() defer rl.lock.Unlock() @@ -195,12 +249,12 @@ func (rl *requestQueue) pop() *request { } rl.l.Remove(elem) - req := elem.Value.(*request) + req := elem.Value.(*WorkerRequest) delete(rl.elemM, req) return req } -func (rl *requestQueue) remove(req *request) { +func (rl *requestQueue) remove(req *WorkerRequest) { rl.lock.Lock() defer rl.lock.Unlock() diff --git a/p2p/stream/common/requestmanager/types_test.go b/p2p/stream/common/requestmanager/types_test.go index f19c5f0ca1..3f48b16570 100644 --- a/p2p/stream/common/requestmanager/types_test.go +++ b/p2p/stream/common/requestmanager/types_test.go @@ -120,14 +120,14 @@ func makeTestRequestQueue(sizes []int) requestQueues { return q } -func wrapRequestFromRaw(raw *testRequest) *request { - return &request{ +func wrapRequestFromRaw(raw *testRequest) *WorkerRequest { + return &WorkerRequest{ Request: raw, } } func getTestRequestFromElem(elem *list.Element) (*testRequest, error) { - req, ok := elem.Value.(*request) + req, ok := elem.Value.(*WorkerRequest) if !ok { return nil, errors.New("unexpected type") } diff --git a/p2p/stream/common/streammanager/interface.go b/p2p/stream/common/streammanager/interface.go index ab0a916f6e..7b713f8f0d 100644 --- a/p2p/stream/common/streammanager/interface.go +++ b/p2p/stream/common/streammanager/interface.go @@ -41,6 +41,7 @@ type Subscriber interface { type Reader interface { GetStreams() []sttypes.Stream GetStreamByID(id sttypes.StreamID) (sttypes.Stream, bool) + NumStreams() int } // host is the adapter interface of the libp2p host implementation. diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index 0bfc9bbd5b..f5eed1ce23 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -212,6 +212,11 @@ func (sm *streamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream, boo return sm.streams.get(id) } +// NumStreams return the number of connected streams +func (sm *streamManager) NumStreams() int { + return sm.streams.size() +} + type ( addStreamTask struct { st sttypes.Stream @@ -254,12 +259,12 @@ func (sm *streamManager) sanityCheckStream(st sttypes.Stream) error { func (sm *streamManager) handleAddStream(st sttypes.Stream) error { id := st.ID() - if sm.streams.size() >= sm.config.HiCap { - return ErrTooManyStreams - } if _, ok := sm.streams.get(id); ok { return ErrStreamAlreadyExist } + if sm.streams.size() >= sm.config.HiCap { + return ErrTooManyStreams + } sm.streams.addStream(st) diff --git a/p2p/stream/protocols/sync/client_test.go b/p2p/stream/protocols/sync/client_test.go index 611afd7610..c964bd8b48 100644 --- a/p2p/stream/protocols/sync/client_test.go +++ b/p2p/stream/protocols/sync/client_test.go @@ -788,6 +788,10 @@ func (rm *testHostRequestManager) DoRequest(ctx context.Context, request sttypes return resp, stid, nil } +func (rm *testHostRequestManager) AvailableCapacity() int { return 0 } +func (rm *testHostRequestManager) Streams() []sttypes.Stream { return []sttypes.Stream{} } +func (rm *testHostRequestManager) NumStreams() int { return 0 } + func makeTestStreamID(index int) sttypes.StreamID { id := fmt.Sprintf("[test stream %v]", index) return sttypes.StreamID(id) @@ -844,6 +848,10 @@ func (sm *testStreamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream, return nil, false } +func (sm *testStreamManager) NumStreams() int { + return len(sm.streamIDs) +} + func assertError(got, expect error) error { if (got == nil) != (expect == nil) { return fmt.Errorf("unexpected error: %v / %v", got, expect) diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 2dda471bdb..62c9a0b868 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -187,8 +187,8 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) { p.logger.Info().Str("stream", raw.ID()).Msg("handle new sync stream") st := p.wrapStream(raw) if err := p.sm.NewStream(st); err != nil { - // Possibly we have reach the hard limit of the stream if !errors.Is(err, streammanager.ErrStreamAlreadyExist) { + // Possibly we have reach the hard limit of the stream p.logger.Warn().Err(err).Str("stream ID", string(st.ID())). Msg("failed to add new stream") } @@ -309,6 +309,11 @@ func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) { } } +// Streams returns the connected streams +func (p *Protocol) Streams() []sttypes.Stream { + return p.sm.GetStreams() +} + // NumStreams return the streams with minimum version. // Note: nodes with sync version smaller than minVersion is not counted. func (p *Protocol) NumStreams() int { @@ -324,6 +329,11 @@ func (p *Protocol) NumStreams() int { return res } +// AvailableCapacity returns number of free streams to handle new requests +func (p *Protocol) AvailableCapacity() int { + return p.rm.AvailableCapacity() +} + // GetStreamManager get the underlying stream manager for upper level stream operations func (p *Protocol) GetStreamManager() streammanager.StreamManager { return p.sm From a316f6743d9339b25c26ad24ae610ec1c55fb7f2 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 24 Oct 2024 08:38:46 +0800 Subject: [PATCH 6/6] support capacities in stream sync --- api/service/stagedstreamsync/adapter.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/service/stagedstreamsync/adapter.go b/api/service/stagedstreamsync/adapter.go index 56c42b661c..41ab8f9569 100644 --- a/api/service/stagedstreamsync/adapter.go +++ b/api/service/stagedstreamsync/adapter.go @@ -29,9 +29,10 @@ type syncProtocol interface { RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream StreamFailed(stID sttypes.StreamID, reason string) SubscribeAddStreamEvent(ch chan<- streammanager.EvtStreamAdded) event.Subscription + Streams() []sttypes.Stream NumStreams() int + AvailableCapacity() int } - type blockChain interface { engine.ChainReader Engine() engine.Engine