From a6debf7cdb026f80f5a364f606845cdd86ff07ef Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Tue, 7 Nov 2023 15:50:07 +0800 Subject: [PATCH] AVRO-3896: support to custom more logical type (#2569) * AVRO-3896 refactor verify logical type * AVRO-3896 rename internal function and add test case * AVRO-3896: [Rust] Improve error handling, comments and tests Signed-off-by: Martin Tzvetanov Grigorov --------- Signed-off-by: Martin Tzvetanov Grigorov Co-authored-by: ZENOTME Co-authored-by: Martin Tzvetanov Grigorov --- lang/rust/avro/src/error.rs | 4 +- lang/rust/avro/src/schema.rs | 319 ++++++++++++++++++++++------------- 2 files changed, 203 insertions(+), 120 deletions(-) diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 00a04c93130..8fa14602753 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -307,8 +307,8 @@ pub enum Error { #[error("No `type` field found for `logicalType`")] GetLogicalTypeField, - #[error("logicalType must be a string")] - GetLogicalTypeFieldType, + #[error("logicalType must be a string, but is {0:?}")] + GetLogicalTypeFieldType(serde_json::Value), #[error("Unknown complex type: {0}")] GetComplexType(serde_json::Value), diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 47900e46be3..262aa2affc1 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -1272,186 +1272,155 @@ impl Parser { complex: &Map, enclosing_namespace: &Namespace, ) -> AvroResult { - fn logical_verify_type( + // Try to parse this as a native complex type. + fn parse_as_native_complex( complex: &Map, - kinds: &[SchemaKind], parser: &mut Parser, enclosing_namespace: &Namespace, ) -> AvroResult { match complex.get("type") { - Some(value) => { - let ty = match value { - Value::String(s) if s == "fixed" => { - parser.parse_fixed(complex, enclosing_namespace)? - } - _ => parser.parse(value, enclosing_namespace)?, - }; - - if kinds - .iter() - .any(|&kind| SchemaKind::from(ty.clone()) == kind) - { - Ok(ty) - } else { - match get_type_rec(value.clone()) { - Ok(v) => Err(Error::GetLogicalTypeVariant(v)), - Err(err) => Err(err), - } + Some(value) => match value { + Value::String(s) if s == "fixed" => { + parser.parse_fixed(complex, enclosing_namespace) } - } - None => Err(Error::GetLogicalTypeField), - } - } - - fn get_type_rec(json_value: Value) -> AvroResult { - match json_value { - typ @ Value::String(_) => Ok(typ), - Value::Object(ref complex) => match complex.get("type") { - Some(v) => get_type_rec(v.clone()), - None => Err(Error::GetComplexTypeField), + _ => parser.parse(value, enclosing_namespace), }, - _ => Err(Error::GetComplexTypeField), + None => Err(Error::GetLogicalTypeField), } } - // checks whether the logicalType is supported by the type - fn try_logical_type( + // This crate support some logical types natively, and this function tries to convert + // a native complex type with a logical type attribute to these logical types. + // This function: + // 1. Checks whether the native complex type is in the supported kinds. + // 2. If it is, using the convert function to convert the native complex type to + // a logical type. + fn try_convert_to_logical_type( logical_type: &str, - complex: &Map, - kinds: &[SchemaKind], - ok_schema: Schema, - parser: &mut Parser, - enclosing_namespace: &Namespace, - ) -> AvroResult { - match logical_verify_type(complex, kinds, parser, enclosing_namespace) { - // type and logicalType match! - Ok(_) => Ok(ok_schema), - // the logicalType is not expected for this type! - Err(Error::GetLogicalTypeVariant(json_value)) => match json_value { - Value::String(_) => match parser.parse(&json_value, enclosing_namespace) { - Ok(schema) => { - warn!( - "Ignoring invalid logical type '{}' for schema of type: {:?}!", - logical_type, schema - ); - Ok(schema) - } - Err(parse_err) => Err(parse_err), - }, - _ => Err(Error::GetLogicalTypeVariant(json_value)), - }, - err => err, + schema: Schema, + supported_schema_kinds: &[SchemaKind], + convert: F, + ) -> AvroResult + where + F: Fn(Schema) -> AvroResult, + { + let kind = SchemaKind::from(schema.clone()); + if supported_schema_kinds.contains(&kind) { + convert(schema) + } else { + warn!( + "Ignoring unknown logical type '{}' for schema of type: {:?}!", + logical_type, schema + ); + Ok(schema) } } match complex.get("logicalType") { Some(Value::String(t)) => match t.as_str() { "decimal" => { - let inner = Box::new(logical_verify_type( - complex, + return try_convert_to_logical_type( + "decimal", + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Fixed, SchemaKind::Bytes], - self, - enclosing_namespace, - )?); - - let (precision, scale) = Self::parse_precision_and_scale(complex)?; - - return Ok(Schema::Decimal(DecimalSchema { - precision, - scale, - inner, - })); + |inner| -> AvroResult { + let (precision, scale) = Self::parse_precision_and_scale(complex)?; + Ok(Schema::Decimal(DecimalSchema { + precision, + scale, + inner: Box::new(inner), + })) + }, + ); } "big-decimal" => { - logical_verify_type(complex, &[SchemaKind::Bytes], self, enclosing_namespace)?; - return Ok(Schema::BigDecimal); + return try_convert_to_logical_type( + "big-decimal", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Bytes], + |_| -> AvroResult { Ok(Schema::BigDecimal) }, + ); } "uuid" => { - logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?; - return Ok(Schema::Uuid); + return try_convert_to_logical_type( + "uuid", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::String], + |_| -> AvroResult { Ok(Schema::Uuid) }, + ); } "date" => { - return try_logical_type( + return try_convert_to_logical_type( "date", - complex, + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Int], - Schema::Date, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::Date) }, ); } "time-millis" => { - return try_logical_type( - "time-millis", - complex, + return try_convert_to_logical_type( + "date", + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Int], - Schema::TimeMillis, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::TimeMillis) }, ); } "time-micros" => { - return try_logical_type( + return try_convert_to_logical_type( "time-micros", - complex, + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Long], - Schema::TimeMicros, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::TimeMicros) }, ); } "timestamp-millis" => { - return try_logical_type( + return try_convert_to_logical_type( "timestamp-millis", - complex, + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Long], - Schema::TimestampMillis, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::TimestampMillis) }, ); } "timestamp-micros" => { - return try_logical_type( + return try_convert_to_logical_type( "timestamp-micros", - complex, + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Long], - Schema::TimestampMicros, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::TimestampMicros) }, ); } "local-timestamp-millis" => { - return try_logical_type( + return try_convert_to_logical_type( "local-timestamp-millis", - complex, + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Long], - Schema::LocalTimestampMillis, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::LocalTimestampMillis) }, ); } "local-timestamp-micros" => { - return try_logical_type( + return try_convert_to_logical_type( "local-timestamp-micros", - complex, + parse_as_native_complex(complex, self, enclosing_namespace)?, &[SchemaKind::Long], - Schema::LocalTimestampMicros, - self, - enclosing_namespace, + |_| -> AvroResult { Ok(Schema::LocalTimestampMicros) }, ); } "duration" => { - logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?; - return Ok(Schema::Duration); + return try_convert_to_logical_type( + "duration", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Fixed], + |_| -> AvroResult { Ok(Schema::Duration) }, + ); } - // In this case, of an unknown logical type, we just pass through to the underlying + // In this case, of an unknown logical type, we just pass through the underlying // type. _ => {} }, - // The spec says to ignore invalid logical types and just continue through to the - // underlying type - It is unclear whether that applies to this case or not, where the + // The spec says to ignore invalid logical types and just pass through the + // underlying type. It is unclear whether that applies to this case or not, where the // `logicalType` is not a string. - Some(_) => return Err(Error::GetLogicalTypeFieldType), + Some(value) => return Err(Error::GetLogicalTypeFieldType(value.clone())), _ => {} } match complex.get("type") { @@ -6213,7 +6182,6 @@ mod tests { Error::InvalidSchemaName(full_name.to_string(), SCHEMA_NAME_R.as_str()).to_string(); let err = name.map_err(|e| e.to_string()).err().unwrap(); assert_eq!(expected, err); - Ok(()) } @@ -6225,6 +6193,121 @@ mod tests { let name = Name::new(funny_name); assert!(name.is_ok()); } + Ok(()) + } + + #[test] + fn test_avro_3896_decimal_schema() -> TestResult { + // bytes decimal, represented as native logical type. + let schema = json!( + { + "type": "bytes", + "name": "BytesDecimal", + "logicalType": "decimal", + "size": 38, + "precision": 9, + "scale": 2 + }); + let parse_result = Schema::parse(&schema)?; + assert!(matches!( + parse_result, + Schema::Decimal(DecimalSchema { + precision: 9, + scale: 2, + .. + }) + )); + + // long decimal, represents as native complex type. + let schema = json!( + { + "type": "long", + "name": "LongDecimal", + "logicalType": "decimal" + }); + let parse_result = Schema::parse(&schema)?; + // assert!(matches!(parse_result, Schema::Long)); + assert_eq!(parse_result, Schema::Long); + + Ok(()) + } + + #[test] + fn test_avro_3896_uuid_schema() -> TestResult { + // string uuid, represents as native logical type. + let schema = json!( + { + "type": "string", + "name": "StringUUID", + "logicalType": "uuid" + }); + let parse_result = Schema::parse(&schema)?; + assert_eq!(parse_result, Schema::Uuid); + + // uuid logical type is not supported for SchemaKind::Fixed, so it is parsed as Schema::Fixed + // and the `logicalType` is preserved as an attribute. + let schema = json!( + { + "type": "fixed", + "name": "FixedUUID", + "size": 16, + "logicalType": "uuid" + }); + let parse_result = Schema::parse(&schema)?; + assert_eq!( + parse_result, + Schema::Fixed(FixedSchema { + name: Name::new("FixedUUID")?, + doc: None, + aliases: None, + size: 16, + attributes: BTreeMap::from([( + "logicalType".to_string(), + Value::String(String::from("uuid")), + )]), + }) + ); + + Ok(()) + } + + #[test] + fn test_avro_3896_timestamp_millis_schema() -> TestResult { + // long timestamp-millis, represents as native logical type. + let schema = json!( + { + "type": "long", + "name": "LongTimestampMillis", + "logicalType": "timestamp-millis" + }); + let parse_result = Schema::parse(&schema)?; + assert_eq!(parse_result, Schema::TimestampMillis); + + // int timestamp-millis, represents as native complex type. + let schema = json!( + { + "type": "int", + "name": "IntTimestampMillis", + "logicalType": "timestamp-millis" + }); + let parse_result = Schema::parse(&schema)?; + assert_eq!(parse_result, Schema::Int); + + Ok(()) + } + + #[test] + fn test_avro_3896_custom_bytes_schema() -> TestResult { + // log type, represents as complex type. + let schema = json!( + { + "type": "bytes", + "name": "BytesLog", + "logicalType": "custom" + }); + let parse_result = Schema::parse(&schema)?; + assert_eq!(parse_result, Schema::Bytes); + assert_eq!(parse_result.custom_attributes(), None); Ok(()) }