Skip to content

Commit

Permalink
Add LazyFrame version of to_parquet/3 (#496)
Browse files Browse the repository at this point in the history
* Refactor the way we pass the parquet compression config

This makes the representation of parquet compression much simplier,
with an enum. We validate that at the Elixir side.

* Add LazyFrame version of `to_parquet/3`

This is going to save a lazy frame to a parquet file using a stream.
This should keep the memory usage low for bigger files.
  • Loading branch information
Philip Sampaio authored Feb 6, 2023
1 parent c57bffd commit 0d48126
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 48 deletions.
12 changes: 10 additions & 2 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,25 @@ defmodule Explorer.PolarsBackend.DataFrame do

@impl true
def to_parquet(%DataFrame{data: df}, filename, {compression, compression_level}) do
case Native.df_to_parquet(df, filename, Atom.to_string(compression), compression_level) do
case Native.df_to_parquet(df, filename, parquet_compression(compression, compression_level)) do
{:ok, _} -> :ok
{:error, error} -> {:error, error}
end
end

@impl true
def dump_parquet(%DataFrame{data: df}, {compression, compression_level}) do
Native.df_dump_parquet(df, Atom.to_string(compression), compression_level)
Native.df_dump_parquet(df, parquet_compression(compression, compression_level))
end

defp parquet_compression(nil, _), do: :uncompressed

defp parquet_compression(algorithm, level) when algorithm in ~w(gzip brotli zstd)a do
{algorithm, level}
end

defp parquet_compression(algorithm, _) when algorithm in ~w(snappy lz4raw)a, do: algorithm

@impl true
def load_parquet(contents) when is_binary(contents) do
case Native.df_load_parquet(contents) do
Expand Down
13 changes: 13 additions & 0 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end
end

@impl true
def to_parquet(%DF{groups: []} = df, filename, {compression, level}) do
case Native.lf_to_parquet(df.data, filename, Shared.parquet_compression(compression, level)) do
{:ok, _} -> :ok
{:error, _} = err -> err
end
end

@impl true
def to_parquet(_df, _filename, _compression) do
raise "to_parquet/3 with groups is not supported yet for lazy frames"
end

@impl true
def filter_with(
%DF{},
Expand Down
5 changes: 3 additions & 2 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_dtypes(_df), do: err()
def df_dump_csv(_df, _has_headers, _delimiter), do: err()
def df_dump_ndjson(_df), do: err()
def df_dump_parquet(_df, _compression, _compression_level), do: err()
def df_dump_parquet(_df, _compression), do: err()
def df_dump_ipc(_df, _compression), do: err()
def df_dump_ipc_stream(_df, _compression), do: err()
def df_filter_with(_df, _operation, _groups), do: err()
Expand Down Expand Up @@ -118,7 +118,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_to_ipc_stream(_df, _filename, _compression), do: err()
def df_to_lazy(_df), do: err()
def df_to_ndjson(_df, _filename), do: err()
def df_to_parquet(_df, _filename, _compression, _compression_level), do: err()
def df_to_parquet(_df, _filename, _compression), do: err()
def df_width(_df), do: err()
def df_describe(_df, _percentiles), do: err()

Expand Down Expand Up @@ -183,6 +183,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_join(_df, _other, _left_on, _right_on, _how, _suffix), do: err()
def lf_concat_rows(_dfs), do: err()
def lf_concat_columns(_df, _others), do: err()
def lf_to_parquet(_df, _filename, _compression), do: err()

# Series
def s_add(_s, _other), do: err()
Expand Down
7 changes: 7 additions & 0 deletions lib/explorer/polars_backend/shared.ex
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,11 @@ defmodule Explorer.PolarsBackend.Shared do

defp error_message({_err_type, error}) when is_binary(error), do: error
defp error_message(error), do: inspect(error)

def parquet_compression(nil, _), do: :uncompressed

def parquet_compression(algorithm, level) when algorithm in ~w(gzip brotli zstd)a,
do: {algorithm, level}

def parquet_compression(algorithm, _) when algorithm in ~w(snappy lz4raw)a, do: algorithm
end
1 change: 1 addition & 0 deletions native/explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ features = [
"temporal",
"to_dummies",
"is_in",
"streaming",
"strings",
"round_series"
]
Expand Down
52 changes: 9 additions & 43 deletions native/explorer/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::{BufReader, BufWriter, Cursor};
use std::result::Result;

use crate::dataframe::normalize_numeric_dtypes;
use crate::datatypes::ExParquetCompression;
use crate::{ExDataFrame, ExplorerError};

fn finish_reader<R>(reader: impl SerReader<R>) -> Result<ExDataFrame, ExplorerError>
Expand Down Expand Up @@ -200,63 +201,28 @@ pub fn df_from_parquet(filename: &str) -> Result<ExDataFrame, ExplorerError> {
pub fn df_to_parquet(
data: ExDataFrame,
filename: &str,
compression: Option<&str>,
compression_level: Option<i32>,
ex_compression: ExParquetCompression,
) -> Result<(), ExplorerError> {
let file = File::create(filename)?;
let mut buf_writer = BufWriter::new(file);
let compression = parquet_compression(compression, compression_level)?;

let compression = ParquetCompression::try_from(ex_compression)?;

ParquetWriter::new(&mut buf_writer)
.with_compression(compression)
.finish(&mut data.clone_inner())?;
Ok(())
}

fn parquet_compression(
compression: Option<&str>,
compression_level: Option<i32>,
) -> Result<ParquetCompression, ExplorerError> {
let compression_type = match (compression, compression_level) {
(Some("snappy"), _) => ParquetCompression::Snappy,
(Some("gzip"), level) => {
let level = match level {
Some(level) => Some(GzipLevel::try_new(u8::try_from(level)?)?),
None => None,
};
ParquetCompression::Gzip(level)
}
(Some("brotli"), level) => {
let level = match level {
Some(level) => Some(BrotliLevel::try_new(u32::try_from(level)?)?),
None => None,
};
ParquetCompression::Brotli(level)
}
(Some("zstd"), level) => {
let level = match level {
Some(level) => Some(ZstdLevel::try_new(level)?),
None => None,
};
ParquetCompression::Zstd(level)
}
(Some("lz4raw"), _) => ParquetCompression::Lz4Raw,
_ => ParquetCompression::Uncompressed,
};

Ok(compression_type)
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_dump_parquet<'a>(
env: Env<'a>,
pub fn df_dump_parquet(
env: Env,
data: ExDataFrame,
compression: Option<&str>,
compression_level: Option<i32>,
) -> Result<Binary<'a>, ExplorerError> {
ex_compression: ExParquetCompression,
) -> Result<Binary, ExplorerError> {
let mut buf = vec![];

let compression = parquet_compression(compression, compression_level)?;
let compression = ParquetCompression::try_from(ex_compression)?;

ParquetWriter::new(&mut buf)
.with_compression(compression)
Expand Down
52 changes: 51 additions & 1 deletion native/explorer/src/datatypes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::atoms;
use crate::ExplorerError;
use chrono::prelude::*;
use polars::prelude::*;
use rustler::{Atom, NifStruct, ResourceArc};
use rustler::{Atom, NifStruct, NifTaggedEnum, ResourceArc};
use std::convert::TryInto;

pub struct ExDataFrameRef(pub DataFrame);
Expand Down Expand Up @@ -307,3 +308,52 @@ impl From<NaiveTime> for ExTime {
}
}
}

// In Elixir this would be represented like this:
// * `:uncompressed` for `ExParquetCompression::Uncompressed`
// * `{:brotli, 7}` for `ExParquetCompression::Brotli(Some(7))`
#[derive(NifTaggedEnum)]
pub enum ExParquetCompression {
Brotli(Option<u32>),
Gzip(Option<u8>),
Lz4raw,
Snappy,
Uncompressed,
Zstd(Option<i32>),
}

impl TryFrom<ExParquetCompression> for ParquetCompression {
type Error = ExplorerError;

fn try_from(value: ExParquetCompression) -> Result<Self, Self::Error> {
let compression = match value {
ExParquetCompression::Brotli(level) => {
let brotli_level = match level.map(BrotliLevel::try_new) {
// Cant' use map because of ?
Some(result) => Some(result?),
None => None,
};
ParquetCompression::Brotli(brotli_level)
}
ExParquetCompression::Gzip(level) => {
let gzip_level = match level.map(GzipLevel::try_new) {
Some(result) => Some(result?),
None => None,
};
ParquetCompression::Gzip(gzip_level)
}
ExParquetCompression::Lz4raw => ParquetCompression::Lz4Raw,
ExParquetCompression::Snappy => ParquetCompression::Snappy,
ExParquetCompression::Uncompressed => ParquetCompression::Uncompressed,
ExParquetCompression::Zstd(level) => {
let zstd_level = match level.map(ZstdLevel::try_new) {
Some(result) => Some(result?),
None => None,
};
ParquetCompression::Zstd(zstd_level)
}
};

Ok(compression)
}
}
25 changes: 25 additions & 0 deletions native/explorer/src/lazyframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use polars::prelude::*;
use std::result::Result;

use crate::dataframe::io::schema_from_dtypes_pairs;
use crate::datatypes::ExParquetCompression;
use crate::{ExLazyFrame, ExplorerError};

#[rustler::nif]
Expand All @@ -10,6 +11,30 @@ pub fn lf_from_parquet(filename: &str) -> Result<ExLazyFrame, ExplorerError> {
Ok(ExLazyFrame::new(lf))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn lf_to_parquet(
data: ExLazyFrame,
filename: &str,
ex_compression: ExParquetCompression,
) -> Result<(), ExplorerError> {
let compression = ParquetCompression::try_from(ex_compression)?;
let options = ParquetWriteOptions {
compression,
statistics: false,
row_group_size: None,
data_pagesize_limit: None,
maintain_order: false,
};
let lf = data
.clone_inner()
.with_streaming(true)
.with_common_subplan_elimination(false);

lf.sink_parquet(filename.into(), options)?;

Ok(())
}

#[rustler::nif]
pub fn lf_from_ipc(filename: &str) -> Result<ExLazyFrame, ExplorerError> {
let lf = LazyFrame::scan_ipc(filename, Default::default())?;
Expand Down
1 change: 1 addition & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ rustler::init!(
lf_join,
lf_concat_rows,
lf_concat_columns,
lf_to_parquet,
// series
s_add,
s_and,
Expand Down
13 changes: 13 additions & 0 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ defmodule Explorer.DataFrame.LazyTest do
assert DF.to_columns(df1) == DF.to_columns(df)
end

@tag :tmp_dir
test "to_parquet/2 - with defaults", %{ldf: ldf, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.parquet"])

ldf = DF.head(ldf, 15)
DF.to_parquet!(ldf, path)

df = DF.collect(ldf)
df1 = DF.from_parquet!(path)

assert DF.to_columns(df1) == DF.to_columns(df)
end

@tag :tmp_dir
test "from_ndjson/2 - with defaults", %{df: df, tmp_dir: tmp_dir} do
path = Path.join([tmp_dir, "fossil_fuels.ndjson"])
Expand Down

0 comments on commit 0d48126

Please sign in to comment.