Skip to content

Commit

Permalink
[r] Replace SOMAArray read and write calls with ManagedQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenv committed Feb 5, 2025
1 parent 4013fa2 commit 193112d
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 152 deletions.
5 changes: 1 addition & 4 deletions apis/python/src/tiledbsoma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ void load_managed_query(py::module& m) {
py::init([](SOMAArray array,
std::shared_ptr<SOMAContext> ctx,
std::string_view name) {
return ManagedQuery(
std::make_unique<SOMAArray>(array),
ctx->tiledb_ctx(),
name);
return ManagedQuery(array, ctx->tiledb_ctx(), name);
}),
py::arg("array"),
py::arg("ctx"),
Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/BlockwiseIter.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ BlockwiseReadIterBase <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
return(NULL)
}
sr_reset(private$soma_reader_pointer)
mq_reset(private$soma_reader_pointer)
return(invisible(NULL))
},
# @description Re-index an Arrow table
Expand Down Expand Up @@ -237,7 +237,7 @@ BlockwiseReadIterBase <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
return(NULL)
}
sr_set_dim_points(private$soma_reader_pointer, dimname, points)
mq_set_dim_points(private$soma_reader_pointer, dimname, points)
return(invisible(NULL))
}
)
Expand Down
42 changes: 21 additions & 21 deletions apis/r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ c_update_dataframe_schema <- function(uri, ctxxp, column_names_to_drop, add_cols
invisible(.Call(`_tiledbsoma_c_update_dataframe_schema`, uri, ctxxp, column_names_to_drop, add_cols_types, add_cols_enum_value_types, add_cols_enum_ordered))
}

