Skip to content

Commit

Permalink
fix: panic index out of range for invalid series keys (#24565)
Browse files Browse the repository at this point in the history
* chore: add scaffolding for naive solution

* feat: test case scaffolding

* fix: implement check for series key before proceeding

* fix: add validation for ReadSeriesKeyMeasurement usage

* refactor: explicit use of series key len

* feat: add remaining check to index

* feat: add check to remaining files

As the Len function is used as part of the parseSeriesKey, this also needs to be accounted for on the nil return from this function as it is used in different contexts

* feat: expand test cases

* chore: go fmt

* chore: update test failure message

* chore: impl feedback on unnecessary sz checks

* feat: expand test cases

* fix: nil series key check

In both sections for index.go there is a pre-existing length check against the series key which should catch invalid values, perhaps this explains why it hasn't cropped up in the reported panics. For even more safety, we can also skip a nil key because we know that subsequent calls will cause a panic where this key is attempted to be used

* fix: remove nil tags check

A key with no tags is valid, so we should not check for BOTH nil key and tags as a key could be nil, which is invalid, yet still have tags and therefore cause the check to pass which we do not want

* feat: extend test cases from feedback

* fix: extend checks for CompareSeriesKeys

* feat: add nilKeyHandler for shared key checking logic

* fix: logical error in nilKeyHandler

Prior to this, the else was always defaulted to at the end of the conditional branch, which causes unexpected behaviour and a failure of a bunch of tests.

* fix: return tags keep nil data

In a recent change to this, we agreed on a simple name == nil check for the actual data. As a follow on to this, I just realised that we don't actually want to nil back the tags, even if they're not checked, because having no tags is a valid input so we can simply return whatever we were passed unchanged.

* fix: use len == 0 for extra safety

* feat: extra test for blank series key
  • Loading branch information
jdockerty committed Jan 23, 2024
1 parent c7e3dc1 commit 2337bfe
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 8 deletions.
6 changes: 6 additions & 0 deletions cmd/influx_inspect/dumptsi/dumptsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error {
break
}
name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if len(name) == 0 {
continue
}

if !cmd.matchSeries(name, tags) {
continue
Expand Down Expand Up @@ -368,6 +371,9 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet
}

name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
if len(name) == 0 {
continue
}

if !cmd.matchSeries(name, tags) {
continue
Expand Down
23 changes: 23 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
}

name, tags := ParseSeriesKey(key)
if len(name) == 0 {
continue
}
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
Expand Down Expand Up @@ -381,6 +384,9 @@ func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {

// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
key := string(models.MakeKey(name, tags))

// Write auxiliary fields.
Expand Down Expand Up @@ -867,6 +873,9 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
}

name, tags := ParseSeriesKey(itr.keys[0])
if len(name) == 0 {
continue
}
itr.keys = itr.keys[1:]

// TODO(edd): It seems to me like this authorisation check should be
Expand Down Expand Up @@ -2340,6 +2349,9 @@ func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
}

name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}

// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
Expand Down Expand Up @@ -2442,6 +2454,11 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}

name, tags := ParseSeriesKey(seriesKey)
// An invalid series key of 0 length should have been caught by the
// above check, but for extra safety we can pass over it here too.
if len(name) == 0 {
continue
}
keys = append(keys, models.MakeKey(name, tags))
}

Expand Down Expand Up @@ -2918,12 +2935,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,

if auth != nil {
name, tags := ParseSeriesKey(buf)
if len(name) == 0 {
continue
}
if !auth.AuthorizeSeriesRead(database, name, tags) {
continue
}
}

_, buf = ReadSeriesKeyLen(buf)
if len(buf) == 0 {
continue
}
_, buf = ReadSeriesKeyMeasurement(buf)
tagN, buf := ReadSeriesKeyTagN(buf)
for i := 0; i < tagN; i++ {
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {

// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
if len(remainder) == 0 {
return
}

// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
Expand Down
4 changes: 4 additions & 0 deletions tsdb/series_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"

Expand Down Expand Up @@ -112,6 +113,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
}

cur.row.Name, cur.row.Tags = ParseSeriesKey(cur.keys[cur.ofs])
if len(cur.row.Name) == 0 {
return nil, fmt.Errorf("series key was not valid: %+v", cur.keys[cur.ofs])
}
cur.ofs++

//if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {
Expand Down
32 changes: 24 additions & 8 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags)
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
if len(data) == 0 {
return nil, dst
}
name, data = ReadSeriesKeyMeasurement(data)
tagN, data := ReadSeriesKeyTagN(data)

Expand All @@ -436,17 +439,30 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {

func CompareSeriesKeys(a, b []byte) int {
// Handle 'nil' keys.
if len(a) == 0 && len(b) == 0 {
return 0
} else if len(a) == 0 {
return -1
} else if len(b) == 0 {
return 1
// Values below and equal to 1 are considered invalid for key comparisons.
nilKeyHandler := func(a, b int) int {
if a == 0 && b == 0 {
return 0
} else if a == 0 {
return -1
} else if b == 0 {
return 1
}
return a + b
}

keyValidity := nilKeyHandler(len(a), len(b))
if keyValidity <= 1 {
return keyValidity
}

// Read total size.
_, a = ReadSeriesKeyLen(a)
_, b = ReadSeriesKeyLen(b)
szA, a := ReadSeriesKeyLen(a)
szB, b := ReadSeriesKeyLen(b)
keySizeValidity := nilKeyHandler(szA, szB)
if keySizeValidity <= 1 {
return keySizeValidity
}

// Read names.
name0, a := ReadSeriesKeyMeasurement(a)
Expand Down
51 changes: 51 additions & 0 deletions tsdb/series_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,57 @@ func TestParseSeriesKeyInto(t *testing.T) {
}
}

func TestParseSeriesKey(t *testing.T) {
tests := []struct {
name string
seriesKey []byte
expectedKey []byte
}{
{
name: "invalid zero length series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, nil),
expectedKey: nil,
},
{
name: "invalid blank series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte(""), nil),
expectedKey: nil,
},
{
name: "invalid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: nil,
},
{
name: "valid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with empty tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with nil tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(nil)),
expectedKey: []byte("foo"),
},
{
name: "valid series key with no tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), nil),
expectedKey: []byte("foo"),
},
}

for _, tt := range tests {
keyName, _ := tsdb.ParseSeriesKey(tt.seriesKey)
if res := bytes.Compare(keyName, tt.expectedKey); res != 0 {
t.Fatalf("invalid series key parsed for an %s: got %q, expected %q", tt.name, keyName, tt.expectedKey)
}
}
}

// Ensure that broken series files are closed
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0})
Expand Down
6 changes: 6 additions & 0 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,13 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
if tracker.AddedMeasurementSeries != nil {
_, remainder := ReadSeriesKeyLen(key)
if len(remainder) == 0 {
continue
}
measurement, _ := ReadSeriesKeyMeasurement(remainder)
if len(measurement) == 0 {
continue
}
tracker.AddedMeasurementSeries(measurement, 1)
}
}
Expand Down

0 comments on commit 2337bfe

Please sign in to comment.