Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Reader.hasMessageAvailable return wrong value after seeking by timestamp with startMessageIdInclusive #23501

Closed
3 tasks done
summeriiii opened this issue Oct 22, 2024 · 0 comments · Fixed by #23502
Closed
3 tasks done
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@summeriiii
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

master

Minimal reproduce step

  1. create new reader with startMessageIdInclusive
  2. seek to the current time reader.seek(System.currentTimeMillis());
  3. call the reader.hasMessageAvailable()
    @Test(dataProvider = "initializeLastMessageIdInBroker")
    public void testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive(boolean initializeLastMessageIdInBroker)
            throws Exception {
        final String topic = "persistent://my-property/my-ns/" +
                "testHasMessageAvailableAfterSeekTimestampWithMessageInclusive";

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        final long timestampBeforeSend = System.currentTimeMillis();
        final MessageId sentMsgId = producer.send("msg");

        final List<MessageId> messageIds = new ArrayList<>();
        messageIds.add(MessageId.earliest);
        messageIds.add(sentMsgId);
        messageIds.add(MessageId.latest);

        for (MessageId messageId : messageIds) {
            @Cleanup
            Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
                    .startMessageIdInclusive()
                    .startMessageId(messageId).create();
            if (initializeLastMessageIdInBroker) {
                assertTrue(reader.hasMessageAvailable());
            }
            reader.seek(System.currentTimeMillis());
            assertFalse(reader.hasMessageAvailable());
            Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
            assertNull(message);
        }

        for (MessageId messageId : messageIds) {
            @Cleanup
            Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
                    .startMessageIdInclusive()
                    .startMessageId(messageId).create();
            if (initializeLastMessageIdInBroker) {
                assertTrue(reader.hasMessageAvailable());
            }
            reader.seek(timestampBeforeSend);
            assertTrue(reader.hasMessageAvailable());
        }
    }

What did you expect to see?

In the step 3, reader.hasMessageAvailable() should return false cause there has no message to read

What did you see instead?

reader.hasMessageAvailable() return true

Expected :false
Actual   :true
<Click to see difference>

	at org.testng.Assert.fail(Assert.java:110)
	at org.testng.Assert.failNotEquals(Assert.java:1577)
	at org.testng.Assert.assertFalse(Assert.java:78)
	at org.testng.Assert.assertFalse(Assert.java:88)
	at org.apache.pulsar.client.impl.ReaderTest.testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive(ReaderTest.java:934)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
1 participant