#' Iterator-Style Access to SOMA Array via SOMAArray
#' Iterator-Style Access to SOMA Array via ManagedQuery
#'
#' The `sr_*` functions provide low-level access to an instance of the SOMAArray
#' The `mq_*` functions provide low-level access to an instance of a ManagedQuery
#' class so that iterative access over parts of a (large) array is possible.
#' \describe{
#' \item{\code{sr_setup}}{instantiates and by default also submits a query}
#' \item{\code{sr_complete}}{checks if more data is available}
#' \item{\code{sr_next}}{returns the next chunk}
#' \item{\code{mq_setup}}{instantiates and by default also submits a query}
#' \item{\code{mq_complete}}{checks if more data is available}
#' \item{\code{mq_next}}{returns the next chunk}
#' }
#'
#' @param uri Character value with URI path to a SOMA data set
Expand All @@ -289,20 +289,20 @@ c_update_dataframe_schema <- function(uri, ctxxp, column_names_to_drop, add_cols
#' new logging level.
#' @param timestamprange Optional POSIXct (i.e. Datetime) vector with start
#' and end of ' interval for which data is considered.
#' @param sr An external pointer to a TileDB SOMAArray object.
#' @param mq An external pointer to a ManagedQuery object.
#'
#' @return \code{sr_setup} returns an external pointer to a SOMAArray.
#' \code{sr_complete} ' returns a boolean, and \code{sr_next} returns an Arrow
#' @return \code{mq_setup} returns an external pointer to a ManagedQuery object.
#' \code{mq_complete} ' returns a boolean, and \code{mq_next} returns an Arrow
#' array helper object.
#'
#' @examples
#' \dontrun{
#' uri <- extract_dataset("soma-dataframe-pbmc3k-processed-obs")
#' ctxcp <- soma_context()
#' sr <- sr_setup(uri, ctxxp)
#' mq <- mq_setup(uri, ctxxp)
#' rl <- data.frame()
#' while (!sr_complete(sr)) {
#' dat <- sr_next(sr)
#' while (!mq_complete(mq)) {
#' dat <- mq_next(mq)
#' rb <- arrow::RecordBatch$import_from_c(dat$array_data, dat$schema)
#' rl <- rbind(rl, as.data.frame(rb))
#' }
Expand All @@ -311,12 +311,12 @@ c_update_dataframe_schema <- function(uri, ctxxp, column_names_to_drop, add_cols
#' @noRd
NULL

sr_setup <- function(uri, ctxxp, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", timestamprange = NULL, loglevel = "auto") {
.Call(`_tiledbsoma_sr_setup`, uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel)
mq_setup <- function(uri, ctxxp, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", timestamprange = NULL, loglevel = "auto") {
.Call(`_tiledbsoma_mq_setup`, uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel)
}

sr_complete <- function(sr) {
.Call(`_tiledbsoma_sr_complete`, sr)
mq_complete <- function(mq) {
.Call(`_tiledbsoma_mq_complete`, mq)
}

#' @noRd
Expand All @@ -325,16 +325,16 @@ create_empty_arrow_table <- function() {
.Call(`_tiledbsoma_create_empty_arrow_table`)
}

sr_next <- function(sr) {
.Call(`_tiledbsoma_sr_next`, sr)
mq_next <- function(mq) {
.Call(`_tiledbsoma_mq_next`, mq)
}

sr_reset <- function(sr) {
invisible(.Call(`_tiledbsoma_sr_reset`, sr))
mq_reset <- function(mq) {
invisible(.Call(`_tiledbsoma_mq_reset`, mq))
}

sr_set_dim_points <- function(sr, dim, points) {
invisible(.Call(`_tiledbsoma_sr_set_dim_points`, sr, dim, points))
mq_set_dim_points <- function(mq, dim, points) {
invisible(.Call(`_tiledbsoma_mq_set_dim_points`, mq, dim, points))
}

#' TileDB SOMA statistics
Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/ReadIter.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ReadIter <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
TRUE
} else {
sr_complete(private$soma_reader_pointer)
mq_complete(private$soma_reader_pointer)
}
},

Expand Down Expand Up @@ -57,7 +57,7 @@ ReadIter <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
return(NULL)
}
rl <- sr_next(private$soma_reader_pointer)
rl <- mq_next(private$soma_reader_pointer)
return(private$soma_reader_transform(rl))
},

Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/SOMADataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ SOMADataFrame <- R6::R6Class(
value_filter <- parsed@ptr
}
spdl::debug(
"[SOMADataFrame$read] calling sr_setup for {} at ({},{})", self$uri,
"[SOMADataFrame$read] calling mq_setup for {} at ({},{})", self$uri,
private$tiledb_timestamp[1], private$tiledb_timestamp[2]
)
sr <- sr_setup(
sr <- mq_setup(
uri = self$uri,
private$.soma_context,
colnames = column_names,
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/SOMANDArrayBase.R
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ SOMANDArrayBase <- R6::R6Class(
},

# @description Converts a list of vectors corresponding to coords to a
# format acceptable for sr_setup and soma_array_reader
# format acceptable for mq_setup and soma_array_reader
.convert_coords = function(coords) {
# Ensure coords is a named list, use to select dim points
stopifnot(
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/SOMASparseNDArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ SOMASparseNDArray <- R6::R6Class(
coords <- private$.convert_coords(coords)
}

sr <- sr_setup(
sr <- mq_setup(
uri = self$uri,
private$.soma_context,
dim_points = coords,
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/utils-readerTransformers.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' Transformer function: SOMAArray to Arrow table
#'
#' @description Converts the results of a \link{soma_array_reader} or
#' \link{sr_next} to an arrow::\link[arrow]{Table}
#' \link{mq_next} to an arrow::\link[arrow]{Table}
#' @param x A nanoarrow_array object which is itself a wrapper around the external pointer
#' to the Arrow array data; the schema external pointer is added to it as well
#' @return arrow::\link[arrow]{Table}
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ uns_hint <- function(type = c("1d", "2d")) {
}
x$reopen("READ", tiledb_timestamp = x$tiledb_timestamp)
dimname <- x$dimnames()[axis + 1L]
sr <- sr_setup(
sr <- mq_setup(
uri = x$uri,
soma_context(),
colnames = dimname,
Expand Down
4 changes: 2 additions & 2 deletions apis/r/src/Makevars.in
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CXX_STD = CXX20
#CXX_STD = CXX20

## We need the TileDB Headers, and for macOS aka Darwin need to set minimum version 13.3 for macOS
PKG_CPPFLAGS = -I. -I../inst/include/ @tiledb_include@ @cxx20_macos@ -DSPDLOG_USE_STD_FORMAT
PKG_CPPFLAGS = -std=c++20 -I. -I../inst/include/ @tiledb_include@ @cxx20_macos@ -DSPDLOG_USE_STD_FORMAT

## We also need the TileDB library
PKG_LIBS = @cxx20_macos@ @tiledb_libs@ @tiledb_rpath@
Expand Down
58 changes: 29 additions & 29 deletions apis/r/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,9 @@ BEGIN_RCPP
return R_NilValue;
END_RCPP
}
// sr_setup
Rcpp::XPtr<tdbs::SOMAArray> sr_setup(const std::string& uri, Rcpp::XPtr<somactx_wrap_t> ctxxp, Rcpp::Nullable<Rcpp::CharacterVector> colnames, Rcpp::Nullable<Rcpp::XPtr<tiledb::QueryCondition>> qc, Rcpp::Nullable<Rcpp::List> dim_points, Rcpp::Nullable<Rcpp::List> dim_ranges, std::string batch_size, std::string result_order, Rcpp::Nullable<Rcpp::DatetimeVector> timestamprange, const std::string& loglevel);
RcppExport SEXP _tiledbsoma_sr_setup(SEXP uriSEXP, SEXP ctxxpSEXP, SEXP colnamesSEXP, SEXP qcSEXP, SEXP dim_pointsSEXP, SEXP dim_rangesSEXP, SEXP batch_sizeSEXP, SEXP result_orderSEXP, SEXP timestamprangeSEXP, SEXP loglevelSEXP) {
// mq_setup
Rcpp::XPtr<tdbs::ManagedQuery> mq_setup(const std::string& uri, Rcpp::XPtr<somactx_wrap_t> ctxxp, Rcpp::Nullable<Rcpp::CharacterVector> colnames, Rcpp::Nullable<Rcpp::XPtr<tiledb::QueryCondition>> qc, Rcpp::Nullable<Rcpp::List> dim_points, Rcpp::Nullable<Rcpp::List> dim_ranges, std::string batch_size, std::string result_order, Rcpp::Nullable<Rcpp::DatetimeVector> timestamprange, const std::string& loglevel);
RcppExport SEXP _tiledbsoma_mq_setup(SEXP uriSEXP, SEXP ctxxpSEXP, SEXP colnamesSEXP, SEXP qcSEXP, SEXP dim_pointsSEXP, SEXP dim_rangesSEXP, SEXP batch_sizeSEXP, SEXP result_orderSEXP, SEXP timestamprangeSEXP, SEXP loglevelSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Expand All @@ -643,18 +643,18 @@ BEGIN_RCPP
Rcpp::traits::input_parameter< std::string >::type result_order(result_orderSEXP);
Rcpp::traits::input_parameter< Rcpp::Nullable<Rcpp::DatetimeVector> >::type timestamprange(timestamprangeSEXP);
Rcpp::traits::input_parameter< const std::string& >::type loglevel(loglevelSEXP);
rcpp_result_gen = Rcpp::wrap(sr_setup(uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel));
rcpp_result_gen = Rcpp::wrap(mq_setup(uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel));
return rcpp_result_gen;
END_RCPP
}
// sr_complete
bool sr_complete(Rcpp::XPtr<tdbs::SOMAArray> sr);
RcppExport SEXP _tiledbsoma_sr_complete(SEXP srSEXP) {
// mq_complete
bool mq_complete(Rcpp::XPtr<tdbs::ManagedQuery> mq);
RcppExport SEXP _tiledbsoma_mq_complete(SEXP mqSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
rcpp_result_gen = Rcpp::wrap(sr_complete(sr));
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
rcpp_result_gen = Rcpp::wrap(mq_complete(mq));
return rcpp_result_gen;
END_RCPP
}
Expand All @@ -668,36 +668,36 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// sr_next
SEXP sr_next(Rcpp::XPtr<tdbs::SOMAArray> sr);
RcppExport SEXP _tiledbsoma_sr_next(SEXP srSEXP) {
// mq_next
SEXP mq_next(Rcpp::XPtr<tdbs::ManagedQuery> mq);
RcppExport SEXP _tiledbsoma_mq_next(SEXP mqSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
rcpp_result_gen = Rcpp::wrap(sr_next(sr));
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
rcpp_result_gen = Rcpp::wrap(mq_next(mq));
return rcpp_result_gen;
END_RCPP
}
// sr_reset
void sr_reset(Rcpp::XPtr<tdbs::SOMAArray> sr);
RcppExport SEXP _tiledbsoma_sr_reset(SEXP srSEXP) {
// mq_reset
void mq_reset(Rcpp::XPtr<tdbs::ManagedQuery> mq);
RcppExport SEXP _tiledbsoma_mq_reset(SEXP mqSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
sr_reset(sr);
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
mq_reset(mq);
return R_NilValue;
END_RCPP
}
// sr_set_dim_points
void sr_set_dim_points(Rcpp::XPtr<tdbs::SOMAArray> sr, std::string dim, Rcpp::NumericVector points);
RcppExport SEXP _tiledbsoma_sr_set_dim_points(SEXP srSEXP, SEXP dimSEXP, SEXP pointsSEXP) {
// mq_set_dim_points
void mq_set_dim_points(Rcpp::XPtr<tdbs::ManagedQuery> mq, std::string dim, Rcpp::NumericVector points);
RcppExport SEXP _tiledbsoma_mq_set_dim_points(SEXP mqSEXP, SEXP dimSEXP, SEXP pointsSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
Rcpp::traits::input_parameter< std::string >::type dim(dimSEXP);
Rcpp::traits::input_parameter< Rcpp::NumericVector >::type points(pointsSEXP);
sr_set_dim_points(sr, dim, points);
mq_set_dim_points(mq, dim, points);
return R_NilValue;
END_RCPP
}
Expand Down Expand Up @@ -845,12 +845,12 @@ static const R_CallMethodDef CallEntries[] = {
{"_tiledbsoma_tiledbsoma_upgrade_shape", (DL_FUNC) &_tiledbsoma_tiledbsoma_upgrade_shape, 5},
{"_tiledbsoma_upgrade_or_change_domain", (DL_FUNC) &_tiledbsoma_upgrade_or_change_domain, 7},
{"_tiledbsoma_c_update_dataframe_schema", (DL_FUNC) &_tiledbsoma_c_update_dataframe_schema, 6},
{"_tiledbsoma_sr_setup", (DL_FUNC) &_tiledbsoma_sr_setup, 10},
{"_tiledbsoma_sr_complete", (DL_FUNC) &_tiledbsoma_sr_complete, 1},
{"_tiledbsoma_mq_setup", (DL_FUNC) &_tiledbsoma_mq_setup, 10},
{"_tiledbsoma_mq_complete", (DL_FUNC) &_tiledbsoma_mq_complete, 1},
{"_tiledbsoma_create_empty_arrow_table", (DL_FUNC) &_tiledbsoma_create_empty_arrow_table, 0},
{"_tiledbsoma_sr_next", (DL_FUNC) &_tiledbsoma_sr_next, 1},
{"_tiledbsoma_sr_reset", (DL_FUNC) &_tiledbsoma_sr_reset, 1},
{"_tiledbsoma_sr_set_dim_points", (DL_FUNC) &_tiledbsoma_sr_set_dim_points, 3},
{"_tiledbsoma_mq_next", (DL_FUNC) &_tiledbsoma_mq_next, 1},
{"_tiledbsoma_mq_reset", (DL_FUNC) &_tiledbsoma_mq_reset, 1},
{"_tiledbsoma_mq_set_dim_points", (DL_FUNC) &_tiledbsoma_mq_set_dim_points, 3},
{"_tiledbsoma_tiledbsoma_stats_enable", (DL_FUNC) &_tiledbsoma_tiledbsoma_stats_enable, 0},
{"_tiledbsoma_tiledbsoma_stats_disable", (DL_FUNC) &_tiledbsoma_tiledbsoma_stats_disable, 0},
{"_tiledbsoma_tiledbsoma_stats_reset", (DL_FUNC) &_tiledbsoma_tiledbsoma_stats_reset, 0},
Expand Down
9 changes: 5 additions & 4 deletions apis/r/src/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void writeArrayFromArrow(
// optional timestamp range
std::optional<tdbs::TimestampRange> tsrng = makeTimestampRange(tsvec);

std::shared_ptr<tdbs::SOMAArray> arrup;
std::unique_ptr<tdbs::SOMAArray> arrup;
if (arraytype == "SOMADataFrame") {
arrup = tdbs::SOMADataFrame::open(
OpenMode::write,
Expand Down Expand Up @@ -257,7 +257,8 @@ void writeArrayFromArrow(
Rcpp::stop(tfm::format("Unexpected array type '%s'", arraytype));
}

arrup.get()->set_array_data(std::move(schema), std::move(array));
arrup.get()->write();
arrup.get()->close();
auto mq = tdbs::ManagedQuery(*arrup, somactx->tiledb_ctx());
mq.set_array_data(std::move(schema), std::move(array));
mq.submit_write();
mq.close();
}
15 changes: 9 additions & 6 deletions apis/r/src/rinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ SEXP soma_array_reader(
tdb_result_order,
tsrng);

auto mq = tdbs::ManagedQuery(*sr, somactx->tiledb_ctx());

std::unordered_map<std::string, std::shared_ptr<tiledb::Dimension>>
name2dim;
std::shared_ptr<tiledb::ArraySchema> schema = sr->tiledb_schema();
Expand All @@ -120,7 +122,7 @@ SEXP soma_array_reader(
if (!qc.isNull()) {
spdl::info("[soma_array_reader_impl] Applying query condition");
Rcpp::XPtr<tiledb::QueryCondition> qcxp(qc);
sr->set_condition(*qcxp);
mq.set_condition(*qcxp);
}

// If we have dimension points, apply them
Expand All @@ -129,18 +131,18 @@ SEXP soma_array_reader(
// point is applied to the named dimension
if (!dim_points.isNull()) {
Rcpp::List lst(dim_points);
apply_dim_points(sr.get(), name2dim, lst);
apply_dim_points(&mq, name2dim, lst);
}

// If we have a dimension points, apply them
if (!dim_ranges.isNull()) {
Rcpp::List lst(dim_ranges);
apply_dim_ranges(sr.get(), name2dim, lst);
apply_dim_ranges(&mq, name2dim, lst);
}

// Getting next batch: std::optional<std::shared_ptr<ArrayBuffers>>
auto sr_data = sr->read_next();
if (!sr->results_complete()) {
auto sr_data = mq.read_next();
if (!mq.results_complete()) {
Rcpp::stop(
"Read of '%s' is incomplete.\nConsider increasing the memory "
"allocation via the configuration\noption "
Expand Down Expand Up @@ -226,7 +228,8 @@ void set_log_level(const std::string& level) {
Rcpp::CharacterVector get_column_types(
const std::string& uri, const std::vector<std::string>& colnames) {
auto sr = tdbs::SOMAArray::open(OpenMode::read, uri);
auto sr_data = sr->read_next();
auto mq = tdbs::ManagedQuery(*sr, sr->ctx()->tiledb_ctx());
auto sr_data = mq.read_next();
size_t n = colnames.size();
Rcpp::CharacterVector vs(n);
for (size_t i = 0; i < n; i++) {
Expand Down
Loading

0 comments on commit 193112d

Please sign in to comment.