Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple message store from waku #299

Merged
merged 3 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ LICENSE*
tests
examples
*.db
bin
2 changes: 1 addition & 1 deletion dev/docker/up
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ set -e
. dev/docker/env

docker_compose build
docker_compose up -d
docker_compose up -d --remove-orphans
16 changes: 4 additions & 12 deletions dev/e2e/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ services:
- 18008:8008
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
- --ws-port=6001
- --port=6000
- --lightpush
- --filter
- --api.authn.enable
- --api.authn.allowlists
- --log-encoding=json
Expand All @@ -58,14 +56,12 @@ services:
- 25555:5555
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
- --ws-port=6001
- --port=6000
- --lightpush
- --filter
- --api.authn.enable
- --api.authn.allowlists
- --log-encoding=json
Expand All @@ -90,14 +86,12 @@ services:
- 35555:5555
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
- --ws-port=6001
- --port=6000
- --lightpush
- --filter
- --api.authn.enable
- --api.authn.allowlists
- --log-encoding=json
Expand All @@ -122,14 +116,12 @@ services:
- 45555:5555
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
- --ws-port=6001
- --port=6000
- --lightpush
- --filter
- --api.authn.enable
- --api.authn.allowlists
- --log-encoding=json
Expand Down
2 changes: 1 addition & 1 deletion dev/e2e/docker/up
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
export GO_VERSION="$(go list -f "{{.GoVersion}}" -m)"

docker_compose build
docker_compose up -d
docker_compose up -d --remove-orphans
8 changes: 5 additions & 3 deletions dev/run
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ AUTHZ_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
NODE_KEY="8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67"

go run cmd/xmtpd/main.go \
--message-db-connection-string "${MESSAGE_DB_DSN}" \
--message-db-reader-connection-string "${MESSAGE_DB_DSN}" \
--authz-db-connection-string "${AUTHZ_DB_DSN}" \
--nodekey "${NODE_KEY}" \
--metrics \
--metrics-period 5s \
--store.enable \
snormore marked this conversation as resolved.
Show resolved Hide resolved
--store.db-connection-string "${MESSAGE_DB_DSN}" \
--store.reader-db-connection-string "${MESSAGE_DB_DSN}" \
--store.metrics-period 5s \
--authz-db-connection-string "${AUTHZ_DB_DSN}" \
--go-profiling \
"$@"
3 changes: 0 additions & 3 deletions dev/start
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ set -e
PORT="${PORT:-9002}"

dev/run \
--store \
--filter \
--lightpush \
--ws \
--ws-port="${PORT}" \
--api.authn.enable \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/libp2p/go-libp2p v0.20.2
github.com/libp2p/go-libp2p-core v0.16.1
github.com/libp2p/go-libp2p-pubsub v0.6.1
github.com/libp2p/go-msgio v0.2.0
github.com/mattn/go-sqlite3 v1.14.13
github.com/multiformats/go-multiaddr v0.5.0
github.com/nats-io/nats-server/v2 v2.1.2
Expand Down Expand Up @@ -114,6 +113,7 @@ require (
github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect
github.com/libp2p/go-libp2p-resource-manager v0.3.0 // indirect
github.com/libp2p/go-mplex v0.7.0 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
Expand Down
12 changes: 9 additions & 3 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
wakunode "github.com/status-im/go-waku/waku/v2/node"
"github.com/xmtp/xmtp-node-go/pkg/authz"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
"github.com/xmtp/xmtp-node-go/pkg/store"
"go.uber.org/zap"
)

var (
ErrMissingLog = errors.New("missing log config")
ErrMissingWaku = errors.New("missing waku config")
ErrMissingLog = errors.New("missing log config")
ErrMissingWaku = errors.New("missing waku config")
ErrMissingStore = errors.New("missing store config")
)

