Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: as_nanoarraw_array_stream(<RPolarsSeries>) #1076

Merged
merged 2 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
- New function `pl$deserialize_lf()` to deserialize a LazyFrame from a character
vector of JSON representation (#1073).
- New methods `$str$head()` and `$str$tail()` (#1074).
- New S3 methods `nanoarrow::as_nanoarrow_array_stream()` and `nanoarrow::infer_nanoarrow_schema()`
for `RPolarsSeries` (#1076).

## Polars R Package 0.16.3

Expand Down
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,8 @@ RPolarsSeries$set_sorted_mut <- function(descending) invisible(.Call(wrap__RPola

RPolarsSeries$struct_fields <- function() .Call(wrap__RPolarsSeries__struct_fields, self)

RPolarsSeries$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsSeries__export_stream, self, stream_ptr, pl_flavor))

RPolarsSeries$from_arrow_array_stream_str <- function(name, robj_str) .Call(wrap__RPolarsSeries__from_arrow_array_stream_str, name, robj_str)

RPolarsSeries$from_arrow_array_robj <- function(name, array) .Call(wrap__RPolarsSeries__from_arrow_array_robj, name, array)
Expand Down
52 changes: 42 additions & 10 deletions R/pkg-nanoarrow.R
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
#' Create a nanoarrow_array_stream from a Polars object
#'
#' @inheritParams as_arrow_table.RPolarsDataFrame
#' @inheritParams DataFrame_write_ipc
#' @param x A polars object
#' @param schema must stay at default value NULL
#' @rdname S3_as_nanoarrow_array_stream
#' @examples
#' @examplesIf requireNamespace("nanoarrow", quietly = TRUE)
#' library(nanoarrow)
#' pl_df = as_polars_df(mtcars)
#'
#' nanoarrow_array_stream = as_nanoarrow_array_stream(pl_df)
#' as.data.frame(nanoarrow_array_stream)
#' pl_df = as_polars_df(mtcars)$head(5)
#' pl_s = as_polars_series(letters[1:5])
#'
#' as.data.frame(as_nanoarrow_array_stream(pl_df))
#' as.vector(as_nanoarrow_array_stream(pl_s))
# exported in zzz.R
as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL, future = FALSE) {
uw = \(res) unwrap("in as_nanoarrow_array_stream(<RPolarsDataFrame>):")
uw = \(res) unwrap("in as_nanoarrow_array_stream():")

# Don't support the schema argument yet
if (!is.null(schema)) {
Expand All @@ -31,18 +33,48 @@ as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL, fut
}


# TODO: export the `$export_stream` method and combine the two functions
#' @rdname S3_as_nanoarrow_array_stream
# exported in zzz.R
as_nanoarrow_array_stream.RPolarsSeries = function(x, ..., schema = NULL, future = FALSE) {
uw = \(res) unwrap("in as_nanoarrow_array_stream():")

if (!is.null(schema)) {
Err_plain("The `schema` argument is not supported yet") |>
uw()
}

if (!is_scalar_bool(future)) {
Err_plain("`future` argument must be `TRUE` or `FALSE`") |>
uw()
}

stream = nanoarrow::nanoarrow_allocate_array_stream()
.pr$Series$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream), future)
stream
}


#' Infer nanoarrow schema from a Polars object
#'
#' @inheritParams as_nanoarrow_array_stream.RPolarsDataFrame
#' @rdname S3_infer_nanoarrow_schema
#' @examples
#' @examplesIf requireNamespace("nanoarrow", quietly = TRUE)
#' library(nanoarrow)
#' pl_df = as_polars_df(mtcars)
#'
#' pl_df = as_polars_df(mtcars)$select("mpg", "cyl")
#' pl_s = as_polars_series(letters)
#'
#' infer_nanoarrow_schema(pl_df)
#' infer_nanoarrow_schema(pl_s)
# exported in zzz.R
infer_nanoarrow_schema.RPolarsDataFrame = function(x, ..., future = FALSE) {
as_nanoarrow_array_stream.RPolarsDataFrame(x, future = future)$get_schema() |>
nanoarrow::as_nanoarrow_array_stream(x, future = future)$get_schema() |>
result() |>
unwrap("in infer_nanoarrow_schema(<RPolarsDataFrame>):")
unwrap("in infer_nanoarrow_schema():")
}


#' @rdname S3_infer_nanoarrow_schema
# exported in zzz.R
infer_nanoarrow_schema.RPolarsSeries = infer_nanoarrow_schema.RPolarsDataFrame
2 changes: 2 additions & 0 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ replace_private_with_pub_methods(pl, "^pl_")

# register S3 methods for packages in Suggests
s3_register("nanoarrow::as_nanoarrow_array_stream", "RPolarsDataFrame")
s3_register("nanoarrow::as_nanoarrow_array_stream", "RPolarsSeries")
s3_register("nanoarrow::infer_nanoarrow_schema", "RPolarsDataFrame")
s3_register("nanoarrow::infer_nanoarrow_schema", "RPolarsSeries")
s3_register("arrow::as_record_batch_reader", "RPolarsDataFrame")
s3_register("arrow::as_arrow_table", "RPolarsDataFrame")
s3_register("knitr::knit_print", "RPolarsDataFrame")
Expand Down
15 changes: 11 additions & 4 deletions man/S3_as_nanoarrow_array_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions man/S3_infer_nanoarrow_schema.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions src/rust/src/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,42 @@ pub const R_INT_NA_ENC: i32 = -2147483648;
use crate::rpolarserr::polars_to_rpolars_err;
use std::result::Result;

use polars_core::error::PolarsError;
use polars_core::utils::arrow;

pub struct OwnedSeriesIterator {
series: pl::Series,
idx: usize,
n_chunks: usize,
pl_flavor: bool,
}

impl OwnedSeriesIterator {
pub fn new(s: pl::Series, pl_flavor: bool) -> Self {
Self {
series: s.slice(0, s.len()),
idx: 0,
n_chunks: s.n_chunks(),
pl_flavor,
}
}
}

impl Iterator for OwnedSeriesIterator {
type Item = Result<Box<dyn arrow::array::Array>, PolarsError>;

fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.n_chunks {
None
} else {
let batch = self.series.to_arrow(self.idx, self.pl_flavor);
self.idx += 1;

Some(std::result::Result::Ok(batch))
}
}
}

