-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add API of PullConsumer #342
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #342 +/- ##
============================================
- Coverage 34.36% 34.08% -0.28%
+ Complexity 660 656 -4
============================================
Files 220 220
Lines 11450 11450
Branches 277 277
============================================
- Hits 3935 3903 -32
- Misses 7261 7294 +33
+ Partials 254 253 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
ece52b0
to
20f53e1
Compare
java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java
Show resolved
Hide resolved
20f53e1
to
c6db59e
Compare
This PR is stale because it has been open for 30 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR. |
java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java
Show resolved
Hide resolved
In the scenario of stream processing, there are several methods to obtain offset information that are often used, and it is recommended to add them: |
Make sense. At the same time, would it be more appropriate to place these methods in the Ops related interface rather than the pull consumer? |
@socutes I noticed that we already have some methods support seek to begin or end of a specifc message queue. These methods provide |
/** | ||
* Commit offset manually. | ||
*/ | ||
void commit() throws ClientException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics of commit are unclear, for example, for a single queue client, the pull progress is 200, and the poll function returns 100 at this time, we don't know the consumption progress. If we commit 200 will lose the message, commit 100 is not correct either. For users, it is not the case that the previous batch of messages needs to be consumed before taking the next batch. So the meaning here is auto commit. For stream frameworks, it is usually expected that commit and ckpt are atomic, and I think commit(mq, offset) should also be added as a manual interface.
* @param timeout the maximum time to block. | ||
* @return list of fetched messages. | ||
*/ | ||
List<MessageView> poll(Duration timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also provide an asynchronous interface for PullConsumer#poll
? If we only offer a synchronous interface, it means that each call will occupy one user thread. Typically, the poll interface is called for multiple queues, so occupying one thread may not have a significant impact. However, an asynchronous interface can provide greater flexibility, such as forming a future chain with the user's own methods, reducing unnecessary threads when multiple clients exist in one process, and potentially offering other benefits.
We welcome everyone to join the discussion.
Similar to the
LitePullConsumer
provided in RocketMQ 4.0, RocketMQ 5.0 will also provide a brand-new API that can achieve the same functionality, providing higher controllability and flexibility to meet more diverse requirements for message processing.Fixes #341