Skip to content

Commit

Permalink
fix: write a new row group if column's array exceeds 2GB arrow array …
Browse files Browse the repository at this point in the history
…limit
  • Loading branch information
aykut-bozkurt committed Oct 2, 2024
1 parent 031c608 commit 978ce88
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 24 deletions.
35 changes: 27 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,6 @@ mod tests {
}
}

impl<T: IntoDatum + FromDatum> Drop for TestTable<T> {
fn drop(&mut self) {
if !self.uri.contains("://") && std::path::Path::new(&self.uri).exists() {
std::fs::remove_file(&self.uri).unwrap();
}
}
}

fn timetz_to_utc_time(timetz: TimeWithTimeZone) -> Option<Time> {
Some(timetz.to_utc())
}
Expand Down Expand Up @@ -2272,6 +2264,33 @@ mod tests {
let copy_to_table = "copy test from '/tmp/test.parquet';";
Spi::run(copy_to_table).unwrap();
}

#[pg_test]
fn test_large_arrow_array_limit() {
let test_table = TestTable::<String>::new("text".into());
test_table.insert(
"INSERT INTO test_expected select repeat('a', 52000000) from generate_series(1,42) i;",
);
test_helper(test_table);

let parquet_file_metadata_command =
"select * from parquet.file_metadata('/tmp/test.parquet');";
let result_metadata = Spi::connect(|client| {
let mut results = Vec::new();
let tup_table = client
.select(parquet_file_metadata_command, None, None)
.unwrap();

for row in tup_table {
let num_row_groups = row["num_row_groups"].value::<i64>().unwrap().unwrap();
results.push(num_row_groups);
}

results
});

assert_eq!(result_metadata, vec![2]);
}
}

/// This module is required by `cargo pgrx test` invocations.
Expand Down
66 changes: 53 additions & 13 deletions src/parquet_copy_hook/copy_to_dest_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
codec::ParquetCodecOption, parquet_writer::ParquetWriterContext,
schema_visitor::parquet_schema_string_from_tupledesc, uri_utils::parse_uri,
},
parquet_copy_hook::copy_utils::tuple_column_sizes,
pgrx_utils::collect_valid_attributes,
};

Expand All @@ -17,26 +18,54 @@ struct CopyToParquetDestReceiver {
dest: DestReceiver,
uri: *mut i8,
tupledesc: TupleDesc,
natts: usize,
tuple_count: i64,
tuples: *mut List,
row_group_size: i64,
natts: usize,
column_sizes: *mut i64,
codec: ParquetCodecOption,
row_group_size: i64,
per_copy_context: MemoryContext,
}

