Skip to content

Commit

Permalink
fix prefetch of page index
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Jan 20, 2025
1 parent 764b34a commit 2648dd1
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl ParquetMetaDataReader {
mut fetch: F,
file_size: usize,
) -> Result<()> {
let (metadata, remainder) =
let (metadata, fetched) =
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;

self.metadata = Some(metadata);
Expand All @@ -382,7 +382,7 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index_with_remainder(fetch, remainder).await
self.load_page_index_with_remainder(fetch, fetched).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
Expand All @@ -396,7 +396,7 @@ impl ParquetMetaDataReader {
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
fetched: Option<(usize, Bytes)>,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
Expand All @@ -409,10 +409,14 @@ impl ParquetMetaDataReader {
None => return Ok(()),
};

let bytes = match &remainder {
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
remainder.slice(offset..range.end - *remainder_start + offset)
let bytes = match &fetched {
Some((fetched_start, fetched)) if *fetched_start <= range.start && (range.end <= fetched_start + fetched.len()) => {
// `fetched`` is an amount of data spanning from fetched_start to the end of the file
// We want to slice out the range we need from that data, but need to adjust the
// range we are looking for to be relative to fetched_start.
let fetched_start = *fetched_start;
let range = range.start - fetched_start..range.end - fetched_start;
fetched.slice(range)
}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => fetch.fetch(range.start..range.end).await?,
Expand Down Expand Up @@ -582,7 +586,7 @@ impl ParquetMetaDataReader {
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
Ok((
Self::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
Some((footer_start, suffix)),
))
}
}
Expand Down Expand Up @@ -1052,5 +1056,17 @@ mod async_tests {
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}

0 comments on commit 2648dd1

Please sign in to comment.