From 2648dd1a20116cb2307edd4c79cf42c3f0530318 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:33:53 +0000 Subject: [PATCH] fix prefetch of page index --- parquet/src/file/metadata/reader.rs | 32 +++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index c6715a33b5ae..2445445e68ef 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -372,7 +372,7 @@ impl ParquetMetaDataReader { mut fetch: F, file_size: usize, ) -> Result<()> { - let (metadata, remainder) = + let (metadata, fetched) = Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?; self.metadata = Some(metadata); @@ -382,7 +382,7 @@ impl ParquetMetaDataReader { return Ok(()); } - self.load_page_index_with_remainder(fetch, remainder).await + self.load_page_index_with_remainder(fetch, fetched).await } /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already @@ -396,7 +396,7 @@ impl ParquetMetaDataReader { async fn load_page_index_with_remainder( &mut self, mut fetch: F, - remainder: Option<(usize, Bytes)>, + fetched: Option<(usize, Bytes)>, ) -> Result<()> { if self.metadata.is_none() { return Err(general_err!("Footer metadata is not present")); @@ -409,10 +409,14 @@ impl ParquetMetaDataReader { None => return Ok(()), }; - let bytes = match &remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - remainder.slice(offset..range.end - *remainder_start + offset) + let bytes = match &fetched { + Some((fetched_start, fetched)) if *fetched_start <= range.start && (range.end <= fetched_start + fetched.len()) => { + // `fetched`` is an amount of data spanning from fetched_start to the end of the file + // We want to slice out the range we need from that data, but need to adjust the + // range we are looking for to be relative to fetched_start. + let fetched_start = *fetched_start; + let range = range.start - fetched_start..range.end - fetched_start; + fetched.slice(range) } // Note: this will potentially fetch data already in remainder, this keeps things simple _ => fetch.fetch(range.start..range.end).await?, @@ -582,7 +586,7 @@ impl ParquetMetaDataReader { let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; Ok(( Self::decode_metadata(slice)?, - Some((footer_start, suffix.slice(..metadata_start))), + Some((footer_start, suffix)), )) } } @@ -1052,5 +1056,17 @@ mod async_tests { .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let metadata = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(len)) // prefetch entire file + .load_and_finish(f, len) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); } }