impl CopyToParquetDestReceiver {
fn collect_tuple(&mut self, tuple: PgHeapTuple<AllocatedByRust>) {
fn collect_tuple(&mut self, tuple: PgHeapTuple<AllocatedByRust>, tuple_column_sizes: Vec<i32>) {
let mut tuples = unsafe { PgList::from_pg(self.tuples) };
tuples.push(tuple.into_pg());

self.tuples = tuples.into_pg();

let column_sizes = unsafe { std::slice::from_raw_parts_mut(self.column_sizes, self.natts) };
column_sizes
.iter_mut()
.zip(tuple_column_sizes.iter())
.for_each(|(a, b)| *a += *b as i64);

self.tuple_count += 1;
}

fn reset_tuples(&mut self) {
unsafe { pg_sys::MemoryContextReset(self.per_copy_context) };

self.tuple_count = 0;
self.tuples = PgList::<HeapTupleData>::new().into_pg();
self.column_sizes = unsafe {
pg_sys::MemoryContextAllocZero(
self.per_copy_context,
std::mem::size_of::<i64>() * self.natts,
) as *mut i64
};
}

fn exceeds_row_group_size(&self) -> bool {
self.tuple_count >= self.row_group_size
}

fn exceeds_max_col_size(&self, tuple_column_sizes: &[i32]) -> bool {
let column_sizes = unsafe { std::slice::from_raw_parts(self.column_sizes, self.natts) };
column_sizes
.iter()
.zip(tuple_column_sizes)
.map(|(a, b)| *a + *b as i64)
.any(|size| size > i32::MAX as i64)
}

fn write_tuples_to_parquet(&mut self) {
Expand Down Expand Up @@ -67,8 +96,6 @@ impl CopyToParquetDestReceiver {
current_parquet_writer_context.write_new_row_group(tuples);

self.reset_tuples();

unsafe { pg_sys::MemoryContextReset(self.per_copy_context) };
}

fn cleanup(&mut self) {
Expand Down Expand Up @@ -126,6 +153,12 @@ extern "C" fn copy_startup(dest: *mut DestReceiver, _operation: i32, tupledesc:
// update the parquet dest receiver's missing fields
parquet_dest.tupledesc = tupledesc.as_ptr();
parquet_dest.tuples = PgList::<HeapTupleData>::new().into_pg();
parquet_dest.column_sizes = unsafe {
pg_sys::MemoryContextAllocZero(
parquet_dest.per_copy_context,
std::mem::size_of::<i64>() * attributes.len(),
) as *mut i64
};
parquet_dest.natts = attributes.len();

// set up the parquet writer context
Expand Down Expand Up @@ -162,27 +195,33 @@ extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut DestReceiver) -
let natts = parquet_dest.natts;

let datums = slot.tts_values;
let datums: Vec<Datum> = std::slice::from_raw_parts(datums, natts).to_vec();
let datums = std::slice::from_raw_parts(datums, natts);

let nulls = slot.tts_isnull;
let nulls: Vec<bool> = std::slice::from_raw_parts(nulls, natts).to_vec();
let nulls = std::slice::from_raw_parts(nulls, natts);

let datums: Vec<Option<Datum>> = datums
.into_iter()
.iter()
.zip(nulls)
.map(|(datum, is_null)| if is_null { None } else { Some(datum) })
.map(|(datum, is_null)| if *is_null { None } else { Some(*datum) })
.collect();

let tupledesc = PgTupleDesc::from_pg(parquet_dest.tupledesc);

let column_sizes = tuple_column_sizes(&datums, &tupledesc);

if parquet_dest.exceeds_max_col_size(&column_sizes) {
parquet_dest.write_tuples_to_parquet();
}

let heap_tuple = PgHeapTuple::from_datums(tupledesc, datums)
.unwrap_or_else(|e| panic!("failed to create heap tuple from datums: {}", e));

parquet_dest.collect_tuple(heap_tuple);
parquet_dest.collect_tuple(heap_tuple, column_sizes);

if parquet_dest.tuple_count == parquet_dest.row_group_size {
if parquet_dest.exceeds_row_group_size() {
parquet_dest.write_tuples_to_parquet();
};
}
});
};

Expand Down Expand Up @@ -243,6 +282,7 @@ pub extern "C" fn create_copy_to_parquet_dest_receiver(
parquet_dest.natts = 0;
parquet_dest.tuple_count = 0;
parquet_dest.tuples = std::ptr::null_mut();
parquet_dest.column_sizes = std::ptr::null_mut();
parquet_dest.row_group_size = row_group_size;
parquet_dest.codec = codec;
parquet_dest.per_copy_context = per_copy_context;
Expand Down
54 changes: 51 additions & 3 deletions src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::str::FromStr;
use pgrx::{
is_a,
pg_sys::{
self, defGetInt64, defGetString, AccessShareLock, CopyStmt, DefElem, NodeTag::T_CopyStmt,
Oid, RowExclusiveLock,
self, defGetInt64, defGetString, get_typlenbyval, AccessShareLock, CopyStmt, DefElem,
NodeTag::T_CopyStmt, Oid, RowExclusiveLock,
},
PgBox, PgList, PgRelation,
PgBox, PgList, PgRelation, PgTupleDesc,
};
use url::Url;

Expand All @@ -16,6 +16,11 @@ use crate::arrow_parquet::{
uri_utils::parse_uri,
};

// missing PG function at pgrx
extern "C" {
fn toast_raw_datum_size(datum: pg_sys::Datum) -> usize;
}

pub(crate) fn is_parquet_format(copy_stmt: &PgBox<CopyStmt>) -> bool {
let copy_options = unsafe { PgList::<DefElem>::from_pg(copy_stmt.options) };
for option in copy_options.iter_ptr() {
Expand Down Expand Up @@ -341,3 +346,46 @@ pub(crate) fn copy_relation_oid(pstmt: &PgBox<pg_sys::PlannedStmt>) -> Oid {
PgRelation::open_with_name_and_share_lock(relname).unwrap_or_else(|e| panic!("{}", e));
relation.rd_id
}

pub(crate) fn tuple_column_sizes(
tuple_datums: &[Option<pg_sys::Datum>],
tupledesc: &PgTupleDesc,
) -> Vec<i32> {
let mut column_sizes = vec![];

for (idx, column_datum) in tuple_datums.iter().enumerate() {
let att = tupledesc.get(idx).expect("cannot get attribute");
let typoid = att.type_oid();

let mut typlen = -1_i16;
let mut typbyval = false;
unsafe { get_typlenbyval(typoid.value(), &mut typlen, &mut typbyval) };

let column_size = if let Some(column_datum) = column_datum {
if typlen == -1 {
// varlena type
(unsafe { toast_raw_datum_size(*column_datum) }) as i32
} else if typlen == -2 {
// cstring
let cstring = unsafe { pg_sys::DatumGetCString(*column_datum) };

let cstring = unsafe {
std::ffi::CStr::from_ptr(cstring)
.to_str()
.expect("cstring is not a valid CString")
};

cstring.len() as i32 + 1
} else {
// fixed size type
typlen as i32
}
} else {
0
};

column_sizes.push(column_size);
}

column_sizes
}

0 comments on commit 978ce88

Please sign in to comment.