From e172f4bdf9cf2633229ca8d2e18c9c677ef69128 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 30 Apr 2014 14:18:42 -0700 Subject: [PATCH] Fixed issue #29 with subscriptions leaking across routes on auto-unsubscribe --- server/client.go | 10 +++++++++- server/const.go | 2 +- server/route.go | 7 ++++--- test/cluster_test.go | 33 +++++++++++++++++++++++++++++++++ test/routes_test.go | 35 +++++++++++++++++++++++++++++++++++ test/test.go | 19 +++++++++++-------- 6 files changed, 93 insertions(+), 13 deletions(-) diff --git a/server/client.go b/server/client.go index f7540eabef1..6f4b8901996 100644 --- a/server/client.go +++ b/server/client.go @@ -538,14 +538,22 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { sub.nm++ // Check if we should auto-unsubscribe. if sub.max > 0 { + // For routing.. + shouldForward := client.typ != ROUTER && client.srv != nil // If we are at the exact number, unsubscribe but // still process the message in hand, otherwise // unsubscribe and drop message on the floor. if sub.nm == sub.max { defer client.unsubscribe(sub) + if shouldForward { + defer client.srv.broadcastUnSubscribe(sub) + } } else if sub.nm > sub.max { client.mu.Unlock() client.unsubscribe(sub) + if shouldForward { + client.srv.broadcastUnSubscribe(sub) + } return } } @@ -873,7 +881,7 @@ func (c *client) closeConnection() { // we are already connected to the other end. if c.isSolicitedRoute() { rid := c.route.remoteID - if rid != "" && c.srv.remotes[rid] != nil { + if rid != "" && srv.remotes[rid] != nil { Debug("Not attempting reconnect for solicited route, already connected.", rid) return } else { diff --git a/server/const.go b/server/const.go index 113b049d83a..270ed1a08cd 100644 --- a/server/const.go +++ b/server/const.go @@ -8,7 +8,7 @@ import ( const ( // VERSION is the current version for the server. - VERSION = "0.5.1" + VERSION = "0.5.2" // DEFAULT_PORT is the deault port for client connections. DEFAULT_PORT = 4222 diff --git a/server/route.go b/server/route.go index 8cdd79903f4..8a043d825f8 100644 --- a/server/route.go +++ b/server/route.go @@ -208,10 +208,11 @@ func (s *Server) broadcastSubscribe(sub *subscription) { func (s *Server) broadcastUnSubscribe(sub *subscription) { rsid := routeSid(sub) maxStr := _EMPTY_ - if sub.max > 0 { - maxStr = fmt.Sprintf("%d ", sub.max) + // Set max if we have it set and have not tripped auto-unsubscribe + if sub.max > 0 && sub.nm < sub.max { + maxStr = fmt.Sprintf(" %d", sub.max) } - proto := fmt.Sprintf(unsubProto, maxStr, rsid) + proto := fmt.Sprintf(unsubProto, rsid, maxStr) s.broadcastToRoutes(proto) } diff --git a/test/cluster_test.go b/test/cluster_test.go index 3c531bb0501..d5e39ffd528 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -321,3 +321,36 @@ func TestClusterDropsRemoteSids(t *testing.T) { t.Fatalf("Expected no subscriptions for srvB, got %d\n", sc) } } + +// This will test that we drop remote sids correctly. +func TestAutoUnsubscribePropogation(t *testing.T) { + srvA, srvB, optsA, _ := runServers(t) + defer srvA.Shutdown() + defer srvB.Shutdown() + + clientA := createClientConn(t, optsA.Host, optsA.Port) + defer clientA.Close() + + sendA, expectA := setupConn(t, clientA) + expectMsgs := expectMsgsCommand(t, expectA) + + // We will create subscriptions that will auto-unsubscribe and make sure + // we are not accumulating orphan subscriptions on the other side. + for i := 1; i <= 100; i++ { + sub := fmt.Sprintf("SUB foo %d\r\n", i) + auto := fmt.Sprintf("UNSUB %d 1\r\n", i) + sendA(sub) + sendA(auto) + // This will trip the auto-unsubscribe + sendA("PUB foo 2\r\nok\r\n") + expectMsgs(1) + } + + sendA("PING\r\n") + expectA(pongRe) + + // Make sure number of subscriptions on B is correct + if subs := srvB.NumSubscriptions(); subs != 0 { + t.Fatalf("Expected no subscriptions on remote server, got %d\n", subs) + } +} diff --git a/test/routes_test.go b/test/routes_test.go index c369a3ec485..7af43440720 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -446,3 +446,38 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) { routeSend(infoJson) routeExpect(subRe) } + +func TestAutoUnsubPropogation(t *testing.T) { + s, opts := runRouteServer(t) + defer s.Shutdown() + + client := createClientConn(t, opts.Host, opts.Port) + clientSend, clientExpect := setupConn(t, client) + + route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + expectAuthRequired(t, route) + _, routeExpect := setupRoute(t, route, opts) + + // Setup a local subscription + clientSend("SUB foo 2\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) + + routeExpect(subRe) + + clientSend("UNSUB 2 1\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) + + routeExpect(unsubmaxRe) + + clientSend("PUB foo 2\r\nok\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) + + clientSend("UNSUB 2\r\n") + clientSend("PING\r\n") + clientExpect(pongRe) + + routeExpect(unsubnomaxRe) +} diff --git a/test/test.go b/test/test.go index 19cacacd1ae..eccb5d8d00e 100644 --- a/test/test.go +++ b/test/test.go @@ -255,14 +255,17 @@ func sendProto(t tLogger, c net.Conn, op string) { } var ( - infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`) - pingRe = regexp.MustCompile(`PING\r\n`) - pongRe = regexp.MustCompile(`PONG\r\n`) - msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) - okRe = regexp.MustCompile(`\A\+OK\r\n`) - errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`) - subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`) - unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`) + infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`) + pingRe = regexp.MustCompile(`PING\r\n`) + pongRe = regexp.MustCompile(`PONG\r\n`) + msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) + okRe = regexp.MustCompile(`\A\+OK\r\n`) + errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`) + subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`) + unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`) + unsubmaxRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))\r\n`) + unsubnomaxRe = regexp.MustCompile(`UNSUB\s+([^\s]+)\r\n`) + connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`) )