-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgriddap_split.R
448 lines (428 loc) · 22.1 KB
/
griddap_split.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
#' Title Get ERDDAP gridded data restricted to a given season of the year
#'
#' \code{griddap_season} uses the R program 'rerddap::griddap()' to extract environmental data
#' from an 'ERDDAP' server in an (time, z, y ,x) bounding box where time is restricted to a
#' given season of the year (see below). Arguments are the same as in 'rerddap::griddap()'
#' except for the added 'season' parameter. 'read' and 'fmt' options are ignored.
#' @param datasetx Anything coercable to an object of class info. So the output of a
#' call to \code{\link{info}}, or a datasetid, which will internally be passed
#' through \code{\link{info}}
#' @param ... Dimension arguments. See examples. Can be any 1 or more of the
#' dimensions for the particular dataset - and the dimensions vary by dataset.
#' For each dimension, pass in a vector of length two, with min and max value
#' desired. at least 1 required.
#' @param fields (character) Fields to return, in a character vector.
#' @param stride (integer) How many values to get. 1 = get every value, 2 = get every other value, etc.
#' Default: 1 (i.e., get every value)
#' @param request_split A numeric vector indicating the number of splits for each dimension, used
#' to segment the request into manageable chunks. This is particularly useful for large datasets.
#' @param fmt (character) One of:
#' - nc: save output to a netcdf file
#' - memory: save output in a dataframe in memory
#' - duckdb: save data in a duckdb database
#' @param url A URL for an ERDDAP server. Default:
#' https://upwell.pfeg.noaa.gov/erddap/ - See [rerddap::eurl()] for
#' more information
#' @param store ignored
#' @param read ignored
#' @param callopts Curl options passed on to \code{\link[crul]{verb-GET}}
#' @param aggregate_file A string specifying:
#' - if is.null(aggregate_file) then a temporary file is created that does not
#' overwrite user space
#' - if "fmt = nc" the path to the NetCDF file to store the results
#' - if "fmt = duckdb" the path to the duckdb file to store the results
#' - if "fmt = memory" the value is ignored
#' @return Varies by format requested:
#' - if "fmt = nc" the path to the NetCDF file where the results are stored
#' - if "fmt = duckdb" the path to the duckdb file where the results are stored
#' - if "fmt = memory" the usual rerddap::griddap dataframe
#' @export
#'
#' @examples
#' out <- rerddap::info('erdQMekm14day')
#' request_split <- list(time = 3, altitude = 1, latitude = 1, longitude = 1)
#' res <- griddap_split(out,
#' time = c('2015-12-28','2016-01-01'),
#' latitude = c(20, 40),
#' longitude = c(220, 240),
#' fields = 'mod_current',
#' request_split = request_split
#' )
griddap_split <- function(datasetx, ..., fields = 'all', stride = 1, request_split = NULL, fmt = "nc",
url = rerddap::eurl(), store = rerddap::disk(),
read = TRUE, callopts = list(), aggregate_file = NULL) {
x <- datasetx
if (is.null(request_split)) {
print('no split is given')
print('this must be a vector the same length of the number of dimensions')
print('each elementof the vector is the number of splits in that dimension')
stop('stopped on error')
}
if ( (fmt == 'nc') | (fmt == 'duckdb')) {
if (!is.null(aggregate_file )) {
file_check <- file.exists(aggregate_file)
if (file_check) {
print('aggregate file already exists')
print('either rename the exising file or change aggregate_file')
stop('program is stopping')
}
}
}
dimargs <- list(...)
if (length(dimargs) == 0) stop("no dimension arguments passed, see ?griddap")
if (inherits(x, "info")) {
url <- x$base_url
message("info() output passed to x; setting base url to: ", url)
} else {
x <- rerddap::as.info(x, url)
}
if (attr(x, "type") != "griddap")
stop("datasetid '", attr(x, "datasetid"), "' not of type griddap")
check_dims(dimargs, x)
if (!is.null(dimargs$time)) {
check_time_range(dimargs, x)
}
call_list <- extract_rerddap_call(x, dimargs, stride, fields, url)
dimargs = call_list$dimargs
dim_args = call_list$dim_args
dimVals = call_list$dimVals
url_base = call_list$url_base
fields <- call_list$fields
nc_file <- call_list$nc_file
split_dim <- define_split(dimVals, request_split)
extract <- split_griddap_request(x, url, stride,
request_split, split_dim, fields, fmt,
callopts, nc_file, aggregate_file)
return(extract)
}
# Split and Process 'rerddap::griddap()' Requests
#
# This function handles splitting and processing requests to the `rerddap::griddap()` service,
# facilitating the handling of large datasets by dividing the requests into manageable parts.
# It supports output in different formats, including NetCDF ('nc') and DuckDB ('duckdb'), and
# performs the necessary operations to aggregate the results into a single file or database.
#
# @param info An object containing the dataset information, typically obtained from a call to
# `rerddap::info()` or similar function that provides metadata about the dataset.
# @param url The base URL of the ERDDAP server from which to fetch the data.
# @param stride An integer indicating the stride (step size) for the query, allowing for
# data thinning to reduce the volume of data retrieved.
# @param request_split A numeric vector indicating how to split the dataset query along each
# dimension to manage large datasets more effectively.
# @param split_dim A named list where each name corresponds to a dimension in the dataset and
# the values indicate how to split each dimension for the request.
# @param fields A character vector specifying which fields (variables) to retrieve from the
# dataset. A value of 'all' indicates that all available fields should be included.
# @param fmt The format of the output, with supported values including 'nc' for NetCDF files
# and 'duckdb' for DuckDB database format.
# @param aggregate_file The path to the output file or database where the aggregated results
# from the split requests should be stored.
#
# @return Depending on the `fmt`, either the path to the aggregated NetCDF file or the DuckDB
# database file containing the data retrieved and processed from the split requests.
#
split_griddap_request <- function(info, url, stride,
request_split, split_dim, fields, fmt,
callopts, nc_file, aggregate_file) {
# if fmt is duckdb, open the connection
con_db <- NULL
if (fmt == 'duckdb') {
if (is.null(aggregate_file)) {
aggregate_file = tempfile("extract", tmpdir = tempdir(), fileext = '.duckdb')
}
drv <- duckdb::duckdb()
con_db <- duckdb::dbConnect(drv, aggregate_file)
}
# if fmt is netcdf, create new netcdf file and copy attributes
if (fmt == 'nc') {
if (is.null(aggregate_file)) {
aggregate_file = tempfile("extract", tmpdir = tempdir(), fileext = '.nc')
}
return <- create_nc_file(info, fields, nc_file, aggregate_file )
return <- copy_attributes(info, fields, nc_file, aggregate_file)
}
griddapOptsNames <- c('datasetx', names(split_dim), 'fields', "stride", "callopts")
split_names <- names(split_dim)
final_result <- NULL
griddapOpts <- list(info)
final_result <- recursive_extract(1, request_split, split_dim, griddapOpts,
stride, callopts, final_result,
info, fields, fmt, con_db, aggregate_file)
if (fmt == 'duckdb') {
duckdb::dbDisconnect(con_db, shutdown = TRUE)
duckdb::duckdb_shutdown(drv)
final_result <- aggregate_file
} else if (fmt == 'nc') {
final_result <- aggregate_file
}
return(final_result)
}
# Recursively Split and Extract Data Requests
#
# This function is designed to recursively split and process data requests based on specified
# dimensions and constraints. It is particularly useful for managing large datasets or requests
# that need to be broken down into smaller chunks to comply with server limitations or to
# optimize data retrieval. The function iterates through each dimension, applies constraints,
# and accumulates the results.
#
# @param level The current level of recursion, indicating the dimension being processed.
# @param request_split A vector specifying the number of splits for each dimension of the data
# request. The length of this vector should match the number of dimensions in the dataset.
# @param split_dim A named list where each name corresponds to a dimension and each value
# specifies the splitting criteria for that dimension.
# @param griddapOpts A list of options for the `rerddap::griddap()` call, modified recursively
# to reflect the current split criteria.
# @param stride The stride (step size) for the data retrieval, which can help reduce the volume
# of data retrieved by skipping over points.
# @param callopts A list of additional options to pass to the HTTP request function, useful for
# configuring details such as timeouts or proxy settings.
# @param final_result An accumulator for the results from each recursive call, which can be
# aggregated or processed further.
# @param info Dataset information, typically obtained from a call to `rerddap::info()` or a
# similar function, providing metadata about the dataset.
# @param fields Specifies which fields (variables) to retrieve from the dataset. Can be 'all'
# for retrieving all fields or a vector of specific field names.
# @param fmt The format of the output, which can influence how results are stored or aggregated.
# @param con_db A connection object for database formats (e.g., DuckDB) when `fmt` is set to
# a database format. This parameter is used to manage database connections during data extraction.
# @param aggregate_file The path to the file where aggregated results should be stored, applicable
# when data is being compiled into a single file.
#
# @return The final aggregated result of the data extraction process, the specifics of which
# depend on the `fmt` parameter and the nature of the `final_result` accumulation.
#
recursive_extract <- function(level, request_split, split_dim, griddapOpts,
stride, callopts, final_result,
info, fields, fmt, con_db, aggregate_file) {
split_names <- names(split_dim)
# Base case: If the level exceeds the length of request_split, perform the extraction
if (level > length(request_split)) {
final_result <- partial_extract(fields, fmt, griddapOpts, stride, callopts,
final_result, con_db, aggregate_file)
return(final_result)
}
# Recursive step: Iterate over the current dimension
temp_name <- split_names[level]
for (i in seq(1, request_split[[level]])) {
temp_value <- get_dim_constraint(request_split, split_dim, level, i)
griddapOpts[[temp_name]] <- temp_value
# Recurse to the next level with updated griddapOpts
final_result <- recursive_extract(level + 1, request_split, split_dim, griddapOpts,
stride, callopts, final_result,
info, fields, fmt, con_db, aggregate_file)
}
return(final_result)
}
# Perform a Partial Data Extraction and Aggregation
#
# This function facilitates a portion of the data extraction process from a `rerddap::griddap()` call,
# handling specific fields and formatting options. It supports aggregating the extracted data into
# various formats such as in-memory objects, DuckDB, or NetCDF files, based on the specified format.
#
# @param fields The fields (variables) to be extracted from the dataset.
# @param fmt The format for aggregating the extracted data: 'memory', 'duckdb', or 'nc' (NetCDF).
# @param griddapOpts A list of options for the `rerddap::griddap()` call, including dataset ID,
# dimensions, and any other parameters necessary for the data request.
# @param stride An integer specifying the stride (step size) for the data retrieval, useful for
# thinning the data.
# @param callopts A list of additional options to customize the HTTP request made by `rerddap::griddap()`.
# @param final_result An accumulator for the results, which varies based on the specified format. For
# 'memory', this would be a data structure holding the aggregated data; for 'duckdb' or 'nc', it might
# not be directly used.
# @param con_db A connection object for the DuckDB database, used when `fmt` is 'duckdb'.
# @param aggregate_file The file path for storing the aggregated results when `fmt` is 'nc'.
#
# @return Depending on the specified `fmt`, returns the aggregated data in the chosen format. For
# 'memory', it returns the aggregated data structure; for 'duckdb', it may return the connection
# object or an indication of success; for 'nc', it may return the path to the NetCDF file or an
# indication of success.
#
partial_extract <- function(fields, fmt, griddapOpts, stride, callopts,
final_result, con_db, aggregate_file) {
names(griddapOpts)[1] <- 'datasetx'
griddapOpts$fields <- fields
griddapOpts$stride <- stride
griddapOpts$callopts <- callopts
extract <- suppressMessages(do.call(rerddap::griddap, griddapOpts))
if (fmt == 'memory') {
final_result <- aggregate_memory(extract, final_result)
} else if (fmt == 'duckdb') {
final_result <- aggregate_duckdb(extract, con_db)
} else if (fmt == 'nc') {
final_result <- aggregate_netcdf(extract, aggregate_file)
}
return(final_result)
}
# Calculate Dimension Constraints for Data Retrieval Requests
#
# This function determines the constraints for a specific dimension of a dataset, based on the
# provided splitting criteria. It's designed to support segmented data retrieval operations by
# calculating the subset bounds for each segment of the request. This is particularly useful in
# the context of large dataset queries where requests may need to be split to avoid server
# limitations or to manage memory more efficiently.
#
# @param request_split A numeric vector indicating the number of splits for each dimension of
# the data request. The length of this vector should match the number of dimensions in the dataset.
# @param split_dim A list where each element corresponds to a dimension in the dataset and contains
# the full range of values for that dimension. The list's names should match the dimension names.
# @param var_index The index (integer) of the variable within `split_dim` for which the constraint
# is being calculated.
# @param loop_index The index (integer) within the specified split of the dimension to calculate
# the constraint for. This helps identify the specific segment of the split dimension.
#
# @return A numeric vector of length two, representing the lower and upper bounds of the constraint
# for the specified dimension segment. If the dimension is not split (`request_split[var_index] == 1`),
# it returns the full range for that dimension.
#
get_dim_constraint <- function(request_split, split_dim, var_index, loop_index){
split_names <- names(split_dim)
temp_name <- split_names[var_index]
if(request_split[var_index] == 1) {
temp_value <- split_dim[[var_index]]
temp_value <- unlist(temp_value)
temp_value <- c(temp_value[1], temp_value[length(temp_value)])
} else {
temp_value <- split_dim[[var_index]]
temp_value <- temp_value[loop_index]
temp_value <- unlist(temp_value, use.names=FALSE)
temp_value <- c(temp_value[1], temp_value[length(temp_value)])
}
return(temp_value)
}
# Define Splits for Dimension Values Based on Requested Segments
#
# This function calculates and organizes the splits for each dimension of a dataset based on the
# specified number of segments. It is useful for preparing segmented data retrieval requests,
# particularly when working with large datasets or when server limitations necessitate breaking down
# requests into smaller chunks.
#
# @param dimVals A list where each element corresponds to a dimension in the dataset and contains
# the full range of values for that dimension. The list's names should match the dimension names.
# @param request_split A numeric vector indicating the number of segments into which each dimension
# should be split. The length of this vector should match the number of dimensions in `dimVals`.
#
# @return A list of the same structure as `dimVals`, where each dimension's values are further
# organized into the specified number of segments. If a dimension is not to be split (`request_split`
# value of 1), its original range is preserved.
#
define_split <- function(dimVals, request_split) {
indices <- request_split > 1
dim_splits <- list()
for (i in seq_along(dimVals)) {
temp_name <- names(dimVals)[i]
temp_dim <- dimVals[[i]]
no_cut <- request_split[[i]]
if (no_cut == 1) {
dim_splits[[temp_name]] <- temp_dim
} else {
dim_splits[[temp_name]] <- split(temp_dim, cut(seq_along(temp_dim), no_cut, labels = FALSE))
}
}
return(dim_splits)
}
# Aggregate Extracted Data in Memory
#
# This function aggregates newly extracted data with an existing data structure in memory.
# It is designed to sequentially combine data extracts, typically used when data is being
# retrieved in segments and needs to be compiled into a single dataset.
#
# @param extract A data frame or list representing the newly extracted data to be aggregated.
# This parameter is expected to have a structure compatible with `final_result`, particularly
# in terms of column names and types if it's a data frame.
# @param final_result The current aggregate of previously extracted data. If `NULL`,
# `extract` becomes the initial dataset for aggregation. This parameter should be a data
# structure (e.g., a data frame or list) that can hold the combined result of successive
# extractions.
#
# @return Returns an updated version of `final_result` that includes the data from `extract`.
# If `final_result` was initially `NULL`, returns `extract`.
aggregate_memory <- function(extract, final_result){
if (is.null(final_result)) {
final_result <- extract
} else {
final_result$data <- rbind(final_result$data, extract$data)
}
return(final_result)
}
#Aggregate Extracted Data into a DuckDB Database
#
#This function facilitates the aggregation of newly extracted data into a DuckDB database.
#It checks if a table named 'extract' exists in the database and, depending on its presence,
#either creates the table and inserts the data or appends the data to an existing table.
#
#@param extract A data frame or list representing the newly extracted data to be aggregated
# into the DuckDB database. This parameter should contain the actual data in a format
# compatible with DuckDB table structures.
#@param con_db A DuckDB connection object created via `DBI::dbConnect()`. This connection
# is used to interact with the DuckDB database for writing the extracted data.
#
#@return This function does not return a value but performs an operation that inserts or
# appends data into a DuckDB database.
#
aggregate_duckdb <- function(extract, con_db){
# Corrected function body with appropriate variable names
query <- "SELECT table_name FROM information_schema.tables WHERE table_name = 'extract';"
result <- DBI::dbGetQuery(con_db, query) # Corrected the connection object variable name
table_exists <- nrow(result) > 0
if (!table_exists) {
DBI::dbWriteTable(con_db, "extract", extract$data)
} else {
DBI::dbWriteTable(con_db, "extract1", extract$data, append = TRUE)
DBI::dbExecute(con_db, "INSERT INTO extract SELECT * FROM extract1")
# Assuming cleanup is needed for the temporary table 'extract1'
DBI::dbExecute(con_db, "DROP TABLE extract1")
}
}
# Aggregate Extracted Data into a NetCDF File
#
# This function aggregates extracted data into an existing NetCDF file. It is particularly useful
# for sequentially adding data from multiple extracts into a single NetCDF file, handling
# alignment of dimensions and variables between the extract and the aggregate file. It supports
# time dimensions and other variables, converting time values appropriately if necessary.
#
# @param extract A list representing the newly extracted data, including metadata about the
# extraction (e.g., file names, dimensions, variables). The `summary` element of this list
# should contain the filename, dimensions (`dim`), and variables (`var`) of the extracted data.
# @param aggregate_file A string specifying the path to the NetCDF file into which the extracted
# data should be aggregated. This file should already exist and be structured to accommodate
# the extracted data.
#
# @return This function does not return a value but modifies the specified NetCDF file by
# adding or updating data from the extract.
#
# @examples
# # Assuming `extract` contains the necessary structure and `aggregate_file` is the path to
# # your NetCDF file:
# aggregate_netcdf(extract, "path/to/your/aggregate_file.nc")
#
# # Note: This example assumes that `extract` and `aggregate_file.nc` are correctly prepared
# # and compatible. Actual usage will require specific setup of the NetCDF file and extracts.
#
aggregate_netcdf <- function(extract, aggregate_file){
extract_file_name <- extract$summary$filename
extract_file_root <- ncdf4::nc_open(extract_file_name)
root <- ncdf4::nc_open(aggregate_file, write = TRUE)
dim_names <- names(extract$summary$dim)
field_names <- names(extract$summary$var)
start <- list()
count <- list()
for (dim_name in dim_names) {
extract_dim_values <- extract$data[[dim_name]]
if(dim_name == 'time') {
extract_dim_values <- lubridate::as_datetime(extract_dim_values)
extract_dim_values <- as.numeric(extract_dim_values)
}
file_dim_values <- ncdf4::ncvar_get(root, dim_name)
dim_indices <- which(file_dim_values >= min(extract_dim_values) & file_dim_values <= max(extract_dim_values))
start[[dim_name]] <- dim_indices[1]
count[[dim_name]] <- length(dim_indices)
}
start <-rev( unlist(start))
count = rev(unlist(count))
for (field in field_names) {
vals <- ncdf4::ncvar_get(extract_file_root, field)
ncdf4::ncvar_put(root, field, vals, start = start, count = count)
}
ncdf4::nc_close(root)
ncdf4::nc_close(extract_file_root)
}