Skip to content

Commit

Permalink
KAFKA-13722: remove usage of old ProcessorContext
Browse files Browse the repository at this point in the history
We want to deprecate an remove the old ProcessorContext. Thus, we need
to refactor Kafak Streams runtime code, to not make calls into the old
ProcessorContext but only into new code path.
  • Loading branch information
mjsax committed Jan 23, 2025
1 parent 40890fa commit 07915d7
Show file tree
Hide file tree
Showing 33 changed files with 113 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void close() {
private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp()
valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {

@Override
public <KIn, VIn> void forward(final KIn key, final VIn value) {
forward(new Record<>(key, value, timestamp(), headers()));
forward(new Record<>(key, value, recordContext().timestamp(), headers()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
final Record<Object, Object> toProcess = new Record<>(
deserialized.key(),
deserialized.value(),
processorContext.timestamp(),
processorContext.headers()
processorContext.recordContext().timestamp(),
processorContext.recordContext().headers()
);
((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public <K, V> void forward(final K key,
final Record<K, V> toForward = new Record<>(
key,
value,
timestamp(),
recordContext.timestamp(),
headers()
);
forward(toForward);
Expand All @@ -204,7 +204,7 @@ public <K, V> void forward(final K key,
final Record<K, V> toForward = new Record<>(
key,
value,
toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(),
headers()
);
forward(toForward, toInternal.child());
Expand Down Expand Up @@ -250,11 +250,11 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
// old API processors wouldn't see the timestamps or headers of upstream
// new API processors. But then again, from the perspective of those old-API
// processors, even consulting the timestamp or headers when the record context
// is undefined is itself not well defined. Plus, I don't think we need to worry
// is undefined is itself not well-defined. Plus, I don't think we need to worry
// too much about heterogeneous applications, in which the upstream processor is
// implementing the new API and the downstream one is implementing the old API.
// So, this seems like a fine compromise for now.
if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) {
if (recordContext != null && (record.timestamp() != recordContext.timestamp() || record.headers() != recordContext.headers())) {
recordContext = new ProcessorRecordContext(
record.timestamp(),
recordContext.offset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ public final class ProcessorContextUtils {

private ProcessorContextUtils() {}

/**
* Should be removed as part of KAFKA-10217
*/
public static StreamsMetricsImpl metricsImpl(final ProcessorContext context) {
return (StreamsMetricsImpl) context.metrics();
}

/**
* Should be removed as part of KAFKA-10217
*/
Expand Down Expand Up @@ -71,9 +64,10 @@ public static String topicNamePrefix(final Map<String, Object> configs, final St
}
}

public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
@SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
return (InternalProcessorContext<K, V>) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ public void process(final Record<KIn, VIn> record) {
// (instead of `RuntimeException`) to work well with those languages
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().topic(),
internalProcessorContext.recordContext().partition(),
internalProcessorContext.recordContext().offset(),
internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
internalProcessorContext.timestamp());
internalProcessorContext.recordContext().timestamp()
);

final ProcessingExceptionHandler.ProcessingHandlerResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public void process(final Record<KIn, VIn> record) {
final ProcessorRecordContext contextForExtraction =
new ProcessorRecordContext(
timestamp,
context.offset(),
context.partition(),
context.topic(),
context.recordContext().offset(),
context.recordContext().partition(),
context.recordContext().topic(),
record.headers()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,8 @@ private void doProcess(final long wallClockTime) {
final Record<Object, Object> toProcess = new Record<>(
record.key(),
record.value(),
processorContext.timestamp(),
processorContext.headers()
processorContext.recordContext().timestamp(),
processorContext.recordContext().headers()
);
maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;

protected InternalProcessorContext internalProcessorContext;
protected InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
protected boolean consistencyEnabled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,14 @@ private void putInternal(final Bytes key,
key,
new LRUCacheEntry(
value,
internalContext.headers(),
internalContext.recordContext().headers(),
true,
internalContext.offset(),
internalContext.timestamp(),
internalContext.partition(),
internalContext.topic()));
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
)
);

StoreQueryUtils.updatePosition(position, internalContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ public void put(final Windowed<Bytes> key, final byte[] value) {
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
internalContext.headers(),
internalContext.recordContext().headers(),
true,
internalContext.offset(),
internalContext.timestamp(),
internalContext.partition(),
internalContext.topic());
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
);
internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry);

maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), maxObservedTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ public synchronized void put(final Bytes key,
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
internalContext.headers(),
internalContext.recordContext().headers(),
true,
internalContext.offset(),
internalContext.timestamp(),
internalContext.partition(),
internalContext.topic());
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
);
internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry);

maxObservedTimestamp.set(Math.max(keySchema.segmentTimestamp(keyBytes), maxObservedTimestamp.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void maybeSetEvictionListener() {
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
log(key, null, internalContext.timestamp());
log(key, null, internalContext.recordContext().timestamp());
});
}
}
Expand All @@ -66,7 +66,7 @@ public long approximateNumEntries() {
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
log(key, value, internalContext.timestamp());
log(key, value, internalContext.recordContext().timestamp());
}

@Override
Expand All @@ -75,7 +75,7 @@ public byte[] putIfAbsent(final Bytes key,
final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) {
// then it was absent
log(key, value, internalContext.timestamp());
log(key, value, internalContext.recordContext().timestamp());
}
return previous;
}
Expand All @@ -84,7 +84,7 @@ public byte[] putIfAbsent(final Bytes key,
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
log(entry.key, entry.value, internalContext.timestamp());
log(entry.key, entry.value, internalContext.recordContext().timestamp());
}
}

Expand All @@ -97,7 +97,7 @@ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(
@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
log(key, null, internalContext.timestamp());
log(key, null, internalContext.recordContext().timestamp());
return oldValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public void put(final Bytes key, final byte[] value) {
// we need to log the full new list and thus call get() on the inner store below
// if the value is a tombstone, we delete the whole list and thus can save the get call
if (value == null) {
log(key, null, internalContext.timestamp());
log(key, null, internalContext.recordContext().timestamp());
} else {
log(key, wrapped().get(key), internalContext.timestamp());
log(key, wrapped().get(key), internalContext.recordContext().timestamp());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Byte
@Override
public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey);
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(), wrapped().getPosition());
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), wrapped().getPosition());
}

@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate);
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(), wrapped().getPosition());
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), wrapped().getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
public void put(final Bytes key,
final byte[] valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp);
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp));
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
}

