Skip to content

Commit

Permalink
Capture RDKafka Consumer Statistics as an internal built-in log (Mate…
Browse files Browse the repository at this point in the history
…rializeInc#6131)

Kafka sources can be defined with a statistics_interval_ms option that
defines how often librdkafka / rdkafka should call into Materialize code
with kafka client statistics. Previously, we simply printed these metrics
to the logfile. This commit stashes a select number of metrics in a system
table to give users the ability to query these metrics directly.

Fixes #5666
  • Loading branch information
cirego authored and philip-stoev committed Apr 6, 2021
1 parent 836348d commit 4f956bb
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 33 deletions.
9 changes: 9 additions & 0 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog {
index_id: GlobalId::System(3029),
};

pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog {
name: "mz_kafka_consumer_statistics",
schema: MZ_CATALOG_SCHEMA,
variant: LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
id: GlobalId::System(3030),
index_id: GlobalId::System(3031),
};

lazy_static! {
pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable {
name: "mz_view_keys",
Expand Down Expand Up @@ -1271,6 +1279,7 @@ lazy_static! {
Builtin::Log(&MZ_PEEK_DURATIONS),
Builtin::Log(&MZ_SOURCE_INFO),
Builtin::Log(&MZ_MESSAGE_COUNTS),
Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS),
Builtin::Table(&MZ_VIEW_KEYS),
Builtin::Table(&MZ_VIEW_FOREIGN_KEYS),
Builtin::Table(&MZ_KAFKA_SINKS),
Expand Down
21 changes: 21 additions & 0 deletions src/dataflow-types/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum MaterializedLog {
DataflowCurrent,
DataflowDependency,
FrontierCurrent,
KafkaConsumerInfo,
PeekCurrent,
PeekDuration,
SourceInfo,
Expand Down Expand Up @@ -162,6 +163,22 @@ impl LogVariant {
.with_column("worker", ScalarType::Int64.nullable(false))
.with_column("time", ScalarType::Int64.nullable(false)),

LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty()
.with_column("consumer_name", ScalarType::String.nullable(false))
.with_column("source_id", ScalarType::String.nullable(false))
.with_column("dataflow_id", ScalarType::Int64.nullable(false))
.with_column("partition_id", ScalarType::String.nullable(false))
.with_column("rx_msgs", ScalarType::Int64.nullable(false))
.with_column("rx_bytes", ScalarType::Int64.nullable(false))
.with_column("tx_msgs", ScalarType::Int64.nullable(false))
.with_column("tx_bytes", ScalarType::Int64.nullable(false))
.with_column("lo_offset", ScalarType::Int64.nullable(false))
.with_column("hi_offset", ScalarType::Int64.nullable(false))
.with_column("ls_offset", ScalarType::Int64.nullable(false))
.with_column("app_offset", ScalarType::Int64.nullable(false))
.with_column("consumer_lag", ScalarType::Int64.nullable(false))
.with_key(vec![0, 1, 2]),

LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty()
.with_column("uuid", ScalarType::String.nullable(false))
.with_column("worker", ScalarType::Int64.nullable(false))
Expand Down Expand Up @@ -219,6 +236,10 @@ impl LogVariant {
LogVariant::Materialized(MaterializedLog::DataflowCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::DataflowDependency) => vec![],
LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![(
LogVariant::Materialized(MaterializedLog::SourceInfo),
vec![(1, 1), (2, 2), (3, 3)],
)],
LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![],
LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![],
Expand Down
123 changes: 105 additions & 18 deletions src/dataflow/src/logging/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

use std::time::Duration;

use differential_dataflow::{difference::DiffPair, operators::count::CountTotal};
use differential_dataflow::difference::{DiffPair, DiffVector};
use differential_dataflow::operators::count::CountTotal;
use log::error;
use timely::communication::Allocate;
use timely::dataflow::operators::capture::EventLink;
Expand All @@ -38,6 +39,34 @@ pub enum MaterializedEvent {
/// Globally unique identifier for the source on which the dataflow depends.
source: GlobalId,
},
/// Tracks statistics for a particular Kafka consumer / partition pair
/// Reference: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
KafkaConsumerInfo {
/// Kafka name for the consumer
consumer_name: String,
/// Materialize source identifier
source_id: SourceInstanceId,
/// The Kafka partition ID for these metrics (may be multiple per consumer)
partition_id: String,
/// Number of message sets received from Brokers
rxmsgs: i64,
/// Number of bytes received from Brokers
rxbytes: i64,
/// Number of message sets sent to Brokers
txmsgs: i64,
/// Number of bytes transmitted to Brokers
txbytes: i64,
/// Partition's low watermark offset on the broker
lo_offset: i64,
/// Partition's high watermark offset on the broker
hi_offset: i64,
/// Last stable offset on the broker
ls_offset: i64,
/// How far into the topic our consumer has read
app_offset: i64,
/// How many messages remain until our consumer reaches the (hi|lo) watermark
consumer_lag: i64,
},
/// Peek command, true for install and false for retire.
Peek(Peek, bool),
/// Tracks the source name, id, partition id, and received/ingested offsets
Expand Down Expand Up @@ -103,9 +132,10 @@ pub fn construct<A: Allocate>(
let mut input = demux.new_input(&logs, Pipeline);
let (mut dataflow_out, dataflow) = demux.new_output();
let (mut dependency_out, dependency) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();
let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output();
let (mut peek_out, peek) = demux.new_output();
let (mut source_info_out, source_info) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();

let mut demux_buffer = Vec::new();
demux.build(move |_capability| {
Expand All @@ -114,18 +144,20 @@ pub fn construct<A: Allocate>(
move |_frontiers| {
let mut dataflow = dataflow_out.activate();
let mut dependency = dependency_out.activate();
let mut frontier = frontier_out.activate();
let mut kafka_consumer_info = kafka_consumer_info_out.activate();
let mut peek = peek_out.activate();
let mut source_info = source_info_out.activate();
let mut frontier = frontier_out.activate();

input.for_each(|time, data| {
data.swap(&mut demux_buffer);

let mut dataflow_session = dataflow.session(&time);
let mut dependency_session = dependency.session(&time);
let mut frontier_session = frontier.session(&time);
let mut kafka_consumer_info_session = kafka_consumer_info.session(&time);
let mut peek_session = peek.session(&time);
let mut source_info_session = source_info.session(&time);
let mut frontier_session = frontier.session(&time);

for (time, worker, datum) in demux_buffer.drain(..) {
let time_ns = time.as_nanos() as Timestamp;
Expand Down Expand Up @@ -176,6 +208,47 @@ pub fn construct<A: Allocate>(
),
}
}
MaterializedEvent::Frontier(name, logical, delta) => {
frontier_session.give((
row_packer.pack(&[
Datum::String(&name.to_string()),
Datum::Int64(worker as i64),
Datum::Int64(logical as i64),
]),
time_ms,
delta as isize,
));
}
MaterializedEvent::KafkaConsumerInfo {
consumer_name,
source_id,
partition_id,
rxmsgs,
rxbytes,
txmsgs,
txbytes,
lo_offset,
hi_offset,
ls_offset,
app_offset,
consumer_lag,
} => {
kafka_consumer_info_session.give((
(consumer_name, source_id, partition_id),
time_ms,
DiffVector::new(vec![
rxmsgs,
rxbytes,
txmsgs,
txbytes,
lo_offset,
hi_offset,
ls_offset,
app_offset,
consumer_lag,
]),
));
}
MaterializedEvent::Peek(peek, is_install) => {
peek_session.give((peek, worker, is_install, time_ns))
}
Expand All @@ -192,17 +265,6 @@ pub fn construct<A: Allocate>(
DiffPair::new(offset, timestamp),
));
}
MaterializedEvent::Frontier(name, logical, delta) => {
frontier_session.give((
row_packer.pack(&[
Datum::String(&name.to_string()),
Datum::Int64(worker as i64),
Datum::Int64(logical as i64),
]),
time_ms,
delta as isize,
));
}
}
}
});
Expand Down Expand Up @@ -245,6 +307,30 @@ pub fn construct<A: Allocate>(
}
});

let frontier_current = frontier.as_collection();

use differential_dataflow::operators::Count;
let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({
let mut row_packer = repr::RowPacker::new();
move |((consumer_name, source_id, partition_id), diff_vector)| {
row_packer.pack(&[
Datum::String(&consumer_name),
Datum::String(&source_id.source_id.to_string()),
Datum::Int64(source_id.dataflow_id as i64),
Datum::String(&partition_id),
Datum::Int64(diff_vector[0]),
Datum::Int64(diff_vector[1]),
Datum::Int64(diff_vector[2]),
Datum::Int64(diff_vector[3]),
Datum::Int64(diff_vector[4]),
Datum::Int64(diff_vector[5]),
Datum::Int64(diff_vector[6]),
Datum::Int64(diff_vector[7]),
Datum::Int64(diff_vector[8]),
])
}
});

let peek_current = peek
.map(move |(name, worker, is_install, time_ns)| {
let time_ms = (time_ns / 1_000_000) as Timestamp;
Expand All @@ -265,7 +351,6 @@ pub fn construct<A: Allocate>(
}
});

use differential_dataflow::operators::Count;
let source_info_current = source_info.as_collection().count().map({
let mut row_packer = repr::RowPacker::new();
move |((name, id, pid), pair)| {
Expand All @@ -282,8 +367,6 @@ pub fn construct<A: Allocate>(
}
});

let frontier_current = frontier.as_collection();

// Duration statistics derive from the non-rounded event times.
let peek_duration = peek
.unary(
Expand Down Expand Up @@ -361,6 +444,10 @@ pub fn construct<A: Allocate>(
LogVariant::Materialized(MaterializedLog::FrontierCurrent),
frontier_current,
),
(
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
kafka_consumer_info_current,
),
(
LogVariant::Materialized(MaterializedLog::PeekCurrent),
peek_current,
Expand Down
3 changes: 3 additions & 0 deletions src/dataflow/src/source/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use expr::{PartitionId, SourceInstanceId};
use mz_avro::types::Value;
use mz_avro::{AvroRead, Schema, Skip};

use crate::logging::materialized::Logger;
use crate::source::{NextMessage, SourceMessage, SourceReader};

/// Contains all information necessary to ingest data from file sources (either
Expand Down Expand Up @@ -63,6 +64,7 @@ impl SourceReader<Value> for FileSourceReader<Value> {
consumer_activator: SyncActivator,
connector: ExternalSourceConnector,
encoding: DataEncoding,
_: Option<Logger>,
) -> Result<(FileSourceReader<Value>, Option<PartitionId>), anyhow::Error> {
let receiver = match connector {
ExternalSourceConnector::AvroOcf(oc) => {
Expand Down Expand Up @@ -139,6 +141,7 @@ impl SourceReader<Vec<u8>> for FileSourceReader<Vec<u8>> {
consumer_activator: SyncActivator,
connector: ExternalSourceConnector,
_: DataEncoding,
_: Option<Logger>,
) -> Result<(FileSourceReader<Vec<u8>>, Option<PartitionId>), anyhow::Error> {
let receiver = match connector {
ExternalSourceConnector::File(fc) => {
Expand Down
Loading

0 comments on commit 4f956bb

Please sign in to comment.