Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KafkaIO] Share size and offset estimators among ReadFromKafkaDoFn instances per PTransform #32928

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ public synchronized void assign(final Collection<TopicPartition> assigned) {
.collect(Collectors.toList());
super.assign(realPartitions);
assignedPartitions.set(ImmutableList.copyOf(realPartitions));
for (TopicPartition tp : realPartitions) {
updateBeginningOffsets(ImmutableMap.of(tp, 0L));
updateEndOffsets(ImmutableMap.of(tp, (long) kafkaRecords.get(tp).size()));
}
}
// Override offsetsForTimes() in order to look up the offsets by timestamp.
@Override
Expand All @@ -163,8 +159,12 @@ public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
}
};

for (String topic : getTopics()) {
consumer.updatePartitions(topic, partitionInfoMap.get(topic));
for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry :
kafkaRecords.entrySet()) {
consumer.updatePartitions(
entry.getKey().topic(), partitionInfoMap.get(entry.getKey().topic()));
consumer.updateBeginningOffsets(ImmutableMap.of(entry.getKey(), 0L));
consumer.updateEndOffsets(ImmutableMap.of(entry.getKey(), (long) entry.getValue().size()));
}

Runnable recordEnqueueTask =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -228,10 +227,6 @@ public boolean advance() throws IOException {
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
rawSizes.update(recordSize);

for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
backlogBytesOfSplit.set(backlogSplit.getValue());
}

// Pass metrics to container.
kafkaResults.updateKafkaMetrics();
return true;
Expand Down Expand Up @@ -350,7 +345,6 @@ public long getSplitBacklogBytes() {
private final Counter bytesReadBySplit;
private final Gauge backlogBytesOfSplit;
private final Gauge backlogElementsOfSplit;
private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;
private final Counter checkpointMarkCommitsEnqueued =
Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
// Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
Expand Down Expand Up @@ -507,10 +501,6 @@ Instant updateAndGetWatermark() {
lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
return lastWatermark;
}

String name() {
return this.topicPartition.toString();
}
}

KafkaUnboundedReader(
Expand Down Expand Up @@ -555,16 +545,14 @@ String name() {
prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis()));
}

PartitionState<K, V> state =
new PartitionState<K, V>(
states.add(
new PartitionState<>(
tp,
nextOffset,
source
.getSpec()
.getTimestampPolicyFactory()
.createTimestampPolicy(tp, prevWatermark));
states.add(state);
perPartitionBacklogMetrics.put(state.name(), 0L);
.createTimestampPolicy(tp, prevWatermark)));
}

partitionStates = ImmutableList.copyOf(states);
Expand Down Expand Up @@ -681,6 +669,8 @@ private void nextBatch() throws IOException {

partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator());

reportBacklog();

// cycle through the partitions in order to interleave records from each.
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
}
Expand Down Expand Up @@ -754,7 +744,6 @@ private long getSplitBacklogMessageCount() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
perPartitionBacklogMetrics.put(p.name(), pBacklog);
backlogCount += pBacklog;
}

Expand Down
Loading
Loading