Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event filtering using the backend #4831

Merged
merged 13 commits into from
Nov 11, 2021
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ For details about compatibility between different releases, see the **Commitment
- `gs.txack.receive`, `gs.txack.drop` and `gs.txack.forward` events, which track the transmission acknowledgements from gateways.
- `ttn-lw-stack as-db migrate` command to migrate the Application Server database. This command records the schema version and only performs migrations if on a newer version.
- Use the `--force` flag to force perform migrations.
- Server-side event filtering with the `names` field.

### Changed

- Gateway Server default UDP worker count has been increased to 1024, from 16.
- Application Server webhooks and application packages default worker count has been increased to 1024, from 16.
- Application Server no longer sets the end device's `session.started_at` and `pending_session.started_at`. The session start time should be retrieved from the Network Server, per API specification.
- This requires an Application Server database migration (`ttn-lw-stack as-db migrate`) to clear the `started_at` field in existing (pending) sessions.
- Console changing to server-side event filtering (used to be client-side).

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions api/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3687,6 +3687,7 @@ The messages (for translation) are stored as "error:<namespace>:<name>".
| `identifiers` | [`EntityIdentifiers`](#ttn.lorawan.v3.EntityIdentifiers) | repeated | |
| `tail` | [`uint32`](#uint32) | | If greater than zero, this will return historical events, up to this maximum when the stream starts. If used in combination with "after", the limit that is reached first, is used. The availability of historical events depends on server support and retention policy. |
| `after` | [`google.protobuf.Timestamp`](#google.protobuf.Timestamp) | | If not empty, this will return historical events after the given time when the stream starts. If used in combination with "tail", the limit that is reached first, is used. The availability of historical events depends on server support and retention policy. |
| `names` | [`string`](#string) | repeated | If provided, this will filter events, so that only events with the given names are returned. Names can be provided as either exact event names (e.g. 'gs.up.receive'), or as regular expressions (e.g. '/^gs\..+/'). |

### <a name="ttn.lorawan.v3.Events">Service `Events`</a>

Expand Down
7 changes: 7 additions & 0 deletions api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -17440,6 +17440,13 @@
"type": "string",
"format": "date-time",
"description": "If not empty, this will return historical events after the given time when the stream starts.\nIf used in combination with \"tail\", the limit that is reached first, is used.\nThe availability of historical events depends on server support and retention policy."
},
"names": {
"type": "array",
"items": {
"type": "string"
},
"description": "If provided, this will filter events, so that only events with the given names are returned.\nNames can be provided as either exact event names (e.g. 'gs.up.receive'),\nor as regular expressions (e.g. '/^gs\\..+/')."
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions api/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ message StreamEventsRequest {
// If used in combination with "tail", the limit that is reached first, is used.
// The availability of historical events depends on server support and retention policy.
google.protobuf.Timestamp after = 3 [(gogoproto.stdtime) = true];
// If provided, this will filter events, so that only events with the given names are returned.
// Names can be provided as either exact event names (e.g. 'gs.up.receive'),
// or as regular expressions (e.g. '/^gs\..+/').
repeated string names = 4;
}

message FindRelatedEventsRequest {
Expand Down
5 changes: 4 additions & 1 deletion cmd/ttn-lw-cli/commands/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ var eventsCommand = &cobra.Command{
return errNoIDs.New()
}
tail, _ := cmd.Flags().GetUint32("tail")
names, _ := cmd.Flags().GetStringSlice("names")
req := &ttnpb.StreamEventsRequest{
Identifiers: ids,
Tail: tail,
Names: names,
}

g, gCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -104,7 +106,7 @@ var eventsCommand = &cobra.Command{
io.Write(os.Stdout, config.OutputFormat, evt)
}

return ctx.Err()
return g.Wait()
},
}

Expand Down Expand Up @@ -167,6 +169,7 @@ var eventsFindRelatedCommand = &cobra.Command{
func init() {
eventsCommand.Flags().AddFlagSet(entityIdentifiersSliceFlags())
eventsCommand.Flags().Uint32("tail", 0, "")
eventsCommand.Flags().StringSlice("names", nil, "")
Root.AddCommand(eventsCommand)
eventsFindRelatedCommand.Flags().String("correlation-id", "", "")
eventsCommand.AddCommand(eventsFindRelatedCommand)
Expand Down
27 changes: 27 additions & 0 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,15 @@
"file": "conversion.go"
}
},
"error:pkg/events/grpc:invalid_regexp": {
"translations": {
"en": "invalid regexp"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/grpc:no_identifiers": {
"translations": {
"en": "no identifiers"
Expand All @@ -3905,6 +3914,15 @@
"file": "grpc.go"
}
},
"error:pkg/events/grpc:no_matching_events": {
"translations": {
"en": "no matching events for regexp `{regexp}`"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/grpc:storage_disabled": {
"translations": {
"en": "events storage is not not enabled"
Expand All @@ -3914,6 +3932,15 @@
"file": "grpc.go"
}
},
"error:pkg/events/grpc:unknown_event_name": {
"translations": {
"en": "unknown event `{name}`"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/redis:channel_closed": {
"translations": {
"en": "channel closed"
Expand Down
104 changes: 95 additions & 9 deletions pkg/events/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package grpc
import (
"context"
"os"
"regexp"
"sort"
"strings"
"time"

pbtypes "github.com/gogo/protobuf/types"
grpc_runtime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/gogoproto"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/rpcmiddleware/warning"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand All @@ -41,16 +46,74 @@ func NewEventsServer(ctx context.Context, pubsub events.PubSub) *EventsServer {
if _, ok := pubsub.(events.Store); ok {
log.FromContext(ctx).Infof("Events PubSub: %T is also a Store!", pubsub)
}
definedNames := make(map[string]struct{})
for _, def := range events.All().Definitions() {
definedNames[def.Name()] = struct{}{}
}
return &EventsServer{
ctx: ctx,
pubsub: pubsub,
ctx: ctx,
pubsub: pubsub,
definedNames: definedNames,
}
}

// EventsServer streams events from a PubSub over gRPC.
type EventsServer struct {
ctx context.Context
pubsub events.PubSub
ctx context.Context
pubsub events.PubSub
definedNames map[string]struct{}
}

var (
errInvalidRegexp = errors.DefineInvalidArgument("invalid_regexp", "invalid regexp")
errNoMatchingEvents = errors.DefineInvalidArgument("no_matching_events", "no matching events for regexp `{regexp}`")
errUnknownEventName = errors.DefineInvalidArgument("unknown_event_name", "unknown event `{name}`")
)

func (srv *EventsServer) processNames(names ...string) ([]string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever.

if len(names) == 0 {
return nil, nil
}
nameMap := make(map[string]struct{})
for _, name := range names {
if strings.HasPrefix(name, "/") && strings.HasSuffix(name, "/") {
re, err := regexp.Compile(strings.Trim(name, "/"))
if err != nil {
return nil, errInvalidRegexp.WithCause(err)
}
var found bool
for defined := range srv.definedNames {
if re.MatchString(defined) {
nameMap[defined] = struct{}{}
found = true
}
}
if !found {
return nil, errNoMatchingEvents.WithAttributes("regexp", re.String())
}
} else {
var found bool
for defined := range srv.definedNames {
if name == defined {
nameMap[name] = struct{}{}
found = true
break
}
}
if !found {
return nil, errUnknownEventName.WithAttributes("name", name)
}
}
}
if len(nameMap) == 0 {
return nil, nil
}
out := make([]string, 0, len(nameMap))
for name := range nameMap {
out = append(out, name)
}
sort.Strings(out)
return out, nil
}

var errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers")
Expand All @@ -60,6 +123,12 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve
if len(req.Identifiers) == 0 {
return errNoIdentifiers.New()
}

names, err := srv.processNames(req.Names...)
if err != nil {
return err
}

ctx := stream.Context()

if err := rights.RequireAny(ctx, req.Identifiers...); err != nil {
Expand All @@ -71,16 +140,20 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve

store, hasStore := srv.pubsub.(events.Store)
var group *errgroup.Group
if hasStore && (req.Tail > 0 || req.After != nil) {
if hasStore {
if req.After == nil {
now := time.Now()
req.After = &now
}
group, ctx = errgroup.WithContext(ctx)
group.Go(func() error {
return store.SubscribeWithHistory(ctx, nil, req.Identifiers, req.After, int(req.Tail), handler)
return store.SubscribeWithHistory(ctx, names, req.Identifiers, req.After, int(req.Tail), handler)
})
} else {
if req.Tail > 0 || req.After != nil {
warning.Add(ctx, "Events storage is not enabled")
}
if err := srv.pubsub.Subscribe(ctx, nil, req.Identifiers, handler); err != nil {
if err := srv.pubsub.Subscribe(ctx, names, req.Identifiers, handler); err != nil {
return err
}
}
Expand All @@ -94,14 +167,27 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve
return err
}
now := time.Now().UTC()
if err := stream.Send(&ttnpb.Event{
startEvent := &ttnpb.Event{
UniqueId: events.NewCorrelationID(),
Name: "events.stream.start",
Time: &now,
Identifiers: req.Identifiers,
Origin: hostname,
CorrelationIds: events.CorrelationIDsFromContext(ctx),
}); err != nil {
}

if len(names) > 0 {
value, err := gogoproto.Value(names)
if err != nil {
return err
}
startEvent.Data, err = pbtypes.MarshalAny(value)
if err != nil {
return err
}
}

if err := stream.Send(startEvent); err != nil {
return err
}

Expand Down
Loading