diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index e34452e2..f511b2bd 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -40,11 +40,6 @@ var ( altDBNumStr = "12" invalidDBNumStr = "16" dbNumStrFull = fmt.Sprintf("db%s", dbNumStr) - - TestStreamTimestamps = []string{ - "1638006862416-0", - "1638006862417-2", - } ) const ( diff --git a/exporter/keys_test.go b/exporter/keys_test.go index c3e3c511..3ed8d35d 100644 --- a/exporter/keys_test.go +++ b/exporter/keys_test.go @@ -283,7 +283,7 @@ func createKeyFixtures(t *testing.T, c redis.Conn, fixtures []keyFixture) { for _, f := range fixtures { args := append([]interface{}{f.key}, f.args...) if _, err := c.Do(f.command, args...); err != nil { - t.Errorf("Error creating fixture: %#v, %#v", f, err) + t.Fatalf("Error creating fixture: %#v, %#v", f, err) } } } diff --git a/exporter/streams_test.go b/exporter/streams_test.go index e533f497..28333b8c 100644 --- a/exporter/streams_test.go +++ b/exporter/streams_test.go @@ -18,6 +18,13 @@ type scanStreamFixture struct { consumers []streamGroupConsumersInfo } +var ( + TestStreamTimestamps = []string{ + "1638006862416-0", + "1638006862417-2", + } +) + func isNotTestTimestamp(returned string) bool { for _, expected := range TestStreamTimestamps { if parseStreamItemId(expected) == parseStreamItemId(returned) { @@ -27,7 +34,7 @@ func isNotTestTimestamp(returned string) bool { return true } -func TestGetStreamInfo(t *testing.T) { +func TestStreamsGetStreamInfo(t *testing.T) { if os.Getenv("TEST_REDIS_URI") == "" { t.Skipf("TEST_REDIS_URI not set - skipping") } @@ -67,34 +74,34 @@ func TestGetStreamInfo(t *testing.T) { } if info.Length != tst.streamInfo.Length { - t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v\n", info.Length, tst.streamInfo.Length) + t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v", info.Length, tst.streamInfo.Length) } if info.RadixTreeKeys != tst.streamInfo.RadixTreeKeys { - t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys) + t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys) } if info.RadixTreeNodes != tst.streamInfo.RadixTreeNodes { - t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes) + t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes) } if info.Groups != tst.streamInfo.Groups { - t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v\n", info.Groups, tst.streamInfo.Groups) + t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v", info.Groups, tst.streamInfo.Groups) } if isNotTestTimestamp(info.LastGeneratedId) { - t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastGeneratedId, TestStreamTimestamps) + t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v", info.LastGeneratedId, TestStreamTimestamps) } if info.FirstEntryId != "" { - t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.FirstEntryId, "") + t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v; - Expected empty", info.FirstEntryId) } if info.LastEntryId != "" { - t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastEntryId, "") + t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected empty", info.LastEntryId) } if info.MaxDeletedEntryId != "" { - t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v\n", info.MaxDeletedEntryId, "") + t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v", info.MaxDeletedEntryId, "0-0") } }) } } -func TestGetStreamInfoUsingRedis7(t *testing.T) { +func TestStreamsGetStreamInfoUsingRedis7(t *testing.T) { if os.Getenv("TEST_REDIS7_URI") == "" { t.Skipf("TEST_REDIS7_URI not set - skipping") } @@ -134,47 +141,46 @@ func TestGetStreamInfoUsingRedis7(t *testing.T) { } if info.Length != tst.streamInfo.Length { - t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v\n", info.Length, tst.streamInfo.Length) + t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v", info.Length, tst.streamInfo.Length) } if info.RadixTreeKeys != tst.streamInfo.RadixTreeKeys { - t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys) + t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys) } if info.RadixTreeNodes != tst.streamInfo.RadixTreeNodes { - t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes) + t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes) } if info.Groups != tst.streamInfo.Groups { - t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v\n", info.Groups, tst.streamInfo.Groups) + t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v", info.Groups, tst.streamInfo.Groups) } if isNotTestTimestamp(info.LastGeneratedId) { - t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastGeneratedId, TestStreamTimestamps) + t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v", info.LastGeneratedId, TestStreamTimestamps) } if info.FirstEntryId != TestStreamTimestamps[0] { - t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.FirstEntryId, TestStreamTimestamps) + t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v", info.FirstEntryId, TestStreamTimestamps) } if info.LastEntryId != TestStreamTimestamps[len(TestStreamTimestamps)-1] { - t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastEntryId, TestStreamTimestamps) + t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v", info.LastEntryId, TestStreamTimestamps) } if info.MaxDeletedEntryId != "0-0" { - t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v\n", info.MaxDeletedEntryId, "0-0") + t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v", info.MaxDeletedEntryId, "0-0") } }) } } -func TestScanStreamGroups(t *testing.T) { +func TestStreamsScanStreamGroups123(t *testing.T) { if os.Getenv("TEST_REDIS_URI") == "" { t.Skipf("TEST_REDIS_URI not set - skipping") } addr := os.Getenv("TEST_REDIS_URI") - db := dbNumStr c, err := redis.DialURL(addr) if err != nil { t.Fatalf("Couldn't connect to %#v: %#v", addr, err) } - if _, err = c.Do("SELECT", db); err != nil { - t.Errorf("Couldn't select database %#v", db) + if _, err = c.Do("SELECT", dbNumStr); err != nil { + t.Errorf("Couldn't select database %#v", dbNumStr) } fixtures := []keyFixture{ @@ -185,12 +191,14 @@ func TestScanStreamGroups(t *testing.T) { c.Do("XGROUP", "CREATE", "test_stream_1", "test_group_1", "$", "MKSTREAM") c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_1", "$", "MKSTREAM") c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_2", "$") + // Add simple values - createKeyFixtures(t, c, fixtures) defer func() { deleteKeyFixtures(t, c, fixtures) c.Close() }() + createKeyFixtures(t, c, fixtures) + // Process messages to assign Consumers to their groups c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_1", ">") c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_2", ">") @@ -205,7 +213,7 @@ func TestScanStreamGroups(t *testing.T) { Name: "test_group_1", Consumers: 1, Pending: 1, - EntriesRead: 0, + EntriesRead: 1, Lag: 0, LastDeliveredId: "1638006862521-0", StreamGroupConsumersInfo: []streamGroupConsumersInfo{ @@ -225,14 +233,14 @@ func TestScanStreamGroups(t *testing.T) { Consumers: 2, Pending: 1, Lag: 0, - EntriesRead: 0, + EntriesRead: 1, LastDeliveredId: "1638006862522-0", }, { Name: "test_group_2", Consumers: 0, Pending: 0, - Lag: 0, + Lag: 1, }, }}, } @@ -240,38 +248,39 @@ func TestScanStreamGroups(t *testing.T) { t.Run(tst.name, func(t *testing.T) { scannedGroup, _ := scanStreamGroups(c, tst.stream) if err != nil { - t.Errorf("Err: %s", err) + t.Fatalf("Err: %s", err) + return } if len(scannedGroup) == len(tst.groups) { for i := range scannedGroup { if scannedGroup[i].Name != tst.groups[i].Name { - t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Name, scannedGroup[i].Name) + t.Errorf("%d) Group name mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Name, scannedGroup[i].Name) } if scannedGroup[i].Consumers != tst.groups[i].Consumers { - t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Consumers, scannedGroup[i].Consumers) + t.Errorf("%d) Consumers count mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Consumers, scannedGroup[i].Consumers) } if scannedGroup[i].Pending != tst.groups[i].Pending { - t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Pending, scannedGroup[i].Pending) + t.Errorf("%d) Pending items mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Pending, scannedGroup[i].Pending) } if parseStreamItemId(scannedGroup[i].LastDeliveredId) != parseStreamItemId(tst.groups[i].LastDeliveredId) { - t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId) + t.Errorf("%d) LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId) } if scannedGroup[i].Lag != tst.groups[i].Lag { - t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Lag, scannedGroup[i].Lag) + t.Errorf("%d) Lag mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Lag, scannedGroup[i].Lag) } if scannedGroup[i].EntriesRead != tst.groups[i].EntriesRead { - t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead) + t.Errorf("%d) EntriesRead mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead) } } } else { - t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(scannedGroup)) + t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d", len(tst.consumers), len(scannedGroup)) } }) } } -func TestScanStreamGroupsUsingRedis7(t *testing.T) { +func TestStreamsScanStreamGroupsUsingRedis7(t *testing.T) { if os.Getenv("TEST_REDIS7_URI") == "" { t.Skipf("TEST_REDIS7_URI not set - skipping") } @@ -291,16 +300,19 @@ func TestScanStreamGroupsUsingRedis7(t *testing.T) { {"XADD", "test_stream_1", []interface{}{"1638006862521-0", "field_1", "str_1"}}, {"XADD", "test_stream_2", []interface{}{"1638006862522-0", "field_pattern_1", "str_pattern_1"}}, } + // Create test streams c.Do("XGROUP", "CREATE", "test_stream_1", "test_group_1", "$", "MKSTREAM") c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_1", "$", "MKSTREAM") c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_2", "$") + // Add simple values - createKeyFixtures(t, c, fixtures) defer func() { deleteKeyFixtures(t, c, fixtures) c.Close() }() + createKeyFixtures(t, c, fixtures) + // Process messages to assign Consumers to their groups c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_1", ">") c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_2", ">") @@ -356,32 +368,32 @@ func TestScanStreamGroupsUsingRedis7(t *testing.T) { if len(scannedGroup) == len(tst.groups) { for i := range scannedGroup { if scannedGroup[i].Name != tst.groups[i].Name { - t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Name, scannedGroup[i].Name) + t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Name, scannedGroup[i].Name) } if scannedGroup[i].Consumers != tst.groups[i].Consumers { - t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Consumers, scannedGroup[i].Consumers) + t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Consumers, scannedGroup[i].Consumers) } if scannedGroup[i].Pending != tst.groups[i].Pending { - t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Pending, scannedGroup[i].Pending) + t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Pending, scannedGroup[i].Pending) } if parseStreamItemId(scannedGroup[i].LastDeliveredId) != parseStreamItemId(tst.groups[i].LastDeliveredId) { - t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId) + t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId) } if scannedGroup[i].Lag != tst.groups[i].Lag { - t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Lag, scannedGroup[i].Lag) + t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Lag, scannedGroup[i].Lag) } if scannedGroup[i].EntriesRead != tst.groups[i].EntriesRead { - t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead) + t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead) } } } else { - t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(scannedGroup)) + t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d", len(tst.consumers), len(scannedGroup)) } }) } } -func TestScanStreamGroupsConsumers(t *testing.T) { +func TestStreamsScanStreamGroupsConsumers(t *testing.T) { if os.Getenv("TEST_REDIS_URI") == "" { t.Skipf("TEST_REDIS_URI not set - skipping") } @@ -401,15 +413,18 @@ func TestScanStreamGroupsConsumers(t *testing.T) { {"XADD", "single_consumer_stream", []interface{}{"*", "field_1", "str_1"}}, {"XADD", "multiple_consumer_stream", []interface{}{"*", "field_pattern_1", "str_pattern_1"}}, } + // Create test streams c.Do("XGROUP", "CREATE", "single_consumer_stream", "test_group_1", "$", "MKSTREAM") c.Do("XGROUP", "CREATE", "multiple_consumer_stream", "test_group_1", "$", "MKSTREAM") + // Add simple test items to streams - createKeyFixtures(t, c, fixtures) defer func() { deleteKeyFixtures(t, c, fixtures) c.Close() }() + createKeyFixtures(t, c, fixtures) + // Process messages to assign Consumers to their groups c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "single_consumer_stream", ">") c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "multiple_consumer_stream", ">") @@ -456,15 +471,15 @@ func TestScanStreamGroupsConsumers(t *testing.T) { if len(g.StreamGroupConsumersInfo) == len(tst.consumers) { for i := range g.StreamGroupConsumersInfo { if g.StreamGroupConsumersInfo[i].Name != tst.consumers[i].Name { - t.Errorf("Consumer name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.consumers[i].Name, g.StreamGroupConsumersInfo[i].Name) + t.Errorf("Consumer name mismatch.\nExpected: %#v;\nActual: %#v", tst.consumers[i].Name, g.StreamGroupConsumersInfo[i].Name) } if g.StreamGroupConsumersInfo[i].Pending != tst.consumers[i].Pending { - t.Errorf("Pending items mismatch for %s.\nExpected: %#v;\nActual: %#v\n", g.StreamGroupConsumersInfo[i].Name, tst.consumers[i].Pending, g.StreamGroupConsumersInfo[i].Pending) + t.Errorf("Pending items mismatch for %s.\nExpected: %#v;\nActual: %#v", g.StreamGroupConsumersInfo[i].Name, tst.consumers[i].Pending, g.StreamGroupConsumersInfo[i].Pending) } } } else { - t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(g.StreamGroupConsumersInfo)) + t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d", len(tst.consumers), len(g.StreamGroupConsumersInfo)) } } @@ -472,7 +487,7 @@ func TestScanStreamGroupsConsumers(t *testing.T) { } } -func TestExtractStreamMetrics(t *testing.T) { +func TestStreamsExtractStreamMetrics(t *testing.T) { if os.Getenv("TEST_REDIS_URI") == "" { t.Skipf("TEST_REDIS_URI not set - skipping") }