Skip to content

Commit

Permalink
feat: extend stream metrics for redis 7 (#817)
Browse files Browse the repository at this point in the history
* extended stream metrics
  • Loading branch information
donovanvanheerden authored Jul 19, 2023
1 parent ba1843f commit 1b22581
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 7 deletions.
5 changes: 5 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,16 @@ func NewRedisExporter(redisURI string, opts Options) (*Exporter, error) {
"stream_group_consumer_idle_seconds": {txt: `Consumer idle time in seconds`, lbls: []string{"db", "stream", "group", "consumer"}},
"stream_group_consumer_messages_pending": {txt: `Pending number of messages for this specific consumer`, lbls: []string{"db", "stream", "group", "consumer"}},
"stream_group_consumers": {txt: `Consumers count of stream group`, lbls: []string{"db", "stream", "group"}},
"stream_group_entries_read": {txt: `Total number of entries read from the stream group`, lbls: []string{"db", "stream", "group"}},
"stream_group_lag": {txt: `The number of messages waiting to be delivered to the stream group's consumers`, lbls: []string{"db", "stream", "group"}},
"stream_group_last_delivered_id": {txt: `The epoch timestamp (ms) of the last delivered message`, lbls: []string{"db", "stream", "group"}},
"stream_group_messages_pending": {txt: `Pending number of messages in that stream group`, lbls: []string{"db", "stream", "group"}},
"stream_groups": {txt: `Groups count of stream`, lbls: []string{"db", "stream"}},
"stream_last_generated_id": {txt: `The epoch timestamp (ms) of the latest message on the stream`, lbls: []string{"db", "stream"}},
"stream_length": {txt: `The number of elements of the stream`, lbls: []string{"db", "stream"}},
"stream_max_deleted_entry_id": {txt: `The epoch timestamp (ms) of last message was deleted from the stream`, lbls: []string{"db", "stream"}},
"stream_first_entry_id": {txt: `The epoch timestamp (ms) of the first message in the stream`, lbls: []string{"db", "stream"}},
"stream_last_entry_id": {txt: `The epoch timestamp (ms) of the last message in the stream`, lbls: []string{"db", "stream"}},
"stream_radix_tree_keys": {txt: `Radix tree keys count"`, lbls: []string{"db", "stream"}},
"stream_radix_tree_nodes": {txt: `Radix tree nodes count`, lbls: []string{"db", "stream"}},
"up": {txt: "Information about the Redis instance"},
Expand Down
41 changes: 35 additions & 6 deletions exporter/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@ import (
// All fields of the streamInfo struct must be exported
// because of redis.ScanStruct (reflect) limitations
type streamInfo struct {
Length int64 `redis:"length"`
RadixTreeKeys int64 `redis:"radix-tree-keys"`
RadixTreeNodes int64 `redis:"radix-tree-nodes"`
LastGeneratedId string `redis:"last-generated-id"`
Groups int64 `redis:"groups"`
StreamGroupsInfo []streamGroupsInfo
Length int64 `redis:"length"`
RadixTreeKeys int64 `redis:"radix-tree-keys"`
RadixTreeNodes int64 `redis:"radix-tree-nodes"`
LastGeneratedId string `redis:"last-generated-id"`
Groups int64 `redis:"groups"`
MaxDeletedEntryId string `redis:"max-deleted-entry-id"`
FirstEntryId string
LastEntryId string
StreamGroupsInfo []streamGroupsInfo
}

type streamGroupsInfo struct {
Name string `redis:"name"`
Consumers int64 `redis:"consumers"`
Pending int64 `redis:"pending"`
LastDeliveredId string `redis:"last-delivered-id"`
EntriesRead int64 `redis:"entries-read"`
Lag int64 `redis:"lag"`
StreamGroupConsumersInfo []streamGroupConsumersInfo
}

Expand All @@ -39,12 +44,17 @@ func getStreamInfo(c redis.Conn, key string) (*streamInfo, error) {
if err != nil {
return nil, err
}

// Scan slice to struct
var stream streamInfo
if err := redis.ScanStruct(v, &stream); err != nil {
return nil, err
}

// Extract first and last id from slice
stream.FirstEntryId = getEntryId(v, 17)
stream.LastEntryId = getEntryId(v, 19)

stream.StreamGroupsInfo, err = scanStreamGroups(c, key)
if err != nil {
return nil, err
Expand All @@ -54,6 +64,20 @@ func getStreamInfo(c redis.Conn, key string) (*streamInfo, error) {
return &stream, nil
}

func getEntryId(redisValue []interface{}, index int) string {
var emptyStreamId = ""

if len(redisValue) < index || len(redisValue) > index && len(redisValue[index].([]interface{})) < 2 {
return emptyStreamId
}

entryId, ok := redisValue[index].([]interface{})[0].([]byte)
if !ok {
return emptyStreamId
}
return string(entryId)
}

func scanStreamGroups(c redis.Conn, stream string) ([]streamGroupsInfo, error) {
groups, err := redis.Values(doRedisCmd(c, "XINFO", "GROUPS", stream))
if err != nil {
Expand Down Expand Up @@ -163,11 +187,16 @@ func (e *Exporter) extractStreamMetrics(ch chan<- prometheus.Metric, c redis.Con
e.registerConstMetricGauge(ch, "stream_radix_tree_nodes", float64(info.RadixTreeNodes), dbLabel, k.key)
e.registerConstMetricGauge(ch, "stream_last_generated_id", parseStreamItemId(info.LastGeneratedId), dbLabel, k.key)
e.registerConstMetricGauge(ch, "stream_groups", float64(info.Groups), dbLabel, k.key)
e.registerConstMetricGauge(ch, "stream_max_deleted_entry_id", parseStreamItemId(info.MaxDeletedEntryId), dbLabel, k.key)
e.registerConstMetricGauge(ch, "stream_first_entry_id", parseStreamItemId(info.FirstEntryId), dbLabel, k.key)
e.registerConstMetricGauge(ch, "stream_last_entry_id", parseStreamItemId(info.LastEntryId), dbLabel, k.key)

for _, g := range info.StreamGroupsInfo {
e.registerConstMetricGauge(ch, "stream_group_consumers", float64(g.Consumers), dbLabel, k.key, g.Name)
e.registerConstMetricGauge(ch, "stream_group_messages_pending", float64(g.Pending), dbLabel, k.key, g.Name)
e.registerConstMetricGauge(ch, "stream_group_last_delivered_id", parseStreamItemId(g.LastDeliveredId), dbLabel, k.key, g.Name)
e.registerConstMetricGauge(ch, "stream_group_entries_read", float64(g.EntriesRead), dbLabel, k.key, g.Name)
e.registerConstMetricGauge(ch, "stream_group_lag", float64(g.Lag), dbLabel, k.key, g.Name)
for _, c := range g.StreamGroupConsumersInfo {
e.registerConstMetricGauge(ch, "stream_group_consumer_messages_pending", float64(c.Pending), dbLabel, k.key, g.Name, c.Name)
e.registerConstMetricGauge(ch, "stream_group_consumer_idle_seconds", float64(c.Idle)/1e3, dbLabel, k.key, g.Name, c.Name)
Expand Down
206 changes: 205 additions & 1 deletion exporter/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestGetStreamInfo(t *testing.T) {
if os.Getenv("TEST_REDIS_URI") == "" {
t.Skipf("TEST_REDIS_URI not set - skipping")
}

addr := os.Getenv("TEST_REDIS_URI")
c, err := redis.DialURL(addr)
if err != nil {
Expand Down Expand Up @@ -80,6 +81,82 @@ func TestGetStreamInfo(t *testing.T) {
if isNotTestTimestamp(info.LastGeneratedId) {
t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastGeneratedId, TestStreamTimestamps)
}
if info.FirstEntryId != "" {
t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.FirstEntryId, "")
}
if info.LastEntryId != "" {
t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastEntryId, "")
}
if info.MaxDeletedEntryId != "" {
t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v\n", info.MaxDeletedEntryId, "")
}
})
}
}

func TestGetStreamInfoUsingRedis7(t *testing.T) {
if os.Getenv("TEST_REDIS7_URI") == "" {
t.Skipf("TEST_REDIS7_URI not set - skipping")
}

addr := os.Getenv("TEST_REDIS7_URI")
c, err := redis.DialURL(addr)
if err != nil {
t.Fatalf("Couldn't connect to %#v: %#v", addr, err)
}
defer c.Close()

setupDBKeys(t, addr)
defer deleteKeysFromDB(t, addr)

if _, err = c.Do("SELECT", dbNumStr); err != nil {
t.Errorf("Couldn't select database %#v", dbNumStr)
}

tsts := []scanStreamFixture{
{
name: "Stream test",
stream: TestStreamName,
streamInfo: streamInfo{
Length: 2,
RadixTreeKeys: 1,
RadixTreeNodes: 2,
Groups: 2,
},
},
}

for _, tst := range tsts {
t.Run(tst.name, func(t *testing.T) {
info, err := getStreamInfo(c, tst.stream)
if err != nil {
t.Fatalf("Error getting stream info for %#v: %s", tst.stream, err)
}

if info.Length != tst.streamInfo.Length {
t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v\n", info.Length, tst.streamInfo.Length)
}
if info.RadixTreeKeys != tst.streamInfo.RadixTreeKeys {
t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys)
}
if info.RadixTreeNodes != tst.streamInfo.RadixTreeNodes {
t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes)
}
if info.Groups != tst.streamInfo.Groups {
t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v\n", info.Groups, tst.streamInfo.Groups)
}
if isNotTestTimestamp(info.LastGeneratedId) {
t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastGeneratedId, TestStreamTimestamps)
}
if info.FirstEntryId != TestStreamTimestamps[0] {
t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.FirstEntryId, TestStreamTimestamps)
}
if info.LastEntryId != TestStreamTimestamps[len(TestStreamTimestamps)-1] {
t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastEntryId, TestStreamTimestamps)
}
if info.MaxDeletedEntryId != "0-0" {
t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v\n", info.MaxDeletedEntryId, "0-0")
}
})
}
}
Expand Down Expand Up @@ -128,6 +205,118 @@ func TestScanStreamGroups(t *testing.T) {
Name: "test_group_1",
Consumers: 1,
Pending: 1,
EntriesRead: 0,
Lag: 0,
LastDeliveredId: "1638006862521-0",
StreamGroupConsumersInfo: []streamGroupConsumersInfo{
{
Name: "test_consumer_1",
Pending: 1,
},
},
},
}},
{
name: "Multiple groups test",
stream: "test_stream_2",
groups: []streamGroupsInfo{
{
Name: "test_group_1",
Consumers: 2,
Pending: 1,
Lag: 0,
EntriesRead: 0,
LastDeliveredId: "1638006862522-0",
},
{
Name: "test_group_2",
Consumers: 0,
Pending: 0,
Lag: 0,
},
}},
}
for _, tst := range tsts {
t.Run(tst.name, func(t *testing.T) {
scannedGroup, _ := scanStreamGroups(c, tst.stream)
if err != nil {
t.Errorf("Err: %s", err)
}

if len(scannedGroup) == len(tst.groups) {
for i := range scannedGroup {
if scannedGroup[i].Name != tst.groups[i].Name {
t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Name, scannedGroup[i].Name)
}
if scannedGroup[i].Consumers != tst.groups[i].Consumers {
t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Consumers, scannedGroup[i].Consumers)
}
if scannedGroup[i].Pending != tst.groups[i].Pending {
t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Pending, scannedGroup[i].Pending)
}
if parseStreamItemId(scannedGroup[i].LastDeliveredId) != parseStreamItemId(tst.groups[i].LastDeliveredId) {
t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId)
}
if scannedGroup[i].Lag != tst.groups[i].Lag {
t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Lag, scannedGroup[i].Lag)
}
if scannedGroup[i].EntriesRead != tst.groups[i].EntriesRead {
t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead)
}
}
} else {
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(scannedGroup))
}
})
}
}

