Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver006 committed Aug 26, 2023
1 parent 3176357 commit 19c406c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 55 deletions.
5 changes: 0 additions & 5 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ var (
altDBNumStr = "12"
invalidDBNumStr = "16"
dbNumStrFull = fmt.Sprintf("db%s", dbNumStr)

TestStreamTimestamps = []string{
"1638006862416-0",
"1638006862417-2",
}
)

const (
Expand Down
2 changes: 1 addition & 1 deletion exporter/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func createKeyFixtures(t *testing.T, c redis.Conn, fixtures []keyFixture) {
for _, f := range fixtures {
args := append([]interface{}{f.key}, f.args...)
if _, err := c.Do(f.command, args...); err != nil {
t.Errorf("Error creating fixture: %#v, %#v", f, err)
t.Fatalf("Error creating fixture: %#v, %#v", f, err)
}
}
}
Expand Down
113 changes: 64 additions & 49 deletions exporter/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type scanStreamFixture struct {
consumers []streamGroupConsumersInfo
}

var (
TestStreamTimestamps = []string{
"1638006862416-0",
"1638006862417-2",
}
)

func isNotTestTimestamp(returned string) bool {
for _, expected := range TestStreamTimestamps {
if parseStreamItemId(expected) == parseStreamItemId(returned) {
Expand All @@ -27,7 +34,7 @@ func isNotTestTimestamp(returned string) bool {
return true
}

func TestGetStreamInfo(t *testing.T) {
func TestStreamsGetStreamInfo(t *testing.T) {
if os.Getenv("TEST_REDIS_URI") == "" {
t.Skipf("TEST_REDIS_URI not set - skipping")
}
Expand Down Expand Up @@ -67,34 +74,34 @@ func TestGetStreamInfo(t *testing.T) {
}

if info.Length != tst.streamInfo.Length {
t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v\n", info.Length, tst.streamInfo.Length)
t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v", info.Length, tst.streamInfo.Length)
}
if info.RadixTreeKeys != tst.streamInfo.RadixTreeKeys {
t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys)
t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys)
}
if info.RadixTreeNodes != tst.streamInfo.RadixTreeNodes {
t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes)
t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes)
}
if info.Groups != tst.streamInfo.Groups {
t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v\n", info.Groups, tst.streamInfo.Groups)
t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v", info.Groups, tst.streamInfo.Groups)
}
if isNotTestTimestamp(info.LastGeneratedId) {
t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastGeneratedId, TestStreamTimestamps)
t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v", info.LastGeneratedId, TestStreamTimestamps)
}
if info.FirstEntryId != "" {
t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.FirstEntryId, "")
t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v; - Expected empty", info.FirstEntryId)
}
if info.LastEntryId != "" {
t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastEntryId, "")
t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected empty", info.LastEntryId)
}
if info.MaxDeletedEntryId != "" {
t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v\n", info.MaxDeletedEntryId, "")
t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v", info.MaxDeletedEntryId, "0-0")
}
})
}
}

func TestGetStreamInfoUsingRedis7(t *testing.T) {
func TestStreamsGetStreamInfoUsingRedis7(t *testing.T) {
if os.Getenv("TEST_REDIS7_URI") == "" {
t.Skipf("TEST_REDIS7_URI not set - skipping")
}
Expand Down Expand Up @@ -134,47 +141,46 @@ func TestGetStreamInfoUsingRedis7(t *testing.T) {
}

if info.Length != tst.streamInfo.Length {
t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v\n", info.Length, tst.streamInfo.Length)
t.Errorf("Stream length mismatch.\nActual: %#v;\nExpected: %#v", info.Length, tst.streamInfo.Length)
}
if info.RadixTreeKeys != tst.streamInfo.RadixTreeKeys {
t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys)
t.Errorf("Stream RadixTreeKeys mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeKeys, tst.streamInfo.RadixTreeKeys)
}
if info.RadixTreeNodes != tst.streamInfo.RadixTreeNodes {
t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v\n", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes)
t.Errorf("Stream RadixTreeNodes mismatch.\nActual: %#v;\nExpected: %#v", info.RadixTreeNodes, tst.streamInfo.RadixTreeNodes)
}
if info.Groups != tst.streamInfo.Groups {
t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v\n", info.Groups, tst.streamInfo.Groups)
t.Errorf("Stream Groups mismatch.\nActual: %#v;\nExpected: %#v", info.Groups, tst.streamInfo.Groups)
}
if isNotTestTimestamp(info.LastGeneratedId) {
t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastGeneratedId, TestStreamTimestamps)
t.Errorf("Stream LastGeneratedId mismatch.\nActual: %#v;\nExpected any of: %#v", info.LastGeneratedId, TestStreamTimestamps)
}
if info.FirstEntryId != TestStreamTimestamps[0] {
t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.FirstEntryId, TestStreamTimestamps)
t.Errorf("Stream FirstEntryId mismatch.\nActual: %#v;\nExpected any of: %#v", info.FirstEntryId, TestStreamTimestamps)
}
if info.LastEntryId != TestStreamTimestamps[len(TestStreamTimestamps)-1] {
t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v\n", info.LastEntryId, TestStreamTimestamps)
t.Errorf("Stream LastEntryId mismatch.\nActual: %#v;\nExpected any of: %#v", info.LastEntryId, TestStreamTimestamps)
}
if info.MaxDeletedEntryId != "0-0" {
t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v\n", info.MaxDeletedEntryId, "0-0")
t.Errorf("Stream MaxDeletedEntryId mismatch.\nActual: %#v;\nExpected: %#v", info.MaxDeletedEntryId, "0-0")
}
})
}
}

