diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index a1281e53..c7de770e 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -696,16 +696,10 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() - .with_partition_field(PartitionField::new(4, 1000, "date_day", Transform::Day)) - .build() - .expect("Failed to create partition spec"); - let table = Table::builder() .with_name("orders") .with_location("/test/orders") .with_schema(schema) - .with_partition_spec(partition_spec) .build(&["test".to_owned()], catalog) .await .expect("Failed to create table"); @@ -714,7 +708,7 @@ mod tests { let ctx = SessionContext::new(); - ctx.register_table("orders", table).unwrap(); + ctx.register_table("orders", table.clone()).unwrap(); ctx.sql( "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES @@ -732,7 +726,7 @@ mod tests { .expect("Failed to insert values into table"); let batches = ctx - .sql("select product_id, sum(amount) from orders group by product_id;") + .sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;") .await .expect("Failed to create plan for select") .collect() @@ -755,11 +749,11 @@ mod tests { ); for (product_id, amount) in product_ids.iter().zip(amounts) { if product_id.unwrap() == 1 { - assert_eq!(amount.unwrap(), 7) + assert_eq!(amount.unwrap(), 3) } else if product_id.unwrap() == 2 { assert_eq!(amount.unwrap(), 1) } else if product_id.unwrap() == 3 { - assert_eq!(amount.unwrap(), 3) + assert_eq!(amount.unwrap(), 0) } else { panic!("Unexpected order id") } @@ -779,8 +773,70 @@ mod tests { .await .expect("Failed to insert values into table"); + ctx.sql( + "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES + (10, 1, 2, '2020-01-04', 3), + (11, 3, 1, '2020-01-04', 2), + (12, 2, 3, '2020-01-04', 1);", + ) + .await + .expect("Failed to create query plan for insert") + .collect() + .await + .expect("Failed to insert values into table"); + + ctx.sql( + "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES + (13, 1, 1, '2020-01-05', 4), + (14, 3, 2, '2020-01-05', 2), + (15, 2, 3, '2020-01-05', 3);", + ) + .await + .expect("Failed to create query plan for insert") + .collect() + .await + .expect("Failed to insert values into table"); + + ctx.sql( + "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES + (16, 2, 3, '2020-01-05', 3), + (17, 1, 3, '2020-01-06', 1), + (18, 2, 1, '2020-01-06', 2);", + ) + .await + .expect("Failed to create query plan for insert") + .collect() + .await + .expect("Failed to insert values into table"); + + ctx.sql( + "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES + (19, 2, 2, '2020-01-06', 1), + (20, 1, 2, '2020-01-07', 3), + (21, 3, 1, '2020-01-07', 2);", + ) + .await + .expect("Failed to create query plan for insert") + .collect() + .await + .expect("Failed to insert values into table"); + + ctx.sql( + "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES + (21, 3, 1, '2020-01-07', 2), + (22, 2, 3, '2020-01-07', 1), + (23, 1, 1, '2020-01-08', 4), + (24, 3, 2, '2020-01-08', 2), + (25, 2, 3, '2020-01-08', 3);", + ) + .await + .expect("Failed to create query plan for insert") + .collect() + .await + .expect("Failed to insert values into table"); + let batches = ctx - .sql("select product_id, sum(amount) from orders group by product_id;") + .sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;") .await .expect("Failed to create plan for select") .collect() @@ -802,18 +858,19 @@ mod tests { .unwrap(), ); for (product_id, amount) in product_ids.iter().zip(amounts) { - if product_id.unwrap() == 1 { - assert_eq!(amount.unwrap(), 9) - } else if product_id.unwrap() == 2 { - assert_eq!(amount.unwrap(), 2) - } else if product_id.unwrap() == 3 { - assert_eq!(amount.unwrap(), 4) - } else { - panic!("Unexpected order id") + match product_id.unwrap() { + 1 => assert_eq!(amount.unwrap(), 11), + 2 => assert_eq!(amount.unwrap(), 7), + 3 => assert_eq!(amount.unwrap(), 2), + _ => panic!("Unexpected order id"), } } } } + + if let Tabular::Table(table) = table.tabular.read().await.deref() { + assert_eq!(table.manifests(None, None).await.unwrap().len(), 2); + }; } #[tokio::test] diff --git a/iceberg-rust/src/table/transaction/append.rs b/iceberg-rust/src/table/transaction/append.rs index b22f5891..a002a8f3 100644 --- a/iceberg-rust/src/table/transaction/append.rs +++ b/iceberg-rust/src/table/transaction/append.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use iceberg_rust_spec::{manifest::ManifestEntry, manifest_list::ManifestListEntry}; +use smallvec::SmallVec; use crate::{ error::Error, @@ -15,6 +16,16 @@ fn split_datafiles_once( rect: Rectangle, names: &[&str], ) -> Result<[(Vec, Rectangle); 2], Error> { + if rect.min.is_empty() { + let mut smaller = files.collect::, Error>>()?; + let larger = smaller.split_off(smaller.len() / 2); + + return Ok([ + (smaller, Rectangle::new(SmallVec::new(), SmallVec::new())), + (larger, Rectangle::new(SmallVec::new(), SmallVec::new())), + ]); + } + let mut smaller = Vec::new(); let mut larger = Vec::new(); let mut smaller_rect = None;