@Override
Expand All @@ -44,7 +44,7 @@ public byte[] putIfAbsent(final Bytes key,
final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
if (previous == null) {
// then it was absent
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp));
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
}
return previous;
}
Expand All @@ -54,7 +54,7 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueAndTimestamp = entry.value;
log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp));
log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void log(final Bytes key,
name(),
key,
rawValue(valueAndTimestamp),
valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.timestamp(),
valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.recordContext().timestamp(),
wrapped().getPosition()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void put(final Bytes key,
}

void log(final Bytes key, final byte[] value) {
internalContext.logChange(name(), key, value, internalContext.timestamp(), wrapped().getPosition());
internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), wrapped().getPosition());
}

private int maybeUpdateSeqnumForDups() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ protected void maybeRecordE2ELatency() {
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.timestamp();
final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ private void maybeRecordE2ELatency() {
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.timestamp();
final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ private void maybeRecordE2ELatency() {
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.timestamp();
final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,13 @@ private void enforceWrappedStore(final WindowStore<Bytes, byte[]> underlying) {
hasIndex = timeOrderedWindowStore.hasIndex();
}

@SuppressWarnings("unchecked")
private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore wrapped) {
if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
return (RocksDBTimeOrderedWindowStore) wrapped;
}
if (wrapped instanceof WrappedStateStore) {
return getWrappedStore(((WrappedStateStore<?, ?, ?>) wrapped).wrapped());
return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>) wrapped).wrapped());
}
return null;
}
Expand Down Expand Up @@ -255,12 +256,13 @@ public synchronized void put(final Bytes key,
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
internalContext.headers(),
internalContext.recordContext().headers(),
true,
internalContext.offset(),
internalContext.timestamp(),
internalContext.partition(),
internalContext.topic());
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
internalContext.recordContext().topic()
);

// Put to index first so that base can be evicted later
if (hasIndex) {
Expand All @@ -274,10 +276,11 @@ public synchronized void put(final Bytes key,
new byte[0],
new RecordHeaders(),
true,
internalContext.offset(),
internalContext.timestamp(),
internalContext.partition(),
"");
internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(),
""
);
final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry);
} else {
Expand Down
Loading

0 comments on commit 07915d7

Please sign in to comment.