Skip to content

Commit

Permalink
[CHORE] add IOStats to all micropartition ops (#1584)
Browse files Browse the repository at this point in the history
* Makes IOStats required for micropatition ops that may be lazy and call
`tables_or_read, concat_or_read`.
* Add annotations for all ops that call the methods above
* update iostats to use `Cow<'static, str>` since many of the strings we
use is static.
  • Loading branch information
samster25 authored Nov 13, 2023
1 parent fbbeff3 commit a316bde
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 41 deletions.
15 changes: 9 additions & 6 deletions src/daft-io/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::sync::{
atomic::{self},
Arc,
use std::{
borrow::Cow,
sync::{
atomic::{self},
Arc,
},
};

pub type IOStatsRef = Arc<IOStatsContext>;

#[derive(Default, Debug)]
pub struct IOStatsContext {
name: String,
name: Cow<'static, str>,
num_get_requests: atomic::AtomicUsize,
num_head_requests: atomic::AtomicUsize,
num_list_requests: atomic::AtomicUsize,
Expand Down Expand Up @@ -38,9 +41,9 @@ pub(crate) struct IOStatsByteStreamContextHandle {
}

impl IOStatsContext {
pub fn new(name: String) -> IOStatsRef {
pub fn new<S: Into<Cow<'static, str>>>(name: S) -> IOStatsRef {
Arc::new(IOStatsContext {
name,
name: name.into(),
num_get_requests: atomic::AtomicUsize::new(0),
num_head_requests: atomic::AtomicUsize::new(0),
num_list_requests: atomic::AtomicUsize::new(0),
Expand Down
25 changes: 11 additions & 14 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,7 @@ impl MicroPartition {
}
}

pub fn from_scan_task(
scan_task: Arc<ScanTask>,
io_stats: Option<IOStatsRef>,
) -> crate::Result<Self> {
pub fn from_scan_task(scan_task: Arc<ScanTask>, io_stats: IOStatsRef) -> crate::Result<Self> {
let schema = scan_task.schema.clone();
match (
&scan_task.metadata,
Expand Down Expand Up @@ -355,7 +352,7 @@ impl MicroPartition {
.clone()
.map(|c| Arc::new(c.clone()))
.unwrap_or_default(),
io_stats,
Some(io_stats),
if scan_task.sources.len() == 1 { 1 } else { 128 }, // Hardcoded for to 128 bulk reads
cfg.multithreaded_io,
&ParquetSchemaInferenceOptions {
Expand All @@ -369,7 +366,7 @@ impl MicroPartition {
// Perform an eager **data** read
_ => {
let statistics = scan_task.statistics.clone();
let (tables, schema) = materialize_scan_task(scan_task, None, io_stats)?;
let (tables, schema) = materialize_scan_task(scan_task, None, Some(io_stats))?;
Ok(Self::new_loaded(schema, Arc::new(tables), statistics))
}
}
Expand Down Expand Up @@ -411,15 +408,15 @@ impl MicroPartition {
Ok(size_bytes)
}

pub(crate) fn tables_or_read(
&self,
io_stats: Option<IOStatsRef>,
) -> crate::Result<Arc<Vec<Table>>> {
pub(crate) fn tables_or_read(&self, io_stats: IOStatsRef) -> crate::Result<Arc<Vec<Table>>> {
let mut guard = self.state.lock().unwrap();
match guard.deref() {
TableState::Unloaded(scan_task) => {
let (tables, _) =
materialize_scan_task(scan_task.clone(), Some(self.schema.clone()), io_stats)?;
let (tables, _) = materialize_scan_task(
scan_task.clone(),
Some(self.schema.clone()),
Some(io_stats),
)?;
let table_values = Arc::new(tables);

// Cache future accesses by setting the state to TableState::Loaded
Expand All @@ -431,8 +428,8 @@ impl MicroPartition {
}
}

pub(crate) fn concat_or_get(&self) -> crate::Result<Arc<Vec<Table>>> {
let tables = self.tables_or_read(None)?;
pub(crate) fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result<Arc<Vec<Table>>> {
let tables = self.tables_or_read(io_stats)?;
if tables.len() <= 1 {
return Ok(tables);
}
Expand Down
5 changes: 4 additions & 1 deletion src/daft-micropartition/src/ops/agg.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use common_error::DaftResult;
use daft_dsl::Expr;
use daft_io::IOStatsContext;
use daft_table::Table;

use crate::micropartition::MicroPartition;

impl MicroPartition {
pub fn agg(&self, to_agg: &[Expr], group_by: &[Expr]) -> DaftResult<Self> {
let tables = self.concat_or_get()?;
let io_stats = IOStatsContext::new("MicroPartition::agg");

let tables = self.concat_or_get(io_stats)?;

match tables.as_slice() {
[] => {
Expand Down
5 changes: 4 additions & 1 deletion src/daft-micropartition/src/ops/concat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Mutex;

use common_error::{DaftError, DaftResult};
use daft_io::IOStatsContext;

use crate::micropartition::{MicroPartition, TableState};

Expand All @@ -26,10 +27,12 @@ impl MicroPartition {
}
}

let io_stats = IOStatsContext::new("MicroPartition::concat");

let mut all_tables = vec![];

for m in mps.iter() {
let tables = m.tables_or_read(None)?;
let tables = m.tables_or_read(io_stats.clone())?;
all_tables.extend_from_slice(tables.as_slice());
}
let mut all_stats = None;
Expand Down
9 changes: 7 additions & 2 deletions src/daft-micropartition/src/ops/eval_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashSet, sync::Arc};
use common_error::{DaftError, DaftResult};
use daft_core::schema::Schema;
use daft_dsl::Expr;
use daft_io::IOStatsContext;
use snafu::ResultExt;

use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu};
Expand Down Expand Up @@ -30,8 +31,10 @@ fn infer_schema(exprs: &[Expr], schema: &Schema) -> DaftResult<Schema> {

impl MicroPartition {
pub fn eval_expression_list(&self, exprs: &[Expr]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new("MicroPartition::eval_expression_list");

let expected_schema = infer_schema(exprs, &self.schema)?;
let tables = self.tables_or_read(None)?;
let tables = self.tables_or_read(io_stats)?;
let evaluated_tables = tables
.iter()
.map(|t| t.eval_expression_list(exprs))
Expand All @@ -51,7 +54,9 @@ impl MicroPartition {
}

pub fn explode(&self, exprs: &[Expr]) -> DaftResult<Self> {
let tables = self.tables_or_read(None)?;
let io_stats = IOStatsContext::new("MicroPartition::explode");

let tables = self.tables_or_read(io_stats)?;
let evaluated_tables = tables
.iter()
.map(|t| t.explode(exprs))
Expand Down
4 changes: 3 additions & 1 deletion src/daft-micropartition/src/ops/filter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use common_error::DaftResult;
use daft_dsl::Expr;
use daft_io::IOStatsContext;
use snafu::ResultExt;

use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu};
Expand All @@ -8,6 +9,7 @@ use daft_stats::TruthValue;

impl MicroPartition {
pub fn filter(&self, predicate: &[Expr]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new("MicroPartition::filter");
if predicate.is_empty() {
return Ok(Self::empty(Some(self.schema.clone())));
}
Expand All @@ -26,7 +28,7 @@ impl MicroPartition {
}
// TODO figure out defered IOStats
let tables = self
.tables_or_read(None)?
.tables_or_read(io_stats)?
.iter()
.map(|t| t.filter(predicate))
.collect::<DaftResult<Vec<_>>>()
Expand Down
6 changes: 4 additions & 2 deletions src/daft-micropartition/src/ops/join.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use common_error::DaftResult;
use daft_core::array::ops::DaftCompare;
use daft_dsl::Expr;
use daft_io::IOStatsContext;
use daft_table::infer_join_schema;

use crate::micropartition::MicroPartition;
Expand Down Expand Up @@ -33,9 +34,10 @@ impl MicroPartition {
if let TruthValue::False = tv {
return Ok(Self::empty(Some(join_schema.into())));
}
let io_stats = IOStatsContext::new("MicroPartition::join");

let lt = self.concat_or_get()?;
let rt = right.concat_or_get()?;
let lt = self.concat_or_get(io_stats.clone())?;
let rt = right.concat_or_get(io_stats)?;

match (lt.as_slice(), rt.as_slice()) {
([], _) | (_, []) => Ok(Self::empty(Some(join_schema.into()))),
Expand Down
13 changes: 10 additions & 3 deletions src/daft-micropartition/src/ops/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::Expr;
use daft_io::IOStatsContext;
use daft_table::Table;

use crate::micropartition::MicroPartition;
Expand Down Expand Up @@ -45,7 +46,9 @@ impl MicroPartition {
exprs: &[Expr],
num_partitions: usize,
) -> DaftResult<Vec<Self>> {
let tables = self.tables_or_read(None)?;
let io_stats = IOStatsContext::new("MicroPartition::partition_by_hash");

let tables = self.tables_or_read(io_stats)?;

if tables.is_empty() {
return Ok(
Expand All @@ -63,7 +66,9 @@ impl MicroPartition {
}

pub fn partition_by_random(&self, num_partitions: usize, seed: u64) -> DaftResult<Vec<Self>> {
let tables = self.tables_or_read(None)?;
let io_stats = IOStatsContext::new("MicroPartition::partition_by_random");

let tables = self.tables_or_read(io_stats)?;

if tables.is_empty() {
return Ok(
Expand All @@ -87,7 +92,9 @@ impl MicroPartition {
boundaries: &Table,
descending: &[bool],
) -> DaftResult<Vec<Self>> {
let tables = self.tables_or_read(None)?;
let io_stats = IOStatsContext::new("MicroPartition::partition_by_range");

let tables = self.tables_or_read(io_stats)?;

if tables.is_empty() {
let num_partitions = boundaries.len() + 1;
Expand Down
5 changes: 4 additions & 1 deletion src/daft-micropartition/src/ops/slice.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use common_error::DaftResult;
use daft_io::IOStatsContext;

use crate::micropartition::MicroPartition;

impl MicroPartition {
pub fn slice(&self, start: usize, end: usize) -> DaftResult<Self> {
let tables = self.tables_or_read(None)?;
let io_stats = IOStatsContext::new(format!("MicroPartition::slice {start}-{end}"));

let tables = self.tables_or_read(io_stats)?;
let mut slices_tables = vec![];
let mut rows_needed = (end - start).max(0);
let mut offset_so_far = start;
Expand Down
9 changes: 7 additions & 2 deletions src/daft-micropartition/src/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use std::sync::Arc;
use common_error::DaftResult;
use daft_core::Series;
use daft_dsl::Expr;
use daft_io::IOStatsContext;
use daft_table::Table;

use crate::micropartition::MicroPartition;

impl MicroPartition {
pub fn sort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult<Self> {
let tables = self.concat_or_get()?;
let io_stats = IOStatsContext::new("MicroPartition::sort");

let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
[single] => {
Expand All @@ -25,7 +28,9 @@ impl MicroPartition {
}

pub fn argsort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult<Series> {
let tables = self.concat_or_get()?;
let io_stats = IOStatsContext::new("MicroPartition::argsort");

let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => {
let empty_table = Table::empty(Some(self.schema.clone()))?;
Expand Down
13 changes: 10 additions & 3 deletions src/daft-micropartition/src/ops/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use std::sync::Arc;

use common_error::DaftResult;
use daft_core::Series;
use daft_io::IOStatsContext;
use daft_table::Table;

use crate::micropartition::MicroPartition;

impl MicroPartition {
pub fn take(&self, idx: &Series) -> DaftResult<Self> {
let tables = self.concat_or_get()?;
let io_stats = IOStatsContext::new("MicroPartition::take");

let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
// Fallback onto `[empty_table]` behavior
[] => {
Expand All @@ -33,7 +36,9 @@ impl MicroPartition {
}

pub fn sample(&self, num: usize) -> DaftResult<Self> {
let tables = self.concat_or_get()?;
let io_stats = IOStatsContext::new(format!("MicroPartition::sample({num})"));

let tables = self.concat_or_get(io_stats)?;

match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
Expand All @@ -50,7 +55,9 @@ impl MicroPartition {
}

pub fn quantiles(&self, num: usize) -> DaftResult<Self> {
let tables = self.concat_or_get()?;
let io_stats = IOStatsContext::new(format!("MicroPartition::quantiles({num})"));

let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
[single] => {
Expand Down
12 changes: 9 additions & 3 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ impl PyMicroPartition {
}

pub fn get_column(&self, name: &str, py: Python) -> PyResult<PySeries> {
let tables = py.allow_threads(|| self.inner.concat_or_get())?;
let tables = py.allow_threads(|| {
let io_stats = IOStatsContext::new(format!("PyMicroPartition::get_column: {name}"));
self.inner.concat_or_get(io_stats)
})?;
let columns = tables
.iter()
.map(|t| t.get_column(name))
Expand Down Expand Up @@ -77,7 +80,7 @@ impl PyMicroPartition {
"MicroPartition::from_scan_task for {:?}",
scan_task.0.sources
));
MicroPartition::from_scan_task(scan_task.into(), Some(io_stats))
MicroPartition::from_scan_task(scan_task.into(), io_stats)
})?
.into())
}
Expand Down Expand Up @@ -125,7 +128,10 @@ impl PyMicroPartition {

// Export Methods
pub fn to_table(&self, py: Python) -> PyResult<PyTable> {
let concatted = py.allow_threads(|| self.inner.concat_or_get())?;
let concatted = py.allow_threads(|| {
let io_stats = IOStatsContext::new("PyMicroPartition::to_table");
self.inner.concat_or_get(io_stats)
})?;
match &concatted.as_ref()[..] {
[] => PyTable::empty(Some(self.schema()?)),
[table] => Ok(PyTable {
Expand Down
4 changes: 2 additions & 2 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub mod pylib {
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<Vec<PyTable>> {
py.allow_threads(|| {
let io_stats = IOStatsContext::new("read_parquet_bulk".to_string());
let io_stats = IOStatsContext::new("read_parquet_bulk");

let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
Expand Down Expand Up @@ -243,7 +243,7 @@ pub mod pylib {
multithreaded_io: Option<bool>,
) -> PyResult<PyTable> {
py.allow_threads(|| {
let io_stats = IOStatsContext::new("read_parquet_statistics".to_string());
let io_stats = IOStatsContext::new("read_parquet_statistics");

let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
Expand Down

0 comments on commit a316bde

Please sign in to comment.