Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture RDKafka Statistics as an internal built-in log #6131

Merged
merged 17 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog {
index_id: GlobalId::System(3029),
};

pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog {
name: "mz_kafka_consumer_statistics",
schema: MZ_CATALOG_SCHEMA,
variant: LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
id: GlobalId::System(3030),
index_id: GlobalId::System(3031),
};

lazy_static! {
pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable {
name: "mz_view_keys",
Expand Down Expand Up @@ -1271,6 +1279,7 @@ lazy_static! {
Builtin::Log(&MZ_PEEK_DURATIONS),
Builtin::Log(&MZ_SOURCE_INFO),
Builtin::Log(&MZ_MESSAGE_COUNTS),
Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS),
Builtin::Table(&MZ_VIEW_KEYS),
Builtin::Table(&MZ_VIEW_FOREIGN_KEYS),
Builtin::Table(&MZ_KAFKA_SINKS),
Expand Down
17 changes: 17 additions & 0 deletions src/dataflow-types/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum MaterializedLog {
DataflowCurrent,
DataflowDependency,
FrontierCurrent,
KafkaConsumerInfo,
PeekCurrent,
PeekDuration,
SourceInfo,
Expand Down Expand Up @@ -162,6 +163,18 @@ impl LogVariant {
.with_column("worker", ScalarType::Int64.nullable(false))
.with_column("time", ScalarType::Int64.nullable(false)),

LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty()
.with_column("consumer_name", ScalarType::String.nullable(false))
.with_column("source_id", ScalarType::String.nullable(false))
.with_column("partition_id", ScalarType::String.nullable(false))
.with_column("rx_msgs", ScalarType::Int64.nullable(false))
.with_column("rx_bytes", ScalarType::Int64.nullable(false))
.with_column("tx_msgs", ScalarType::Int64.nullable(false))
.with_column("tx_bytes", ScalarType::Int64.nullable(false))
.with_column("app_offset", ScalarType::Int64.nullable(false))
.with_column("consumer_lag", ScalarType::Int64.nullable(false))
.with_key(vec![0, 1, 2]),

LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty()
.with_column("uuid", ScalarType::String.nullable(false))
.with_column("worker", ScalarType::Int64.nullable(false))
Expand Down Expand Up @@ -219,6 +232,10 @@ impl LogVariant {
LogVariant::Materialized(MaterializedLog::DataflowCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::DataflowDependency) => vec![],
LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![(
LogVariant::Materialized(MaterializedLog::SourceInfo),
vec![(1, 1), (2, 3)],
)],
LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![],
LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![],
Expand Down
106 changes: 88 additions & 18 deletions src/dataflow/src/logging/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

use std::time::Duration;

use differential_dataflow::{difference::DiffPair, operators::count::CountTotal};
use differential_dataflow::difference::DiffPair;
use differential_dataflow::operators::count::CountTotal;
use log::error;
use timely::communication::Allocate;
use timely::dataflow::operators::capture::EventLink;
Expand All @@ -38,6 +39,28 @@ pub enum MaterializedEvent {
/// Globally unique identifier for the source on which the dataflow depends.
source: GlobalId,
},
/// Tracks statistics for a particular Kafka consumer / partition pair
/// Reference: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
KafkaConsumerInfo {
/// Kafka name for the consumer
consumer_name: String,
/// Materialize source identifier
source_id: SourceInstanceId,
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will generate a string that has a global_id and instance_id inline as e.g. u0/1 where the global id tracks the source, and the instance ID tracks the per-worker/per-instantiation key.

I think it'd be better to split these out into two columns, because then the source_id here would be directly comparable to the GlobalIDs in other MaterializedEvent variants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that but didn't see any easy way to split this out. Let me look again!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the fields are pub, I think just renaming this to source: GlobalId, source_instance: u64 should do it.

Copy link
Contributor Author

@cirego cirego Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I missed that SourceInfo does &id.source_id.to_string(). Updated kafka_consumer_info to do the same.

I also added a test case to verify the join behavior.

/// The Kafka partition ID for these metrics (may be multiple per consumer)
partition_id: String,
/// Number of message sets received from Brokers
rxmsgs: i64,
/// Number of bytes received from Brokers
rxbytes: i64,
/// Number of message sets sent to Brokers
txmsgs: i64,
/// Number of bytes transmitted to Brokers
txbytes: i64,
/// How far into the topic our consumer has read
app_offset: i64,
/// How many messages remain until our consumer reaches the (hi|lo) watermark
consumer_lag: i64,
},
/// Peek command, true for install and false for retire.
Peek(Peek, bool),
/// Tracks the source name, id, partition id, and received/ingested offsets
Expand Down Expand Up @@ -103,9 +126,10 @@ pub fn construct<A: Allocate>(
let mut input = demux.new_input(&logs, Pipeline);
let (mut dataflow_out, dataflow) = demux.new_output();
let (mut dependency_out, dependency) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();
let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output();
let (mut peek_out, peek) = demux.new_output();
let (mut source_info_out, source_info) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();

let mut demux_buffer = Vec::new();
demux.build(move |_capability| {
Expand All @@ -114,18 +138,20 @@ pub fn construct<A: Allocate>(
move |_frontiers| {
let mut dataflow = dataflow_out.activate();
let mut dependency = dependency_out.activate();
let mut frontier = frontier_out.activate();
let mut kafka_consumer_info = kafka_consumer_info_out.activate();
let mut peek = peek_out.activate();
let mut source_info = source_info_out.activate();
let mut frontier = frontier_out.activate();

input.for_each(|time, data| {
data.swap(&mut demux_buffer);

let mut dataflow_session = dataflow.session(&time);
let mut dependency_session = dependency.session(&time);
let mut frontier_session = frontier.session(&time);
let mut kafka_consumer_info_session = kafka_consumer_info.session(&time);
let mut peek_session = peek.session(&time);
let mut source_info_session = source_info.session(&time);
let mut frontier_session = frontier.session(&time);

for (time, worker, datum) in demux_buffer.drain(..) {
let time_ns = time.as_nanos() as Timestamp;
Expand Down Expand Up @@ -176,6 +202,40 @@ pub fn construct<A: Allocate>(
),
}
}
MaterializedEvent::Frontier(name, logical, delta) => {
frontier_session.give((
row_packer.pack(&[
Datum::String(&name.to_string()),
Datum::Int64(worker as i64),
Datum::Int64(logical as i64),
]),
time_ms,
delta as isize,
));
}
MaterializedEvent::KafkaConsumerInfo {
consumer_name,
source_id,
partition_id,
rxmsgs,
rxbytes,
txmsgs,
txbytes,
app_offset,
consumer_lag,
} => {
kafka_consumer_info_session.give((
(consumer_name, source_id, partition_id),
time_ms,
DiffPair::new(
DiffPair::new(
DiffPair::new(rxmsgs, rxbytes),
DiffPair::new(txmsgs, txbytes),
),
DiffPair::new(app_offset, consumer_lag),
),
antifuchs marked this conversation as resolved.
Show resolved Hide resolved
));
}
MaterializedEvent::Peek(peek, is_install) => {
peek_session.give((peek, worker, is_install, time_ns))
}
Expand All @@ -192,17 +252,6 @@ pub fn construct<A: Allocate>(
DiffPair::new(offset, timestamp),
));
}
MaterializedEvent::Frontier(name, logical, delta) => {
frontier_session.give((
row_packer.pack(&[
Datum::String(&name.to_string()),
Datum::Int64(worker as i64),
Datum::Int64(logical as i64),
]),
time_ms,
delta as isize,
));
}
}
}
});
Expand Down Expand Up @@ -245,6 +294,26 @@ pub fn construct<A: Allocate>(
}
});

let frontier_current = frontier.as_collection();

use differential_dataflow::operators::Count;
let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({
let mut row_packer = repr::RowPacker::new();
move |((consumer_name, source_id, partition_id), pairs)| {
row_packer.pack(&[
Datum::String(&consumer_name),
Datum::String(&source_id.to_string()),
Datum::String(&partition_id),
Datum::Int64(pairs.element1.element1.element1),
Datum::Int64(pairs.element1.element1.element2),
Datum::Int64(pairs.element1.element2.element1),
Datum::Int64(pairs.element1.element2.element2),
Datum::Int64(pairs.element2.element1),
Datum::Int64(pairs.element2.element2),
])
}
antifuchs marked this conversation as resolved.
Show resolved Hide resolved
});

let peek_current = peek
.map(move |(name, worker, is_install, time_ns)| {
let time_ms = (time_ns / 1_000_000) as Timestamp;
Expand All @@ -265,7 +334,6 @@ pub fn construct<A: Allocate>(
}
});

use differential_dataflow::operators::Count;
let source_info_current = source_info.as_collection().count().map({
let mut row_packer = repr::RowPacker::new();
move |((name, id, pid), pair)| {
Expand All @@ -282,8 +350,6 @@ pub fn construct<A: Allocate>(
}
});

let frontier_current = frontier.as_collection();

// Duration statistics derive from the non-rounded event times.
let peek_duration = peek
.unary(
Expand Down Expand Up @@ -361,6 +427,10 @@ pub fn construct<A: Allocate>(
LogVariant::Materialized(MaterializedLog::FrontierCurrent),
frontier_current,
),
(
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
kafka_consumer_info_current,
),
(
LogVariant::Materialized(MaterializedLog::PeekCurrent),
peek_current,
Expand Down
3 changes: 3 additions & 0 deletions src/dataflow/src/source/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use expr::{PartitionId, SourceInstanceId};
use mz_avro::types::Value;
use mz_avro::{AvroRead, Schema, Skip};

use crate::logging::materialized::Logger;
use crate::source::{NextMessage, SourceMessage, SourceReader};

/// Contains all information necessary to ingest data from file sources (either
Expand Down Expand Up @@ -63,6 +64,7 @@ impl SourceReader<Value> for FileSourceReader<Value> {
consumer_activator: SyncActivator,
connector: ExternalSourceConnector,
encoding: DataEncoding,
_: Option<Logger>,
) -> Result<(FileSourceReader<Value>, Option<PartitionId>), anyhow::Error> {
let receiver = match connector {
ExternalSourceConnector::AvroOcf(oc) => {
Expand Down Expand Up @@ -139,6 +141,7 @@ impl SourceReader<Vec<u8>> for FileSourceReader<Vec<u8>> {
consumer_activator: SyncActivator,
connector: ExternalSourceConnector,
_: DataEncoding,
_: Option<Logger>,
) -> Result<(FileSourceReader<Vec<u8>>, Option<PartitionId>), anyhow::Error> {
let receiver = match connector {
ExternalSourceConnector::File(fc) => {
Expand Down
Loading