Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 20, 2024
1 parent 39c3411 commit a2e6b0d
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 18 deletions.
3 changes: 3 additions & 0 deletions cmd/notification-service/di/inject_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ var commandsSet = wire.NewSet(

app.NewProcessSavedEventHandler,
wire.Bind(new(firestorepubsub.ProcessSavedEventHandler), new(*app.ProcessSavedEventHandler)),

app.NewProcessFollowChangeHandler,
wire.Bind(new(memorypubsub.ProcessFollowChangeHandler), new(*app.ProcessFollowChangeHandler)),
)

var queriesSet = wire.NewSet(
Expand Down
16 changes: 16 additions & 0 deletions cmd/notification-service/di/inject_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ var pubsubSet = wire.NewSet(
pubsub.NewReceivedEventPubSub,
wire.Bind(new(app.ReceivedEventPublisher), new(*pubsub.ReceivedEventPubSub)),
wire.Bind(new(app.ReceivedEventSubscriber), new(*pubsub.ReceivedEventPubSub)),

pubsub.NewFollowChangePublisher,
wire.Bind(new(app.FollowChangePublisher), new(*pubsub.FollowChangePublisher)),
)

var googlePubsubSet = wire.NewSet(
newExternalEventPublisher,
newExternalFollowChangeSubscriber,
)

func newExternalEventPublisher(config config.Config, logger watermill.LoggerAdapter) (app.ExternalEventPublisher, error) {
Expand All @@ -31,3 +35,15 @@ func newExternalEventPublisher(config config.Config, logger watermill.LoggerAdap
return gcp.NewNoopPublisher(), nil
}
}

func newExternalFollowChangeSubscriber(config config.Config, logger watermill.LoggerAdapter) (app.ExternalFollowChangeSubscriber, error) {
if config.GooglePubSubEnabled() {
subscriber, err := gcp.NewWatermillSubscriber(config, logger)
if err != nil {
return nil, errors.Wrap(err, "error creating a watermil subscriber")
}
return gcp.NewFollowChangeSubscriber(subscriber, logger), nil
} else {
return gcp.NewNoopSubscriber(), nil
}
}
38 changes: 24 additions & 14 deletions cmd/notification-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,38 @@ import (
)

type Service struct {
app app.Application
server http.Server
metricsServer http.MetricsServer
downloader *app.Downloader
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber
eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache
app app.Application
server http.Server
metricsServer http.MetricsServer
downloader *app.Downloader
followChangePuller *app.FollowChangePuller
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber
eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache
}

func NewService(
app app.Application,
server http.Server,
metricsServer http.MetricsServer,
downloader *app.Downloader,
followChangePuller *app.FollowChangePuller,
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber,
externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber,
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber,
eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache,
) Service {
return Service{
app: app,
server: server,
metricsServer: metricsServer,
downloader: downloader,
receivedEventSubscriber: receivedEventSubscriber,
eventSavedSubscriber: eventSavedSubscriber,
eventWasAlreadySavedCache: eventWasAlreadySavedCache,
app: app,
server: server,
metricsServer: metricsServer,
downloader: downloader,
followChangePuller: followChangePuller,
receivedEventSubscriber: receivedEventSubscriber,
externalFollowChangeSubscriber: externalFollowChangeSubscriber,
eventSavedSubscriber: eventSavedSubscriber,
eventWasAlreadySavedCache: eventWasAlreadySavedCache,
}
}

Expand Down Expand Up @@ -73,6 +79,10 @@ func (s Service) Run(ctx context.Context) error {
errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error")
}()

go func() {
errCh <- errors.Wrap(s.followChangePuller.Run(ctx), "follow change subscriber error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.eventSavedSubscriber.Run(ctx), "event saved subscriber error")
Expand Down
8 changes: 8 additions & 0 deletions cmd/notification-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) {
googlePubsubSet,
loggingSet,
adaptersSet,
followChangePullerSet,
)
return Service{}, nil, nil
}
Expand All @@ -49,13 +50,16 @@ func BuildIntegrationService(context.Context, config.Config) (IntegrationService
applicationSet,
firestoreAdaptersSet,
downloaderSet,
followChangePullerSet,
generatorSet,
pubsubSet,
loggingSet,
integrationAdaptersSet,

mocks.NewMockExternalEventPublisher,
mocks.NewMockExternalFollowChangeSubscriber,
wire.Bind(new(app.ExternalEventPublisher), new(*mocks.MockExternalEventPublisher)),
wire.Bind(new(app.ExternalFollowChangeSubscriber), new(*mocks.MockExternalFollowChangeSubscriber)),
)
return IntegrationService{}, nil, nil
}
Expand All @@ -79,6 +83,10 @@ var downloaderSet = wire.NewSet(
app.NewDownloader,
)

var followChangePullerSet = wire.NewSet(
app.NewFollowChangePuller,
)

var generatorSet = wire.NewSet(
notifications.NewGenerator,
)
19 changes: 15 additions & 4 deletions cmd/notification-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions service/adapters/gcp/gcp_follow_change_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package gcp

import (
"context"
"encoding/json"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/pkg/errors"
"github.com/planetary-social/go-notification-service/service/config"
"github.com/planetary-social/go-notification-service/service/domain"
"google.golang.org/api/option"
)

