diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go index a27be245846..484d0d1d372 100644 --- a/cmd/influx_inspect/dumptsi/dumptsi.go +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -247,6 +247,9 @@ func (cmd *Command) printSeries(sfile *tsdb.SeriesFile) error { break } name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID)) + if len(name) == 0 { + continue + } if !cmd.matchSeries(name, tags) { continue @@ -371,6 +374,9 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet } name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID)) + if len(name) == 0 { + continue + } if !cmd.matchSeries(name, tags) { continue diff --git a/tsdb/index.go b/tsdb/index.go index ef225311fab..68be2c31122 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -137,6 +137,9 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) { } name, tags := ParseSeriesKey(key) + if len(name) == 0 { + continue + } deleted := itr.sfile.IsDeleted(elem.SeriesID) return &seriesElemAdapter{ name: name, @@ -375,6 +378,9 @@ func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) { // Convert to a key. name, tags := ParseSeriesKey(seriesKey) + if len(name) == 0 { + continue + } key := string(models.MakeKey(name, tags)) // Write auxiliary fields. @@ -861,6 +867,9 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { } name, tags := ParseSeriesKey(itr.keys[0]) + if len(name) == 0 { + continue + } itr.keys = itr.keys[1:] // TODO(edd): It seems to me like this authorisation check should be @@ -2348,6 +2357,11 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) } name, tags := ParseSeriesKey(seriesKey) + // An invalid series key of 0 length should have been caught by the + // above check, but for extra safety we can pass over it here too. + if len(name) == 0 { + continue + } keys = append(keys, models.MakeKey(name, tags)) } @@ -2815,12 +2829,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte, if auth != nil { name, tags := ParseSeriesKey(buf) + if len(name) == 0 { + continue + } if !auth.AuthorizeSeriesRead(database, name, tags) { continue } } _, buf = ReadSeriesKeyLen(buf) + if len(buf) == 0 { + continue + } _, buf = ReadSeriesKeyMeasurement(buf) tagN, buf := ReadSeriesKeyTagN(buf) for i := 0; i < tagN; i++ { diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index bc94ab7a9d4..8c7b2406b83 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -704,6 +704,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { // Read key size. _, remainder := tsdb.ReadSeriesKeyLen(seriesKey) + if len(remainder) == 0 { + return + } // Read measurement name. name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder) diff --git a/tsdb/series_cursor.go b/tsdb/series_cursor.go index e0d42132869..8352141985f 100644 --- a/tsdb/series_cursor.go +++ b/tsdb/series_cursor.go @@ -3,6 +3,7 @@ package tsdb import ( "bytes" "errors" + "fmt" "sort" "sync" @@ -112,6 +113,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) { } cur.row.Name, cur.row.Tags = ParseSeriesKey(cur.keys[cur.ofs]) + if len(cur.row.Name) == 0 { + return nil, fmt.Errorf("series key was not valid: %+v", cur.keys[cur.ofs]) + } cur.ofs++ //if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) { diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 19f51b50be4..ab32be37426 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -414,6 +414,9 @@ func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) { var name []byte _, data = ReadSeriesKeyLen(data) + if len(data) == 0 { + return nil, dst + } name, data = ReadSeriesKeyMeasurement(data) tagN, data := ReadSeriesKeyTagN(data) @@ -436,17 +439,30 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) { func CompareSeriesKeys(a, b []byte) int { // Handle 'nil' keys. - if len(a) == 0 && len(b) == 0 { - return 0 - } else if len(a) == 0 { - return -1 - } else if len(b) == 0 { - return 1 + // Values below and equal to 1 are considered invalid for key comparisons. + nilKeyHandler := func(a, b int) int { + if a == 0 && b == 0 { + return 0 + } else if a == 0 { + return -1 + } else if b == 0 { + return 1 + } + return a + b + } + + keyValidity := nilKeyHandler(len(a), len(b)) + if keyValidity <= 1 { + return keyValidity } // Read total size. - _, a = ReadSeriesKeyLen(a) - _, b = ReadSeriesKeyLen(b) + szA, a := ReadSeriesKeyLen(a) + szB, b := ReadSeriesKeyLen(b) + keySizeValidity := nilKeyHandler(szA, szB) + if keySizeValidity <= 1 { + return keySizeValidity + } // Read names. name0, a := ReadSeriesKeyMeasurement(a) diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index 2954b431f64..820a73e64a1 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -50,6 +50,57 @@ func TestParseSeriesKeyInto(t *testing.T) { } } +func TestParseSeriesKey(t *testing.T) { + tests := []struct { + name string + seriesKey []byte + expectedKey []byte + }{ + { + name: "invalid zero length series key", + seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, nil), + expectedKey: nil, + }, + { + name: "invalid blank series key", + seriesKey: tsdb.AppendSeriesKey(nil, []byte(""), nil), + expectedKey: nil, + }, + { + name: "invalid series key with tags", + seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, models.NewTags(map[string]string{"tag1": "foo"})), + expectedKey: nil, + }, + { + name: "valid series key with tags", + seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{"tag1": "foo"})), + expectedKey: []byte("foo"), + }, + { + name: "valid series key with empty tags", + seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{})), + expectedKey: []byte("foo"), + }, + { + name: "valid series key with nil tags", + seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(nil)), + expectedKey: []byte("foo"), + }, + { + name: "valid series key with no tags", + seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), nil), + expectedKey: []byte("foo"), + }, + } + + for _, tt := range tests { + keyName, _ := tsdb.ParseSeriesKey(tt.seriesKey) + if res := bytes.Compare(keyName, tt.expectedKey); res != 0 { + t.Fatalf("invalid series key parsed for an %s: got %q, expected %q", tt.name, keyName, tt.expectedKey) + } + } +} + // Ensure that broken series files are closed func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) { f := NewBrokenSeriesFile([]byte{0, 0, 0, 0, 0})