Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow the specification of a write window for retention policies #25517

Merged
merged 12 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/influx_tools/importer/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Command struct {
replication int
duration time.Duration
shardDuration time.Duration
futureLimit time.Duration
pastLimit time.Duration
buildTSI bool
replace bool
}
Expand Down Expand Up @@ -75,6 +77,13 @@ func (cmd *Command) Run(args []string) (err error) {
if cmd.replication > 0 {
rp.ReplicaN = &cmd.replication
}
if cmd.futureLimit > 0 {
rp.FutureWriteLimit = &cmd.futureLimit
}
if cmd.pastLimit > 0 {
rp.PastWriteLimit = &cmd.pastLimit
}

err = i.CreateDatabase(rp)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion cmd/influx_tools/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (i *importer) CreateDatabase(rp *meta.RetentionPolicySpec) error {

nonmatchingRp := (rpi != nil) && ((rp.Duration != nil && rpi.Duration != *rp.Duration) ||
(rp.ReplicaN != nil && rpi.ReplicaN != *rp.ReplicaN) ||
(rpi.ShardGroupDuration != rp.ShardGroupDuration))
(rpi.ShardGroupDuration != rp.ShardGroupDuration) ||
(rp.PastWriteLimit != nil && rpi.PastWriteLimit != *rp.PastWriteLimit) ||
(rp.FutureWriteLimit != nil && rpi.FutureWriteLimit != *rp.FutureWriteLimit))
if nonmatchingRp {
return fmt.Errorf("retention policy %v already exists with different parameters", rp.Name)
} else {
Expand Down
35 changes: 27 additions & 8 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
s.Shards[shardInfo.ID] = shardInfo
}

func withinWriteWindow(rp *meta.RetentionPolicyInfo, p models.Point) bool {
if (rp != nil) &&
(((rp.FutureWriteLimit > 0) && p.Time().After(time.Now().Add(rp.FutureWriteLimit))) ||
((rp.PastWriteLimit > 0) && p.Time().Before(time.Now().Add(-rp.PastWriteLimit)))) {
Comment on lines +126 to +127
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for future optimization: Create a writeWindow object that caches the value of time.Now().Add(rp.FutureWriteLimit) and time.Now().Add(-rp.PastWriteLimit) to avoid calculating them twice for every point in request.

return false
}
return true
}

// Open opens the communication channel with the point writer.
func (w *PointsWriter) Open() error {
w.closing = make(chan struct{})
Expand Down Expand Up @@ -189,9 +198,10 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
}

for _, p := range wp.Points {
// Either the point is outside the scope of the RP, or we already have
// a suitable shard group for the point.
if p.Time().Before(min) || list.Covers(p.Time()) {
// Either the point is outside the scope of the RP, we already have
// a suitable shard group for the point, or it is outside the write window
// for the RP, and we don't want to unnecessarily create a shard for it
if p.Time().Before(min) || list.Covers(p.Time()) || !withinWriteWindow(rp, p) {
continue
}

Expand All @@ -211,9 +221,9 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
mapping := NewShardMapping(len(wp.Points))
for _, p := range wp.Points {
sg := list.ShardGroupAt(p.Time())
if sg == nil {
if sg == nil || !withinWriteWindow(rp, p) {
// We didn't create a shard group because the point was outside the
// scope of the RP.
// scope of the RP, or the point is outside the write window for the RP.
mapping.Dropped = append(mapping.Dropped, p)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
Expand Down Expand Up @@ -354,13 +364,18 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas
return err
}

// Write each shard in it's own goroutine and return as soon as one fails.
// Write each shard in its own goroutine and return as soon as one fails.
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(writeCtx tsdb.WriteContext, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
err := w.writeToShard(writeCtx, shard, database, retentionPolicy, points)
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}

err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID),
Dropped: len(points),
Database: database,
RetentionPolicy: retentionPolicy,
}
}
ch <- err
}(writeCtx, shardMappings.Shards[shardID], database, retentionPolicy, points)
Expand All @@ -375,7 +390,11 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas
atomic.AddInt64(&w.stats.SubWriteOK, 1)

if err == nil && len(shardMappings.Dropped) > 0 {
err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}
err = tsdb.PartialWriteError{Reason: "points beyond retention policy or outside permissible write window",
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
Dropped: len(shardMappings.Dropped),
Database: database,
RetentionPolicy: retentionPolicy,
}
}

