From 4664ac7d38e202beeed571604be2d05156fc3bab Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 24 Jul 2023 17:22:03 -0400 Subject: [PATCH] chore: remove unneeded blocks pubsub implementation --- go.mod | 3 +- go.sum | 1 - lib/pubsub.go | 146 -------------------------------------------------- 3 files changed, 1 insertion(+), 149 deletions(-) delete mode 100644 lib/pubsub.go diff --git a/go.mod b/go.mod index 20cae58..98cc9bc 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/ipfs/bifrost-gateway go 1.19 require ( - github.com/cskr/pubsub v1.0.2 github.com/filecoin-saturn/caboose v0.0.3 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru/v2 v2.0.1 @@ -20,7 +19,6 @@ require ( github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/mitchellh/go-server-timing v1.0.1 github.com/multiformats/go-multicodec v0.9.0 - github.com/multiformats/go-multihash v0.2.3 github.com/prometheus/client_golang v1.15.1 github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 github.com/spf13/cobra v1.6.1 @@ -97,6 +95,7 @@ require ( github.com/multiformats/go-multiaddr v0.8.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/opencontainers/runtime-spec v1.0.3-0.20211123151946-c2389c3cb60a // indirect diff --git a/go.sum b/go.sum index 420df78..bd094fe 100644 --- a/go.sum +++ b/go.sum @@ -68,7 +68,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= -github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/lib/pubsub.go b/lib/pubsub.go deleted file mode 100644 index 0d1d553..0000000 --- a/lib/pubsub.go +++ /dev/null @@ -1,146 +0,0 @@ -package lib - -import ( - "context" - "sync" - - "github.com/multiformats/go-multihash" - - pubsub "github.com/cskr/pubsub" - blocks "github.com/ipfs/go-block-format" -) - -// Note: blantantly copied from boxo/bitswap internals https://github.com/ipfs/boxo/blob/664b3e5ee4a997c67909da9b017a28efa40cc8ae/bitswap/client/internal/notifications/notifications.go -// some minor changes were made - -const bufferSize = 16 - -// BlockPubSub is a simple interface for publishing blocks and being able to subscribe -// for multihashes. It's used internally by an exchange to decouple receiving blocks -// and actually providing them back to the GetBlocks caller. -// Note: because multihashes are being requested and blocks returned the codecs could be anything -type BlockPubSub interface { - Publish(blocks ...blocks.Block) - Subscribe(ctx context.Context, keys ...multihash.Multihash) <-chan blocks.Block - Shutdown() -} - -// NewBlockPubSub generates a new BlockPubSub interface. -func NewBlockPubSub() BlockPubSub { - return &impl{ - wrapped: *pubsub.New(bufferSize), - closed: make(chan struct{}), - } -} - -type impl struct { - lk sync.RWMutex - wrapped pubsub.PubSub - - closed chan struct{} -} - -func (ps *impl) Publish(blocks ...blocks.Block) { - ps.lk.RLock() - defer ps.lk.RUnlock() - select { - case <-ps.closed: - return - default: - } - - for _, block := range blocks { - ps.wrapped.Pub(block, string(block.Cid().Hash())) - } -} - -func (ps *impl) Shutdown() { - ps.lk.Lock() - defer ps.lk.Unlock() - select { - case <-ps.closed: - return - default: - } - close(ps.closed) - ps.wrapped.Shutdown() -} - -// Subscribe returns a channel of blocks for the given |keys|. |blockChannel| -// is closed if the |ctx| times out or is cancelled, or after receiving the blocks -// corresponding to |keys|. -func (ps *impl) Subscribe(ctx context.Context, keys ...multihash.Multihash) <-chan blocks.Block { - - blocksCh := make(chan blocks.Block, len(keys)) - valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking - if len(keys) == 0 { - close(blocksCh) - return blocksCh - } - - // prevent shutdown - ps.lk.RLock() - defer ps.lk.RUnlock() - - select { - case <-ps.closed: - close(blocksCh) - return blocksCh - default: - } - - // AddSubOnceEach listens for each key in the list, and closes the channel - // once all keys have been received - ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...) - go func() { - defer func() { - close(blocksCh) - - ps.lk.RLock() - defer ps.lk.RUnlock() - // Don't touch the pubsub instance if we're - // already closed. - select { - case <-ps.closed: - return - default: - } - - ps.wrapped.Unsub(valuesCh) - }() - - for { - select { - case <-ctx.Done(): - return - case <-ps.closed: - return - case val, ok := <-valuesCh: - if !ok { - return - } - block, ok := val.(blocks.Block) - if !ok { - return - } - select { - case <-ctx.Done(): - return - case blocksCh <- block: // continue - case <-ps.closed: - return - } - } - } - }() - - return blocksCh -} - -func toStrings(keys []multihash.Multihash) []string { - strs := make([]string, 0, len(keys)) - for _, key := range keys { - strs = append(strs, string(key)) - } - return strs -}