Skip to content

Commit

Permalink
increase debugging in subscribe and barrier methods
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Sep 24, 2021
1 parent a6f6442 commit cb50a15
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ func (s *DefaultService) Subscribe(ctx context.Context, topic string) (*subscrip
s.subsMu.Lock()
defer s.subsMu.Unlock()

defer log.Debugw("subscribed to topic")
log.Debugw("creating topic if new")
ps := s.createSubIfNew(topic)
return ps.subscribe(ctx), nil

log.Debugw("creating subscription")
sub := ps.subscribe(ctx)
return sub, nil
}

func (s *DefaultService) createSubIfNew(topic string) *pubsub {
Expand All @@ -88,16 +91,29 @@ func (s *DefaultService) createSubIfNew(topic string) *pubsub {
}

func (s *DefaultService) Barrier(ctx context.Context, state string, target int) error {
log := log.With("state", state, "target", target)
log.Debugw("subscribing to topic")

if target == 0 {
log.Warnw("requested a barrier with target zero; satisfying immediately", "state", state)
log.Warnw("requested a barrier with target zero; satisfying immediately")
return nil
}

s.barriersMu.Lock()
log.Debugw("creating state if new")
barrier := s.createBarrierIfNew(state)
s.barriersMu.Unlock()

return barrier.wait(ctx, target)
log.Debugw("waiting for barrier")
err := barrier.wait(ctx, target)

if err != nil {
log.Debugw("barrier errored", "err", err)
} else {
log.Debugw("barrier satisfied")
}

return err
}

func (s *DefaultService) SignalEntry(ctx context.Context, state string) (int, error) {
Expand Down

0 comments on commit cb50a15

Please sign in to comment.