Skip to content

Commit

Permalink
Merge pull request #58 from planetary-social/fix-follow-changes-loop
Browse files Browse the repository at this point in the history
Fix follow changes loop, better log formatting
  • Loading branch information
dcadenas authored Aug 21, 2024
2 parents 2a524d7 + c45679d commit 208b1b2
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 9 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ flowchart TB

3. **Tag the Image:**
Tag the image with the `stable` tag as described in the [GitHub documentation](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry).
```
docker tag ghcr.io/planetary-social/nos-notification-service-go:latest ghcr.io/planetary-social/nos-notification-service-go:stable && docker push ghcr.io/planetary-social/nos-notification-service-go:stable
```

4. **Trigger the Image Update Process:**
The image update process checks for new tags every 3 minutes. Therefore, you should see the new image deployed in approximately 5 minutes.
10 changes: 4 additions & 6 deletions service/adapters/gcp/gcp_follow_change_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ func (p *GCPFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *doma

ch := make(chan *domain.FollowChange)

defer func() {
close(ch)
p.subscriber.Close()
}()

go func() {
var payload domain.FollowChange
defer close(ch)
defer p.subscriber.Close()

for message := range subChan {
// We never retry messages so we can ACK immediately.
message.Ack()

var payload domain.FollowChange
if err := json.Unmarshal(message.Payload, &payload); err != nil {
p.logger.Error("error unmarshaling follow change payload", err, watermill.LogFields{"payload": string(message.Payload)})
continue
Expand Down
3 changes: 2 additions & 1 deletion service/app/follow_change_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (f *FollowChangePuller) Run(ctx context.Context) error {
}

for followChange := range ch {
f.logger.Debug().WithField("followChange", followChange).Message("received follow change")
f.logger.Debug().Message(followChange.String())

f.counter += 1
}

Expand Down
52 changes: 50 additions & 2 deletions service/domain/follow_change.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package domain

import (
"encoding/json"
"errors"
"fmt"
"time"
)

type FollowChange struct {
ChangeType string `json:"changeType"`
At uint `json:"at"`
At time.Time `json:"at"`
Follower PublicKey `json:"follower"`
Followee PublicKey `json:"followee"`
FriendlyFollowee string `json:"friendlyFollowee"`
FriendlyFollower string `json:"friendlyFollower"`
}

func NewFollowChange(changeType string, follower PublicKey, friendlyFollower string, followee PublicKey, friendlyFollowee string, at uint) FollowChange {
func NewFollowChange(changeType string, follower PublicKey, friendlyFollower string, followee PublicKey, friendlyFollowee string, at time.Time) FollowChange {
return FollowChange{
ChangeType: changeType,
Follower: follower,
Expand All @@ -19,3 +26,44 @@ func NewFollowChange(changeType string, follower PublicKey, friendlyFollower str
At: at,
}
}

func (f *FollowChange) UnmarshalJSON(data []byte) error {
var temp struct {
ChangeType string `json:"changeType"`
At int64 `json:"at"`
Follower string `json:"follower"`
Followee string `json:"followee"`
FriendlyFollowee string `json:"friendlyFollowee"`
FriendlyFollower string `json:"friendlyFollower"`
}

if err := json.Unmarshal(data, &temp); err != nil {
return err
}

f.ChangeType = temp.ChangeType
f.At = time.Unix(temp.At, 0)
f.FriendlyFollowee = temp.FriendlyFollowee
f.FriendlyFollower = temp.FriendlyFollower

var err error
f.Follower, err = NewPublicKeyFromHex(temp.Follower)
if err != nil {
return errors.New("invalid hex for follower: " + err.Error())
}

f.Followee, err = NewPublicKeyFromHex(temp.Followee)
if err != nil {
return errors.New("invalid hex for followee: " + err.Error())
}

return nil
}

func (f FollowChange) String() string {
if f.ChangeType == "unfollowed" {
return fmt.Sprintf("New unfollow: %s(%s) --x--> %s(%s)", f.FriendlyFollower, f.Follower.s, f.FriendlyFollowee, f.Followee.s)
}

return fmt.Sprintf("New follow: %s(%s) -----> %s(%s)", f.FriendlyFollower, f.Follower.s, f.FriendlyFollowee, f.Followee.s)
}

0 comments on commit 208b1b2

Please sign in to comment.