#[derive(Debug, Clone)]
pub struct RPolarsSeries(pub pl::Series);

Expand Down Expand Up @@ -522,6 +558,23 @@ impl RPolarsSeries {
Ok(ca.fields().iter().map(|s| s.name()).collect())
}

pub fn export_stream(&self, stream_ptr: &str, pl_flavor: bool) {
let data_type = self.0.dtype().to_arrow(pl_flavor);
let field = pl::ArrowField::new("", data_type, false);

let iter_boxed = Box::new(OwnedSeriesIterator::new(self.0.clone(), pl_flavor));
let mut stream = arrow::ffi::export_iterator(iter_boxed, field);
let stream_out_ptr_addr: usize = stream_ptr.parse().unwrap();
let stream_out_ptr = stream_out_ptr_addr as *mut arrow::ffi::ArrowArrayStream;
unsafe {
std::ptr::swap_nonoverlapping(
stream_out_ptr,
&mut stream as *mut arrow::ffi::ArrowArrayStream,
1,
);
}
}

pub fn from_arrow_array_stream_str(name: Robj, robj_str: Robj) -> RResult<Robj> {
let name = robj_to!(str, name)?;
let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?
Expand Down
37 changes: 19 additions & 18 deletions tests/testthat/_snaps/after-wrappers.md
Original file line number Diff line number Diff line change
Expand Up @@ -711,24 +711,25 @@
[9] "clear" "clone"
[11] "compare" "div"
[13] "dtype" "equals"
[15] "fast_explode_flag" "from_arrow_array_robj"
[17] "from_arrow_array_stream_str" "get_fmt"
[19] "is_sorted" "is_sorted_flag"
[21] "is_sorted_reverse_flag" "len"
[23] "map_elements" "max"
[25] "mean" "median"
[27] "min" "mul"
[29] "n_chunks" "n_unique"
[31] "name" "new"
[33] "panic" "print"
[35] "rem" "rename_mut"
[37] "rep" "set_sorted_mut"
[39] "shape" "sleep"
[41] "sort" "std"
[43] "struct_fields" "sub"
[45] "sum" "to_fmt_char"
[47] "to_frame" "to_r"
[49] "value_counts" "var"
[15] "export_stream" "fast_explode_flag"
[17] "from_arrow_array_robj" "from_arrow_array_stream_str"
[19] "get_fmt" "is_sorted"
[21] "is_sorted_flag" "is_sorted_reverse_flag"
[23] "len" "map_elements"
[25] "max" "mean"
[27] "median" "min"
[29] "mul" "n_chunks"
[31] "n_unique" "name"
[33] "new" "panic"
[35] "print" "rem"
[37] "rename_mut" "rep"
[39] "set_sorted_mut" "shape"
[41] "sleep" "sort"
[43] "std" "struct_fields"
[45] "sub" "sum"
[47] "to_fmt_char" "to_frame"
[49] "to_r" "value_counts"
[51] "var"

# public and private methods of each class RThreadHandle

Expand Down
32 changes: 32 additions & 0 deletions tests/testthat/test-pkg-nanoarrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,28 @@ test_that("as_nanoarrow_array_stream() works for DataFrame", {
)
})


test_that("as_nanoarrow_array_stream() works for Series", {
skip_if_not_installed("nanoarrow")

s = as_polars_series(letters[1:3])
stream = nanoarrow::as_nanoarrow_array_stream(s)
expect_s3_class(stream, "nanoarrow_array_stream")
expect_identical(
as.vector(stream),
letters[1:3]
)

# nanoarrow does not support the string view type yet
# https://github.com/apache/arrow-nanoarrow/pull/367
expect_grepl_error(
nanoarrow::as_nanoarrow_array_stream(s, future = TRUE) |>
as.vector(),
"Unknown format: 'vu'"
)
})


test_that("infer_nanoarrow_schema() works for DataFrame", {
skip_if_not_installed("nanoarrow")

Expand All @@ -26,3 +48,13 @@ test_that("infer_nanoarrow_schema() works for DataFrame", {
inferred_schema = nanoarrow::infer_nanoarrow_schema(df)
expect_identical(format(stream_schema), format(inferred_schema))
})


test_that("infer_nanoarrow_schema() works for Series", {
skip_if_not_installed("nanoarrow")

s = as_polars_series(letters[1:3])
stream_schema = nanoarrow::as_nanoarrow_array_stream(s)$get_schema()
inferred_schema = nanoarrow::infer_nanoarrow_schema(s)
expect_identical(format(stream_schema), format(inferred_schema))
})
Loading