Skip to content

Commit

Permalink
Fixed issue #29 with subscriptions leaking across routes on auto-unsu…
Browse files Browse the repository at this point in the history
…bscribe
  • Loading branch information
Derek Collison committed Apr 30, 2014
1 parent cd56514 commit e172f4b
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 13 deletions.
10 changes: 9 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,22 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
sub.nm++
// Check if we should auto-unsubscribe.
if sub.max > 0 {
// For routing..
shouldForward := client.typ != ROUTER && client.srv != nil
// If we are at the exact number, unsubscribe but
// still process the message in hand, otherwise
// unsubscribe and drop message on the floor.
if sub.nm == sub.max {
defer client.unsubscribe(sub)
if shouldForward {
defer client.srv.broadcastUnSubscribe(sub)
}
} else if sub.nm > sub.max {
client.mu.Unlock()
client.unsubscribe(sub)
if shouldForward {
client.srv.broadcastUnSubscribe(sub)
}
return
}
}
Expand Down Expand Up @@ -873,7 +881,7 @@ func (c *client) closeConnection() {
// we are already connected to the other end.
if c.isSolicitedRoute() {
rid := c.route.remoteID
if rid != "" && c.srv.remotes[rid] != nil {
if rid != "" && srv.remotes[rid] != nil {
Debug("Not attempting reconnect for solicited route, already connected.", rid)
return
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
// VERSION is the current version for the server.
VERSION = "0.5.1"
VERSION = "0.5.2"

// DEFAULT_PORT is the deault port for client connections.
DEFAULT_PORT = 4222
Expand Down
7 changes: 4 additions & 3 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,11 @@ func (s *Server) broadcastSubscribe(sub *subscription) {
func (s *Server) broadcastUnSubscribe(sub *subscription) {
rsid := routeSid(sub)
maxStr := _EMPTY_
if sub.max > 0 {
maxStr = fmt.Sprintf("%d ", sub.max)
// Set max if we have it set and have not tripped auto-unsubscribe
if sub.max > 0 && sub.nm < sub.max {
maxStr = fmt.Sprintf(" %d", sub.max)
}
proto := fmt.Sprintf(unsubProto, maxStr, rsid)
proto := fmt.Sprintf(unsubProto, rsid, maxStr)
s.broadcastToRoutes(proto)
}

Expand Down
33 changes: 33 additions & 0 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,36 @@ func TestClusterDropsRemoteSids(t *testing.T) {
t.Fatalf("Expected no subscriptions for srvB, got %d\n", sc)
}
}

// This will test that we drop remote sids correctly.
func TestAutoUnsubscribePropogation(t *testing.T) {
srvA, srvB, optsA, _ := runServers(t)
defer srvA.Shutdown()
defer srvB.Shutdown()

clientA := createClientConn(t, optsA.Host, optsA.Port)
defer clientA.Close()

sendA, expectA := setupConn(t, clientA)
expectMsgs := expectMsgsCommand(t, expectA)

// We will create subscriptions that will auto-unsubscribe and make sure
// we are not accumulating orphan subscriptions on the other side.
for i := 1; i <= 100; i++ {
sub := fmt.Sprintf("SUB foo %d\r\n", i)
auto := fmt.Sprintf("UNSUB %d 1\r\n", i)
sendA(sub)
sendA(auto)
// This will trip the auto-unsubscribe
sendA("PUB foo 2\r\nok\r\n")
expectMsgs(1)
}

sendA("PING\r\n")
expectA(pongRe)

// Make sure number of subscriptions on B is correct
if subs := srvB.NumSubscriptions(); subs != 0 {
t.Fatalf("Expected no subscriptions on remote server, got %d\n", subs)
}
}
35 changes: 35 additions & 0 deletions test/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,38 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) {
routeSend(infoJson)
routeExpect(subRe)
}

func TestAutoUnsubPropogation(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()

client := createClientConn(t, opts.Host, opts.Port)
clientSend, clientExpect := setupConn(t, client)

route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
expectAuthRequired(t, route)
_, routeExpect := setupRoute(t, route, opts)

// Setup a local subscription
clientSend("SUB foo 2\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)

routeExpect(subRe)

clientSend("UNSUB 2 1\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)

routeExpect(unsubmaxRe)

clientSend("PUB foo 2\r\nok\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)

clientSend("UNSUB 2\r\n")
clientSend("PING\r\n")
clientExpect(pongRe)

routeExpect(unsubnomaxRe)
}
19 changes: 11 additions & 8 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,17 @@ func sendProto(t tLogger, c net.Conn, op string) {
}

var (
infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`PING\r\n`)
pongRe = regexp.MustCompile(`PONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`PING\r\n`)
pongRe = regexp.MustCompile(`PONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`SUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
unsubRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))?\r\n`)
unsubmaxRe = regexp.MustCompile(`UNSUB\s+([^\s]+)(\s+(\d+))\r\n`)
unsubnomaxRe = regexp.MustCompile(`UNSUB\s+([^\s]+)\r\n`)

connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`)
)

Expand Down

0 comments on commit e172f4b

Please sign in to comment.