#schema_registry_converter
This library provides a way of using the Confluent Schema Registry in a way that is compliant with the usual jvm usage.
The release notes can be found on github
Consuming/decoding and producing/encoding is supported. It's also possible to provide the schema to use when decoding. When no schema is provided, the latest
schema with the same subject
will be used. It's feature complete compared to the confluent java version. However it does not yet support adding credentials
to the requests to the schema registry like done for java as part of the 5.3 release. Please create
and issue if you would like to use this feature.
For consuming messages encoded with the schema registry, you need to fetch the correct schema from the schema registry to transform it into a record. For clarity, error handling is omitted from the diagram.
For producing messages which can be properly consumed by other clients, the proper id needs to be encoded with the message. To get the correct id, it might be necessary to register a new schema. For clarity, error handling is omitted from the diagram.
schema_registry_converter.rs is available on crates.io. It is recommended to look there for the newest and more elaborate documentation.
[dependencies]
schema_registry_converter = "2.0.0"
...and see the docs for how to use it.
Two examples of but consuming/decoding and producing/encoding.
To use structs the must have an implementation of either the serde::Deserialize
or serde::Serialize
trait to work.
use rdkafka::message::{Message, BorrowedMessage};
use avro_rs::types::Value;
use schema_registry_converter::{Decoder, Encoder};
use schema_registry_converter::schema_registry::SubjectNameStrategy;
fn main() {
let mut decoder = Decoder::new("http://localhost:8081".into());
let mut encoder = Encoder::new("http://localhost:8081".into());
}
fn get_value<'a>(
msg: &'a BorrowedMessage,
decoder: &'a mut Decoder,
) -> Value{
match decoder.decode(msg.payload()){
Ok(v) => v,
Err(e) => panic!("Error getting value: {}", e),
}
}
fn get_heartbeat<'a>(
msg: &'a BorrowedMessage,
decoder: &'a mut Decoder,
) -> Heartbeat{
match decoder.decode_with_name(msg.payload()){
Ok((name, value)) => {
match name.name.as_str() {
"Heartbeat" => {
match name.namespace{
Some(namespace) => {
match namespace.as_str(){
"nl.openweb.data" => from_value::<Heartbeat>(&value).unwrap(),
ns=> panic!("Unexpected namespace {}", ns),
}
},
None => panic!("No namespace in schema, while expected"),
}
}
name=> panic!("Unexpected name {}", name),
}
}
Err(e) => panic!("error getting heartbeat: {}, e"),
}
}
fn get_future_record<'a>(
topic: &'a str,
key: Option<&'a str>,
values: Vec<(&'static str, Value)>,
encoder: &'a mut Encoder,
) -> FutureRecord<'a>{
let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
let payload = match encoder.encode(values, &subject_name_strategy) {
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
FutureRecord {
topic,
partition: None,
payload: Some(&payload),
key,
timestamp: None,
headers: None,
}
}
fn get_future_record_from_struct<'a>(
topic: &'a str,
key: Option<&'a str>,
heartbeat: Heartbeat,
encoder: &'a mut Encoder,
) -> FutureRecord<'a>{
let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
let payload = match encoder.encode_struct(heartbeat, &subject_name_strategy) {
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
FutureRecord {
topic,
partition: None,
payload: Some(&payload),
key,
timestamp: None,
headers: None,
}
}
use schema_registry_converter::schema_registry::{
post_schema,
SuppliedSchema
};
fn main(){
let heartbeat_schema = SuppliedSchema::new(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#.into());
let result = post_schema("http://localhost:8081/subjects/test-value/versions", heartbeat_schema);
}
The avro part of the conversion is handled by avro-rs. As such, I don't include tests for every possible schema. While I used rdkafka in combination to successfully consume from and produce to kafka, and while it's used in the example, this crate has no direct dependency on it. All this crate does is convert [u8] <-> avro_rs::types::Value.
Due to mockito, used for mocking the schema registry responses, being run in a separate thread, tests have to be run using --test-threads=1
for example like
cargo +stable test --color=always -- --nocapture --test-threads=1
The integration tests require a Kafka cluster running on the default ports. It will create topics, register schema's, produce and consume some messages.
They are marked with kafka_test
so to include them in testing cargo +stable test --features kafka_test --color=always -- --nocapture --test-threads=1
need to be run.
The easiest way to run them is with the confluent cli. The 'prepare_integration_test.sh' script can be used to create the 3 topics needed for the tests, but even without those the test pass.
This project is licensed under either of
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Schema Registry Converter by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.