Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Closing connection on max subscriptions exceeded #1709

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ const (

// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
MAX_CONNECTIONS_ERR = "maximum connections exceeded"

// MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit
MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded"
)

// Errors
Expand Down Expand Up @@ -141,6 +144,7 @@ var (
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
)

// GetDefaultOptions returns default configuration options for the client.
Expand Down Expand Up @@ -2973,11 +2977,11 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {

// processOpErr handles errors from reading or parsing the protocol.
// The lock should not be held entering this function.
func (nc *Conn) processOpErr(err error) {
func (nc *Conn) processOpErr(err error) bool {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
nc.mu.Unlock()
return
return false
}

if nc.Opts.AllowReconnect && nc.status == CONNECTED {
Expand All @@ -2997,14 +3001,12 @@ func (nc *Conn) processOpErr(err error) {
nc.clearPendingFlushCalls()

go nc.doReconnect(err, false)
nc.mu.Unlock()
return
return false
}

nc.changeConnStatus(DISCONNECTED)
nc.err = err
nc.mu.Unlock()
nc.close(CLOSED, true, nil)
return true
}

// dispatch is responsible for calling any async callbacks
Expand Down Expand Up @@ -3101,7 +3103,9 @@ func (nc *Conn) readLoop() {
err = nc.parse(buf)
}
if err != nil {
nc.processOpErr(err)
if shouldClose := nc.processOpErr(err); shouldClose {
nc.close(CLOSED, true, nil)
}
break
}
}
Expand Down Expand Up @@ -3431,15 +3435,17 @@ slowConsumer:
}
}

// processPermissionsViolation is called when the server signals a subject
// permissions violation on either publish or subscribe.
func (nc *Conn) processPermissionsViolation(err string) {
// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
// This will trigger the async error callback if set.
// These errors include the following:
// - permissions violation on publish or subscribe
// - maximum subscriptions exceeded
func (nc *Conn) processTransientError(err error) {
nc.mu.Lock()
// create error here so we can pass it as a closure to the async cb dispatcher.
e := errors.New("nats: " + err)
nc.err = e
nc.err = err
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
nc.mu.Unlock()
}
Expand Down Expand Up @@ -3671,15 +3677,17 @@ func (nc *Conn) processErr(ie string) {
// convert to lower case.
e := strings.ToLower(ne)

close := false
var close bool

// FIXME(dlc) - process Slow Consumer signals special.
if e == STALE_CONNECTION {
nc.processOpErr(ErrStaleConnection)
close = nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
nc.processOpErr(ErrMaxConnectionsExceeded)
close = nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processPermissionsViolation(ne)
nc.processTransientError(fmt.Errorf("nats: %s", ne))
} else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) {
nc.processTransientError(ErrMaxSubscriptionsExceeded)
} else if authErr := checkAuthError(e); authErr != nil {
nc.mu.Lock()
close = nc.processAuthError(authErr)
Expand Down Expand Up @@ -5128,7 +5136,9 @@ func (nc *Conn) processPingTimer() {
nc.pout++
if nc.pout > nc.Opts.MaxPingsOut {
nc.mu.Unlock()
nc.processOpErr(ErrStaleConnection)
if shouldClose := nc.processOpErr(ErrStaleConnection); shouldClose {
nc.close(CLOSED, true, nil)
}
return
}

Expand Down
33 changes: 33 additions & 0 deletions test/sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test
import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1737,3 +1738,35 @@ func TestSubscriptionEvents(t *testing.T) {
close(blockChan)
})
}

func TestMaxSubscriptionsExceeded(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
max_subscriptions: 5
`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

ch := make(chan error)
nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
ch <- err
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

for i := 0; i < 6; i++ {
s, err := nc.Subscribe("foo", func(_ *nats.Msg) {})
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
defer s.Unsubscribe()
}

WaitOnChannel(t, ch, nats.ErrMaxSubscriptionsExceeded)

// wait for the server to process the SUBs
time.Sleep(100 * time.Millisecond)
}