Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EnableZeroQueueConsumer Setting Not Functioning Correctly #1276

Open
gkhnoztrk opened this issue Aug 29, 2024 · 4 comments · May be fixed by #1278
Open

EnableZeroQueueConsumer Setting Not Functioning Correctly #1276

gkhnoztrk opened this issue Aug 29, 2024 · 4 comments · May be fixed by #1278

Comments

@gkhnoztrk
Copy link

Description
I've encountered an issue with the Pulsar client where setting EnableZeroQueueConsumer: true does not produce the expected behavior. The client continues to perform extra reads into the buffer, which contradicts the purpose of this setting.
Steps to Reproduce

Configure the Pulsar client with EnableZeroQueueConsumer: true
Observe the client's behavior during message consumption

Expected Behavior
When EnableZeroQueueConsumer is set to true, the client should not perform extra reads into the buffer. It should only fetch messages as they are consumed.
Actual Behavior
The client continues to read additional messages into the buffer, despite the EnableZeroQueueConsumer setting being enabled.
Additional Information
I've reviewed the code and I'm confident there's an issue in the implementation of this feature. The extra buffering occurs consistently, regardless of the EnableZeroQueueConsumer setting.
Environment

Pulsar Client Version: v0.13.1
Server : [apachepulsar/pulsar:3.3.1] (docker)

Any assistance in investigating and resolving this issue would be greatly appreciated. Thank you for your time and attention to this matter.

image

@crossoverJie
Copy link
Member

crossoverJie commented Aug 30, 2024

When EnableZeroQueueConsumer is set to true, the client should not perform extra reads into the buffer. It should only fetch messages as they are consumed.

Could you please provide a more detailed explanation of the meaning of this sentence?

case messageCh <- nextMessage:

In the current situation, all messages are first distributed to the messageCh, and then data is retrieved from this channel.

@gkhnoztrk
Copy link
Author

Animation

The expected behavior with EnableZeroQueueConsumer set to true should be as follows:

50 tasks were placed in the queue.
Each consumer was limited to processing 5 tasks concurrently.
4 consumers were opened sequentially.
The distribution should have been:
Consumer 1: Tasks 1-5
Consumer 2: Tasks 6-10
Consumer 3: Tasks 11-15
Consumer 4: Tasks 16-20
However, the actual behavior observed was:

Consumer 1: Processed tasks 1-5, but incorrectly buffered tasks 6-20
Consumer 2: Processed tasks 21-25, but incorrectly buffered tasks 26-40
(Similar pattern for other consumers)
When Consumer 1 was closed, it released all its tasks, including those in the buffer, which Consumer 4 then took over.

This behavior contradicts the purpose of EnableZeroQueueConsumer. With this setting enabled, consumers should only fetch tasks as they're ready to process them, without pre-fetching or buffering additional tasks. The current implementation appears to be ignoring this setting, leading to unnecessary buffering and potential issues with task distribution and processing.

test code :

func consumer3() {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:    "pulsar://localhost:6650",
		Logger: plog.NewLoggerWithLogrus(&logrus.Logger{}),
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	schema := pulsar.NewAvroSchema(common.AvroSchema, nil)

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:                       "test1",
		SubscriptionName:            "my-shared-subscription",
		Type:                        pulsar.Shared,
		Schema:                      schema,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
		EnableZeroQueueConsumer:     true,
		MaxPendingChunkedMessage:    1,
	})

	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	accountWorker := common.NewAccountListener(5)

	fmt.Println("waiting..")
	for accountWorker.WaitReady() {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Printf("Error receiving message: %v", err)
			continue
		}
		account := &common.Account{}
		err = msg.GetSchemaValue(account)
		if err != nil {
			log.Printf("Error decoding message: %v", err)
			consumer.Nack(msg)
			continue
		}
		fmt.Println("Consumed : " + strconv.Itoa(account.ID))
		accountWorker.AddAccount(consumer, msg, account)
	}
} 
const AvroSchema = `
{
  "type": "record",
  "name": "Account",
  "fields": [
    {"name": "ID", "type": "int"}
  ]
}`

type AccountWrapper struct {
	Account  *Account
	Consumer pulsar.Consumer
	Message  pulsar.Message
}

type Account struct {
	ID int `avro:"ID"`
}

type AccountListener struct {
	Limit    int
	Accounts map[int]*AccountWrapper
	lock     *sync.Mutex
}

func NewAccountListener(limit int) *AccountListener {
	return &AccountListener{
		Limit:    limit,
		Accounts: make(map[int]*AccountWrapper),
		lock:     &sync.Mutex{},
	}
}

func (l *AccountListener) WaitReady() bool {
	for l.checkLimit() == false {
		time.Sleep(time.Millisecond * 100)
	}
	return true
}

func (l *AccountListener) IsReady() bool {
	l.lock.Lock()
	defer l.lock.Unlock()
	return len(l.Accounts) < l.Limit
}

func (l *AccountListener) checkLimit() bool {
	l.lock.Lock()
	defer l.lock.Unlock()
	return len(l.Accounts) < l.Limit
}

func (l *AccountListener) AddAccount(consumer pulsar.Consumer, message pulsar.Message, account *Account) {
	l.lock.Lock()
	defer l.lock.Unlock()
	l.Accounts[account.ID] = &AccountWrapper{
		Account:  account,
		Message:  message,
		Consumer: consumer,
	}
	go l.Process(l.Accounts[account.ID])
}

func (l *AccountListener) RemoveAccount(account *Account) {
	l.lock.Lock()
	defer l.lock.Unlock()
	delete(l.Accounts, account.ID)
}

func (l *AccountListener) Process(wrapper *AccountWrapper) {
	// long process simulate
	time.Sleep(time.Minute * 5)
	// finish
	fmt.Println("< : " + strconv.Itoa(wrapper.Account.ID))
	wrapper.Consumer.Ack(wrapper.Message) // success - remove queue
	l.RemoveAccount(wrapper.Account)
}

@gkhnoztrk
Copy link
Author

I've found a temporary solution to my issue by modifying the following line:

pc.availablePermits.inc()

The modified code now looks like this:

if pc.options.receiverQueueSize > 0 {
    pc.availablePermits.inc()
}

This change appears to address the problem I was experiencing. However, I want to emphasize that this is a temporary fix, and I'm not certain about its broader implications or whether it's the most appropriate long-term solution.
I would greatly appreciate if the maintainers or someone with deeper knowledge of the codebase could review this modification and provide feedback on:

Whether this approach aligns with the intended behavior of EnableZeroQueueConsumer.
Potential side effects or issues this change might introduce in other scenarios.
If there's a more robust or recommended way to achieve the desired behavior.

fix

crossoverJie added a commit to crossoverJie/pulsar-client-go that referenced this issue Aug 31, 2024
@crossoverJie
Copy link
Member

@gkhnoztrk

Thank you for the clarification; this was something that wasn't considered before.

This modification is fine; receiverQueueSize will only be 0 when using zeroQueueConsumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants