Skip to content

Commit

Permalink
Merge pull request #102 from JanKaul/refactor-fork-node
Browse files Browse the repository at this point in the history
Refactor fork node
  • Loading branch information
JanKaul authored Jan 24, 2025
2 parents 2e88e56 + e930691 commit 22cc467
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 42 deletions.
44 changes: 18 additions & 26 deletions datafusion_iceberg/src/materialized_view/delta_queries/fork_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,12 @@ use pin_project_lite::pin_project;

pub fn fork_node(plan: Arc<LogicalPlan>) -> (ForkNode, ForkNode) {
let parallelism = std::thread::available_parallelism().unwrap().get();
let (left_sender, (left_receiver, (right_sender, right_receiver))): (
Vec<_>,
(Vec<_>, (Vec<_>, Vec<_>)),
) = iter::repeat_n((), parallelism)
let (sender, receiver): (Vec<_>, Vec<_>) = iter::repeat_n((), parallelism)
.map(|_| {
let (left_sender, left_receiver) = channel(8);
let (right_sender, right_receiver) = channel(8);
let (sender, receiver) = channel(8);
(
left_sender,
(
Arc::new(Mutex::new(Some(left_receiver))),
(right_sender, Arc::new(Mutex::new(Some(right_receiver)))),
),
Arc::new(Mutex::new(Some(sender))),
Arc::new(Mutex::new(Some(receiver))),
)
})
.unzip();
Expand All @@ -55,16 +48,14 @@ pub fn fork_node(plan: Arc<LogicalPlan>) -> (ForkNode, ForkNode) {
(
ForkNode {
input: plan.clone(),
sender: right_sender.clone(),
receiver: left_receiver,
own_sender: left_sender.clone(),
sender: sender.clone(),
receiver: receiver.clone(),
executed: executed.clone(),
},
ForkNode {
input: plan,
sender: left_sender,
receiver: right_receiver,
own_sender: right_sender,
sender,
receiver,
executed,
},
)
Expand All @@ -73,9 +64,8 @@ pub fn fork_node(plan: Arc<LogicalPlan>) -> (ForkNode, ForkNode) {
#[allow(clippy::type_complexity)]
pub struct ForkNode {
pub(crate) input: Arc<LogicalPlan>,
sender: Vec<Sender<Result<RecordBatch, DataFusionError>>>,
sender: Vec<Arc<Mutex<Option<Sender<Result<RecordBatch, DataFusionError>>>>>>,
receiver: Vec<Arc<Mutex<Option<Receiver<Result<RecordBatch, DataFusionError>>>>>>,
own_sender: Vec<Sender<Result<RecordBatch, DataFusionError>>>,
executed: Vec<Arc<AtomicBool>>,
}

Expand Down Expand Up @@ -142,7 +132,6 @@ impl UserDefinedLogicalNodeCore for ForkNode {
input: Arc::new(inputs.pop().unwrap()),
sender: self.sender.clone(),
receiver: self.receiver.clone(),
own_sender: self.own_sender.clone(),
executed: self.executed.clone(),
})
}
Expand All @@ -160,9 +149,8 @@ impl From<ForkNode> for LogicalPlan {
pub(crate) struct PhysicalForkNode {
input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
sender: Vec<Sender<Result<RecordBatch, DataFusionError>>>,
sender: Vec<Arc<Mutex<Option<Sender<Result<RecordBatch, DataFusionError>>>>>>,
receiver: Vec<Arc<Mutex<Option<Receiver<Result<RecordBatch, DataFusionError>>>>>>,
own_sender: Vec<Sender<Result<RecordBatch, DataFusionError>>>,
executed: Vec<Arc<AtomicBool>>,
}

Expand Down Expand Up @@ -214,7 +202,6 @@ impl ExecutionPlan for PhysicalForkNode {
properties,
sender: self.sender.clone(),
receiver: self.receiver.clone(),
own_sender: self.own_sender.clone(),
executed: self.executed.clone(),
}))
}
Expand Down Expand Up @@ -247,10 +234,16 @@ impl ExecutionPlan for PhysicalForkNode {
)));
}

