Skip to content

Commit

Permalink
fix: reacher
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Jan 22, 2025
1 parent f6628dc commit c31379b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 28 deletions.
39 changes: 14 additions & 25 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const (
pingTimeout = time.Second * 15
workers = 8
workers = 32
retryAfterDuration = time.Minute * 5
)

Expand All @@ -32,8 +32,8 @@ type reacher struct {
mu sync.Mutex
peers map[string]*peer

work chan struct{}
quit chan struct{}
newPeer chan struct{}
quit chan struct{}

pinger p2p.Pinger
notifier p2p.ReachableNotifier
Expand All @@ -53,7 +53,7 @@ type Options struct {
func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher {

r := &reacher{
work: make(chan struct{}, 1),
newPeer: make(chan struct{}, 1),
quit: make(chan struct{}),
pinger: streamer,
peers: make(map[string]*peer),
Expand Down Expand Up @@ -93,17 +93,15 @@ func (r *reacher) manage() {

for {

p, tryAfter := r.tryAcquirePeer()
p, tryAfter := r.nextPeer()

// if no peer is returned,
// wait until either more work or the closest retry-after time.

// wait for work and tryAfter
if tryAfter > 0 {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
case <-time.After(tryAfter):
continue
Expand All @@ -115,12 +113,12 @@ func (r *reacher) manage() {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
}
}

// send p to channel
// ping peer
select {
case <-r.quit:
return
Expand All @@ -135,10 +133,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {

for p := range c {

r.mu.Lock()
overlay := p.overlay
r.mu.Unlock()

now := time.Now()

ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout)
Expand All @@ -149,30 +143,25 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
if err == nil {
r.metrics.Pings.WithLabelValues("success").Inc()
r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic)
} else {
r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate)
}

r.notifyManage()
}
}

func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
func (r *reacher) nextPeer() (*peer, time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()

var (
now = time.Now()
nextClosest time.Time
)
var nextClosest time.Time

for _, p := range r.peers {

// retry after has expired, retry
if now.After(p.retryAfter) {
if time.Now().After(p.retryAfter) {
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
return p, 0
}
Expand All @@ -193,7 +182,7 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {

func (r *reacher) notifyManage() {
select {
case r.work <- struct{}{}:
case r.newPeer <- struct{}{}:
default:
}
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ type kadOptions struct {
}

var (
oversaturationCounts = [swarm.MaxBins]int{64, 32, 16, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8}
suffixBits = [swarm.MaxBins]int{5, 4, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}
// prefixBits = [swarm.MaxBins]int{6, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}
oversaturationCounts = [swarm.MaxBins]int{64, 32, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16}
suffixBits = [swarm.MaxBins]int{5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}
)

func newKadOptions(o Options) kadOptions {
Expand Down

0 comments on commit c31379b

Please sign in to comment.