Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I often get the error nats: no heartbeat received #1622

Open
VuongUranus opened this issue Apr 22, 2024 · 25 comments
Open

I often get the error nats: no heartbeat received #1622

VuongUranus opened this issue Apr 22, 2024 · 25 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@VuongUranus
Copy link

Observed behavior

I often get the error no heartbeat received. Why do I get this error, where have I misconfigured?
When I get this error it seems like my pull consumer has been deleted, I don't know the reason but it seems to be due to exceeding the time configured in the InactiveThreshold attribute configured in the consumer config.

func  ContinuousPolling(cons jetstream.Consumer, f func([]byte)) (jetstream.MessagesContext, error) {
	iter, err := cons.Messages()
	if err != nil {
		log.Error().Msgf("Error when get message from NATS: %v", err)
		return nil, err
	}
	go func() {
		for {
			msg, err := iter.Next()
			if err != nil {
				if errors.Is(err, jetstream.ErrMsgIteratorClosed) || errors.Is(err, jetstream.ErrConsumerDeleted) {
					return
				}

				log.Error().Msgf("Error when consume message. Reson: %v",  err)
				continue
			}
			go f(msg.Data())
			msg.Ack()
		}
	}()

	return iter, nil
}
func setDefaultConsumerConfig(subjects []string, consumerName ...string) jetstream.ConsumerConfig {
	name := ""
	if len(consumerName) > 0 {
		name = consumerName[0]
	}

	return jetstream.ConsumerConfig{
		Name:              name,
		DeliverPolicy:     jetstream.DeliverNewPolicy,
		FilterSubjects:    subjects,
		AckPolicy:         jetstream.AckAllPolicy,
		Replicas:          1,
		InactiveThreshold: 1 * time.Minute,
	}
}

Expected behavior

Please explain to me and avoid this situation again.

Server and client version

nats-server version: 2.10.12

Host environment

No response

Steps to reproduce

No response

@VuongUranus VuongUranus added the defect Suspected defect such as a bug or regression label Apr 22, 2024
@Jarema
Copy link
Member

Jarema commented Apr 22, 2024

Hey!

What we don't see here, is the context in which both provided functions are called. Could you please share it?

Inactive Threshold kicks in when there is noone listening for messages from a consumer (so, no Fetch or Messages active).

This means it can happen for example if there is a long pause between creating a consumer and consuming its messages, or longer than a minute app downtime (the app that is consuming).

Moving the issue to the nats.go repo, as it's probably a client side discussion unless we find any issue.

@Jarema Jarema transferred this issue from nats-io/nats-server Apr 22, 2024
@withinboredom
Copy link

I believe this happens if you spend too long processing a message in the message loop. You need to take the message off the "queue" and return processing to the library asap (at least that is what I got when trying to debug this issue in my code).

@Jarema
Copy link
Member

Jarema commented May 13, 2024

That should not be the case, especially in such a long inactive threshold.
We will however try to replicate it by running a slow workload.

@withinboredom
Copy link

AFAICT, the issue appears to be that heartbeats are only processed when calling iter.Next() so you need to call it again asap.

@piotrpio
Copy link
Collaborator

This is not correct. The heartbeats are indeed processed in Next(), but the timer which triggers the error is only running in the context of each Next() execution. So each time you call iter.Next() the timer is reset and stopped when the method returns.

@withinboredom
Copy link

Indeed! Looking at the code, it looks like this error might be coming from networking issues as the heartbeat isn't paused/reset during reconnection:

nats.go/jetstream/pull.go

Lines 608 to 630 in 8894a27

err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&s.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := s.consumer.Info(ctx)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
return false, err
}
if attempt == backoffOpts.attempts-1 {
return true, fmt.Errorf("could not get consumer info after server reconnect: %w", err)
}
return true, err
}
return false, nil
}, backoffOpts)
if err != nil {
s.Stop()
return nil, err
}

@withinboredom
Copy link

I can confirm that the above PR appears to fix the issue -- at least, I haven't seen this message in awhile now.

@withinboredom
Copy link

withinboredom commented May 19, 2024

@VuongUranus try adding this to your go.mod file and see if you are also not seeing the issue anymore?

replace github.com/nats-io/nats.go => github.com/withinboredom/nats.go patch-1

