From 4046c732dec0c9311294a2589590b4d017c5a02a Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 24 Dec 2023 12:55:21 +0100 Subject: [PATCH] fix: sink_csv deadlock (#13239) --- crates/polars-io/src/csv/write.rs | 17 ++++- crates/polars-io/src/csv/write_impl.rs | 16 +++-- .../src/executors/sinks/file_sink.rs | 66 ++++++++----------- 3 files changed, 56 insertions(+), 43 deletions(-) diff --git a/crates/polars-io/src/csv/write.rs b/crates/polars-io/src/csv/write.rs index b095eee9840f..6d5877d86869 100644 --- a/crates/polars-io/src/csv/write.rs +++ b/crates/polars-io/src/csv/write.rs @@ -1,3 +1,4 @@ +use polars_core::POOL; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -30,6 +31,7 @@ pub struct CsvWriter { header: bool, bom: bool, batch_size: usize, + n_threads: usize, } impl SerWriter for CsvWriter @@ -49,6 +51,7 @@ where header: true, bom: false, batch_size: 1024, + n_threads: POOL.current_num_threads(), } } @@ -60,7 +63,13 @@ where if self.header { write_impl::write_header(&mut self.buffer, &names, &self.options)?; } - write_impl::write(&mut self.buffer, df, self.batch_size, &self.options) + write_impl::write( + &mut self.buffer, + df, + self.batch_size, + &self.options, + self.n_threads, + ) } } @@ -149,6 +158,11 @@ where self } + pub fn n_threads(mut self, n_threads: usize) -> Self { + self.n_threads = n_threads; + self + } + pub fn batched(self, _schema: &Schema) -> PolarsResult> { let expects_bom = self.bom; let expects_header = self.header; @@ -188,6 +202,7 @@ impl BatchedWriter { df, self.writer.batch_size, &self.writer.options, + self.writer.n_threads, )?; Ok(()) } diff --git a/crates/polars-io/src/csv/write_impl.rs b/crates/polars-io/src/csv/write_impl.rs index 35dba2b85686..4c1a884a541b 100644 --- a/crates/polars-io/src/csv/write_impl.rs +++ b/crates/polars-io/src/csv/write_impl.rs @@ -281,6 +281,7 @@ pub(crate) fn write( df: &DataFrame, chunk_size: usize, options: &SerializeOptions, + n_threads: usize, ) -> PolarsResult<()> { for s in df.get_columns() { let nested = match s.dtype() { @@ -379,7 +380,6 @@ pub(crate) fn write( let time_zones = time_zones.into_iter().collect::>(); let len = df.height(); - let n_threads = POOL.current_num_threads(); let total_rows_per_pool_iter = n_threads * chunk_size; let any_value_iter_pool = LowContentionPool::>::new(n_threads); let write_buffer_pool = LowContentionPool::>::new(n_threads); @@ -388,8 +388,9 @@ pub(crate) fn write( // holds the buffers that will be written let mut result_buf: Vec>> = Vec::with_capacity(n_threads); + while n_rows_finished < len { - let par_iter = (0..n_threads).into_par_iter().map(|thread_no| { + let buf_writer = |thread_no| { let thread_offset = thread_no * chunk_size; let total_offset = n_rows_finished + thread_offset; let mut df = df.slice(total_offset as i64, chunk_size); @@ -453,10 +454,15 @@ pub(crate) fn write( any_value_iter_pool.set(col_iters); Ok(write_buffer) - }); + }; - // rayon will ensure the right order - POOL.install(|| result_buf.par_extend(par_iter)); + if n_threads > 1 { + let par_iter = (0..n_threads).into_par_iter().map(buf_writer); + // rayon will ensure the right order + POOL.install(|| result_buf.par_extend(par_iter)); + } else { + result_buf.push(buf_writer(0)); + } for buf in result_buf.drain(..) { let mut buf = buf?; diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index e06cbec8bd69..661dea48bc20 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -1,6 +1,5 @@ use std::any::Any; use std::path::Path; -use std::sync::Mutex; use std::thread::JoinHandle; use crossbeam_channel::{bounded, Receiver, Sender}; @@ -55,6 +54,17 @@ impl SinkWriter for polars_io::ipc::BatchedWriter { } } +#[cfg(feature = "csv")] +impl SinkWriter for polars_io::csv::BatchedWriter { + fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + self.write_batch(df) + } + + fn _finish(&mut self) -> PolarsResult<()> { + Ok(()) + } +} + #[cfg(feature = "json")] impl SinkWriter for BatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { @@ -219,14 +229,11 @@ impl IpcCloudSink { } #[cfg(feature = "csv")] -#[derive(Clone)] -pub struct CsvSink { - writer: Arc>>, -} +pub struct CsvSink {} #[cfg(feature = "csv")] impl CsvSink { #[allow(clippy::new_ret_no_self)] - pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult { + pub fn new(path: &Path, options: CsvWriterOptions, schema: &Schema) -> PolarsResult { let file = std::fs::File::create(path)?; let writer = CsvWriter::new(file) .include_bom(options.include_bom) @@ -241,41 +248,26 @@ impl CsvSink { .with_float_precision(options.serialize_options.float_precision) .with_null_value(options.serialize_options.null) .with_quote_style(options.serialize_options.quote_style) + .n_threads(1) .batched(schema)?; - Ok(Self { - writer: Arc::new(Mutex::new(writer)), - }) - } -} - -// Csv has a sync implementation because it writes in parallel. The file sink would deadlock. -#[cfg(feature = "csv")] -impl Sink for CsvSink { - fn sink(&mut self, _: &PExecutionContext, chunk: DataChunk) -> PolarsResult { - let mut writer = self.writer.lock().unwrap(); - writer.write_batch(&chunk.data)?; - Ok(SinkResult::CanHaveMoreInput) - } - - fn combine(&mut self, _other: &mut dyn Sink) { - // already synchronized - } - - fn split(&self, _thread_no: usize) -> Box { - Box::new(self.clone()) - } + let writer = Box::new(writer) as Box; - fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult { - Ok(FinalizedSink::Finished(Default::default())) - } + let morsels_per_sink = morsels_per_sink(); + let backpressure = morsels_per_sink * 2; + let (sender, receiver) = bounded(backpressure); - fn as_any(&mut self) -> &mut dyn Any { - self - } + let io_thread_handle = Arc::new(Some(init_writer_thread( + receiver, + writer, + options.maintain_order, + morsels_per_sink, + ))); - fn fmt(&self) -> &str { - "csv_sink" + Ok(FilesSink { + sender, + io_thread_handle, + }) } } @@ -417,6 +409,6 @@ impl Sink for FilesSink { self } fn fmt(&self) -> &str { - "file_sink" + "parquet_sink" } }