Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: panic index out of range for invalid series keys [Port to 1.11] #24595

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/influx_inspect/dumptsi/dumptsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error {
break
}
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if len(name) == 0 {
continue
}

if !cmd.matchSeries(name, tags) {
continue
Expand Down Expand Up @@ -368,6 +371,9 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet
}

name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if len(name) == 0 {
continue
}

if !cmd.matchSeries(name, tags) {
continue
Expand Down
23 changes: 23 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
}

name, tags := ParseSeriesKey(key)
if len(name) == 0 {
continue
}
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
Expand Down Expand Up @@ -381,6 +384,9 @@ func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {

// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
key := string(models.MakeKey(name, tags))

// Write auxiliary fields.
Expand Down Expand Up @@ -867,6 +873,9 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
}

name, tags := ParseSeriesKey(itr.keys[0])
if len(name) == 0 {
continue
}
itr.keys = itr.keys[1:]

// TODO(edd): It seems to me like this authorisation check should be
Expand Down Expand Up @@ -2340,6 +2349,9 @@ func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
}

name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}

// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
Expand Down Expand Up @@ -2442,6 +2454,11 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}

name, tags := ParseSeriesKey(seriesKey)
// An invalid series key of 0 length should have been caught by the
// above check, but for extra safety we can pass over it here too.
if len(name) == 0 {
continue
}
keys = append(keys, models.MakeKey(name, tags))
}

Expand Down Expand Up @@ -2918,12 +2935,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,

if auth != nil {
name, tags := ParseSeriesKey(buf)
if len(name) == 0 {
continue
}
if !auth.AuthorizeSeriesRead(database, name, tags) {
continue
}
}

_, buf = ReadSeriesKeyLen(buf)
if len(buf) == 0 {
continue
}
_, buf = ReadSeriesKeyMeasurement(buf)
tagN, buf := ReadSeriesKeyTagN(buf)
for i := 0; i < tagN; i++ {
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {

// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
if len(remainder) == 0 {
return
}

// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
Expand Down
4 changes: 4 additions & 0 deletions tsdb/series_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"

Expand Down Expand Up @@ -112,6 +113,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
}

cur.row.Name, cur.row.Tags = ParseSeriesKey(cur.keys[cur.ofs])
if len(cur.row.Name) == 0 {
return nil, fmt.Errorf("series key was not valid: %+v", cur.keys[cur.ofs])
}
cur.ofs++

//if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {
Expand Down
32 changes: 24 additions & 8 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags)
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
if len(data) == 0 {
return nil, dst
}
name, data = ReadSeriesKeyMeasurement(data)
tagN, data := ReadSeriesKeyTagN(data)

Expand All @@ -436,17 +439,30 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {

func CompareSeriesKeys(a, b []byte) int {
// Handle 'nil' keys.
if len(a) == 0 && len(b) == 0 {
return 0
} else if len(a) == 0 {
return -1
} else if len(b) == 0 {
return 1
// Values below and equal to 1 are considered invalid for key comparisons.
nilKeyHandler := func(a, b int) int {
if a == 0 && b == 0 {
return 0
} else if a == 0 {
return -1
} else if b == 0 {
return 1
}
return a + b
}

keyValidity := nilKeyHandler(len(a), len(b))
if keyValidity <= 1 {
return keyValidity
}

// Read total size.
_, a = ReadSeriesKeyLen(a)
_, b = ReadSeriesKeyLen(b)
szA, a := ReadSeriesKeyLen(a)
szB, b := ReadSeriesKeyLen(b)
keySizeValidity := nilKeyHandler(szA, szB)
if keySizeValidity <= 1 {
return keySizeValidity
}

// Read names.
name0, a := ReadSeriesKeyMeasurement(a)
Expand Down
51 changes: 51 additions & 0 deletions tsdb/series_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,57 @@ func TestParseSeriesKeyInto(t *testing.T) {
}
}

func TestParseSeriesKey(t *testing.T) {
tests := []struct {
name string
seriesKey []byte
expectedKey []byte
}{
{
name: "invalid zero length series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, nil),
expectedKey: nil,
},
{
name: "invalid blank series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte(""), nil),
expectedKey: nil,
},
{
name: "invalid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: nil,
},
{
name: "valid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with empty tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with nil tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(nil)),
expectedKey: []byte("foo"),
},
{
name: "valid series key with no tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), nil),
expectedKey: []byte("foo"),
},
}

for _, tt := range tests {
keyName, _ := tsdb.ParseSeriesKey(tt.seriesKey)
if res := bytes.Compare(keyName, tt.expectedKey); res != 0 {
t.Fatalf("invalid series key parsed for an %s: got %q, expected %q", tt.name, keyName, tt.expectedKey)
}
}
}

// Ensure that broken series files are closed
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0})
Expand Down
6 changes: 6 additions & 0 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,13 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
if tracker.AddedMeasurementSeries != nil {
_, remainder := ReadSeriesKeyLen(key)
if len(remainder) == 0 {
continue
}
measurement, _ := ReadSeriesKeyMeasurement(remainder)
if len(measurement) == 0 {
continue
}
tracker.AddedMeasurementSeries(measurement, 1)
}
}
Expand Down