@VuongUranus
Copy link
Author

@withinboredom i can't replace replace github.com/nats-io/nats.go => github.com/withinboredom/nats.go patch-1

@piotrpio
Copy link
Collaborator

@VuongUranus The issue mentioned by @withinboredom is fixed here: #1643, we'll be merging it soon.

However, I am still not certain that this is indeed your problem. It would be great if you could verify and if you still encounter the issue it would be very helpful if you could answer the questions from this comment: #1622 (comment)

@VuongUranus
Copy link
Author

My message handling function is in a goroutine so it may not be due to the effects of processing messages for too long.

@VuongUranus
Copy link
Author

@piotrpio
I updated to version 1.36.0 but still get the error nats: no heartbeat received

@svirmi
Copy link

svirmi commented Jun 23, 2024

I have also "no heartbeat received" with 1.36.0 version.
Code snippet:

for {

		select {

		case <-nats.Ctx.Done():
			return

		default:

			msg, err := consumer.Next()

			if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
				logger.Error("consumer.Next()", slog.String("nats", err.Error()))
				continue

			} else if errors.Is(err, jetstream.ErrNoHeartbeat) {
				logger.Error("consumer.Next() :: jetstream.ErrNoHeartbeat", slog.String("symbol", symbol))
				continue

			} else if err != nil {
				logger.Error("consumer.Next() :: failed to fetch message", slog.String("nats", err.Error()))
				continue
			}

			// msg.Ack()

			err = json.Unmarshal(msg.Data(), &kline)

			if err != nil {
				logger.Error("error in json.Unmarshal(msg.Data(), &kline)", slog.String("nats", err.Error()))
				return
			}

			nats.KLine <- kline
		}
	}

@Jarema
Copy link
Member

Jarema commented Jun 24, 2024

After you get that error, do you get it once, and then resume normal operation, or there are many consecutive heartbeat errors?
After the error - can you check if the consumer is still there (with consumer info call in client or CLI?)

@VuongUranus
Copy link
Author

This error occurs repeatedly on a consumer. When this error occurs, I tried searching for the consumer using CLI but could not find it. And I found that when I use NoAck: true for stream config and use AckNonePolicy for consumer config, this error is reduced.

@Jarema
Copy link
Member

Jarema commented Jun 24, 2024

This error means that there is some issue with the Consumer, or JetStream.
Sometimes client can recover, sometimes it can't (for example if it is deleted).

Ack should have nothing to do with it. Most probable reason is having consumer with inactivity threshold set, which will go away after given duration of client inactivity.

@VuongUranus
Copy link
Author

I use Consume function to pull messages so why is the consumer deleted by inactivity threshold?

@VuongUranus
Copy link
Author

I increased the value of InactiveThreshold but the error still occurs as usual. However using NoAck: true and AckPolicy: jetstream.AckNonePolicy actually reduces errors a lot.

@kaverhovsky
Copy link

@VuongUranus I have encountered this error. It was a consecutive set of nats: no heartbeat received just after nats-server have been shutted down. I couldn't grasp why my client wasn't able to reconnect.

The thing was that I missed ReconnectWait option in client configuration (not Jetstream, but Core Nats client). Because of that all reconnection attempts had been exhausted before nats-server have gone up again. That led to client connection close.

Maybe in your case there were some server failure and reconnection attempts exhaustion.

@yimliuwork
Copy link

yimliuwork commented Oct 4, 2024

Facing the same issue here. I'm using consumer.Messages() with version 1.37.0 to pull messages and often get nats: no heartbeat received error. What's weird is that if I drain the subscription and then resubscribe to the same consumer right away, it'll work. Then if I just change the consumer name and start a new subscription, it fails again... It almost feel like it needs to wait for the consumer to be "ready".

Another odd thing is that this is only observed on the server but not on local developing environment. So there is a chance this might be a network issue? Hope to get some insights from you guys.

here is the config for the consumer:

