Skip to content

Commit

Permalink
chore: clean up UUIDs and Durations
Browse files Browse the repository at this point in the history
  • Loading branch information
svencowart committed Jan 14, 2025
1 parent 1bc9a51 commit b157584
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
35 changes: 21 additions & 14 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ pub enum Codec {
Float64,
Binary,
Utf8,
Decimal(usize, Option<usize>, Option<usize>),
Uuid,
Date32,
TimeMillis,
TimeMicros,
Expand All @@ -165,11 +167,10 @@ pub enum Codec {
Fixed(i32),
List(Arc<AvroDataType>),
Struct(Arc<[AvroField]>),
Interval,
Duration,
/// In Arrow, use Dictionary(Int32, Utf8) for Enum.
Enum(Vec<String>),
Map(Arc<AvroDataType>),
Decimal(usize, Option<usize>, Option<usize>),
}

impl Codec {
Expand All @@ -184,6 +185,18 @@ impl Codec {
Self::Float64 => Float64,
Self::Binary => Binary,
Self::Utf8 => Utf8,
Self::Decimal(precision, scale, size) => match size {
Some(s) if *s > 16 => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
Some(s) => Decimal128(*precision as u8, scale.unwrap_or(0) as i8),
None if *precision <= DECIMAL128_MAX_PRECISION as usize
&& scale.unwrap_or(0) <= DECIMAL128_MAX_SCALE as usize =>
{
Decimal128(*precision as u8, scale.unwrap_or(0) as i8)
}
_ => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
},
// arrow-rs does not support the UUID Canonical Extension Type yet, so this is a temporary workaround.
Self::Uuid => FixedSizeBinary(16),
Self::Date32 => Date32,
Self::TimeMillis => Time32(TimeUnit::Millisecond),
Self::TimeMicros => Time64(TimeUnit::Microsecond),
Expand All @@ -193,7 +206,7 @@ impl Codec {
Self::TimestampMicros(is_utc) => {
Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
}
Self::Interval => Interval(IntervalUnit::MonthDayNano),
Self::Duration => Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => FixedSizeBinary(*size),
Self::List(f) => List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME))),
Self::Struct(f) => Struct(f.iter().map(|x| x.field()).collect()),
Expand All @@ -212,16 +225,6 @@ impl Codec {
)),
false,
),
Self::Decimal(precision, scale, size) => match size {
Some(s) if *s > 16 => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
Some(s) => Decimal128(*precision as u8, scale.unwrap_or(0) as i8),
None if *precision <= DECIMAL128_MAX_PRECISION as usize
&& scale.unwrap_or(0) <= DECIMAL128_MAX_SCALE as usize =>
{
Decimal128(*precision as u8, scale.unwrap_or(0) as i8)
}
_ => Decimal256(*precision as u8, scale.unwrap_or(0) as i8),
},
}
}
}
Expand Down Expand Up @@ -450,6 +453,7 @@ fn make_data_type<'a>(
None,
);
}
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
(Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
(Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,
Expand All @@ -461,7 +465,7 @@ fn make_data_type<'a>(
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(false)
}
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Duration,
(Some(logical), _) => {
// Insert unrecognized logical type into metadata
field.metadata.insert("logicalType".into(), logical.into());
Expand Down Expand Up @@ -510,6 +514,9 @@ fn arrow_type_to_codec(dt: &DataType) -> Codec {
Float64 => Codec::Float64,
Utf8 => Codec::Utf8,
Binary | LargeBinary => Codec::Binary,
// arrow-rs does not support the UUID Canonical Extension Type yet, so this mapping is not possible.
// It is unsafe to assume all FixedSizeBinary(16) are UUIDs.
// Uuid => Codec::Uuid,
Date32 => Codec::Date32,
Time32(TimeUnit::Millisecond) => Codec::TimeMillis,
Time64(TimeUnit::Microsecond) => Codec::TimeMicros,
Expand Down
6 changes: 3 additions & 3 deletions arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl Decoder {
Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
}
Codec::Fixed(n) => Self::Fixed(*n, Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::Interval => Self::Interval(Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::Duration => Self::Interval(Vec::with_capacity(DEFAULT_CAPACITY)),
Codec::List(item) => {
let item_decoder = Box::new(Self::try_new(item)?);
Self::List(
Expand Down Expand Up @@ -866,7 +866,7 @@ mod tests {
fn test_interval_decoding() {
// Avro interval => 12 bytes => [ months i32, days i32, ms i32 ]
// decode 2 rows => row1 => months=1, days=2, ms=100 => row2 => months=-1, days=10, ms=9999
let dt = AvroDataType::from_codec(Codec::Interval);
let dt = AvroDataType::from_codec(Codec::Duration);
let mut dec = Decoder::try_new(&dt).unwrap();
// row1 => months=1 => 01,00,00,00, days=2 => 02,00,00,00, ms=100 => 64,00,00,00
// row2 => months=-1 => 0xFF,0xFF,0xFF,0xFF, days=10 => 0x0A,0x00,0x00,0x00, ms=9999 => 0x0F,0x27,0x00,0x00
Expand Down Expand Up @@ -903,7 +903,7 @@ mod tests {
#[test]
fn test_interval_decoding_with_nulls() {
// Avro union => [ interval, null]
let dt = AvroDataType::from_codec(Codec::Interval);
let dt = AvroDataType::from_codec(Codec::Duration);
let child = Decoder::try_new(&dt).unwrap();
let mut dec = Decoder::Nullable(
Nullability::NullFirst,
Expand Down

0 comments on commit b157584

Please sign in to comment.