for range shardMappings.Points {
Expand Down
144 changes: 116 additions & 28 deletions coordinator/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/require"
)

// TODO(benbjohnson): Rewrite tests to use cluster_test.MetaClient.

// Ensures the points writer maps a single point to a single shard.
func TestPointsWriter_MapShards_One(t *testing.T) {
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
rp := NewRetentionPolicy("myp", time.Now(), time.Hour, 3, 0, 0)

ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
Expand Down Expand Up @@ -51,11 +52,103 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
}
}

func TestPointsWriter_MapShards_WriteLimits(t *testing.T) {
ms := PointsWriterMetaClient{}
c := coordinator.NewPointsWriter()

MustParseDuration := func(s string) time.Duration {
d, err := time.ParseDuration(s)
require.NoError(t, err, "failed to parse duration: %q", s)
return d
}

pastWriteLimit := MustParseDuration("10m")
futureWriteLimit := MustParseDuration("15m")
rp := NewRetentionPolicy("myp", time.Now().Add(-time.Minute*45), 3*time.Hour, 3, futureWriteLimit, pastWriteLimit)

ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}

ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return &rp.ShardGroups[0], nil
}

c.MetaClient = ms

pr := &coordinator.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
}

pr.AddPoint("cpu", 0.0, time.Now(), nil)
pr.AddPoint("cpu", 1.0, time.Now().Add(time.Second), nil)
pr.AddPoint("cpu", 2.0, time.Now().Add(time.Minute*30), nil)
pr.AddPoint("cpu", -1.0, time.Now().Add(-time.Minute*5), nil)
pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil)

values := []float64{0.0, 1.0, -1.0}
dropped := []float64{2.0, -2.0}

MapPoints(t, c, pr, values, dropped)

// Clear the write limits by setting them to zero
// No points should be dropped
zeroDuration := time.Duration(0)
rpu := &meta.RetentionPolicyUpdate{
Name: nil,
Duration: nil,
ReplicaN: nil,
ShardGroupDuration: nil,
FutureWriteLimit: &zeroDuration,
PastWriteLimit: &zeroDuration,
}
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
values = []float64{0.0, 1.0, 2.0, -1.0, -2.0}
dropped = []float64{}
MapPoints(t, c, pr, values, dropped)
}

func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, dropped []float64) {
var (
shardMappings *coordinator.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if exp := 1; len(shardMappings.Points) != exp {
t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp)
}

p := func() []models.Point {
for _, v := range shardMappings.Points {
return v
}
return nil
}()
verify :=
func(p []models.Point, values []float64) {
require.Equal(t, len(values), len(p), "unexpected number of points")
for i, expV := range values {
f, err := p[i].Fields()
require.NoError(t, err, "error retrieving fields")
v, ok := f["value"]
require.True(t, ok, "\"value\" field not found")
require.Equal(t, expV, v, "unexpected value")
}
}
verify(p, values)
verify(shardMappings.Dropped, dropped)
}

// Ensures the points writer maps to a new shard group when the shard duration
// is changed.
func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
rp := NewRetentionPolicy("myp", time.Now(), time.Hour, 3, 0, 0)

ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
Expand Down Expand Up @@ -133,7 +226,7 @@ func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
// Ensures the points writer maps a multiple points across shard group boundaries.
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
rp := NewRetentionPolicy("myp", time.Now(), time.Hour, 3, 0, 0)
rp.ShardGroupDuration = time.Hour
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
Expand Down Expand Up @@ -206,7 +299,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
// Ensures the points writer does not map points beyond the retention policy.
func TestPointsWriter_MapShards_Invalid(t *testing.T) {
ms := PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
rp := NewRetentionPolicy("myp", time.Now(), time.Hour, 3, 0, 0)

ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
Expand Down Expand Up @@ -292,27 +385,21 @@ func TestPointsWriter_WritePoints(t *testing.T) {
// copy to prevent data race
theTest := test
sm := coordinator.NewShardMapping(16)
sm.MapPoint(
&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}},
pr.Points[0])
sm.MapPoint(
&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}},
pr.Points[1])
sm.MapPoint(
&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}},
pr.Points[2])
sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}}, pr.Points[0])
sm.MapPoint(&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}}, pr.Points[1])
sm.MapPoint(&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}}, pr.Points[2])

