Skip to content

Commit

Permalink
Add Subscribe2 method (#288)
Browse files Browse the repository at this point in the history
* Add Subscribe2 method

* Fix failing tests

* Check err

* Lint

* Update logger

* Add test for dispatcher

* Refactor update method

* Tidy up locking

* Only log on non-EOF errors

* Clean up logging

* Continue on nil request

* Register all topics and let the handler dedupe

* Remove useless code
  • Loading branch information
neekolas authored Aug 24, 2023
1 parent a3731b0 commit b2e6089
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 5 deletions.
2 changes: 1 addition & 1 deletion dev/docker/build-local
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT_COMMIT="$(git rev-parse HEAD)"
GO_VERSION="$(go list -f "{{.GoVersion}}" -m)"

docker build \
--tag "${DOCKER_IMAGE_NAME}:latest" \
--tag "${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG}" \
--build-arg="GO_VERSION=${GO_VERSION}" \
--build-arg="GIT_COMMIT=${GIT_COMMIT}" \
-f dev/docker/Dockerfile \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/uptrace/bun/dialect/pgdialect v1.1.3
github.com/uptrace/bun/driver/pgdriver v1.1.3
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3
github.com/xmtp/proto/v3 v3.24.1
github.com/xmtp/proto/v3 v3.27.0
github.com/yoheimuta/protolint v0.39.0
go.opencensus.io v0.23.0
go.uber.org/zap v1.21.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1947,8 +1947,8 @@ github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDy
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4=
github.com/xmtp/go-waku v0.0.0-20220810150132-1237437a419a h1:hIC6tybvfLRH/kuM7mDez1oPxecQDEVE3JWMhNjc9kQ=
github.com/xmtp/go-waku v0.0.0-20220810150132-1237437a419a/go.mod h1:z498Zyc/u6sNiFQFuRLtYVQMUQurezgH36xBpE8ei6Q=
github.com/xmtp/proto/v3 v3.24.1 h1:4or3fZOZM8zeauyZoLAGVPPDl0/whvRp9V92/8rRz54=
github.com/xmtp/proto/v3 v3.24.1/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.27.0 h1:G70006UEffkCmWvp9G/7Dywosj1sLm9StR5HWEb891U=
github.com/xmtp/proto/v3 v3.27.0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw=
github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8=
Expand Down
20 changes: 20 additions & 0 deletions pkg/api/message/v1/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ func (c *grpcClient) Subscribe(ctx context.Context, r *messagev1.SubscribeReques
}, nil
}

func (c *grpcClient) Subscribe2(ctx context.Context, r *messagev1.SubscribeRequest) (Subscribe2Stream, error) {
ctx, cancel := context.WithCancel(ctx)
stream, err := c.grpc.Subscribe2(ctx)
if err != nil {
cancel()
return nil, err
}
if err = stream.Send(r); err != nil {
cancel()
return nil, err
}
return &grpcBidiStream{
subscribe2Client: stream,
grpcStream: grpcStream{
cancel: cancel,
stream: stream,
},
}, nil
}

