Skip to content

Commit

Permalink
[improve][pip] PIP-386: Add resetIncludeHead in CommandSubscribe for …
Browse files Browse the repository at this point in the history
…startMessageIdInclusive implementation
  • Loading branch information
summeriiii committed Oct 9, 2024
1 parent 676fdb1 commit 9fac413
Showing 1 changed file with 118 additions and 0 deletions.
118 changes: 118 additions & 0 deletions pip/pip-386.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation

# Motivation

This pip is intended to fix issue https://github.com/apache/pulsar/issues/23239.

In the previous implementation of the method startMessageIdInclusive (https://github.com/apache/pulsar/pull/4331),
we added startMessageIdInclusive() to support include current position of reset on ReaderBuilder.

However, the condition `if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0)` in PersistentTopic#getNonDurableSubscription was directly removed.
When we use the NonDurableSubscription, this caused the entryId to decrease by 1 for non-batch messages,
resulting in wrong msgBackLog after topic unload for non-durable subscription.

# Goals

Add resetIncludeHead in CommandSubscribe to implement startMessageIdInclusive, and fix the NonDurable Subscription msgBackLog incorrect after topic unload

# High Level Design

# Detailed Design

## Design & Implementation Details

- CommandSubscribe add the field **resetIncludeHead**, when use the ConsumerBuilder#startMessageIdInclusive or ReaderBuilder#startMessageIdInclusive this param is true, otherwise it is false.
- PersistTopic#getNonDurableSubscription add the judge condition `(msgId.getBatchIndex() >= 0 || resetIncludeHead)`, entryId - 1 will execute **when msg is batch or the resetIncludeHead is true.**


```java
if (ledgerId >= 0 && entryId >= 0
&& msgId instanceof BatchMessageIdImpl
&& (msgId.getBatchIndex() >= 0 || resetIncludeHead)) {
// When the start message is relative to a batch, we need to take one step back on the previous
// message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
entryId = msgId.getEntryId() - 1;
}
```


### Binary protocol

Add `reset_include_head` field to the `CommandSubscribe`.

```protobuf
PulsarApi.proto
message CommandSubscribe {
enum SubType {
Exclusive = 0;
Shared = 1;
Failover = 2;
Key_Shared = 3;
}
required string topic = 1;
required string subscription = 2;
required SubType subType = 3;
required uint64 consumer_id = 4;
required uint64 request_id = 5;
optional string consumer_name = 6;
optional int32 priority_level = 7;
// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];
// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
optional MessageIdData start_message_id = 9;
/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;
optional bool read_compacted = 11;
optional Schema schema = 12;
enum InitialPosition {
Latest = 0;
Earliest = 1;
}
// Signal whether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];
// Mark the subscription as "replicated". Pulsar will make sure
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;
// If true, the subscribe operation will cause a topic to be
// created if it does not exist already (and if topic auto-creation
// is allowed by broker.
// If false, the subscribe operation will fail if the topic
// does not exist.
optional bool force_topic_creation = 15 [default = true];
// If specified, the subscription will reset cursor's position back
// to specified seconds and will send messages from that point
optional uint64 start_message_rollback_duration_sec = 16 [default = 0];
optional KeySharedMeta keySharedMeta = 17;
repeated KeyValue subscription_properties = 18;
// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;
optional bool reset_include_head = 20 [default = false];
}
```


# Links

* Mailing List discussion thread:
* Mailing List voting thread:

0 comments on commit 9fac413

Please sign in to comment.