Skip to content

Commit

Permalink
KAFKA-18015: Add support for duration based offset reset strategy to …
Browse files Browse the repository at this point in the history
…Kafka Streams (#17973)

Part of KIP-1106.

Adds the public APIs to Kafka Streams, to support the the newly added "by_duration" reset policy,
plus adds the missing "none" reset policy. Deprecates the enum `Topology.AutoOffsetReset` and
all related methods, and replaced them with new overload using the new `AutoOffsetReset` class.

Co-authored-by: Matthias J. Sax <[email protected]>

Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
KApolinario1120 authored Dec 11, 2024
1 parent 156d551 commit d83f09d
Show file tree
Hide file tree
Showing 13 changed files with 615 additions and 39 deletions.
103 changes: 103 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.time.Duration;
import java.util.Optional;

/**
* Sets the {@code auto.offset.reset} configuration when
* {@link Topology#addSource(AutoOffsetReset, String, String...) adding a source processor}
* or when creating {@link KStream} or {@link KTable} via {@link StreamsBuilder}.
*/
public class AutoOffsetReset {
protected final StrategyType offsetResetStrategy;
protected final Optional<Duration> duration;

private AutoOffsetReset(final StrategyType offsetResetStrategy, final Optional<Duration> duration) {
this.offsetResetStrategy = offsetResetStrategy;
this.duration = duration;
}

protected AutoOffsetReset(final AutoOffsetReset autoOffsetReset) {
this(autoOffsetReset.offsetResetStrategy, autoOffsetReset.duration);
}

/**
* Creates an {@code AutoOffsetReset} instance representing "none".
*
* @return An {@link AutoOffsetReset} instance for no reset.
*/
public static AutoOffsetReset none() {
return new AutoOffsetReset(StrategyType.NONE, Optional.empty());
}

/**
* Creates an {@code AutoOffsetReset} instance representing "earliest".
*
* @return An {@link AutoOffsetReset} instance for the "earliest" offset.
*/
public static AutoOffsetReset earliest() {
return new AutoOffsetReset(StrategyType.EARLIEST, Optional.empty());
}

/**
* Creates an {@code AutoOffsetReset} instance representing "latest".
*
* @return An {@code AutoOffsetReset} instance for the "latest" offset.
*/
public static AutoOffsetReset latest() {
return new AutoOffsetReset(StrategyType.LATEST, Optional.empty());
}

/**
* Creates an {@code AutoOffsetReset} instance for the specified reset duration.
*
* @param duration The duration to use for the offset reset; must be non-negative.
* @return An {@code AutoOffsetReset} instance with the specified duration.
* @throws IllegalArgumentException If the duration is negative.
*/
public static AutoOffsetReset byDuration(final Duration duration) {
if (duration.isNegative()) {
throw new IllegalArgumentException("Duration cannot be negative");
}
return new AutoOffsetReset(StrategyType.BY_DURATION, Optional.of(duration));
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AutoOffsetReset that = (AutoOffsetReset) o;
return offsetResetStrategy == that.offsetResetStrategy && duration.equals(that.duration);
}

@Override
public int hashCode() {
int result = offsetResetStrategy.hashCode();
result = 31 * result + duration.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public synchronized <K, V> KStream<K, V> stream(final String topic,
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics) {
return stream(topics, Consumed.with(null, null, null, null));
return stream(topics, Consumed.with(null, null));
}

/**
Expand Down
220 changes: 217 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;

import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
import org.apache.kafka.streams.AutoOffsetReset;

import java.time.Duration;
import java.util.Optional;

public class AutoOffsetResetInternal extends AutoOffsetReset {

public AutoOffsetResetInternal(final AutoOffsetReset autoOffsetReset) {
super(autoOffsetReset);
}

public StrategyType offsetResetStrategy() {
return offsetResetStrategy;
}
public Optional<Duration> duration() {
return duration;
}
}
127 changes: 109 additions & 18 deletions streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.TimestampExtractor;
Expand Down Expand Up @@ -55,30 +56,52 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected TimestampExtractor timestampExtractor;
protected Topology.AutoOffsetReset resetPolicy;
@Deprecated
protected Topology.AutoOffsetReset legacyResetPolicy;
protected AutoOffsetReset resetPolicy;
protected String processorName;

@SuppressWarnings("deprecation")
private Consumed(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy,
final Topology.AutoOffsetReset legacyResetPolicy,
final AutoOffsetReset resetPolicy,
final String processorName) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.timestampExtractor = timestampExtractor;
this.legacyResetPolicy = legacyResetPolicy;
this.resetPolicy = resetPolicy;
this.processorName = processorName;
}

/**
* Create an instance of {@link Consumed} from an existing instance.
* @param consumed the instance of {@link Consumed} to copy
*/
protected Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde,
consumed.valueSerde,
consumed.timestampExtractor,
consumed.resetPolicy,
consumed.processorName
this(
consumed.keySerde,
consumed.valueSerde,
consumed.timestampExtractor,
consumed.legacyResetPolicy,
consumed.resetPolicy,
consumed.processorName
);
}

@Deprecated
private static AutoOffsetReset convertOldToNew(final Topology.AutoOffsetReset resetPolicy) {
if (resetPolicy == null) {
return null;
}

return resetPolicy == org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST
? AutoOffsetReset.earliest()
: AutoOffsetReset.latest();
}

/**
* Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
*
Expand All @@ -95,12 +118,39 @@ protected Consumed(final Consumed<K, V> consumed) {
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*
* @deprecated Since 4.0. Use {@link #with(Serde, Serde, TimestampExtractor, AutoOffsetReset)} instead.
*/
@Deprecated
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, convertOldToNew(resetPolicy), null);
}

