Skip to content

Commit

Permalink
fix(rust, python): asof join ensure join column is not dropped when '…
Browse files Browse the repository at this point in the history
…by' argument is given
  • Loading branch information
ritchie46 committed Nov 22, 2022
1 parent 7929246 commit d3e4075
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 32 deletions.
5 changes: 4 additions & 1 deletion polars/polars-core/src/frame/asof_join/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ impl DataFrame {
let left_asof = self.column(left_on)?;
let right_asof = other.column(right_on)?;
let right_asof_name = right_asof.name();
let left_asof_name = left_asof.name();

check_asof_columns(left_asof, right_asof)?;

Expand Down Expand Up @@ -657,7 +658,9 @@ impl DataFrame {
};

let mut drop_these = right_by.get_column_names();
drop_these.push(right_asof_name);
if left_asof_name == right_asof_name {
drop_these.push(right_asof_name);
}

let cols = other
.get_columns()
Expand Down
27 changes: 9 additions & 18 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,11 @@ impl DataFrame {
}

/// The number of chunks per column
pub fn n_chunks(&self) -> PolarsResult<usize> {
Ok(self
.columns
.get(0)
.ok_or_else(|| {
PolarsError::NoData("Can not determine number of chunks if there is no data".into())
})?
.n_chunks())
pub fn n_chunks(&self) -> usize {
match self.columns.get(0) {
None => 0,
Some(s) => s.n_chunks(),
}
}

/// Get a reference to the schema fields of the `DataFrame`.
Expand Down Expand Up @@ -1592,10 +1589,7 @@ impl DataFrame {
return self.take_unchecked_vectical(&idx_ca.into_inner());
}

let n_chunks = match self.n_chunks() {
Err(_) => return self.clone(),
Ok(n) => n,
};
let n_chunks = self.n_chunks();
let has_utf8 = self
.columns
.iter()
Expand Down Expand Up @@ -1640,10 +1634,7 @@ impl DataFrame {
return self.take_unchecked_vectical(&idx_ca);
}

let n_chunks = match self.n_chunks() {
Err(_) => return self.clone(),
Ok(n) => n,
};
let n_chunks = self.n_chunks();

let has_utf8 = self
.columns
Expand Down Expand Up @@ -2374,7 +2365,7 @@ impl DataFrame {
RecordBatchIter {
columns: &self.columns,
idx: 0,
n_chunks: self.n_chunks().unwrap_or(0),
n_chunks: self.n_chunks(),
}
}

Expand Down Expand Up @@ -3529,7 +3520,7 @@ mod test {
.unwrap();

df.vstack_mut(&df.slice(0, 3)).unwrap();
assert_eq!(df.n_chunks().unwrap(), 2)
assert_eq!(df.n_chunks(), 2)
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>>
let total_len = df.height();
let chunk_size = total_len / n;

if df.n_chunks()? == n
if df.n_chunks() == n
&& df.get_columns()[0]
.chunk_lengths()
.all(|len| len.abs_diff(chunk_size) < 100)
Expand All @@ -183,7 +183,7 @@ pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>>
chunk_size
};
let df = df.slice((i * chunk_size) as i64, len);
if df.n_chunks()? > 1 {
if df.n_chunks() > 1 {
// we add every chunk as separate dataframe. This make sure that every partition
// deals with it.
out.extend(flatten_df(&df))
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ where

// Important that this rechunk is never done in parallel.
// As that leads to great memory overhead.
if rechunk && df.n_chunks()? > 1 {
if rechunk && df.n_chunks() > 1 {
if low_memory {
df.as_single_chunk();
} else {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/ndjson_core/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ where
)?;

let mut df: DataFrame = json_reader.as_df()?;
if rechunk && df.n_chunks()? > 1 {
if rechunk && df.n_chunks() > 1 {
df.as_single_chunk_par();
}
Ok(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,8 @@ impl Sink for GenericBuild {
.into_iter()
.map(|chunk| chunk.data),
);
if let Ok(n_chunks) = left_df.n_chunks() {
if left_df.height() > 0 {
assert_eq!(n_chunks, chunks_len);
}
if left_df.height() > 0 {
assert_eq!(left_df.n_chunks(), chunks_len);
}
let materialized_join_cols =
Arc::new(std::mem::take(&mut self.materialized_join_cols));
Expand Down
4 changes: 2 additions & 2 deletions polars/tests/it/core/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ fn test_chunked_left_join() -> PolarsResult<()> {

let band_instruments = accumulate_dataframes_vertical(split_df(&mut band_instruments, 2)?)?;
let band_members = accumulate_dataframes_vertical(split_df(&mut band_members, 2)?)?;
assert_eq!(band_instruments.n_chunks()?, 2);
assert_eq!(band_members.n_chunks()?, 2);
assert_eq!(band_instruments.n_chunks(), 2);
assert_eq!(band_members.n_chunks(), 2);

let out = band_instruments.join(&band_members, ["name"], ["name"], JoinType::Left, None)?;
let expected = df![
Expand Down
5 changes: 2 additions & 3 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,9 +868,8 @@ impl PyDataFrame {
PyList::new(py, iter).to_object(py)
}

pub fn n_chunks(&self) -> PyResult<usize> {
let n = self.df.n_chunks().map_err(PyPolarsErr::from)?;
Ok(n)
pub fn n_chunks(&self) -> usize {
self.df.n_chunks()
}

pub fn shape(&self) -> (usize, usize) {
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/unit/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,15 @@ def test_join_asof_projection() -> None:
{
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
"df1_col1": ["foo", "bar", "foo", "bar", "foo"],
"key": ["a", "b", "b", "a", "b"],
}
)

df2 = pl.DataFrame(
{
"df2_date": [20221012, 20221015, 20221018],
"df2_col1": ["1", "2", "3"],
"key": ["a", "b", "b"],
}
)

Expand All @@ -695,3 +697,11 @@ def test_join_asof_projection() -> None:
"df2_date": [None, 20221012, 20221012, 20221012, 20221015],
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
}
assert (
df1.lazy().join_asof(
df2.lazy(), by="key", left_on="df1_date", right_on="df2_date"
)
).select(["df2_date", "df1_date"]).collect().to_dict(False) == {
"df2_date": [None, None, None, 20221012, 20221015],
"df1_date": [20221011, 20221012, 20221013, 20221014, 20221016],
}

0 comments on commit d3e4075

Please sign in to comment.