consConfig := jetstream.ConsumerConfig{
		Name:              queueGroup,
		Durable:           queueGroup,
		Description:       fmt.Sprintf("consumer for queue group: %v", queueGroup),
		AckPolicy:         jetstream.AckExplicitPolicy,
		AckWait:           30 * time.Second,
		MaxDeliver:        10,               
		FilterSubject:     fmt.Sprintf("%v.gamespace.%v", n.mpEnv, gamespace),
		ReplayPolicy:      jetstream.ReplayInstantPolicy,
		MaxAckPending:     5120, 
		HeadersOnly:       false,
		InactiveThreshold: 60 * time.Second,
	}

@Jarema
Copy link
Member

Jarema commented Oct 4, 2024

@yimliuwork can you share a snippet of code where you create and then use the consumer?

@yimliuwork
Copy link

Hi @Jarema, Thanks for replying!

The Consumer creation, MessageContext creation and message iteration are in different functions, I'll just putting everything together here (without logging code):

func (n *NatsClient) GenerateConsumer(ctx context.Context, queueGroup, gamespace string, startSeq uint64) (jetstream.Consumer, error) {
	consConfig := jetstream.ConsumerConfig{
		Name:              queueGroup,
		Durable:           queueGroup,
		Description:       fmt.Sprintf("consumer for queue group: %v", queueGroup),
		AckPolicy:         jetstream.AckExplicitPolicy,
		AckWait:           30 * time.Second, // reduce redelivery as much as possible
		MaxDeliver:        10,               // may need redelivery when one connection is cut off and messages in its buffer are not delivered
		FilterSubject:     fmt.Sprintf("%v.gamespace.%v", n.mpEnv, gamespace),
		ReplayPolicy:      jetstream.ReplayInstantPolicy,
		MaxAckPending:     5120, // put a high number to ensure good throughput
		HeadersOnly:       false,
		InactiveThreshold: 60 * time.Second,
	}
	if startSeq != 0 {
		consConfig.DeliverPolicy = jetstream.DeliverByStartSequencePolicy
		consConfig.OptStartSeq = startSeq
	} else {
		consConfig.DeliverPolicy = jetstream.DeliverNewPolicy
	}
	// If consumer already exists and the provided configuration differs from its configuration, ErrConsumerExists
	// is returned. If the provided configuration is the same as the existing consumer, the existing consumer
	// is returned.
	return n.js.CreateConsumer(ctx, fmt.Sprintf("GAMESPACE-%v", n.mpEnv), consConfig)
}

func (s *Server) initSubscription(ctx context.Context, queueGroup, gamespace string, startSeq uint64) (jetstream.MessagesContext, error) {
	cons, err := s.nc.GenerateConsumer(ctx, queueGroup, gamespace, startSeq)
	if err != nil {
		return nil, err
	}
	log.Info(ctx, fmt.Sprintf("consumer %v created or found", queueGroup))

	subscription, err := cons.Messages(jetstream.PullMaxMessages(200))
	if err != nil {
		return nil, err
	}

	return subscription, nil
}

func (sw *streamWorker) startPulling(ctx context.Context) {
	defer log.Debug(ctx, "stop pulling...")
	defer close(sw.toSend)
	firstMsg := true
	for {
		msg, err := sw.subscription.Next()
		sw.toSend <- msg
	}
}

Judging from logs, I found that the .Next() was called right after the consumer is created. However, it took 1 min for it to return the "no heartbeat" error. Also during this time, when I went to Synadia Jetstream manage page, I can see the consumer is created, but there is no waiting request. I guess that's why the server considered the consumer is idle and deleted it after the threshold.

@Jarema
Copy link
Member

Jarema commented Oct 16, 2024

This is pretty weird. Could you show us how those functions are used?
Maybe it will give us some hint.

@yimliuwork
Copy link

That's pretty much all the code really. So basically I'm building a grpc server-stream API that distributes messages from a message queue (a consumer) to different clients. The API request payload includes the consumer's name and start sequence. With each request, we create such consumer or we find the consumer if it already exists. Then we create a subscription to that consumer using cons.Messages() and start pulling messages using subscription.Next().

Another issue I just found out from load testing is that this 'no heartbeat' error can happen half way through a stream. Like I have 100 subscriptions to one consumer, and they are pulling messages fine. Then at some point all 100 subscriptions receive a 'no heartbeat' err.

@brettinternet
Copy link

brettinternet commented Oct 16, 2024

@Jarema I have a possible reproduction here alongside another problem from this issue #1703 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants