Skip to content

Commit

Permalink
Add LazyFrame version of to_ipc/3 (elixir-explorer#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
treebee committed Apr 23, 2023
1 parent bcf1505 commit 73711f9
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 1 deletion.
9 changes: 8 additions & 1 deletion lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end
end

@impl true
def to_ipc(%DF{} = df, filename, {compression, _level}) do
case Native.lf_to_ipc(df.data, filename, Atom.to_string(compression)) do
{:ok, _} -> :ok
{:error, _} = err -> err
end
end

@impl true
def filter_with(
%DF{},
Expand Down Expand Up @@ -376,7 +384,6 @@ defmodule Explorer.PolarsBackend.LazyFrame do
sample: 5,
slice: 2,
to_csv: 4,
to_ipc: 3,
to_ipc_stream: 3,
to_ndjson: 2,
to_rows: 2,
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_concat_rows(_dfs), do: err()
def lf_concat_columns(_df, _others), do: err()
def lf_to_parquet(_df, _filename, _compression), do: err()
def lf_to_ipc(_df, _filename, _compression), do: err()

# Series
def s_as_str(_s), do: err()
Expand Down
26 changes: 26 additions & 0 deletions native/explorer/src/lazyframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,32 @@ pub fn lf_from_ipc(filename: &str) -> Result<ExLazyFrame, ExplorerError> {
Ok(ExLazyFrame::new(lf))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_ipc(
data: ExLazyFrame,
filename: &str,
compression: Option<&str>,
) -> Result<(), ExplorerError> {
// Select the compression algorithm.
let compression = match compression {
Some("lz4") => Some(IpcCompression::LZ4),
Some("zstd") => Some(IpcCompression::ZSTD),
_ => None,
};
let options = IpcWriterOptions {
compression,
maintain_order: false,
};
let lf = data
.clone_inner()
.with_streaming(true)
.with_common_subplan_elimination(false);

lf.sink_ipc(filename.into(), options)?;

Ok(())
}

#[rustler::nif]
#[allow(clippy::too_many_arguments)]
pub fn lf_from_csv(
Expand Down
1 change: 1 addition & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ rustler::init!(
lf_concat_rows,
lf_concat_columns,
lf_to_parquet,
lf_to_ipc,
// series
s_as_str,
s_add,
Expand Down
13 changes: 13 additions & 0 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,19 @@ defmodule Explorer.DataFrame.LazyTest do
end
end

@tag :tmp_dir
test "to_ipc/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.ipc"])

ldf = DF.head(ldf, 15)
DF.to_ipc!(ldf, path)

df = DF.collect(ldf)
df1 = DF.from_ipc!(path)

assert DF.to_rows(df1) |> Enum.sort() == DF.to_rows(df) |> Enum.sort()
end

@tag :tmp_dir
test "to_parquet/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.parquet"])
Expand Down

0 comments on commit 73711f9

Please sign in to comment.