Skip to content

Commit

Permalink
+ read_record_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
romainfrancois committed Sep 24, 2018
1 parent f27eeba commit fa4ee22
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 11 deletions.
1 change: 1 addition & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(int64)
export(int8)
export(list_of)
export(null)
export(read_record_batch)
export(record_batch)
export(schema)
export(struct)
Expand Down
4 changes: 4 additions & 0 deletions r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ RecordBatch_to_file <- function(batch, path) {
.Call(`_arrow_RecordBatch_to_file`, batch, path)
}

read_record_batch_ <- function(path) {
.Call(`_arrow_read_record_batch_`, path)
}

Field_initialize <- function(name, type, nullable = TRUE) {
.Call(`_arrow_Field_initialize`, name, type, nullable)
}
Expand Down
21 changes: 10 additions & 11 deletions r/R/array.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@ array <- function(...){

`arrow::RecordBatch` <- R6Class("arrow::RecordBatch", inherit = `arrow::Object`,
public = list(
initialize = function(.data){
self$set_pointer(dataframe_to_RecordBatch(.data))
},
num_columns = function() RecordBatch_num_columns(self),
num_rows = function() RecordBatch_num_rows(self),
schema = function() schema(.xp = RecordBatch_schema(self)),
to_file = function(path) RecordBatch_to_file(self, fs::path_abs(path))
schema = function() `arrow::Schema`$new(RecordBatch_schema(self)),
to_file = function(path) invisible(RecordBatch_to_file(self, fs::path_abs(path)))
)
)

Expand All @@ -79,17 +76,19 @@ array <- function(...){
#'
#' @export
record_batch <- function(.data){
`arrow::RecordBatch`$new(.data)
`arrow::RecordBatch`$new(dataframe_to_RecordBatch(.data))
}

#' @export
read_record_batch <- function(path){
`arrow::RecordBatch`$new(read_record_batch_(fs::path_abs(path)))
}

`arrow::Table` <- R6Class("arrow::Table", inherit = `arrow::Object`,
public = list(
initialize = function(.data){
self$set_pointer(dataframe_to_Table(.data))
},
num_columns = function() Table_num_columns(self),
num_rows = function() Table_num_rows(self),
schema = function() schema(.xp = Table_schema(self))
schema = function() `arrow::Schema`$new(Table_schema(self))
)
)

Expand All @@ -99,5 +98,5 @@ record_batch <- function(.data){
#'
#' @export
table <- function(.data){
`arrow::Table`$new(.data)
`arrow::Table`$new(dataframe_to_Table(.data))
}
12 changes: 12 additions & 0 deletions r/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// read_record_batch_
std::shared_ptr<arrow::RecordBatch> read_record_batch_(std::string path);
RcppExport SEXP _arrow_read_record_batch_(SEXP pathSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
rcpp_result_gen = Rcpp::wrap(read_record_batch_(path));
return rcpp_result_gen;
END_RCPP
}
// Field_initialize
std::shared_ptr<arrow::Field> Field_initialize(const std::string& name, const std::shared_ptr<arrow::DataType>& type, bool nullable);
RcppExport SEXP _arrow_Field_initialize(SEXP nameSEXP, SEXP typeSEXP, SEXP nullableSEXP) {
Expand Down Expand Up @@ -860,6 +871,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_Table_num_rows", (DL_FUNC) &_arrow_Table_num_rows, 1},
{"_arrow_Table_schema", (DL_FUNC) &_arrow_Table_schema, 1},
{"_arrow_RecordBatch_to_file", (DL_FUNC) &_arrow_RecordBatch_to_file, 2},
{"_arrow_read_record_batch_", (DL_FUNC) &_arrow_read_record_batch_, 1},
{"_arrow_Field_initialize", (DL_FUNC) &_arrow_Field_initialize, 3},
{"_arrow_Field_ToString", (DL_FUNC) &_arrow_Field_ToString, 1},
{"_arrow_Field_name", (DL_FUNC) &_arrow_Field_name, 1},
Expand Down
15 changes: 15 additions & 0 deletions r/src/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow_types.h"
#include <arrow/io/file.h>
#include <arrow/ipc/writer.h>
#include <arrow/ipc/reader.h>

using namespace Rcpp;
using namespace arrow;
Expand Down Expand Up @@ -154,3 +155,17 @@ int RecordBatch_to_file(const std::shared_ptr<arrow::RecordBatch>& batch, std::s
return offset;
}

// [[Rcpp::export]]
std::shared_ptr<arrow::RecordBatch> read_record_batch_(std::string path) {
std::shared_ptr<arrow::io::ReadableFile> stream;
std::shared_ptr<arrow::ipc::RecordBatchFileReader> rbf_reader;

R_ERROR_NOT_OK(arrow::io::ReadableFile::Open(path, &stream));
R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(stream, &rbf_reader));

std::shared_ptr<arrow::RecordBatch> batch;
R_ERROR_NOT_OK(rbf_reader->ReadRecordBatch(0, &batch));

R_ERROR_NOT_OK(stream->Close());
return batch;
}

0 comments on commit fa4ee22

Please sign in to comment.