func TestScanStreamGroups(t *testing.T) {
func TestStreamsScanStreamGroups123(t *testing.T) {
if os.Getenv("TEST_REDIS_URI") == "" {
t.Skipf("TEST_REDIS_URI not set - skipping")
}
addr := os.Getenv("TEST_REDIS_URI")
db := dbNumStr

c, err := redis.DialURL(addr)
if err != nil {
t.Fatalf("Couldn't connect to %#v: %#v", addr, err)
}

if _, err = c.Do("SELECT", db); err != nil {
t.Errorf("Couldn't select database %#v", db)
if _, err = c.Do("SELECT", dbNumStr); err != nil {
t.Errorf("Couldn't select database %#v", dbNumStr)
}

fixtures := []keyFixture{
Expand All @@ -185,12 +191,14 @@ func TestScanStreamGroups(t *testing.T) {
c.Do("XGROUP", "CREATE", "test_stream_1", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_2", "$")

// Add simple values
createKeyFixtures(t, c, fixtures)
defer func() {
deleteKeyFixtures(t, c, fixtures)
c.Close()
}()
createKeyFixtures(t, c, fixtures)

// Process messages to assign Consumers to their groups
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_1", ">")
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_2", ">")
Expand All @@ -205,7 +213,7 @@ func TestScanStreamGroups(t *testing.T) {
Name: "test_group_1",
Consumers: 1,
Pending: 1,
EntriesRead: 0,
EntriesRead: 1,
Lag: 0,
LastDeliveredId: "1638006862521-0",
StreamGroupConsumersInfo: []streamGroupConsumersInfo{
Expand All @@ -225,53 +233,54 @@ func TestScanStreamGroups(t *testing.T) {
Consumers: 2,
Pending: 1,
Lag: 0,
EntriesRead: 0,
EntriesRead: 1,
LastDeliveredId: "1638006862522-0",
},
{
Name: "test_group_2",
Consumers: 0,
Pending: 0,
Lag: 0,
Lag: 1,
},
}},
}
for _, tst := range tsts {
t.Run(tst.name, func(t *testing.T) {
scannedGroup, _ := scanStreamGroups(c, tst.stream)
if err != nil {
t.Errorf("Err: %s", err)
t.Fatalf("Err: %s", err)
return
}

if len(scannedGroup) == len(tst.groups) {
for i := range scannedGroup {
if scannedGroup[i].Name != tst.groups[i].Name {
t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Name, scannedGroup[i].Name)
t.Errorf("%d) Group name mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Name, scannedGroup[i].Name)
}
if scannedGroup[i].Consumers != tst.groups[i].Consumers {
t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Consumers, scannedGroup[i].Consumers)
t.Errorf("%d) Consumers count mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Consumers, scannedGroup[i].Consumers)
}
if scannedGroup[i].Pending != tst.groups[i].Pending {
t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Pending, scannedGroup[i].Pending)
t.Errorf("%d) Pending items mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Pending, scannedGroup[i].Pending)
}
if parseStreamItemId(scannedGroup[i].LastDeliveredId) != parseStreamItemId(tst.groups[i].LastDeliveredId) {
t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId)
t.Errorf("%d) LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId)
}
if scannedGroup[i].Lag != tst.groups[i].Lag {
t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Lag, scannedGroup[i].Lag)
t.Errorf("%d) Lag mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].Lag, scannedGroup[i].Lag)
}
if scannedGroup[i].EntriesRead != tst.groups[i].EntriesRead {
t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead)
t.Errorf("%d) EntriesRead mismatch.\nExpected: %#v;\nActual: %#v", i, tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead)
}
}
} else {
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(scannedGroup))
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d", len(tst.consumers), len(scannedGroup))
}
})
}
}

func TestScanStreamGroupsUsingRedis7(t *testing.T) {
func TestStreamsScanStreamGroupsUsingRedis7(t *testing.T) {
if os.Getenv("TEST_REDIS7_URI") == "" {
t.Skipf("TEST_REDIS7_URI not set - skipping")
}
Expand All @@ -291,16 +300,19 @@ func TestScanStreamGroupsUsingRedis7(t *testing.T) {
{"XADD", "test_stream_1", []interface{}{"1638006862521-0", "field_1", "str_1"}},
{"XADD", "test_stream_2", []interface{}{"1638006862522-0", "field_pattern_1", "str_pattern_1"}},
}

// Create test streams
c.Do("XGROUP", "CREATE", "test_stream_1", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "test_stream_2", "test_group_2", "$")

// Add simple values
createKeyFixtures(t, c, fixtures)
defer func() {
deleteKeyFixtures(t, c, fixtures)
c.Close()
}()
createKeyFixtures(t, c, fixtures)

// Process messages to assign Consumers to their groups
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_1", ">")
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "test_stream_2", ">")
Expand Down Expand Up @@ -356,32 +368,32 @@ func TestScanStreamGroupsUsingRedis7(t *testing.T) {
if len(scannedGroup) == len(tst.groups) {
for i := range scannedGroup {
if scannedGroup[i].Name != tst.groups[i].Name {
t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Name, scannedGroup[i].Name)
t.Errorf("Group name mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Name, scannedGroup[i].Name)
}
if scannedGroup[i].Consumers != tst.groups[i].Consumers {
t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Consumers, scannedGroup[i].Consumers)
t.Errorf("Consumers count mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Consumers, scannedGroup[i].Consumers)
}
if scannedGroup[i].Pending != tst.groups[i].Pending {
t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Pending, scannedGroup[i].Pending)
t.Errorf("Pending items mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Pending, scannedGroup[i].Pending)
}
if parseStreamItemId(scannedGroup[i].LastDeliveredId) != parseStreamItemId(tst.groups[i].LastDeliveredId) {
t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId)
t.Errorf("LastDeliveredId items mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].LastDeliveredId, scannedGroup[i].LastDeliveredId)
}
if scannedGroup[i].Lag != tst.groups[i].Lag {
t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].Lag, scannedGroup[i].Lag)
t.Errorf("Lag mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].Lag, scannedGroup[i].Lag)
}
if scannedGroup[i].EntriesRead != tst.groups[i].EntriesRead {
t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v\n", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead)
t.Errorf("EntriesRead mismatch.\nExpected: %#v;\nActual: %#v", tst.groups[i].EntriesRead, scannedGroup[i].EntriesRead)
}
}
} else {
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(scannedGroup))
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d", len(tst.consumers), len(scannedGroup))
}
})
}
}

