Skip to content

Commit

Permalink
collect delete files
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jan 5, 2025
1 parent 97de3f2 commit b1b98b6
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use async_trait::async_trait;
use chrono::DateTime;
use core::panic;
use datafusion_expr::{dml::InsertOp, utils::conjunction};
use futures::TryStreamExt;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -48,7 +49,7 @@ use crate::{
};

use iceberg_rust::spec::{
manifest::{ManifestEntry, Status},
manifest::{Content, ManifestEntry, Status},
util,
};
use iceberg_rust::spec::{
Expand Down Expand Up @@ -283,7 +284,9 @@ async fn table_scan(

// All files have to be grouped according to their partition values. This is done by using a HashMap with the partition values as the key.
// This way data files with the same partition value are mapped to the same vector.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
let mut data_file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
let mut equality_delete_file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> =
HashMap::new();

let partition_fields = &snapshot_range
.1
Expand Down Expand Up @@ -396,10 +399,23 @@ async fn table_scan(
statistics: Some(manifest_statistics),
extensions: None,
};
file_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
match manifest.data_file().content() {
Content::Data => {
data_file_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
}
Content::EqualityDeletes => {
equality_delete_file_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
}
Content::PositionDeletes => {
panic!("Position deletes not supported.")
}
}
};
});
} else {
Expand Down Expand Up @@ -445,10 +461,23 @@ async fn table_scan(
statistics: Some(manifest_statistics),
extensions: None,
};
file_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
match manifest.data_file().content() {
Content::Data => {
data_file_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
}
Content::EqualityDeletes => {
equality_delete_file_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
}
Content::PositionDeletes => {
panic!("Position deletes not supported.")
}
}
}
});
};
Expand Down Expand Up @@ -505,7 +534,7 @@ async fn table_scan(
let file_scan_config = FileScanConfig {
object_store_url,
file_schema,
file_groups: file_groups.into_values().collect(),
file_groups: data_file_groups.into_values().collect(),
statistics,
projection: projection.cloned(),
limit,
Expand Down

0 comments on commit b1b98b6

Please sign in to comment.