diff --git a/cmd/notification-service/di/inject_application.go b/cmd/notification-service/di/inject_application.go index ad697a4..b4be8df 100644 --- a/cmd/notification-service/di/inject_application.go +++ b/cmd/notification-service/di/inject_application.go @@ -24,6 +24,9 @@ var commandsSet = wire.NewSet( app.NewProcessSavedEventHandler, wire.Bind(new(firestorepubsub.ProcessSavedEventHandler), new(*app.ProcessSavedEventHandler)), + + app.NewProcessFollowChangeHandler, + wire.Bind(new(memorypubsub.ProcessFollowChangeHandler), new(*app.ProcessFollowChangeHandler)), ) var queriesSet = wire.NewSet( diff --git a/cmd/notification-service/di/inject_pubsub.go b/cmd/notification-service/di/inject_pubsub.go index 8c25a8d..ef4b5aa 100644 --- a/cmd/notification-service/di/inject_pubsub.go +++ b/cmd/notification-service/di/inject_pubsub.go @@ -14,10 +14,14 @@ var pubsubSet = wire.NewSet( pubsub.NewReceivedEventPubSub, wire.Bind(new(app.ReceivedEventPublisher), new(*pubsub.ReceivedEventPubSub)), wire.Bind(new(app.ReceivedEventSubscriber), new(*pubsub.ReceivedEventPubSub)), + + pubsub.NewFollowChangePublisher, + wire.Bind(new(app.FollowChangePublisher), new(*pubsub.FollowChangePublisher)), ) var googlePubsubSet = wire.NewSet( newExternalEventPublisher, + newExternalFollowChangeSubscriber, ) func newExternalEventPublisher(config config.Config, logger watermill.LoggerAdapter) (app.ExternalEventPublisher, error) { @@ -31,3 +35,15 @@ func newExternalEventPublisher(config config.Config, logger watermill.LoggerAdap return gcp.NewNoopPublisher(), nil } } + +func newExternalFollowChangeSubscriber(config config.Config, logger watermill.LoggerAdapter) (app.ExternalFollowChangeSubscriber, error) { + if config.GooglePubSubEnabled() { + subscriber, err := gcp.NewWatermillSubscriber(config, logger) + if err != nil { + return nil, errors.Wrap(err, "error creating a watermil subscriber") + } + return gcp.NewFollowChangeSubscriber(subscriber, logger), nil + } else { + return gcp.NewNoopSubscriber(), nil + } +} diff --git a/cmd/notification-service/di/service.go b/cmd/notification-service/di/service.go index 67c59fa..7d4a1fd 100644 --- a/cmd/notification-service/di/service.go +++ b/cmd/notification-service/di/service.go @@ -13,13 +13,15 @@ import ( ) type Service struct { - app app.Application - server http.Server - metricsServer http.MetricsServer - downloader *app.Downloader - receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber - eventSavedSubscriber *firestorepubsub.EventSavedSubscriber - eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache + app app.Application + server http.Server + metricsServer http.MetricsServer + downloader *app.Downloader + followChangePuller *app.FollowChangePuller + receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber + externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber + eventSavedSubscriber *firestorepubsub.EventSavedSubscriber + eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache } func NewService( @@ -27,18 +29,22 @@ func NewService( server http.Server, metricsServer http.MetricsServer, downloader *app.Downloader, + followChangePuller *app.FollowChangePuller, receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber, + externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber, eventSavedSubscriber *firestorepubsub.EventSavedSubscriber, eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache, ) Service { return Service{ - app: app, - server: server, - metricsServer: metricsServer, - downloader: downloader, - receivedEventSubscriber: receivedEventSubscriber, - eventSavedSubscriber: eventSavedSubscriber, - eventWasAlreadySavedCache: eventWasAlreadySavedCache, + app: app, + server: server, + metricsServer: metricsServer, + downloader: downloader, + followChangePuller: followChangePuller, + receivedEventSubscriber: receivedEventSubscriber, + externalFollowChangeSubscriber: externalFollowChangeSubscriber, + eventSavedSubscriber: eventSavedSubscriber, + eventWasAlreadySavedCache: eventWasAlreadySavedCache, } } @@ -73,6 +79,10 @@ func (s Service) Run(ctx context.Context) error { errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error") }() + go func() { + errCh <- errors.Wrap(s.followChangePuller.Run(ctx), "follow change subscriber error") + }() + runners++ go func() { errCh <- errors.Wrap(s.eventSavedSubscriber.Run(ctx), "event saved subscriber error") diff --git a/cmd/notification-service/di/wire.go b/cmd/notification-service/di/wire.go index 10b583d..6326748 100644 --- a/cmd/notification-service/di/wire.go +++ b/cmd/notification-service/di/wire.go @@ -29,6 +29,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) { googlePubsubSet, loggingSet, adaptersSet, + followChangePullerSet, ) return Service{}, nil, nil } @@ -49,13 +50,16 @@ func BuildIntegrationService(context.Context, config.Config) (IntegrationService applicationSet, firestoreAdaptersSet, downloaderSet, + followChangePullerSet, generatorSet, pubsubSet, loggingSet, integrationAdaptersSet, mocks.NewMockExternalEventPublisher, + mocks.NewMockExternalFollowChangeSubscriber, wire.Bind(new(app.ExternalEventPublisher), new(*mocks.MockExternalEventPublisher)), + wire.Bind(new(app.ExternalFollowChangeSubscriber), new(*mocks.MockExternalFollowChangeSubscriber)), ) return IntegrationService{}, nil, nil } @@ -79,6 +83,10 @@ var downloaderSet = wire.NewSet( app.NewDownloader, ) +var followChangePullerSet = wire.NewSet( + app.NewFollowChangePuller, +) + var generatorSet = wire.NewSet( notifications.NewGenerator, ) diff --git a/cmd/notification-service/di/wire_gen.go b/cmd/notification-service/di/wire_gen.go index 6735ccf..7a79aec 100644 --- a/cmd/notification-service/di/wire_gen.go +++ b/cmd/notification-service/di/wire_gen.go @@ -7,9 +7,8 @@ package di import ( - "context" - firestore2 "cloud.google.com/go/firestore" + "context" "github.com/ThreeDotsLabs/watermill" "github.com/google/wire" "github.com/planetary-social/go-notification-service/internal/logging" @@ -76,6 +75,13 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S server := http.NewServer(configConfig, application, logger) metricsServer := http.NewMetricsServer(prometheusPrometheus, configConfig, logger) downloader := app.NewDownloader(memoryEventWasAlreadySavedCache, transactionProvider, receivedEventPubSub, logger, prometheusPrometheus) + followChangePublisher := pubsub.NewFollowChangePublisher() + externalFollowChangeSubscriber, err := newExternalFollowChangeSubscriber(configConfig, watermillAdapter) + if err != nil { + cleanup() + return Service{}, nil, err + } + followChangePuller := app.NewFollowChangePuller(memoryEventWasAlreadySavedCache, followChangePublisher, externalFollowChangeSubscriber, logger, prometheusPrometheus) receivedEventSubscriber := memorypubsub.NewReceivedEventSubscriber(receivedEventPubSub, saveReceivedEventHandler, logger) subscriber, err := firestore.NewWatermillSubscriber(client, watermillAdapter) if err != nil { @@ -95,7 +101,7 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S } processSavedEventHandler := app.NewProcessSavedEventHandler(transactionProvider, generator, apnsAPNS, logger, prometheusPrometheus, externalEventPublisher) eventSavedSubscriber := firestorepubsub.NewEventSavedSubscriber(subscriber, processSavedEventHandler, prometheusPrometheus, logger) - service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, eventSavedSubscriber, memoryEventWasAlreadySavedCache) + service := NewService(application, server, metricsServer, downloader, followChangePuller, receivedEventSubscriber, externalFollowChangeSubscriber, eventSavedSubscriber, memoryEventWasAlreadySavedCache) return service, func() { cleanup() }, nil @@ -148,6 +154,9 @@ func BuildIntegrationService(contextContext context.Context, configConfig config server := http.NewServer(configConfig, application, logger) metricsServer := http.NewMetricsServer(prometheusPrometheus, configConfig, logger) downloader := app.NewDownloader(memoryEventWasAlreadySavedCache, transactionProvider, receivedEventPubSub, logger, prometheusPrometheus) + followChangePublisher := pubsub.NewFollowChangePublisher() + mockExternalFollowChangeSubscriber := mocks.NewMockExternalFollowChangeSubscriber() + followChangePuller := app.NewFollowChangePuller(memoryEventWasAlreadySavedCache, followChangePublisher, mockExternalFollowChangeSubscriber, logger, prometheusPrometheus) receivedEventSubscriber := memorypubsub.NewReceivedEventSubscriber(receivedEventPubSub, saveReceivedEventHandler, logger) subscriber, err := firestore.NewWatermillSubscriber(client, watermillAdapter) if err != nil { @@ -163,7 +172,7 @@ func BuildIntegrationService(contextContext context.Context, configConfig config mockExternalEventPublisher := mocks.NewMockExternalEventPublisher() processSavedEventHandler := app.NewProcessSavedEventHandler(transactionProvider, generator, apnsMock, logger, prometheusPrometheus, mockExternalEventPublisher) eventSavedSubscriber := firestorepubsub.NewEventSavedSubscriber(subscriber, processSavedEventHandler, prometheusPrometheus, logger) - service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, eventSavedSubscriber, memoryEventWasAlreadySavedCache) + service := NewService(application, server, metricsServer, downloader, followChangePuller, receivedEventSubscriber, mockExternalFollowChangeSubscriber, eventSavedSubscriber, memoryEventWasAlreadySavedCache) integrationService := IntegrationService{ Service: service, MockAPNS: apnsMock, @@ -210,4 +219,6 @@ type buildTransactionFirestoreAdaptersDependencies struct { var downloaderSet = wire.NewSet(app.NewDownloader) +var followChangePullerSet = wire.NewSet(app.NewFollowChangePuller) + var generatorSet = wire.NewSet(notifications.NewGenerator) diff --git a/service/adapters/gcp/gcp_follow_change_subscriber.go b/service/adapters/gcp/gcp_follow_change_subscriber.go new file mode 100644 index 0000000..dcba554 --- /dev/null +++ b/service/adapters/gcp/gcp_follow_change_subscriber.go @@ -0,0 +1,74 @@ +package gcp + +import ( + "context" + "encoding/json" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" + "github.com/pkg/errors" + "github.com/planetary-social/go-notification-service/service/config" + "github.com/planetary-social/go-notification-service/service/domain" + "google.golang.org/api/option" +) + +const googlePubSubFollowChangeTopic = "follow-changes" + +func NewWatermillSubscriber(config config.Config, logger watermill.LoggerAdapter) (*googlecloud.Subscriber, error) { + var options []option.ClientOption + + if j := config.GooglePubSubCredentialsJSON(); len(j) > 0 { + options = append(options, option.WithCredentialsJSON(config.GooglePubSubCredentialsJSON())) + } + + publisherConfig := googlecloud.SubscriberConfig{ + ProjectID: config.GooglePubSubProjectID(), + DoNotCreateTopicIfMissing: true, + ClientOptions: options, + } + + return googlecloud.NewSubscriber(publisherConfig, logger) +} + +type GCPFollowChangeSubscriber struct { + subscriber *googlecloud.Subscriber + logger watermill.LoggerAdapter +} + +func NewFollowChangeSubscriber(subscriber *googlecloud.Subscriber, logger watermill.LoggerAdapter) *GCPFollowChangeSubscriber { + return &GCPFollowChangeSubscriber{subscriber: subscriber, logger: logger} +} + +func (p *GCPFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) { + subChan, err := p.subscriber.Subscribe(ctx, googlePubSubFollowChangeTopic) + if err != nil { + return nil, errors.Wrap(err, "error subscribing") + } + + ch := make(chan *domain.FollowChange) + + defer func() { + close(ch) + p.subscriber.Close() + }() + + go func() { + var payload domain.FollowChange + for message := range subChan { + if err := json.Unmarshal(message.Payload, &payload); err != nil { + p.logger.Error("error unmarshaling follow change payload", err, watermill.LogFields{"payload": string(message.Payload)}) + return + } + + message.Ack() + + select { + case ch <- &payload: + case <-ctx.Done(): + return + } + } + }() + + return ch, nil +} diff --git a/service/adapters/gcp/noop_subscriber.go b/service/adapters/gcp/noop_subscriber.go new file mode 100644 index 0000000..5e09fa6 --- /dev/null +++ b/service/adapters/gcp/noop_subscriber.go @@ -0,0 +1,18 @@ +package gcp + +import ( + "context" + + "github.com/planetary-social/go-notification-service/service/domain" +) + +type NoopSubscriber struct { +} + +func NewNoopSubscriber() *NoopSubscriber { + return &NoopSubscriber{} +} +func (p *NoopSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) { + ch := make(chan *domain.FollowChange) + return ch, nil +} diff --git a/service/adapters/mocks/external_follow_change_subscriber.go b/service/adapters/mocks/external_follow_change_subscriber.go new file mode 100644 index 0000000..0816c78 --- /dev/null +++ b/service/adapters/mocks/external_follow_change_subscriber.go @@ -0,0 +1,19 @@ +package mocks + +import ( + "context" + + "github.com/planetary-social/go-notification-service/service/domain" +) + +type MockExternalFollowChangeSubscriber struct { +} + +func NewMockExternalFollowChangeSubscriber() *MockExternalFollowChangeSubscriber { + return &MockExternalFollowChangeSubscriber{} +} + +func (m MockExternalFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) { + ch := make(chan *domain.FollowChange) + return ch, nil +} diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index c3d0f28..f9bc903 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -15,6 +15,7 @@ import ( const ( labelHandlerName = "handlerName" labelRelayDownloaderState = "state" + labelFollowChanges = "follow changes" labelTopic = "topic" labelVcsRevision = "vcsRevision" labelVcsTime = "vcsTime" @@ -32,6 +33,7 @@ type Prometheus struct { applicationHandlerCallsCounter *prometheus.CounterVec applicationHandlerCallDurationHistogram *prometheus.HistogramVec relayDownloaderStateGauge *prometheus.GaugeVec + relayFollowChangeGauge prometheus.Counter subscriptionQueueLengthGauge *prometheus.GaugeVec apnsCallsCounter *prometheus.CounterVec @@ -62,6 +64,12 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { }, []string{labelRelayDownloaderState}, ) + relayFollowChangeGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "follow_change_count", + Help: "Number of follow changes.", + }, + ) subscriptionQueueLengthGauge := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "subscription_queue_length", @@ -89,6 +97,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { applicationHandlerCallsCounter, applicationHandlerCallDurationHistogram, relayDownloaderStateGauge, + relayFollowChangeGauge, subscriptionQueueLengthGauge, versionGague, apnsCallsCounter, @@ -118,6 +127,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { applicationHandlerCallsCounter: applicationHandlerCallsCounter, applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram, relayDownloaderStateGauge: relayDownloaderStateGauge, + relayFollowChangeGauge: relayFollowChangeGauge, subscriptionQueueLengthGauge: subscriptionQueueLengthGauge, apnsCallsCounter: apnsCallsCounter, @@ -135,6 +145,10 @@ func (p *Prometheus) MeasureRelayDownloadersState(n int, state app.RelayDownload p.relayDownloaderStateGauge.With(prometheus.Labels{labelRelayDownloaderState: state.String()}).Set(float64(n)) } +func (p *Prometheus) MeasureFollowChange(n int) { + p.relayFollowChangeGauge.Add(float64(n)) +} + func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) { p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n)) } diff --git a/service/adapters/pubsub/follow_change_publisher.go b/service/adapters/pubsub/follow_change_publisher.go new file mode 100644 index 0000000..e517e85 --- /dev/null +++ b/service/adapters/pubsub/follow_change_publisher.go @@ -0,0 +1,19 @@ +package pubsub + +import ( + "github.com/planetary-social/go-notification-service/service/domain" +) + +type FollowChangePublisher struct { + pubsub *GoChannelPubSub[domain.FollowChange] +} + +func NewFollowChangePublisher() *FollowChangePublisher { + return &FollowChangePublisher{ + pubsub: NewGoChannelPubSub[domain.FollowChange](), + } +} + +func (m *FollowChangePublisher) Publish(followChange domain.FollowChange) { + m.pubsub.Publish(followChange) +} diff --git a/service/app/app.go b/service/app/app.go index 8bd2848..bdf54b3 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -56,6 +56,10 @@ type ExternalEventPublisher interface { PublishNewEventReceived(ctx context.Context, event domain.Event) error } +type ExternalFollowChangeSubscriber interface { + Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) +} + type Application struct { Commands Commands Queries Queries @@ -120,9 +124,13 @@ type ReceivedEventSubscriber interface { Subscribe(ctx context.Context) <-chan ReceivedEvent } +// type FollowChangeSubscriber interface { +// Subscribe(ctx context.Context) <-chan domain.FollowChange +// } type Metrics interface { StartApplicationCall(handlerName string) ApplicationCall MeasureRelayDownloadersState(n int, state RelayDownloaderState) + MeasureFollowChange(n int) } type ApplicationCall interface { diff --git a/service/app/follow_change_puller.go b/service/app/follow_change_puller.go new file mode 100644 index 0000000..5e3e918 --- /dev/null +++ b/service/app/follow_change_puller.go @@ -0,0 +1,71 @@ +package app + +import ( + "context" + "time" + + "github.com/planetary-social/go-notification-service/internal/logging" + "github.com/planetary-social/go-notification-service/service/domain" +) + +type FollowChangePublisher interface { + Publish(followChange domain.FollowChange) +} + +type FollowChangePuller struct { + eventWasAlreadySavedCache EventWasAlreadySavedCache + followChangePublisher FollowChangePublisher + externalFollowChangeSubscriber ExternalFollowChangeSubscriber + logger logging.Logger + metrics Metrics + counter int +} + +func NewFollowChangePuller( + eventWasAlreadySavedCache EventWasAlreadySavedCache, + followChangePublisher FollowChangePublisher, + externalFollowChangeSubscriber ExternalFollowChangeSubscriber, + logger logging.Logger, + metrics Metrics, +) *FollowChangePuller { + return &FollowChangePuller{ + eventWasAlreadySavedCache: eventWasAlreadySavedCache, + followChangePublisher: followChangePublisher, + logger: logger.New("followChangePuller"), + metrics: metrics, + counter: 0, + } +} + +func (f *FollowChangePuller) Run(ctx context.Context) error { + go f.storeMetricsLoop(ctx) + + ch, err := f.externalFollowChangeSubscriber.Subscribe(ctx) + if err != nil { + return err + } + + for followChange := range ch { + f.logger.Debug().WithField("followChange", followChange).Message("received follow change") + f.counter += 1 + } + + return nil +} + +func (f *FollowChangePuller) storeMetricsLoop(ctx context.Context) { + for { + f.storeMetrics() + + select { + case <-time.After(storeMetricsEvery): + case <-ctx.Done(): + return + } + } +} + +func (f *FollowChangePuller) storeMetrics() { + f.metrics.MeasureFollowChange(f.counter) + f.counter = 0 +} diff --git a/service/app/handler_process_follow_change.go b/service/app/handler_process_follow_change.go new file mode 100644 index 0000000..5be2cf7 --- /dev/null +++ b/service/app/handler_process_follow_change.go @@ -0,0 +1,45 @@ +package app + +import ( + "context" + + "github.com/planetary-social/go-notification-service/internal/logging" + "github.com/planetary-social/go-notification-service/service/domain" +) + +type ProcessFollowChange struct { + follow_change domain.FollowChange +} + +func NewProcessFollowChange(follow_change domain.FollowChange) ProcessFollowChange { + return ProcessFollowChange{follow_change: follow_change} +} + +type ProcessFollowChangeHandler struct { + eventWasAlreadySavedCache EventWasAlreadySavedCache + logger logging.Logger + metrics Metrics +} + +func NewProcessFollowChangeHandler( + eventWasAlreadySavedCache EventWasAlreadySavedCache, + logger logging.Logger, + metrics Metrics, +) *ProcessFollowChangeHandler { + return &ProcessFollowChangeHandler{ + eventWasAlreadySavedCache: eventWasAlreadySavedCache, + logger: logger.New("processFollowChangeHandler"), + metrics: metrics, + } +} + +func (h *ProcessFollowChangeHandler) Handle(ctx context.Context, cmd ProcessFollowChange) (err error) { + defer h.metrics.StartApplicationCall("processFollowChange").End(&err) + + h.logger.Debug(). + WithField("follower", cmd.follow_change.Follower). + WithField("followee", cmd.follow_change.Followee). + Message("processing follow change") + + return nil +} diff --git a/service/domain/follow_change.go b/service/domain/follow_change.go new file mode 100644 index 0000000..bc2ab22 --- /dev/null +++ b/service/domain/follow_change.go @@ -0,0 +1,21 @@ +package domain + +type FollowChange struct { + ChangeType string `json:"changeType"` + At uint `json:"at"` + Follower PublicKey `json:"follower"` + Followee PublicKey `json:"followee"` + FriendlyFollowee string `json:"friendlyFollowee"` + FriendlyFollower string `json:"friendlyFollower"` +} + +func NewFollowChange(changeType string, follower PublicKey, friendlyFollower string, followee PublicKey, friendlyFollowee string, at uint) FollowChange { + return FollowChange{ + ChangeType: changeType, + Follower: follower, + FriendlyFollower: friendlyFollower, + Followee: followee, + FriendlyFollowee: friendlyFollowee, + At: at, + } +} diff --git a/service/ports/memorypubsub/received_follow_change.go b/service/ports/memorypubsub/received_follow_change.go new file mode 100644 index 0000000..e9e8a82 --- /dev/null +++ b/service/ports/memorypubsub/received_follow_change.go @@ -0,0 +1,43 @@ +// Package pubsub receives internal events. +package memorypubsub + +import ( + "context" + + "github.com/planetary-social/go-notification-service/service/app" +) + +type ProcessFollowChangeHandler interface { + Handle(ctx context.Context, cmd app.ProcessFollowChange) error +} + +// type FollowChangeSubscriber struct { +// pubsub *pubsub.FollowChangeSubscriber +// handler ProcessFollowChangeHandler +// logger logging.Logger +// } + +// func NewFollowChangeSubscriber( +// pubsub *pubsub.FollowChangeSubscriber, +// handler ProcessFollowChangeHandler, +// logger logging.Logger, +// ) *FollowChangeSubscriber { +// return &FollowChangeSubscriber{ +// pubsub: pubsub, +// handler: handler, +// logger: logger.New("followChangeSubscriber"), +// } +// } + +// func (p *FollowChangeSubscriber) Run(ctx context.Context) error { +// for v := range p.pubsub.Subscribe(ctx) { +// cmd := app.NewProcessFollowChange(v) +// if err := p.handler.Handle(ctx, cmd); err != nil { +// p.logger.Error(). +// WithError(err). +// WithField("follow change", v). +// Message("error handling a received event") +// } +// } +// return nil +// }