Skip to content

Commit

Permalink
[admin-tool][server] Add two admin tool commands for dumping consumer…
Browse files Browse the repository at this point in the history
… ingestion states and heartbeat states; Add logging for stale heartbeat replicas
  • Loading branch information
sixpluszero committed Oct 25, 2024
1 parent a698f3e commit 2d22694
Show file tree
Hide file tree
Showing 26 changed files with 971 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public abstract long getLatestOffsetBasedOnMetrics(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition);

public abstract Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
public abstract Map<String, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition);

public abstract Map<String, ConsumerServiceIngestionInfo> getIngestionInfoFromConsumerServices();

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public class AggKafkaConsumerService extends AbstractVeniceService {
new VeniceJsonSerializer<>(new TypeReference<Map<String, Map<String, TopicPartitionIngestionInfo>>>() {
});

private final VeniceJsonSerializer<Map<String, Map<String, ConsumerServiceIngestionInfo>>> consumerIngestionContextJsonSerializer =
new VeniceJsonSerializer<>(new TypeReference<Map<String, Map<String, ConsumerServiceIngestionInfo>>>() {
});

public AggKafkaConsumerService(
final PubSubConsumerAdapterFactory consumerFactory,
final PubSubPropertiesSupplier pubSubPropertiesSupplier,
Expand Down Expand Up @@ -475,16 +479,26 @@ byte[] getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSub
Map<String, Map<String, TopicPartitionIngestionInfo>> topicPartitionIngestionContext = new HashMap<>();
for (String kafkaUrl: kafkaServerToConsumerServiceMap.keySet()) {
AbstractKafkaConsumerService consumerService = getKafkaConsumerService(kafkaUrl);
Map<PubSubTopicPartition, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap =
Map<String, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap =
consumerService.getIngestionInfoFromConsumer(versionTopic, pubSubTopicPartition);
for (Map.Entry<PubSubTopicPartition, TopicPartitionIngestionInfo> entry: topicPartitionIngestionInfoMap
.entrySet()) {
PubSubTopicPartition topicPartition = entry.getKey();
for (Map.Entry<String, TopicPartitionIngestionInfo> entry: topicPartitionIngestionInfoMap.entrySet()) {
TopicPartitionIngestionInfo topicPartitionIngestionInfo = entry.getValue();
topicPartitionIngestionContext.computeIfAbsent(kafkaUrl, k -> new HashMap<>())
.put(topicPartition.toString(), topicPartitionIngestionInfo);
.put(entry.getKey(), topicPartitionIngestionInfo);
}
}
return topicPartitionIngestionContextJsonSerializer.serialize(topicPartitionIngestionContext, "");
}

byte[] getIngestionInfoForConsumerServices() throws IOException {
Map<String, Map<String, ConsumerServiceIngestionInfo>> aggregateIngestionInfoMap = new VeniceConcurrentHashMap<>();

for (String kafkaUrl: kafkaServerToConsumerServiceMap.keySet()) {
AbstractKafkaConsumerService consumerService = getKafkaConsumerService(kafkaUrl);
Map<String, ConsumerServiceIngestionInfo> topicPartitionIngestionInfoMap =
consumerService.getIngestionInfoFromConsumerServices();
aggregateIngestionInfoMap.put(kafkaUrl, topicPartitionIngestionInfoMap);
}
return consumerIngestionContextJsonSerializer.serialize(aggregateIngestionInfoMap, "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.davinci.kafka.consumer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;


public class ConsumerIngestionInfo {
private String consumerName;
private Map<String, TopicPartitionIngestionInfo> consumerIngestionInfo;

@JsonCreator
public ConsumerIngestionInfo(
@JsonProperty("consumerName") String consumerName,
@JsonProperty("consumerIngestionInfo") Map<String, TopicPartitionIngestionInfo> consumerIngestionInfo) {
this.consumerIngestionInfo = consumerIngestionInfo;
this.consumerName = consumerName;
}

public String getConsumerName() {
return consumerName;
}

public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}

public Map<String, TopicPartitionIngestionInfo> getConsumerIngestionInfo() {
return consumerIngestionInfo;
}

public void setConsumerIngestionInfo(Map<String, TopicPartitionIngestionInfo> consumerIngestionInfo) {
this.consumerIngestionInfo = consumerIngestionInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ConsumerIngestionInfo consumerIngestionInfo = (ConsumerIngestionInfo) o;
return this.consumerName.equals(consumerIngestionInfo.getConsumerName())
&& this.consumerIngestionInfo.equals(consumerIngestionInfo.getConsumerIngestionInfo());
}

@Override
public int hashCode() {
int result = consumerName.hashCode();
result = 31 * result + consumerIngestionInfo.hashCode();
return result;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");

for (Map.Entry<String, TopicPartitionIngestionInfo> entry: consumerIngestionInfo.entrySet()) {
sb.append("\"").append(entry.getKey()).append("\"").append("=").append(entry.getValue().toString()).append(", ");
}

// Remove trailing comma and space, if any
if (!consumerIngestionInfo.isEmpty()) {
sb.setLength(sb.length() - 2);
}

sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.davinci.kafka.consumer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;


public class ConsumerServiceIngestionInfo {
private String consumerServiceName;
private Map<String, ConsumerIngestionInfo> consumerServiceIngestionInfo;

@JsonCreator
public ConsumerServiceIngestionInfo(
@JsonProperty("consumerServiceName") String consumerServiceName,
@JsonProperty("consumerServiceIngestionInfo") Map<String, ConsumerIngestionInfo> consumerServiceIngestionInfo) {
this.consumerServiceIngestionInfo = consumerServiceIngestionInfo;
this.consumerServiceName = consumerServiceName;
}

public void setConsumerServiceName(String consumerServiceName) {
this.consumerServiceName = consumerServiceName;
}

public String getConsumerServiceName() {
return consumerServiceName;
}

public void setConsumerServiceIngestionInfo(Map<String, ConsumerIngestionInfo> consumerServiceIngestionInfo) {
this.consumerServiceIngestionInfo = consumerServiceIngestionInfo;
}

public Map<String, ConsumerIngestionInfo> getConsumerServiceIngestionInfo() {
return consumerServiceIngestionInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ConsumerServiceIngestionInfo consumerServiceIngestionInfo = (ConsumerServiceIngestionInfo) o;
return this.consumerServiceName.equals(consumerServiceIngestionInfo.getConsumerServiceName())
&& this.consumerServiceIngestionInfo.equals(consumerServiceIngestionInfo.getConsumerServiceIngestionInfo());
}

@Override
public int hashCode() {
int result = consumerServiceName.hashCode();
result = 31 * result + consumerServiceIngestionInfo.hashCode();
return result;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");

for (Map.Entry<String, ConsumerIngestionInfo> entry: consumerServiceIngestionInfo.entrySet()) {
sb.append("\"").append(entry.getKey()).append("\"").append("=").append(entry.getValue().toString()).append(", ");
}

// Remove trailing comma and space, if any
if (!consumerServiceIngestionInfo.isEmpty()) {
sb.setLength(sb.length() - 2);
}

sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -494,37 +495,60 @@ private long getSomeOffsetFor(
}
}

public Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
public Map<String, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
SharedKafkaConsumer consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
Map<PubSubTopicPartition, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap = new HashMap<>();
if (consumer != null) {
ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
String consumerIdStr = consumptionTask.getTaskIdStr();
for (PubSubTopicPartition topicPartition: consumer.getAssignment()) {
long offsetLag = consumer.getOffsetLag(topicPartition);
long latestOffset = consumer.getLatestOffset(topicPartition);
double msgRate = consumptionTask.getMessageRate(topicPartition);
double byteRate = consumptionTask.getByteRate(topicPartition);
long lastSuccessfulPollTimestamp = consumptionTask.getLastSuccessfulPollTimestamp(topicPartition);
long elapsedTimeSinceLastPollInMs = ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP;
if (lastSuccessfulPollTimestamp != ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP) {
elapsedTimeSinceLastPollInMs =
LatencyUtils.getElapsedTimeFromMsToMs(consumptionTask.getLastSuccessfulPollTimestamp());
}
PubSubTopic destinationVersionTopic = consumptionTask.getDestinationIdentifier(topicPartition);
String destinationVersionTopicName = destinationVersionTopic == null ? "" : destinationVersionTopic.getName();
TopicPartitionIngestionInfo topicPartitionIngestionInfo = new TopicPartitionIngestionInfo(
latestOffset,
offsetLag,
msgRate,
byteRate,
consumerIdStr,
elapsedTimeSinceLastPollInMs,
destinationVersionTopicName);
topicPartitionIngestionInfoMap.put(topicPartition, topicPartitionIngestionInfo);
return getIngestionInfoFromConsumer(consumer);
}

public Map<String, ConsumerServiceIngestionInfo> getIngestionInfoFromConsumerServices() {
Map<String, ConsumerIngestionInfo> result = new VeniceConcurrentHashMap<>();
for (Map.Entry<SharedKafkaConsumer, ConsumptionTask> entry: consumerToConsumptionTask.entrySet()) {
Map<String, TopicPartitionIngestionInfo> consumerResult = getIngestionInfoFromConsumer(entry.getKey());
if (consumerResult.isEmpty()) {
continue;
}
ConsumerIngestionInfo consumerIngestionInfo =
new ConsumerIngestionInfo(entry.getValue().getTaskIdStr(), consumerResult);
result.put(entry.getValue().getTaskIdStr(), consumerIngestionInfo);
}
if (result.isEmpty()) {
return Collections.emptyMap();
}
String consumerServiceName = this.kafkaUrl + "-" + this.poolType;
return Collections.singletonMap(consumerServiceName, new ConsumerServiceIngestionInfo(consumerServiceName, result));
}

Map<String, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(SharedKafkaConsumer consumer) {
if (consumer == null) {
return Collections.emptyMap();
}
Map<String, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap = new HashMap<>();
ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
String consumerIdStr = consumptionTask.getTaskIdStr();
for (PubSubTopicPartition topicPartition: consumer.getAssignment()) {
long offsetLag = consumer.getOffsetLag(topicPartition);
long latestOffset = consumer.getLatestOffset(topicPartition);
double msgRate = consumptionTask.getMessageRate(topicPartition);
double byteRate = consumptionTask.getByteRate(topicPartition);
long lastSuccessfulPollTimestamp = consumptionTask.getLastSuccessfulPollTimestamp(topicPartition);
long elapsedTimeSinceLastPollInMs = ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP;
if (lastSuccessfulPollTimestamp != ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP) {
elapsedTimeSinceLastPollInMs =
LatencyUtils.getElapsedTimeFromMsToMs(consumptionTask.getLastSuccessfulPollTimestamp());
}
PubSubTopic destinationVersionTopic = consumptionTask.getDestinationIdentifier(topicPartition);
String destinationVersionTopicName = destinationVersionTopic == null ? "" : destinationVersionTopic.getName();
TopicPartitionIngestionInfo topicPartitionIngestionInfo = new TopicPartitionIngestionInfo(
latestOffset,
offsetLag,
msgRate,
byteRate,
consumerIdStr,
elapsedTimeSinceLastPollInMs,
destinationVersionTopicName);
topicPartitionIngestionInfoMap.put(topicPartition.toString(), topicPartitionIngestionInfo);
}
return topicPartitionIngestionInfoMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicP
}

@Override
public Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
public Map<String, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(versionTopic, pubSubTopicPartition);
Expand All @@ -235,6 +235,17 @@ public Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoFr
}
}

@Override
public Map<String, ConsumerServiceIngestionInfo> getIngestionInfoFromConsumerServices() {
Map<String, ConsumerServiceIngestionInfo> aggregateResult = new VeniceConcurrentHashMap<>();
for (KafkaConsumerService kafkaConsumerService: consumerServices) {
Map<String, ConsumerServiceIngestionInfo> consumerServiceResult =
kafkaConsumerService.getIngestionInfoFromConsumerServices();
aggregateResult.putAll(consumerServiceResult);
}
return aggregateResult;
}

@Override
public boolean startInner() throws Exception {
consumerServices.forEach(KafkaConsumerService::start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,6 @@ private Properties getKafkaConsumerProperties(VeniceStoreVersionConfig storeConf
return kafkaConsumerProperties;
}

@Override
public ByteBuffer getStoreVersionCompressionDictionary(String topicName) {
return storageMetadataService.getStoreVersionCompressionDictionary(topicName);
}
Expand All @@ -1157,7 +1156,6 @@ public StoreIngestionTask getStoreIngestionTask(String topicName) {
return topicNameToIngestionTaskMap.get(topicName);
}

@Override
public AdminResponse getConsumptionSnapshots(String topicName, ComplementSet<Integer> partitions) {
AdminResponse response = new AdminResponse();
StoreIngestionTask ingestionTask = getStoreIngestionTask(topicName);
Expand Down Expand Up @@ -1195,6 +1193,19 @@ public TopicPartitionIngestionContextResponse getTopicPartitionIngestionContext(
return topicPartitionIngestionContextResponse;
}

public TopicPartitionIngestionContextResponse getIngestionContext() {
TopicPartitionIngestionContextResponse response = new TopicPartitionIngestionContextResponse();
try {
byte[] topicPartitionInfo = aggKafkaConsumerService.getIngestionInfoForConsumerServices();
response.setTopicPartitionIngestionContext(topicPartitionInfo);
} catch (Exception e) {
response.setError(true);
response.setMessage(e.getMessage());
LOGGER.error("Error on get topic partition ingestion context for all consumers", e);
}
return response;
}

/**
* This method updates all sub-partitions' latest offset records fetched from isolated ingestion process
* in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.IngestionMetadataRetriever;
import java.util.Set;
import java.util.concurrent.CompletableFuture;


/**
* An interface for Store Ingestion Service for Venice.
*/
public interface StoreIngestionService extends IngestionMetadataRetriever {
public interface StoreIngestionService {
/**
* Starts consuming messages from Kafka Partition corresponding to Venice Partition.
* @param veniceStore Venice Store for the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,12 @@ public int hashCode() {
result = 31 * result + versionTopicName.hashCode();
return result;
}

@Override
public String toString() {
return "{" + "\"latestOffset\"=" + latestOffset + ", \"offsetLag\"=" + offsetLag + ", \"msgRate\"=" + msgRate
+ ", \"byteRate\"=" + byteRate + ", \"consumerIdStr\"='" + consumerIdStr + '\''
+ ", \"elapsedTimeSinceLastPollInMs\"=" + elapsedTimeSinceLastPollInMs + ", \"versionTopicName\"=\""
+ versionTopicName + "\"" + '}';
}
}
Loading

0 comments on commit 2d22694

Please sign in to comment.