Skip to content

Commit

Permalink
fix: prevent retention service creating orphaned shard files (influxd…
Browse files Browse the repository at this point in the history
…ata#24530)

* fix: prevent retention service creating orphaned shard files

Under certain circumstances, the retention service can fail to delete shards from
the store in a timely manner. When the shard groups are pruned based on age, this
leaves orphaned shard files on the disk. The retention service will then not attempt
to remove the obsolete shard files because the meta store does not know about them.
This can cause excessive disk space usage for some users.

This corrects that by requiring shards files be deleted before they can be removed
from the meta store.

fixes: influxdata#24529
  • Loading branch information
gwossum authored and chengshiwen committed Aug 27, 2024
1 parent ce3209d commit bd0ea74
Show file tree
Hide file tree
Showing 9 changed files with 554 additions and 152 deletions.
1 change: 1 addition & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (s *Server) appendRetentionPolicyService(c retention.Config) {
srv := retention.NewService(c)
srv.MetaClient = s.MetaClient
srv.TSDBStore = s.TSDBStore
srv.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient)
s.Services = append(s.Services, srv)
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.23.0
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611
golang.org/x/sync v0.5.0
golang.org/x/sys v0.23.0
golang.org/x/text v0.15.0
Expand Down Expand Up @@ -98,7 +99,6 @@ require (
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
Expand Down
15 changes: 1 addition & 14 deletions services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,24 +678,11 @@ func (c *Client) TruncateShardGroups(t time.Time) error {

// PruneShardGroups remove deleted shard groups from the data store.
func (c *Client) PruneShardGroups() error {
var changed bool
expiration := time.Now().Add(ShardGroupDeletedExpiration)
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
for i, d := range data.Databases {
for j, rp := range d.RetentionPolicies {
var remainingShardGroups []ShardGroupInfo
for _, sgi := range rp.ShardGroups {
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) {
remainingShardGroups = append(remainingShardGroups, sgi)
continue
}
changed = true
}
data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
}
}
changed := data.PruneShardGroups(expiration)
if changed {
return c.commit(data)
}
Expand Down
2 changes: 2 additions & 0 deletions services/meta/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,9 @@ func TestMetaClient_PruneShardGroups(t *testing.T) {

data := c.Data()
data.Databases[1].RetentionPolicies[0].ShardGroups[0].DeletedAt = expiration
data.Databases[1].RetentionPolicies[0].ShardGroups[0].Shards = nil
data.Databases[1].RetentionPolicies[0].ShardGroups[1].DeletedAt = expiration
data.Databases[1].RetentionPolicies[0].ShardGroups[1].Shards = nil

if err := c.SetData(&data); err != nil {
t.Fatal(err)
Expand Down
27 changes: 25 additions & 2 deletions services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ func (data *Data) DropShard(id uint64) {
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...)

if len(shards) == 1 {
// We just deleted the last shard in the shard group.
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
// We just deleted the last shard in the shard group, but make sure we don't overwrite the timestamp if it
// was already deleted.
if !data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Deleted() {
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
}
}
return
}
Expand Down Expand Up @@ -447,6 +450,26 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
return ErrShardGroupNotFound
}

// PruneShardGroups removes any shards deleted before expiration and that have no remaining owners.
// Returns true if data is modified.
func (data *Data) PruneShardGroups(expiration time.Time) bool {
var changed bool
for i, d := range data.Databases {
for j, rp := range d.RetentionPolicies {
var remainingShardGroups []ShardGroupInfo
for _, sgi := range rp.ShardGroups {
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) || len(sgi.Shards) > 0 {
remainingShardGroups = append(remainingShardGroups, sgi)
continue
}
changed = true
}
data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
}
}
return changed
}

// CreateContinuousQuery adds a named continuous query to a database.
func (data *Data) CreateContinuousQuery(database, name, query string) error {
di := data.Database(database)
Expand Down
55 changes: 55 additions & 0 deletions services/retention/helpers/test_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package helpers

import (
"fmt"
"time"

"golang.org/x/exp/slices"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/services/meta"
)

// DataDeleteShardGroup deletes the shard group specified by database, policy, and id from targetData.
// It does this by setting the shard group's DeletedAt time to now. We have to reimplement DeleteShardGroup
// instead of using data's so that the DeletedAt time will be deterministic. We are also not testing
// the functionality of DeleteShardGroup. We are testing if DeleteShardGroup gets called correctly.
func DataDeleteShardGroup(targetData *meta.Data, now time.Time, database, policy string, id uint64) error {
rpi, err := targetData.RetentionPolicy(database, policy)

if err != nil {
return err
} else if rpi == nil {
return influxdb.ErrRetentionPolicyNotFound(policy)
}

// Find shard group by ID and set its deletion timestamp.
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].ID == id {
rpi.ShardGroups[i].DeletedAt = now
return nil
}
}

return meta.ErrShardGroupNotFound
}

// DataNukeShardGroup unconditionally removes the shard group identified by targetDB, targetRP, and targetID
// from targetData. There's no meta.Data method to directly remove a shard group, only to mark it deleted and
// then prune it. We can't use the functionality we're testing to generate the expected result.
func DataNukeShardGroup(targetData *meta.Data, targetDB, targetRP string, targetID uint64) error {
rpi, err := targetData.RetentionPolicy(targetDB, targetRP)
if err != nil {
return err
} else if rpi == nil {
return fmt.Errorf("no retention policy found for %q, %q, %d", targetDB, targetRP, targetID)
}
isTargetShardGroup := func(sgi meta.ShardGroupInfo) bool {
return sgi.ID == targetID
}
if !slices.ContainsFunc(rpi.ShardGroups, isTargetShardGroup) {
return fmt.Errorf("shard not found for %q, %q, %d", targetDB, targetRP, targetID)
}
rpi.ShardGroups = slices.DeleteFunc(rpi.ShardGroups, isTargetShardGroup)
return nil
}
Loading

0 comments on commit bd0ea74

Please sign in to comment.