Skip to content

Commit

Permalink
MQTT leak failing test
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Jun 20, 2024
1 parent 3d9cbb9 commit c5a449b
Showing 1 changed file with 57 additions and 7 deletions.
64 changes: 57 additions & 7 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"net"
"os"
"reflect"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -3045,14 +3048,30 @@ func TestMQTTCluster(t *testing.T) {
}
}

func testMQTTConnectDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool) {
func testMQTTConnectSubDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool, sub bool, qos byte) {
t.Helper()
mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: clean}, o.MQTT.Host, o.MQTT.Port)
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, found)
if sub {
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: qos}}, []byte{qos})
}
testMQTTDisconnectEx(t, mc, nil, false)
mc.Close()
}

func captureHeapProfile(filename string) {
f, _ := os.Create(filename)
defer f.Close()
runtime.GC() // Force garbage collection to get a clear picture
pprof.WriteHeapProfile(f)
}

func printHeapUsage(label string) runtime.MemStats {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
fmt.Printf("%s Heap: allocs=%v, objects=%v\n", label, memStats.HeapAlloc, memStats.HeapObjects)
return memStats
}
func TestMQTTClusterConnectDisconnectClean(t *testing.T) {
nServers := 3
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers)
Expand All @@ -3064,7 +3083,38 @@ func TestMQTTClusterConnectDisconnectClean(t *testing.T) {
// specified.
N := 100
for n := 0; n < N; n++ {
testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false)
testMQTTConnectSubDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false, false, 0)
}
}

func TestMQTTClusterConnectSubDisconnectClean(t *testing.T) {
nServers := 3
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers)
defer cl.shutdown()

time.Sleep(1 * time.Second)

// initialize MQTT assets in the cluster
testMQTTConnectSubDisconnect(t, cl.opts[0], "init", true, false, false, 0)
runtime.GC() // Force garbage collection to get a clear picture

memStats := printHeapUsage("BEFORE")
baseHeapAlloc := memStats.HeapAlloc
baseHeapObjects := memStats.HeapObjects

for i := 0; i < 10; i++ {
clientID := nuid.Next()
testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, true, false, true, 2)
runtime.GC() // Force garbage collection to get a clear picture

memStats = printHeapUsage(fmt.Sprintf("AFTER %d", i))
if memStats.HeapAlloc > 2*baseHeapAlloc || memStats.HeapObjects > 2*baseHeapObjects {
captureHeapProfile("AFTERLEAK.pprof")
t.Fatalf("after %d iterations heap alloc has grown from %v to %v (%v%%), objects from %v to %v (%v%%)",
i,
baseHeapAlloc, memStats.HeapAlloc, memStats.HeapAlloc*100/baseHeapAlloc,
baseHeapObjects, memStats.HeapObjects, memStats.HeapObjects*100/baseHeapObjects)
}
}
}

Expand All @@ -3081,13 +3131,13 @@ func TestMQTTClusterConnectDisconnectPersist(t *testing.T) {
for n := 0; n < N; n++ {
// First clean sessions on all servers
for i := 0; i < nServers; i++ {
testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false)
testMQTTConnectSubDisconnect(t, cl.opts[i], clientID, true, false, false, 0)
}

testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false)
testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true)
testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true)
testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true)
testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, false, false, 0)
testMQTTConnectSubDisconnect(t, cl.opts[1], clientID, false, true, false, 0)
testMQTTConnectSubDisconnect(t, cl.opts[2], clientID, false, true, false, 0)
testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, true, false, 0)
}
}

Expand Down

0 comments on commit c5a449b

Please sign in to comment.