diff --git a/nats.go b/nats.go index 83638a565..3f12d61e2 100644 --- a/nats.go +++ b/nats.go @@ -86,6 +86,9 @@ const ( // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit MAX_CONNECTIONS_ERR = "maximum connections exceeded" + + // MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit + MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded" ) // Errors @@ -141,6 +144,7 @@ var ( ErrNoResponders = errors.New("nats: no responders available for request") ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") ErrConnectionNotTLS = errors.New("nats: connection is not tls") + ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") ) // GetDefaultOptions returns default configuration options for the client. @@ -2973,11 +2977,11 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) { // processOpErr handles errors from reading or parsing the protocol. // The lock should not be held entering this function. -func (nc *Conn) processOpErr(err error) { +func (nc *Conn) processOpErr(err error) bool { nc.mu.Lock() + defer nc.mu.Unlock() if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() { - nc.mu.Unlock() - return + return false } if nc.Opts.AllowReconnect && nc.status == CONNECTED { @@ -2997,14 +3001,12 @@ func (nc *Conn) processOpErr(err error) { nc.clearPendingFlushCalls() go nc.doReconnect(err, false) - nc.mu.Unlock() - return + return false } nc.changeConnStatus(DISCONNECTED) nc.err = err - nc.mu.Unlock() - nc.close(CLOSED, true, nil) + return true } // dispatch is responsible for calling any async callbacks @@ -3101,7 +3103,9 @@ func (nc *Conn) readLoop() { err = nc.parse(buf) } if err != nil { - nc.processOpErr(err) + if shouldClose := nc.processOpErr(err); shouldClose { + nc.close(CLOSED, true, nil) + } break } } @@ -3431,15 +3435,17 @@ slowConsumer: } } -// processPermissionsViolation is called when the server signals a subject -// permissions violation on either publish or subscribe. -func (nc *Conn) processPermissionsViolation(err string) { +// processTransientError is called when the server signals a non terminal error +// which does not close the connection or trigger a reconnect. +// This will trigger the async error callback if set. +// These errors include the following: +// - permissions violation on publish or subscribe +// - maximum subscriptions exceeded +func (nc *Conn) processTransientError(err error) { nc.mu.Lock() - // create error here so we can pass it as a closure to the async cb dispatcher. - e := errors.New("nats: " + err) - nc.err = e + nc.err = err if nc.Opts.AsyncErrorCB != nil { - nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) }) + nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) }) } nc.mu.Unlock() } @@ -3671,15 +3677,17 @@ func (nc *Conn) processErr(ie string) { // convert to lower case. e := strings.ToLower(ne) - close := false + var close bool // FIXME(dlc) - process Slow Consumer signals special. if e == STALE_CONNECTION { - nc.processOpErr(ErrStaleConnection) + close = nc.processOpErr(ErrStaleConnection) } else if e == MAX_CONNECTIONS_ERR { - nc.processOpErr(ErrMaxConnectionsExceeded) + close = nc.processOpErr(ErrMaxConnectionsExceeded) } else if strings.HasPrefix(e, PERMISSIONS_ERR) { - nc.processPermissionsViolation(ne) + nc.processTransientError(fmt.Errorf("nats: %s", ne)) + } else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) { + nc.processTransientError(ErrMaxSubscriptionsExceeded) } else if authErr := checkAuthError(e); authErr != nil { nc.mu.Lock() close = nc.processAuthError(authErr) @@ -5128,7 +5136,9 @@ func (nc *Conn) processPingTimer() { nc.pout++ if nc.pout > nc.Opts.MaxPingsOut { nc.mu.Unlock() - nc.processOpErr(ErrStaleConnection) + if shouldClose := nc.processOpErr(ErrStaleConnection); shouldClose { + nc.close(CLOSED, true, nil) + } return } diff --git a/test/sub_test.go b/test/sub_test.go index a27e14256..559efc50c 100644 --- a/test/sub_test.go +++ b/test/sub_test.go @@ -16,6 +16,7 @@ package test import ( "errors" "fmt" + "os" "sync" "sync/atomic" "testing" @@ -1737,3 +1738,35 @@ func TestSubscriptionEvents(t *testing.T) { close(blockChan) }) } + +func TestMaxSubscriptionsExceeded(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + max_subscriptions: 5 + `)) + defer os.Remove(conf) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + ch := make(chan error) + nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + ch <- err + })) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + for i := 0; i < 6; i++ { + s, err := nc.Subscribe("foo", func(_ *nats.Msg) {}) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + defer s.Unsubscribe() + } + + WaitOnChannel(t, ch, nats.ErrMaxSubscriptionsExceeded) + + // wait for the server to process the SUBs + time.Sleep(100 * time.Millisecond) +}