func TestScanStreamGroupsUsingRedis7(t *testing.T) {
if os.Getenv("TEST_REDIS7_URI") == "" {
t.Skipf("TEST_REDIS7_URI not set - skipping")
}
addr := os.Getenv("TEST_REDIS7_URI")
db := dbNumStr

c, err := redis.DialURL(addr)
if err != nil {
t.Fatalf("Couldn't connect to %#v: %#v", addr, err)
}

if _, err = c.Do("SELECT", db); err != nil {
t.Errorf("Couldn't select database %#v", db)
}

fixtures := []keyFixture{
{"XADD", "test_stream_1", []interface{}{"1638006862521-0", "field_1", "str_1"}},
{"XADD", "test_stream_2", []interface{}{"1638006862522-0", "field_pattern_1", "str_pattern_1"}},
}
// Create test streams
c.Do("XGROUP", "CREATE", "test_stream_1", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_2", "$")
// Add simple values
createKeyFixtures(t, c, fixtures)
defer func() {
deleteKeyFixtures(t, c, fixtures)
c.Close()
}()
// Process messages to assign Consumers to their groups
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_1", ">")
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_2", ">")
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_2", "COUNT", "1", "STREAMS", "test_stream_2", "0")

tsts := []scanStreamFixture{
{
name: "Single group test",
stream: "test_stream_1",
groups: []streamGroupsInfo{
{
Name: "test_group_1",
Consumers: 1,
Pending: 1,
EntriesRead: 1,
Lag: 0,
LastDeliveredId: "1638006862521-0",
StreamGroupConsumersInfo: []streamGroupConsumersInfo{
{
Expand All @@ -145,12 +334,15 @@ func TestScanStreamGroups(t *testing.T) {
Name: "test_group_1",
Consumers: 2,
Pending: 1,
Lag: 0,
EntriesRead: 1,
LastDeliveredId: "1638006862522-0",
},
{
Name: "test_group_2",
Consumers: 0,
Pending: 0,
Lag: 1,
},
}},
}
Expand All @@ -175,6 +367,12 @@ func TestScanStreamGroups(t *testing.T) {
if parseStreamItemId(scannedGroup[i].LastDeliveredId) != parseStreamItemId(tst.groups[i].LastDeliveredId) {
t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId)
}
if scannedGroup[i].Lag != tst.groups[i].Lag {
t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Lag, scannedGroup[i].Lag)
}
if scannedGroup[i].EntriesRead != tst.groups[i].EntriesRead {
t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead)
}
}
} else {
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(scannedGroup))
Expand All @@ -187,7 +385,7 @@ func TestScanStreamGroupsConsumers(t *testing.T) {
if os.Getenv("TEST_REDIS_URI") == "" {
t.Skipf("TEST_REDIS_URI not set - skipping")
}
addr := os.Getenv("TEST_REDIS_URI")
addr := os.Getenv("TEST_REDIS7_URI")
db := dbNumStr

c, err := redis.DialURL(addr)
Expand Down Expand Up @@ -302,9 +500,14 @@ func TestExtractStreamMetrics(t *testing.T) {
"stream_radix_tree_nodes": false,
"stream_last_generated_id": false,
"stream_groups": false,
"stream_max_deleted_entry_id": false,
"stream_first_entry_id": false,
"stream_last_entry_id": false,
"stream_group_consumers": false,
"stream_group_messages_pending": false,
"stream_group_last_delivered_id": false,
"stream_group_entries_read": false,
"stream_group_lag": false,
"stream_group_consumer_messages_pending": false,
"stream_group_consumer_idle_seconds": false,
}
Expand All @@ -313,6 +516,7 @@ func TestExtractStreamMetrics(t *testing.T) {
for k := range want {
log.Debugf("metric: %s", m.Desc().String())
log.Debugf("want: %s", k)

if strings.Contains(m.Desc().String(), k) {
want[k] = true
}
Expand Down

0 comments on commit 1b22581

Please sign in to comment.