self.own_sender[partition].clone().close_channel();
let sender = {
let mut lock = self.sender[partition].lock().unwrap();
lock.take()
}
.ok_or(DataFusionError::Internal(
"Fork node can only be executed once.".to_string(),
))
.unwrap();

let schema = self.schema().clone();
let sender = self.sender[partition].clone();

if partition >= input_partitions {
return Ok(Box::pin(RecordBatchStreamSender::new(
Expand Down Expand Up @@ -327,7 +320,6 @@ impl ExtensionPlanner for ForkNodePlanner {
properties,
sender: fork_node.sender.clone(),
receiver: fork_node.receiver.clone(),
own_sender: fork_node.own_sender.clone(),
executed: fork_node.executed.clone(),
})))
} else {
Expand Down
4 changes: 2 additions & 2 deletions datafusion_iceberg/src/materialized_view/delta_queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ WHERE L_SHIPDATE >= '1996-01-01';
.await
.expect("Failed to execute select query");

sleep(Duration::from_millis(1_000)).await;
sleep(Duration::from_millis(10_000)).await;

let batches = ctx
.sql("select sum(L_QUANTITY), O_ORDERSTATUS from warehouse.tpch.lineitem_orders group by O_ORDERSTATUS;")
Expand Down Expand Up @@ -425,7 +425,7 @@ WHERE L_SHIPDATE >= '1996-01-01';
.await
.expect("Failed to execute select query");

sleep(Duration::from_millis(1_000)).await;
sleep(Duration::from_millis(10_000)).await;

let batches = ctx
.sql("select sum(L_QUANTITY), O_ORDERSTATUS from warehouse.tpch.lineitem_orders group by O_ORDERSTATUS;")
Expand Down
28 changes: 14 additions & 14 deletions iceberg-rust/src/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use parquet::{
arrow::AsyncArrowWriter,
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
format::FileMetaData,
};
use uuid::Uuid;

Expand Down Expand Up @@ -161,9 +162,8 @@ pub async fn store_parquet_partitioned(
.await)
}

type SendableAsyncArrowWriter = AsyncArrowWriter<BufWriter>;
type ArrowSender = Sender<(String, SendableAsyncArrowWriter)>;
type ArrowReciever = Receiver<(String, SendableAsyncArrowWriter)>;
type ArrowSender = Sender<(String, FileMetaData)>;
type ArrowReciever = Receiver<(String, FileMetaData)>;

/// Write arrow record batches to parquet files. Does not perform any operation on an iceberg table.
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -221,11 +221,17 @@ async fn write_parquet_files(
.await
.unwrap(),
);
let file = finished_writer.1.close().await?;
writer_sender
.try_send(finished_writer)
.try_send((finished_writer.0, file))
.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
}
current_writer.1.write(&batch).await?;
if let Err(size) = current {
if size % 64_000_000 >= 32_000_000 {
current_writer.1.flush().await?;
}
}
let batch_size = record_batch_size(&batch);
num_bytes.fetch_add(batch_size, Ordering::AcqRel);
Ok(())
Expand All @@ -235,19 +241,13 @@ async fn write_parquet_files(

let last = Arc::try_unwrap(current_writer).unwrap().into_inner();

writer_sender.try_send(last).unwrap();
let file = last.1.close().await?;

writer_sender.try_send((last.0, file)).unwrap();

writer_sender.close_channel();

if num_bytes.load(Ordering::Acquire) == 0 {
writer_reciever
.into_future()
.await
.0
.unwrap()
.1
.close()
.await?;
return Ok(Vec::new());
}

Expand All @@ -256,7 +256,7 @@ async fn write_parquet_files(
let object_store = object_store.clone();
let bucket = bucket.to_string();
async move {
let metadata = writer.1.close().await?;
let metadata = writer.1;
let size = object_store
.head(&writer.0.as_str().into())
.await
Expand Down
3 changes: 3 additions & 0 deletions iceberg-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub enum Error {
/// Channel error
#[error(transparent)]
FuturesChannel(#[from] futures::channel::mpsc::SendError),
/// Tokio error
#[error(transparent)]
TokioJoinError(#[from] tokio::task::JoinError),
/// Objectstore error
#[error(transparent)]
ObjectStore(#[from] object_store::Error),
Expand Down

0 comments on commit 22cc467

Please sign in to comment.