func (c *grpcClient) SubscribeAll(ctx context.Context) (Stream, error) {
ctx, cancel := context.WithCancel(ctx)
stream, err := c.grpc.SubscribeAll(ctx, &messagev1.SubscribeAllRequest{})
Expand Down
9 changes: 9 additions & 0 deletions pkg/api/message/v1/client/grpc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ func (s *grpcStream) Close() error {
s.cancel()
return nil
}

type grpcBidiStream struct {
subscribe2Client messagev1.MessageApi_Subscribe2Client
grpcStream
}

func (s *grpcBidiStream) Send(req *messagev1.SubscribeRequest) error {
return s.subscribe2Client.Send(req)
}
5 changes: 5 additions & 0 deletions pkg/api/message/v1/client/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -69,6 +70,10 @@ func (c *httpClient) Subscribe(ctx context.Context, req *messagev1.SubscribeRequ
return stream, nil
}

func (c *httpClient) Subscribe2(ctx context.Context, req *messagev1.SubscribeRequest) (Subscribe2Stream, error) {
return nil, errors.New("not implemented")
}

func (c *httpClient) SubscribeAll(ctx context.Context) (Stream, error) {
stream, err := newHTTPStream(c.log, func() (*http.Response, error) {
return c.post(ctx, "/message/v1/subscribe-all", &messagev1.SubscribeAllRequest{})
Expand Down
6 changes: 6 additions & 0 deletions pkg/api/message/v1/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type Client interface {
Publish(context.Context, *messagev1.PublishRequest) (*messagev1.PublishResponse, error)
Subscribe(context.Context, *messagev1.SubscribeRequest) (Stream, error)
Subscribe2(context.Context, *messagev1.SubscribeRequest) (Subscribe2Stream, error)
SubscribeAll(context.Context) (Stream, error)
Query(context.Context, *messagev1.QueryRequest) (*messagev1.QueryResponse, error)
BatchQuery(ctx context.Context, req *messagev1.BatchQueryRequest) (*messagev1.BatchQueryResponse, error)
Expand All @@ -22,3 +23,8 @@ type Stream interface {
// Closing the stream terminates the subscription.
Close() error
}

type Subscribe2Stream interface {
Send(req *messagev1.SubscribeRequest) error
Stream
}
32 changes: 32 additions & 0 deletions pkg/api/message/v1/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,38 @@ func (d *dispatcher) Unregister(ch chan interface{}, topics ...string) {
d.unregister(ch, topics...)
}

// Update takes in an array of topics and unregisters any topics that are not in the array and registers any topics that are not already registered.
func (d *dispatcher) Update(ch chan interface{}, topics ...string) {
if ch == nil {
return
}

// Create a map of the new topics so we can check existing topics against it to see what needs to be added/removed
newTopicMap := make(map[string]bool)
for _, topic := range topics {
newTopicMap[topic] = true
}

// Lock the map so we can check if any existing subscriptions need to be removed
d.l.RLock()
topicsBySub := d.topicsBySub[ch]
toUnregister := make([]string, 0)
for topic := range topicsBySub {
if !newTopicMap[topic] {
toUnregister = append(toUnregister, topic)
}
}
d.l.RUnlock()

if len(toUnregister) > 0 {
d.Unregister(ch, toUnregister...)
}

// Lock again
d.Register(ch, topics...)

}

func (d *dispatcher) unregister(ch chan interface{}, topics ...string) {
subTopics := d.topicsBySub[ch]
if len(subTopics) == 0 {
Expand Down
14 changes: 14 additions & 0 deletions pkg/api/message/v1/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ func Test_DispatcherUpdates(t *testing.T) {
require.Equal(t, 0, len(d.subsByTopic))
}

func Test_DispatcherUpdateMethod(t *testing.T) {
d := newDispatcher()
ch := make(chan interface{})

d.Update(ch, "a", "b", "c")
require.Equal(t, d.topicsBySub[ch], map[string]bool{"a": true, "b": true, "c": true})
d.Update(ch, "c", "d")
require.Equal(t, d.topicsBySub[ch], map[string]bool{"c": true, "d": true})
require.Equal(t, 2, len(d.subsByTopic))
d.Update(ch, "e")
require.Equal(t, 1, len(d.topicsBySub[ch]))
require.Equal(t, 1, len(d.bcsByTopic))
}

func Test_DispatcherClose(t *testing.T) {
d := newDispatcher()
d.Register(nil, "a", "b", "c")
Expand Down
56 changes: 56 additions & 0 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -167,6 +168,61 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi
}
}

func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
log := s.log.Named("subscribe2")
log.Debug("started")
defer log.Debug("stopped")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

ch := make(chan interface{})
defer s.dispatcher.Unregister(ch)

requestChannel := make(chan *proto.SubscribeRequest)
go func() {
for {
req, err := stream.Recv()
if err != nil {
if err != io.EOF && err != context.Canceled {
log.Error("reading subscription", zap.Error(err))
}
close(requestChannel)
return
}
requestChannel <- req
}
}()

for {
select {
case <-stream.Context().Done():
log.Debug("stream closed")
return nil
case <-s.ctx.Done():
log.Info("service closed")
return nil
case req := <-requestChannel:
if req == nil {
continue
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))
s.dispatcher.Update(ch, req.ContentTopics...)
case obj := <-ch:
env, ok := obj.(*proto.Envelope)
if !ok {
log.Warn("non-envelope received on subscription channel", zap.Any("object", obj))
continue
}

err := stream.Send(env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}
}
}

func (s *Service) SubscribeAll(req *proto.SubscribeAllRequest, stream proto.MessageApi_SubscribeAllServer) error {
log := s.log.Named("subscribeAll")
log.Debug("started")
Expand Down
84 changes: 84 additions & 0 deletions pkg/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -340,6 +341,89 @@ func Test_SubscribeClientClose(t *testing.T) {
})
}

func Test_Subscribe2ClientClose(t *testing.T) {
ctx := withAuth(t, context.Background())
testGRPC(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) {
// start subscribe stream
stream, err := client.Subscribe2(ctx, &messageV1.SubscribeRequest{
ContentTopics: []string{"topic"},
})
require.NoError(t, err)
defer stream.Close()
time.Sleep(50 * time.Millisecond)

// publish 5 messages
envs := makeEnvelopes(10)
publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]})
require.NoError(t, err)
require.NotNil(t, publishRes)

// receive 5 and close the stream
subscribeExpect(t, stream, envs[:5])
err = stream.Close()
require.NoError(t, err)

// publish another 5
publishRes, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[5:]})
require.NoError(t, err)
require.NotNil(t, publishRes)
time.Sleep(50 * time.Millisecond)

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = stream.Next(ctx)
require.Equal(t, io.EOF, err)
})
}