func TestScanStreamGroupsConsumers(t *testing.T) {
func TestStreamsScanStreamGroupsConsumers(t *testing.T) {
if os.Getenv("TEST_REDIS_URI") == "" {
t.Skipf("TEST_REDIS_URI not set - skipping")
}
Expand All @@ -401,15 +413,18 @@ func TestScanStreamGroupsConsumers(t *testing.T) {
{"XADD", "single_consumer_stream", []interface{}{"*", "field_1", "str_1"}},
{"XADD", "multiple_consumer_stream", []interface{}{"*", "field_pattern_1", "str_pattern_1"}},
}

// Create test streams
c.Do("XGROUP", "CREATE", "single_consumer_stream", "test_group_1", "$", "MKSTREAM")
c.Do("XGROUP", "CREATE", "multiple_consumer_stream", "test_group_1", "$", "MKSTREAM")

// Add simple test items to streams
createKeyFixtures(t, c, fixtures)
defer func() {
deleteKeyFixtures(t, c, fixtures)
c.Close()
}()
createKeyFixtures(t, c, fixtures)

// Process messages to assign Consumers to their groups
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "single_consumer_stream", ">")
c.Do("XREADGROUP", "GROUP", "test_group_1", "test_consumer_1", "COUNT", "1", "STREAMS", "multiple_consumer_stream", ">")
Expand Down Expand Up @@ -456,23 +471,23 @@ func TestScanStreamGroupsConsumers(t *testing.T) {
if len(g.StreamGroupConsumersInfo) == len(tst.consumers) {
for i := range g.StreamGroupConsumersInfo {
if g.StreamGroupConsumersInfo[i].Name != tst.consumers[i].Name {
t.Errorf("Consumer name mismatch.\nExpected: %#v;\nActual: %#v\n", tst.consumers[i].Name, g.StreamGroupConsumersInfo[i].Name)
t.Errorf("Consumer name mismatch.\nExpected: %#v;\nActual: %#v", tst.consumers[i].Name, g.StreamGroupConsumersInfo[i].Name)
}
if g.StreamGroupConsumersInfo[i].Pending != tst.consumers[i].Pending {
t.Errorf("Pending items mismatch for %s.\nExpected: %#v;\nActual: %#v\n", g.StreamGroupConsumersInfo[i].Name, tst.consumers[i].Pending, g.StreamGroupConsumersInfo[i].Pending)
t.Errorf("Pending items mismatch for %s.\nExpected: %#v;\nActual: %#v", g.StreamGroupConsumersInfo[i].Name, tst.consumers[i].Pending, g.StreamGroupConsumersInfo[i].Pending)
}

}
} else {
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d\n", len(tst.consumers), len(g.StreamGroupConsumersInfo))
t.Errorf("Consumers entries mismatch.\nExpected: %d;\nActual: %d", len(tst.consumers), len(g.StreamGroupConsumersInfo))
}
}

})
}
}

func TestExtractStreamMetrics(t *testing.T) {
func TestStreamsExtractStreamMetrics(t *testing.T) {
if os.Getenv("TEST_REDIS_URI") == "" {
t.Skipf("TEST_REDIS_URI not set - skipping")
}
Expand Down

0 comments on commit 19c406c

Please sign in to comment.