Skip to content

Commit

Permalink
Merge pull request #4831 from TheThingsNetwork/feature/2231-event-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
htdvisser authored Nov 11, 2021
2 parents 4769237 + d7e1025 commit 82713dc
Show file tree
Hide file tree
Showing 34 changed files with 476 additions and 204 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ For details about compatibility between different releases, see the **Commitment
- `gs.txack.receive`, `gs.txack.drop` and `gs.txack.forward` events, which track the transmission acknowledgements from gateways.
- `ttn-lw-stack as-db migrate` command to migrate the Application Server database. This command records the schema version and only performs migrations if on a newer version.
- Use the `--force` flag to force perform migrations.
- Server-side event filtering with the `names` field.

### Changed

- Gateway Server default UDP worker count has been increased to 1024, from 16.
- Application Server webhooks and application packages default worker count has been increased to 1024, from 16.
- Application Server no longer sets the end device's `session.started_at` and `pending_session.started_at`. The session start time should be retrieved from the Network Server, per API specification.
- This requires an Application Server database migration (`ttn-lw-stack as-db migrate`) to clear the `started_at` field in existing (pending) sessions.
- Console changing to server-side event filtering (used to be client-side).

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions api/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3687,6 +3687,7 @@ The messages (for translation) are stored as "error:<namespace>:<name>".
| `identifiers` | [`EntityIdentifiers`](#ttn.lorawan.v3.EntityIdentifiers) | repeated | |
| `tail` | [`uint32`](#uint32) | | If greater than zero, this will return historical events, up to this maximum when the stream starts. If used in combination with "after", the limit that is reached first, is used. The availability of historical events depends on server support and retention policy. |
| `after` | [`google.protobuf.Timestamp`](#google.protobuf.Timestamp) | | If not empty, this will return historical events after the given time when the stream starts. If used in combination with "tail", the limit that is reached first, is used. The availability of historical events depends on server support and retention policy. |
| `names` | [`string`](#string) | repeated | If provided, this will filter events, so that only events with the given names are returned. Names can be provided as either exact event names (e.g. 'gs.up.receive'), or as regular expressions (e.g. '/^gs\..+/'). |

### <a name="ttn.lorawan.v3.Events">Service `Events`</a>

Expand Down
7 changes: 7 additions & 0 deletions api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -17440,6 +17440,13 @@
"type": "string",
"format": "date-time",
"description": "If not empty, this will return historical events after the given time when the stream starts.\nIf used in combination with \"tail\", the limit that is reached first, is used.\nThe availability of historical events depends on server support and retention policy."
},
"names": {
"type": "array",
"items": {
"type": "string"
},
"description": "If provided, this will filter events, so that only events with the given names are returned.\nNames can be provided as either exact event names (e.g. 'gs.up.receive'),\nor as regular expressions (e.g. '/^gs\\..+/')."
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions api/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ message StreamEventsRequest {
// If used in combination with "tail", the limit that is reached first, is used.
// The availability of historical events depends on server support and retention policy.
google.protobuf.Timestamp after = 3 [(gogoproto.stdtime) = true];
// If provided, this will filter events, so that only events with the given names are returned.
// Names can be provided as either exact event names (e.g. 'gs.up.receive'),
// or as regular expressions (e.g. '/^gs\..+/').
repeated string names = 4;
}

message FindRelatedEventsRequest {
Expand Down
5 changes: 4 additions & 1 deletion cmd/ttn-lw-cli/commands/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ var eventsCommand = &cobra.Command{
return errNoIDs.New()
}
tail, _ := cmd.Flags().GetUint32("tail")
names, _ := cmd.Flags().GetStringSlice("names")
req := &ttnpb.StreamEventsRequest{
Identifiers: ids,
Tail: tail,
Names: names,
}

g, gCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -104,7 +106,7 @@ var eventsCommand = &cobra.Command{
io.Write(os.Stdout, config.OutputFormat, evt)
}

return ctx.Err()
return g.Wait()
},
}

Expand Down Expand Up @@ -167,6 +169,7 @@ var eventsFindRelatedCommand = &cobra.Command{
func init() {
eventsCommand.Flags().AddFlagSet(entityIdentifiersSliceFlags())
eventsCommand.Flags().Uint32("tail", 0, "")
eventsCommand.Flags().StringSlice("names", nil, "")
Root.AddCommand(eventsCommand)
eventsFindRelatedCommand.Flags().String("correlation-id", "", "")
eventsCommand.AddCommand(eventsFindRelatedCommand)
Expand Down
27 changes: 27 additions & 0 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,15 @@
"file": "conversion.go"
}
},
"error:pkg/events/grpc:invalid_regexp": {
"translations": {
"en": "invalid regexp"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/grpc:no_identifiers": {
"translations": {
"en": "no identifiers"
Expand All @@ -3905,6 +3914,15 @@
"file": "grpc.go"
}
},
"error:pkg/events/grpc:no_matching_events": {
"translations": {
"en": "no matching events for regexp `{regexp}`"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/grpc:storage_disabled": {
"translations": {
"en": "events storage is not not enabled"
Expand All @@ -3914,6 +3932,15 @@
"file": "grpc.go"
}
},
"error:pkg/events/grpc:unknown_event_name": {
"translations": {
"en": "unknown event `{name}`"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/redis:channel_closed": {
"translations": {
"en": "channel closed"
Expand Down
104 changes: 95 additions & 9 deletions pkg/events/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package grpc
import (
"context"
"os"
"regexp"
"sort"
"strings"
"time"

pbtypes "github.com/gogo/protobuf/types"
grpc_runtime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/gogoproto"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/rpcmiddleware/warning"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
Expand All @@ -41,16 +46,74 @@ func NewEventsServer(ctx context.Context, pubsub events.PubSub) *EventsServer {
if _, ok := pubsub.(events.Store); ok {
log.FromContext(ctx).Infof("Events PubSub: %T is also a Store!", pubsub)
}
definedNames := make(map[string]struct{})
for _, def := range events.All().Definitions() {
definedNames[def.Name()] = struct{}{}
}
return &EventsServer{
ctx: ctx,
pubsub: pubsub,
ctx: ctx,
pubsub: pubsub,
definedNames: definedNames,
}
}

// EventsServer streams events from a PubSub over gRPC.
type EventsServer struct {
ctx context.Context
pubsub events.PubSub
ctx context.Context
pubsub events.PubSub
definedNames map[string]struct{}
}

var (
errInvalidRegexp = errors.DefineInvalidArgument("invalid_regexp", "invalid regexp")
errNoMatchingEvents = errors.DefineInvalidArgument("no_matching_events", "no matching events for regexp `{regexp}`")
errUnknownEventName = errors.DefineInvalidArgument("unknown_event_name", "unknown event `{name}`")
)

func (srv *EventsServer) processNames(names ...string) ([]string, error) {
if len(names) == 0 {
return nil, nil
}
nameMap := make(map[string]struct{})
for _, name := range names {
if strings.HasPrefix(name, "/") && strings.HasSuffix(name, "/") {
re, err := regexp.Compile(strings.Trim(name, "/"))
if err != nil {
return nil, errInvalidRegexp.WithCause(err)
}
var found bool
for defined := range srv.definedNames {
if re.MatchString(defined) {
nameMap[defined] = struct{}{}
found = true
}
}
if !found {
return nil, errNoMatchingEvents.WithAttributes("regexp", re.String())
}
} else {
var found bool
for defined := range srv.definedNames {
if name == defined {
nameMap[name] = struct{}{}
found = true
break
}
}
if !found {
return nil, errUnknownEventName.WithAttributes("name", name)
}
}
}
if len(nameMap) == 0 {
return nil, nil
}
out := make([]string, 0, len(nameMap))
for name := range nameMap {
out = append(out, name)
}
sort.Strings(out)
return out, nil
}

var errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers")
Expand All @@ -60,6 +123,12 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve
if len(req.Identifiers) == 0 {
return errNoIdentifiers.New()
}

names, err := srv.processNames(req.Names...)
if err != nil {
return err
}

ctx := stream.Context()

if err := rights.RequireAny(ctx, req.Identifiers...); err != nil {
Expand All @@ -71,16 +140,20 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve

store, hasStore := srv.pubsub.(events.Store)
var group *errgroup.Group
if hasStore && (req.Tail > 0 || req.After != nil) {
if hasStore {
if req.After == nil {
now := time.Now()
req.After = &now
}
group, ctx = errgroup.WithContext(ctx)
group.Go(func() error {
return store.SubscribeWithHistory(ctx, nil, req.Identifiers, req.After, int(req.Tail), handler)
return store.SubscribeWithHistory(ctx, names, req.Identifiers, req.After, int(req.Tail), handler)
})
} else {
if req.Tail > 0 || req.After != nil {
warning.Add(ctx, "Events storage is not enabled")
}
if err := srv.pubsub.Subscribe(ctx, nil, req.Identifiers, handler); err != nil {
if err := srv.pubsub.Subscribe(ctx, names, req.Identifiers, handler); err != nil {
return err
}
}
Expand All @@ -94,14 +167,27 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve
return err
}
now := time.Now().UTC()
if err := stream.Send(&ttnpb.Event{
startEvent := &ttnpb.Event{
UniqueId: events.NewCorrelationID(),
Name: "events.stream.start",
Time: &now,
Identifiers: req.Identifiers,
Origin: hostname,
CorrelationIds: events.CorrelationIDsFromContext(ctx),
}); err != nil {
}

if len(names) > 0 {
value, err := gogoproto.Value(names)
if err != nil {
return err
}
startEvent.Data, err = pbtypes.MarshalAny(value)
if err != nil {
return err
}
}

if err := stream.Send(startEvent); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 82713dc

Please sign in to comment.