// Local coordinator.Node ShardWriter
// lock on the write increment since these functions get called in parallel
Expand Down Expand Up @@ -563,7 +650,7 @@ func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64

func NewPointsWriterMetaClient() *PointsWriterMetaClient {
ms := &PointsWriterMetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
rp := NewRetentionPolicy("myp", time.Now(), time.Hour, 3, 0, 0)
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
Expand Down Expand Up @@ -624,7 +711,7 @@ func (s Subscriber) Send(wr *coordinator.WritePointsRequest) {
s.SendFn(wr)
}

func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
func NewRetentionPolicy(name string, start time.Time, duration time.Duration, nodeCount int, futureWriteLimit time.Duration, pastWriteLimit time.Duration) *meta.RetentionPolicyInfo {
shards := []meta.ShardInfo{}
owners := []meta.ShardOwner{}
for i := 1; i <= nodeCount; i++ {
Expand All @@ -637,7 +724,6 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met
Owners: owners,
})

start := time.Now()
rp := &meta.RetentionPolicyInfo{
Name: "myrp",
ReplicaN: nodeCount,
Expand All @@ -651,6 +737,8 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met
Shards: shards,
},
},
FutureWriteLimit: futureWriteLimit,
PastWriteLimit: pastWriteLimit,
}
return rp
}
Expand Down
18 changes: 16 additions & 2 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.
Duration: stmt.Duration,
ReplicaN: stmt.Replication,
ShardGroupDuration: stmt.ShardGroupDuration,
FutureWriteLimit: stmt.FutureWriteLimit,
PastWriteLimit: stmt.PastWriteLimit,
}

// Update the retention policy.
Expand Down Expand Up @@ -286,6 +288,8 @@ func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.Create
Duration: stmt.RetentionPolicyDuration,
ReplicaN: stmt.RetentionPolicyReplication,
ShardGroupDuration: stmt.RetentionPolicyShardGroupDuration,
FutureWriteLimit: stmt.FutureWriteLimit,
PastWriteLimit: stmt.PastWriteLimit,
}
_, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, &spec)
return err
Expand All @@ -303,6 +307,8 @@ func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql
Duration: &stmt.Duration,
ReplicaN: &stmt.Replication,
ShardGroupDuration: stmt.ShardGroupDuration,
FutureWriteLimit: &stmt.FutureWriteLimit,
PastWriteLimit: &stmt.PastWriteLimit,
}

// Create new retention policy.
Expand Down Expand Up @@ -853,9 +859,17 @@ func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.Sh
return nil, influxdb.ErrDatabaseNotFound(q.Database)
}

row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}}
row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "futureWriteLimit", "pastWriteLimit", "default"}}
for _, rpi := range di.RetentionPolicies {
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
row.Values = append(row.Values, []interface{}{
rpi.Name,
rpi.Duration.String(),
rpi.ShardGroupDuration.String(),
rpi.ReplicaN,
rpi.FutureWriteLimit.String(),
rpi.PastWriteLimit.String(),
di.DefaultRetentionPolicy == rpi.Name,
})
}
return []*models.Row{row}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/influxdata/flux v0.194.5
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.3.0
github.com/influxdata/influxql v1.4.0
github.com/influxdata/pkg-config v0.2.11
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ github.com/influxdata/influxdb-iox-client-go v1.0.0-beta.1 h1:zDmAiE2o3Y/YZinI6C
github.com/influxdata/influxdb-iox-client-go v1.0.0-beta.1/go.mod h1:Chl4pz0SRqoPmEavex4vZaQlunqXqrtEPWAN54THFfo=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
github.com/influxdata/influxql v1.3.0 h1:z2VZEK1LGItlUW8LLiKzX8duC5t6/SDM7Doj2rKs8Kw=
github.com/influxdata/influxql v1.3.0/go.mod h1:VqxAKyQz5p8GzgGsxWalCWYGxEqw6kvJo2IickMQiQk=
github.com/influxdata/influxql v1.4.0 h1:Lf62rbAF8KWQf+4Djqf4hVXgmQuGozUoSD6kNWjye44=
github.com/influxdata/influxql v1.4.0/go.mod h1:VqxAKyQz5p8GzgGsxWalCWYGxEqw6kvJo2IickMQiQk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
Expand Down
Loading