const googlePubSubFollowChangeTopic = "follow-changes"

func NewWatermillSubscriber(config config.Config, logger watermill.LoggerAdapter) (*googlecloud.Subscriber, error) {
var options []option.ClientOption

if j := config.GooglePubSubCredentialsJSON(); len(j) > 0 {
options = append(options, option.WithCredentialsJSON(config.GooglePubSubCredentialsJSON()))
}

publisherConfig := googlecloud.SubscriberConfig{
ProjectID: config.GooglePubSubProjectID(),
DoNotCreateTopicIfMissing: true,
ClientOptions: options,
}

return googlecloud.NewSubscriber(publisherConfig, logger)
}

type GCPFollowChangeSubscriber struct {
subscriber *googlecloud.Subscriber
logger watermill.LoggerAdapter
}

func NewFollowChangeSubscriber(subscriber *googlecloud.Subscriber, logger watermill.LoggerAdapter) *GCPFollowChangeSubscriber {
return &GCPFollowChangeSubscriber{subscriber: subscriber, logger: logger}
}

func (p *GCPFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) {
subChan, err := p.subscriber.Subscribe(ctx, googlePubSubFollowChangeTopic)
if err != nil {
return nil, errors.Wrap(err, "error subscribing")
}

ch := make(chan *domain.FollowChange)

defer func() {
close(ch)
p.subscriber.Close()
}()

go func() {
var payload domain.FollowChange
for message := range subChan {
if err := json.Unmarshal(message.Payload, &payload); err != nil {
p.logger.Error("error unmarshaling follow change payload", err, watermill.LogFields{"payload": string(message.Payload)})
return
}

message.Ack()

select {
case ch <- &payload:
case <-ctx.Done():
return
}
}
}()

return ch, nil
}
18 changes: 18 additions & 0 deletions service/adapters/gcp/noop_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package gcp

import (
"context"

"github.com/planetary-social/go-notification-service/service/domain"
)

type NoopSubscriber struct {
}

func NewNoopSubscriber() *NoopSubscriber {
return &NoopSubscriber{}
}
func (p *NoopSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) {
ch := make(chan *domain.FollowChange)
return ch, nil
}
19 changes: 19 additions & 0 deletions service/adapters/mocks/external_follow_change_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mocks

import (
"context"

"github.com/planetary-social/go-notification-service/service/domain"
)

type MockExternalFollowChangeSubscriber struct {
}

func NewMockExternalFollowChangeSubscriber() *MockExternalFollowChangeSubscriber {
return &MockExternalFollowChangeSubscriber{}
}

func (m MockExternalFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) {
ch := make(chan *domain.FollowChange)
return ch, nil
}
14 changes: 14 additions & 0 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
labelHandlerName = "handlerName"
labelRelayDownloaderState = "state"
labelFollowChanges = "follow changes"
labelTopic = "topic"
labelVcsRevision = "vcsRevision"
labelVcsTime = "vcsTime"
Expand All @@ -32,6 +33,7 @@ type Prometheus struct {
applicationHandlerCallsCounter *prometheus.CounterVec
applicationHandlerCallDurationHistogram *prometheus.HistogramVec
relayDownloaderStateGauge *prometheus.GaugeVec
relayFollowChangeGauge prometheus.Counter
subscriptionQueueLengthGauge *prometheus.GaugeVec
apnsCallsCounter *prometheus.CounterVec

Expand Down Expand Up @@ -62,6 +64,12 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
},
[]string{labelRelayDownloaderState},
)
relayFollowChangeGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "follow_change_count",
Help: "Number of follow changes.",
},
)
subscriptionQueueLengthGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "subscription_queue_length",
Expand Down Expand Up @@ -89,6 +97,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram,
relayDownloaderStateGauge,
relayFollowChangeGauge,
subscriptionQueueLengthGauge,
versionGague,
apnsCallsCounter,
Expand Down Expand Up @@ -118,6 +127,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
applicationHandlerCallsCounter: applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram,
relayDownloaderStateGauge: relayDownloaderStateGauge,
relayFollowChangeGauge: relayFollowChangeGauge,
subscriptionQueueLengthGauge: subscriptionQueueLengthGauge,
apnsCallsCounter: apnsCallsCounter,

Expand All @@ -135,6 +145,10 @@ func (p *Prometheus) MeasureRelayDownloadersState(n int, state app.RelayDownload
p.relayDownloaderStateGauge.With(prometheus.Labels{labelRelayDownloaderState: state.String()}).Set(float64(n))
}

func (p *Prometheus) MeasureFollowChange(n int) {
p.relayFollowChangeGauge.Add(float64(n))
}

func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) {
p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n))
}
Expand Down
19 changes: 19 additions & 0 deletions service/adapters/pubsub/follow_change_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pubsub

import (
"github.com/planetary-social/go-notification-service/service/domain"
)

type FollowChangePublisher struct {
pubsub *GoChannelPubSub[domain.FollowChange]
}

func NewFollowChangePublisher() *FollowChangePublisher {
return &FollowChangePublisher{
pubsub: NewGoChannelPubSub[domain.FollowChange](),
}
}

func (m *FollowChangePublisher) Publish(followChange domain.FollowChange) {
m.pubsub.Publish(followChange)
}
Loading

0 comments on commit a2e6b0d

Please sign in to comment.