Skip to content

Commit

Permalink
fix unpartitioned insert
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jan 9, 2025
1 parent d17b4e0 commit f35ee19
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 19 deletions.
95 changes: 76 additions & 19 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,16 +696,10 @@ mod tests {
.build()
.unwrap();

let partition_spec = PartitionSpec::builder()
.with_partition_field(PartitionField::new(4, 1000, "date_day", Transform::Day))
.build()
.expect("Failed to create partition spec");

let table = Table::builder()
.with_name("orders")
.with_location("/test/orders")
.with_schema(schema)
.with_partition_spec(partition_spec)
.build(&["test".to_owned()], catalog)
.await
.expect("Failed to create table");
Expand All @@ -714,7 +708,7 @@ mod tests {

let ctx = SessionContext::new();

ctx.register_table("orders", table).unwrap();
ctx.register_table("orders", table.clone()).unwrap();

ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
Expand All @@ -732,7 +726,7 @@ mod tests {
.expect("Failed to insert values into table");

let batches = ctx
.sql("select product_id, sum(amount) from orders group by product_id;")
.sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;")
.await
.expect("Failed to create plan for select")
.collect()
Expand All @@ -755,11 +749,11 @@ mod tests {
);
for (product_id, amount) in product_ids.iter().zip(amounts) {
if product_id.unwrap() == 1 {
assert_eq!(amount.unwrap(), 7)
assert_eq!(amount.unwrap(), 3)
} else if product_id.unwrap() == 2 {
assert_eq!(amount.unwrap(), 1)
} else if product_id.unwrap() == 3 {
assert_eq!(amount.unwrap(), 3)
assert_eq!(amount.unwrap(), 0)
} else {
panic!("Unexpected order id")
}
Expand All @@ -779,8 +773,70 @@ mod tests {
.await
.expect("Failed to insert values into table");

ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
(10, 1, 2, '2020-01-04', 3),
(11, 3, 1, '2020-01-04', 2),
(12, 2, 3, '2020-01-04', 1);",
)
.await
.expect("Failed to create query plan for insert")
.collect()
.await
.expect("Failed to insert values into table");

ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
(13, 1, 1, '2020-01-05', 4),
(14, 3, 2, '2020-01-05', 2),
(15, 2, 3, '2020-01-05', 3);",
)
.await
.expect("Failed to create query plan for insert")
.collect()
.await
.expect("Failed to insert values into table");

ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
(16, 2, 3, '2020-01-05', 3),
(17, 1, 3, '2020-01-06', 1),
(18, 2, 1, '2020-01-06', 2);",
)
.await
.expect("Failed to create query plan for insert")
.collect()
.await
.expect("Failed to insert values into table");

ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
(19, 2, 2, '2020-01-06', 1),
(20, 1, 2, '2020-01-07', 3),
(21, 3, 1, '2020-01-07', 2);",
)
.await
.expect("Failed to create query plan for insert")
.collect()
.await
.expect("Failed to insert values into table");

ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
(21, 3, 1, '2020-01-07', 2),
(22, 2, 3, '2020-01-07', 1),
(23, 1, 1, '2020-01-08', 4),
(24, 3, 2, '2020-01-08', 2),
(25, 2, 3, '2020-01-08', 3);",
)
.await
.expect("Failed to create query plan for insert")
.collect()
.await
.expect("Failed to insert values into table");

let batches = ctx
.sql("select product_id, sum(amount) from orders group by product_id;")
.sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;")
.await
.expect("Failed to create plan for select")
.collect()
Expand All @@ -802,18 +858,19 @@ mod tests {
.unwrap(),
);
for (product_id, amount) in product_ids.iter().zip(amounts) {
if product_id.unwrap() == 1 {
assert_eq!(amount.unwrap(), 9)
} else if product_id.unwrap() == 2 {
assert_eq!(amount.unwrap(), 2)
} else if product_id.unwrap() == 3 {
assert_eq!(amount.unwrap(), 4)
} else {
panic!("Unexpected order id")
match product_id.unwrap() {
1 => assert_eq!(amount.unwrap(), 11),
2 => assert_eq!(amount.unwrap(), 7),
3 => assert_eq!(amount.unwrap(), 2),
_ => panic!("Unexpected order id"),
}
}
}
}

if let Tabular::Table(table) = table.tabular.read().await.deref() {
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
};
}

#[tokio::test]
Expand Down
11 changes: 11 additions & 0 deletions iceberg-rust/src/table/transaction/append.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::Ordering;

use iceberg_rust_spec::{manifest::ManifestEntry, manifest_list::ManifestListEntry};
use smallvec::SmallVec;

use crate::{
error::Error,
Expand All @@ -15,6 +16,16 @@ fn split_datafiles_once(
rect: Rectangle,
names: &[&str],
) -> Result<[(Vec<ManifestEntry>, Rectangle); 2], Error> {
if rect.min.is_empty() {
let mut smaller = files.collect::<Result<Vec<_>, Error>>()?;
let larger = smaller.split_off(smaller.len() / 2);

return Ok([
(smaller, Rectangle::new(SmallVec::new(), SmallVec::new())),
(larger, Rectangle::new(SmallVec::new(), SmallVec::new())),
]);
}

let mut smaller = Vec::new();
let mut larger = Vec::new();
let mut smaller_rect = None;
Expand Down

0 comments on commit f35ee19

Please sign in to comment.