/**
* Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
*
* @param keySerde
* the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde
* the value serde. If {@code null} the default value serde from config will be used
* @param timestampExtractor
* the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, resetPolicy, null);
}

/**
Expand All @@ -118,7 +168,7 @@ public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Consumed<>(keySerde, valueSerde, null, null, null);
return new Consumed<>(keySerde, valueSerde, null, null, null, null);
}

/**
Expand All @@ -133,7 +183,7 @@ public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
return new Consumed<>(null, null, timestampExtractor, null, null);
return new Consumed<>(null, null, timestampExtractor, null, null, null);
}

/**
Expand All @@ -146,9 +196,27 @@ public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtra
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*
* @deprecated Since 4.0. Use {@link #with(AutoOffsetReset)} instead.
*/
@Deprecated
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, resetPolicy, null);
return new Consumed<>(null, null, null, resetPolicy, convertOldToNew(resetPolicy), null);
}

/**
* Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, null, resetPolicy, null);
}

/**
Expand All @@ -163,7 +231,7 @@ public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPol
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
return new Consumed<>(null, null, null, null, processorName);
return new Consumed<>(null, null, null, null, null, processorName);
}

/**
Expand All @@ -175,7 +243,7 @@ public static <K, V> Consumed<K, V> as(final String processorName) {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}

/**
Expand All @@ -187,7 +255,7 @@ public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}

/**
Expand All @@ -199,7 +267,7 @@ public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}

/**
Expand All @@ -209,9 +277,31 @@ public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampE
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @return a new instance of {@link Consumed}
*
* @deprecated Since 4.0. Use {@link #withOffsetResetPolicy(AutoOffsetReset)} instead.
*/
@Deprecated
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(
keySerde,
valueSerde,
timestampExtractor,
resetPolicy,
convertOldToNew(resetPolicy),
processorName
);
}

/**
* Configure the instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withOffsetResetPolicy(final AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, resetPolicy, processorName);
}

/**
Expand All @@ -224,7 +314,7 @@ public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset reset
*/
@Override
public Consumed<K, V> withName(final String processorName) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}

@Override
Expand All @@ -239,11 +329,12 @@ public boolean equals(final Object o) {
return Objects.equals(keySerde, consumed.keySerde) &&
Objects.equals(valueSerde, consumed.valueSerde) &&
Objects.equals(timestampExtractor, consumed.timestampExtractor) &&
legacyResetPolicy == consumed.legacyResetPolicy &&
resetPolicy == consumed.resetPolicy;
}

@Override
public int hashCode() {
return Objects.hash(keySerde, valueSerde, timestampExtractor, resetPolicy);
return Objects.hash(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy);
}
}
Loading

0 comments on commit d83f09d

Please sign in to comment.