type Options struct {
Expand All @@ -30,6 +32,7 @@ type Config struct {
AllowLister authz.WalletAllowLister
Waku *wakunode.WakuNode
Log *zap.Logger
Store *store.Store
}

// AuthnOptions bundle command line options associated with the authn package.
Expand Down Expand Up @@ -77,13 +80,16 @@ type AuthnConfig struct {
Log *zap.Logger
}

func (params *Config) check() error {
func (params *Config) validate() error {
if params.Log == nil {
return ErrMissingLog
}
if params.Waku == nil {
return ErrMissingWaku
}
if params.Store == nil {
return ErrMissingStore
}
if err := validateAddr(params.HTTPAddress, params.HTTPPort); err != nil {
return errors.Wrap(err, "Invalid HTTP Address")
}
Expand Down
124 changes: 14 additions & 110 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type Service struct {
proto.UnimplementedMessageApiServer

// Configured as constructor options.
log *zap.Logger
waku *wakunode.WakuNode
log *zap.Logger
waku *wakunode.WakuNode
store *store.Store

// Configured internally.
ctx context.Context
Expand All @@ -57,10 +58,11 @@ type Service struct {
nc *nats.Conn
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger) (s *Service, err error) {
func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store) (s *Service, err error) {
s = &Service{
waku: node,
log: logger.Named("message/v1"),
waku: node,
log: logger.Named("message/v1"),
store: store,
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -143,29 +145,22 @@ func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*prot
return nil, status.Errorf(codes.InvalidArgument, "topic length too big")
}

wakuMsg := &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: toWakuTimestamp(env.TimestampNs),
Payload: env.Message,
}

if len(env.Message) > MaxMessageSize {
return nil, status.Errorf(codes.InvalidArgument, "message too big")
}

store, ok := s.waku.Store().(*store.XmtpStore)
if !ok {
return nil, status.Errorf(codes.Internal, "waku store not xmtp store")
}

if !topic.IsEphemeral(env.ContentTopic) {
_, err := store.InsertMessage(wakuMsg)
_, err := s.store.InsertMessage(env)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
}

_, err := s.waku.Relay().Publish(ctx, wakuMsg)
_, err := s.waku.Relay().Publish(ctx, &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: int64(env.TimestampNs),
Payload: env.Message,
})
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -368,31 +363,7 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}
}

store, ok := s.waku.Store().(*store.XmtpStore)
if !ok {
return nil, status.Errorf(codes.Internal, "waku store not xmtp store")
}
start := time.Now()
res, err := store.FindMessages(buildWakuQuery(req))
duration := time.Since(start)
if err != nil {
metrics.EmitQuery(ctx, req, 0, err, duration)
return nil, status.Errorf(codes.Internal, err.Error())
}
metrics.EmitQuery(ctx, req, len(res.Messages), nil, duration)
if duration > 10*time.Millisecond {
log.With(zap.Duration("duration", duration), zap.Int("results", len(res.Messages))).Info("slow query")
}

envs := make([]*proto.Envelope, 0, len(res.Messages))
for _, msg := range res.Messages {
envs = append(envs, buildEnvelope(msg))
}

return &proto.QueryResponse{
Envelopes: envs,
PagingInfo: buildPagingInfo(res.PagingInfo),
}, nil
return s.store.Query(req)
}

func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) (*proto.BatchQueryResponse, error) {
Expand Down Expand Up @@ -430,69 +401,6 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func buildWakuQuery(req *proto.QueryRequest) *wakupb.HistoryQuery {
contentFilters := []*wakupb.ContentFilter{}
for _, contentTopic := range req.ContentTopics {
if contentTopic != "" {
contentFilters = append(contentFilters, &wakupb.ContentFilter{
ContentTopic: contentTopic,
})
}
}

return &wakupb.HistoryQuery{
ContentFilters: contentFilters,
StartTime: toWakuTimestamp(req.StartTimeNs),
EndTime: toWakuTimestamp(req.EndTimeNs),
PagingInfo: buildWakuPagingInfo(req.PagingInfo),
}
}

func buildPagingInfo(pi *wakupb.PagingInfo) *proto.PagingInfo {
if pi == nil {
return nil
}
var pagingInfo proto.PagingInfo
pagingInfo.Limit = uint32(pi.PageSize)
switch pi.Direction {
case wakupb.PagingInfo_BACKWARD:
pagingInfo.Direction = proto.SortDirection_SORT_DIRECTION_DESCENDING
case wakupb.PagingInfo_FORWARD:
pagingInfo.Direction = proto.SortDirection_SORT_DIRECTION_ASCENDING
}
if index := pi.Cursor; index != nil {
pagingInfo.Cursor = &proto.Cursor{
Cursor: &proto.Cursor_Index{
Index: &proto.IndexCursor{
Digest: index.Digest,
SenderTimeNs: uint64(index.SenderTime),
}}}
}
return &pagingInfo
}

func buildWakuPagingInfo(pi *proto.PagingInfo) *wakupb.PagingInfo {
if pi == nil {
return nil
}
pagingInfo := &wakupb.PagingInfo{
PageSize: uint64(pi.Limit),
}
switch pi.Direction {
case proto.SortDirection_SORT_DIRECTION_ASCENDING:
pagingInfo.Direction = wakupb.PagingInfo_FORWARD
case proto.SortDirection_SORT_DIRECTION_DESCENDING:
pagingInfo.Direction = wakupb.PagingInfo_BACKWARD
}
if ic := pi.Cursor.GetIndex(); ic != nil {
pagingInfo.Cursor = &wakupb.Index{
Digest: ic.Digest,
SenderTime: toWakuTimestamp(ic.SenderTimeNs),
}
}
return pagingInfo
}

func isValidSubscribeAllTopic(topic string) bool {
return strings.HasPrefix(topic, validXMTPTopicPrefix)
}
Expand All @@ -504,10 +412,6 @@ func fromWakuTimestamp(ts int64) uint64 {
return uint64(ts)
}

func toWakuTimestamp(ts uint64) int64 {
return int64(ts)
}

func buildNatsSubject(topic string) string {
hasher := fnv.New64a()
hasher.Write([]byte(topic))
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Server struct {
}

func New(config *Config) (*Server, error) {
if err := config.check(); err != nil {
if err := config.validate(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *Server) startGRPC() error {
healthcheck := health.NewServer()
healthgrpc.RegisterHealthServer(grpcServer, healthcheck)

s.messagev1, err = messagev1.NewService(s.Waku, s.Log)
s.messagev1, err = messagev1.NewService(s.Waku, s.Log, s.Store)
if err != nil {
return errors.Wrap(err, "creating message service")
}
Expand Down
Loading