From 14e2247881d4f6a6dc1f897bbefcf8f8fe7a2e95 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Tue, 10 Nov 2020 19:28:59 +0100 Subject: [PATCH] fix https://github.com/gklijs/schema_registry_converter/issues/39 and prepare for 2.0.1 --- Cargo.toml | 44 ++++++++++++++---------------- README.md | 11 ++++++-- RELEASE_NOTES.md | 7 ++++- src/async_impl/avro.rs | 40 ++++++---------------------- src/avro_common.rs | 41 ++++++---------------------- src/blocking/avro.rs | 47 +++++++-------------------------- src/lib.rs | 2 +- tests/blocking/avro_consumer.rs | 2 +- tests/blocking/avro_tests.rs | 2 +- 9 files changed, 64 insertions(+), 132 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 029d80d..fcae054 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "schema_registry_converter" -version = "2.0.0" +version = "2.0.1" authors = ["Gerard Klijs "] include = ["src/**/*", "Cargo.toml"] description = "Encode/decode data from/to kafka using the Confluent Schema Registry" @@ -21,26 +21,37 @@ proto_raw = ["integer-encoding", "logos"] kafka_test = [] default = ["futures"] -[dependencies.avro-rs] +[dependencies.byteorder] +version = "^1.3" + +[dependencies.failure] +version = "^0.1" + +[dependencies.reqwest] version = "^0.10" +features = ["json"] + +[dependencies.serde] +version = "^1.0" +features = ["derive"] + +[dependencies.serde_json] +version = "^1.0" + +[dependencies.avro-rs] +version = "^0.11" optional = true [dependencies.bytes] version = "^0.5" optional = true -[dependencies.byteorder] -version = "^1.3" - -[dependencies.failure] -version = "^0.1" - [dependencies.futures] version = "^0.3" optional = true [dependencies.integer-encoding] -version = "^1.1" +version = "^2.1" optional = true [dependencies.logos] @@ -51,21 +62,6 @@ optional = true version = "^0.2" optional = true -[dependencies.reqwest] -version = "^0.10" -features = ["json"] - -[dependencies.serde] -version = "^1.0" -features = ["derive"] - -[dependencies.serde_json] -version = "^1.0" - -[dependencies.tokio] -version = "0.2.22" -optional = true - [dependencies.url] version = "^2" optional = true diff --git a/README.md b/README.md index 69f2f23..02e1346 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ To use it to convert using avro async use: ```toml [dependencies] -schema_registry_converter = { version = "2.0.0", features = ["avro"] } +schema_registry_converter = { version = "2.0.1", features = ["avro"] } ``` ...and see the [docs](https://docs.rs/schema_registry_converter) for how to use it. @@ -44,7 +44,14 @@ All the converters also have a blocking (non async) version, in that case use so ```toml [dependencies] -schema_registry_converter = { version = "2.0.0", default-features = false, features = ["avro", "blocking"]} +schema_registry_converter = { version = "2.0.1", default-features = false, features = ["avro", "blocking"]} +``` + +If you need to use both in a project you can use something like, but have to be weary you import the correct paths depending on your use. + +```toml +[dependencies] +schema_registry_converter = { version = "2.0.1", features = ["avro", "blocking"]} ``` # Example with consumer and producer using Avro diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 254800a..a6e0cce 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,9 @@ ## Release notes +### 2.0.1 + +Maintenance release with mainly updated dependencies, making the blocking sr settings cloneable and no longer needs `kafka_test` feature to use both blocking and async in the same project. + ### 2.0.0 This release has a breaking change in the SubjectNameStrategy where the supplied schema now is in a Box, to keep the size of the Enum smaller. @@ -10,7 +14,7 @@ Another major change is by default support for async. To use the new version of the library, and continue to use it in a blocking way like it was before, you need to use the library like: ```toml -schema_registry_converter = { version = "2.0.0", default-features = false, features = ["avro", "blocking"]} +schema_registry_converter = { version = "2.0.1", default-features = false, features = ["avro", "blocking"]} ``` Also the Converters are moved to the blocking module, and to create the converters you need a SrSettings object, which can be created with just the schema registry url. @@ -57,5 +61,6 @@ instead of the `encode` function on the encoder. #### Contributors +- [@cbzehner](https://github.com/cbzehner) - [@kitsuneninetails](https://github.com/kitsuneninetails) - [@j-halbert](https://github.com/j-halbert) \ No newline at end of file diff --git a/src/async_impl/avro.rs b/src/async_impl/avro.rs index 7cd5af7..341bb2e 100644 --- a/src/async_impl/avro.rs +++ b/src/async_impl/avro.rs @@ -710,16 +710,9 @@ mod tests { let sr_settings = SrSettings::new(format!("http://{}", server_address())); let mut decoder = AvroDecoder::new(sr_settings); - let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err(); + let error = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err(); - assert_eq!( - heartbeat, - SRCError::new( - "Could not transform bytes using schema", - Some(String::from("failed to fill whole buffer")), - false, - ) - ) + assert_eq!(error.error, "Could not transform bytes using schema") } #[tokio::test] @@ -732,16 +725,9 @@ mod tests { let sr_settings = SrSettings::new(format!("http://{}", server_address())); let mut decoder = AvroDecoder::new(sr_settings); - let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err(); + let error = decoder.decode(Some(&[0, 0, 0, 0, 1])).await.unwrap_err(); - assert_eq!( - heartbeat, - SRCError::new( - "Could not transform bytes using schema", - Some(String::from("failed to fill whole buffer")), - false, - ) - ) + assert_eq!(error.error, "Could not transform bytes using schema") } #[tokio::test] @@ -836,14 +822,8 @@ mod tests { let err = decoder.decode(Some(&[0, 0, 0, 0, 1, 6])).await.unwrap_err(); assert_eq!( - err, - SRCError::new( - "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema", - Some(String::from( - "Failed to parse schema: No `fields` in record" - )), - false, - ) + err.error, + "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema", ) } @@ -1308,12 +1288,8 @@ mod tests { .await .unwrap_err(); assert_eq!( - result, - SRCError::new( - "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema", - Some(String::from("Failed to parse schema: No `fields` in record")), - false, - ) + result.error, + "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema" ) } diff --git a/src/avro_common.rs b/src/avro_common.rs index 10f7c8e..7bdc8cb 100644 --- a/src/avro_common.rs +++ b/src/avro_common.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use avro_rs::schema::{Name, Schema}; -use avro_rs::types::{Record, ToAvro, Value}; +use avro_rs::types::{Record, Value}; use avro_rs::{to_avro_datum, to_value}; use serde::ser::Serialize; use serde_json::{value, Map}; @@ -95,7 +95,7 @@ pub(crate) fn replace_reference(parent: value::Value, child: value::Value) -> va } } -fn to_bytes(avro_schema: &AvroSchema, record: T) -> Result, SRCError> { +fn to_bytes(avro_schema: &AvroSchema, record: Value) -> Result, SRCError> { match to_avro_datum(&avro_schema.parsed, record) { Ok(v) => Ok(get_payload(avro_schema.id, v)), Err(e) => Err(SRCError::non_retryable_with_cause( @@ -124,7 +124,7 @@ pub(crate) fn values_to_bytes( for value in values { record.put(value.0, value.1) } - to_bytes(avro_schema, record) + to_bytes(avro_schema, Value::from(record)) } /// Using the schema with an item implementing serialize the item will be correctly deserialized @@ -200,17 +200,8 @@ mod tests { raw: String::from(r#"{"type":"record","name":"Name","namespace":"nl.openweb.data","fields":[{"name":"name","type":"string","avro.java.string":"String"}]}"#), parsed: Schema::parse_str(r#"{"type":"record","name":"Name","namespace":"nl.openweb.data","fields":[{"name":"name","type":"string","avro.java.string":"String"}]}"#).unwrap(), }; - let result = values_to_bytes(&schema, vec![("beat", Value::Long(3))]); - assert_eq!( - result, - Err(SRCError::new( - "Could not get Avro bytes", - Some(String::from( - "Validation error: value does not match schema" - )), - false, - )) - ) + let err = values_to_bytes(&schema, vec![("beat", Value::Long(3))]).unwrap_err(); + assert_eq!(err.error, "Could not get Avro bytes") } #[test] fn item_to_bytes_no_tranfer_wrong() { @@ -224,16 +215,7 @@ mod tests { ).unwrap(), }; let err = crate::avro_common::item_to_bytes(&schema, Heartbeat { beat: 3 }).unwrap_err(); - assert_eq!( - err, - SRCError::new( - "Failed to resolve", - Some(String::from( - "Schema resoulution error: missing field name in record" - )), - false, - ) - ) + assert_eq!(err.error, "Failed to resolve") } #[test] @@ -253,14 +235,7 @@ mod tests { ], a_type: Atype::Manual, }; - let result = crate::avro_common::item_to_bytes(&schema, item).unwrap_err(); - assert_eq!( - result, - SRCError::new( - "Failed to resolve", - Some(String::from("Schema resoulution error: String expected, got Array([Int(204), Int(240), Int(237), Int(74), Int(227), Int(188), Int(75), Int(46), Int(183), Int(163), Int(122), Int(214), Int(178), Int(72), Int(118), Int(162)])")), - false, - ) - ) + let err = crate::avro_common::item_to_bytes(&schema, item).unwrap_err(); + assert_eq!(err.error, "Failed to resolve") } } diff --git a/src/blocking/avro.rs b/src/blocking/avro.rs index 636af84..2984e08 100644 --- a/src/blocking/avro.rs +++ b/src/blocking/avro.rs @@ -666,16 +666,9 @@ mod tests { let sr_settings = SrSettings::new(format!("http://{}", server_address())); let mut decoder = AvroDecoder::new(sr_settings); - let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1])); + let err = decoder.decode(Some(&[0, 0, 0, 0, 1])).unwrap_err(); - assert_eq!( - heartbeat, - Err(SRCError::new( - "Could not transform bytes using schema", - Some(String::from("failed to fill whole buffer")), - false, - )) - ) + assert_eq!(err.error, "Could not transform bytes using schema") } #[test] @@ -688,16 +681,9 @@ mod tests { let sr_settings = SrSettings::new(format!("http://{}", server_address())); let mut decoder = AvroDecoder::new(sr_settings); - let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1])); + let err = decoder.decode(Some(&[0, 0, 0, 0, 1])).unwrap_err(); - assert_eq!( - heartbeat, - Err(SRCError::new( - "Could not transform bytes using schema", - Some(String::from("failed to fill whole buffer")), - false, - )) - ) + assert_eq!(err.error, "Could not transform bytes using schema") } #[test] @@ -790,17 +776,11 @@ mod tests { let sr_settings = SrSettings::new(format!("http://{}", server_address())); let mut decoder = AvroDecoder::new(sr_settings); - let heartbeat = decoder.decode(Some(&[0, 0, 0, 0, 1, 6])); + let err = decoder.decode(Some(&[0, 0, 0, 0, 1, 6])).unwrap_err(); assert_eq!( - heartbeat, - Err(SRCError::new( - "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema", - Some(String::from( - "Failed to parse schema: No `fields` in record" - )), - false, - )) + err.error, + "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Heartbeat\\\",\\\"namespace\\\":\\\"nl.openweb.data\\\"}\" cant be turned into a Schema", ) } @@ -1241,17 +1221,10 @@ mod tests { references: vec![], }; let sr_settings = SrSettings::new(format!("http://{}", server_address())); - let result = match to_avro_schema(&sr_settings, registered_schema) { - Err(e) => e, - _ => panic!(), - }; + let err = to_avro_schema(&sr_settings, registered_schema).unwrap_err(); assert_eq!( - result, - SRCError::new( - "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema", - Some(String::from("Failed to parse schema: No `fields` in record")), - false, - ) + err.error, + "Supplied raw value \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Name\\\"}\" cant be turned into a Schema" ) } diff --git a/src/lib.rs b/src/lib.rs index 3cb422f..0e2aa68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ //! automatically does retries. //! //! [avro-rs]: https://crates.io/crates/avro-rs -#[cfg(any(not(feature = "blocking"), feature = "kafka_test"))] +#[cfg(feature = "futures")] pub mod async_impl; #[cfg(feature = "avro")] pub mod avro_common; diff --git a/tests/blocking/avro_consumer.rs b/tests/blocking/avro_consumer.rs index f3a7dc3..ce3ff00 100644 --- a/tests/blocking/avro_consumer.rs +++ b/tests/blocking/avro_consumer.rs @@ -20,7 +20,7 @@ pub fn consume_avro( registry: String, topics: &[&str], auto_commit: bool, - test: Box ()>, + test: Box, ) { let sr_settings = SrSettings::new(registry); let mut decoder = AvroDecoder::new(sr_settings); diff --git a/tests/blocking/avro_tests.rs b/tests/blocking/avro_tests.rs index 7fdd9c5..29a0d3c 100644 --- a/tests/blocking/avro_tests.rs +++ b/tests/blocking/avro_tests.rs @@ -27,7 +27,7 @@ fn get_heartbeat_schema() -> Box { }) } -fn test_beat_value(key_value: i64, value_value: i64) -> Box ()> { +fn test_beat_value(key_value: i64, value_value: i64) -> Box { Box::new(move |rec: DeserializedAvroRecord| { println!("testing record {:#?}", rec); let key_values = match rec.key {