func Test_Subscribe2UpdateTopics(t *testing.T) {
ctx := withAuth(t, context.Background())
testGRPC(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) {
// start subscribe stream
stream, err := client.Subscribe2(ctx, &messageV1.SubscribeRequest{
ContentTopics: []string{"topic"},
})
require.NoError(t, err)
defer stream.Close()
time.Sleep(50 * time.Millisecond)

// publish 5 messages
envs := makeEnvelopes(10)
publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]})
require.NoError(t, err)
require.NotNil(t, publishRes)
// receive 5 and close the stream
subscribeExpect(t, stream, envs[:5])

err = stream.Send(&messageV1.SubscribeRequest{
ContentTopics: []string{"topic2"},
})
require.NoError(t, err)

topic1Envs := makeEnvelopes(1)
_, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: topic1Envs})
require.NoError(t, err)

topic2Envs := []*messageV1.Envelope{{
ContentTopic: "topic2",
Message: []byte(fmt.Sprintf("msg %d", 2)),
TimestampNs: uint64(1000),
}}

_, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: topic2Envs})
require.NoError(t, err)
subscribeExpect(t, stream, topic2Envs)

err = stream.Close()
require.NoError(t, err)

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = stream.Next(ctx)
require.Equal(t, io.EOF, err)
})
}

func Test_SubscribeAllClientClose(t *testing.T) {
ctx := withAuth(t, context.Background())
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/api/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ func testGRPCAndHTTP(t *testing.T, ctx context.Context, f func(*testing.T, messa
})
}

func testGRPC(t *testing.T, ctx context.Context, f func(*testing.T, messageclient.Client, *Server)) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
server, cleanup := newTestServer(t)
defer cleanup()

c, err := messageclient.NewGRPCClient(ctx, server.dialGRPC)
require.NoError(t, err)

f(t, c, server)
}

func withAuth(t *testing.T, ctx context.Context) context.Context {
ctx, _ = withAuthWithDetails(t, ctx, time.Now())
return ctx
Expand Down
6 changes: 5 additions & 1 deletion pkg/authn/authn.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b2e6089

Please sign in to comment.