Skip to content

Commit

Permalink
we need to include uuids to discern which stream (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Moody authored Jan 25, 2023
1 parent 24ce61f commit 96a3a37
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 169 deletions.
17 changes: 8 additions & 9 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ func (b *BTrDB) SnoopEpErr(ep *Endpoint, err chan error) chan error {
type Subscriptions struct {
err chan error
id []uuid.UUID
c chan []SubRecord
c chan SubRecord
}

type SubRecord struct {
ID uuid.UUID
Val RawPoint
Val []RawPoint
}

type EPGroup struct {
Expand Down Expand Up @@ -465,7 +465,7 @@ func (b *BTrDB) Subscribe(ctx context.Context, id ...uuid.UUID) (*Subscriptions,
subs := &Subscriptions{
id: id,
err: make(chan error),
c: make(chan []SubRecord),
c: make(chan SubRecord),
}

eps, err := b.EndpointsSplit(ctx, id...)
Expand All @@ -480,13 +480,12 @@ func (b *BTrDB) Subscribe(ctx context.Context, id ...uuid.UUID) (*Subscriptions,

//Next gives either the most recent data for the set of subscriptions
//or an error regarding the connection state.
func (subs *Subscriptions) Next(ctx context.Context) ([]SubRecord, error) {
func (subs *Subscriptions) Next(ctx context.Context) (sr SubRecord, err error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-subs.err:
return nil, err
case sr := <-subs.c:
return sr, nil
err = ctx.Err()
case err = <-subs.err:
case sr = <-subs.c:
}
return
}
22 changes: 15 additions & 7 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (b *Endpoint) InsertGeneric(ctx context.Context, uu uuid.UUID, values []*pb
if p != nil {
if p.RoundBits != nil {
rounding = &pb.RoundSpec{
Spec: &pb.RoundSpec_Bits{int32(*p.RoundBits)},
Spec: &pb.RoundSpec_Bits{Bits: int32(*p.RoundBits)},
}
}
switch p.MergePolicy {
Expand Down Expand Up @@ -1023,33 +1023,41 @@ func (b *Endpoint) Changes(ctx context.Context, uu uuid.UUID, fromVersion uint64
return rvc, rvv, rve
}

func (b *Endpoint) SubscribeTo(ctx context.Context, uuid []uuid.UUID, c chan []SubRecord, errc chan error) {
func (b *Endpoint) SubscribeTo(ctx context.Context, uuid []uuid.UUID, c chan SubRecord, errc chan error) {
by := make([][]byte, len(uuid))
for i := range uuid {
by[i] = []byte(uuid[i])
}
stream, err := b.g.Subscribe(ctx, &pb.SubscriptionParams{
Uuid: by,
})

if err != nil {
errc <- err
close(c)
return
}

go func() {
for {
rp, err := stream.Recv()
if err != nil || rp.Stat != nil {
if err != nil {
errc <- err
close(c)
return
}
arr := make([]SubRecord, len(rp.Values))
if rp.Stat != nil {
errc <- errors.New(rp.Stat.Msg)
close(c)
return
}
sr := SubRecord{
ID: rp.Uuid,
Val: make([]RawPoint, len(rp.Values)),
}
for i := range rp.Values {
arr[i] = SubRecord{nil, RawPoint{rp.Values[i].Time, rp.Values[i].Value}}
sr.Val[i] = RawPoint{rp.Values[i].Time, rp.Values[i].Value}
}
c <- arr
c <- sr
}
}()
}
Loading

0 comments on commit 96a3a37

Please sign in to comment.