Skip to content

Commit

Permalink
Consolidate and document reader/writer classes; also fix ARROW-6449
Browse files Browse the repository at this point in the history
  • Loading branch information
nealrichardson committed Sep 10, 2019
1 parent 495abf6 commit e6b75f4
Show file tree
Hide file tree
Showing 30 changed files with 289 additions and 533 deletions.
12 changes: 12 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ S3method(write_arrow,character)
S3method(write_arrow,raw)
export(Array)
export(Buffer)
export(BufferOutputStream)
export(BufferReader)
export(ChunkedArray)
export(CompressedInputStream)
export(CompressedOutputStream)
Expand All @@ -48,10 +50,20 @@ export(FeatherTableReader)
export(FeatherTableWriter)
export(Field)
export(FileMode)
export(FileOutputStream)
export(FixedSizeBufferWriter)
export(MemoryMappedFile)
export(MessageReader)
export(MessageType)
export(MockOutputStream)
export(ParquetFileReader)
export(ParquetReaderProperties)
export(RandomAccessFile)
export(ReadableFile)
export(RecordBatchFileReader)
export(RecordBatchFileWriter)
export(RecordBatchStreamReader)
export(RecordBatchStreamWriter)
export(Schema)
export(StatusCode)
export(Table)
Expand Down
4 changes: 2 additions & 2 deletions r/R/compression.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ compression_codec <- function(type = "GZIP") {
#' @docType class
#' @usage NULL
#' @format NULL
#' @description `CompressedInputStream` and `CompressedOutputStream`
#' @description `CompressedInputStream` and `CompressedOutputStream`
#' allow you to apply a [compression_codec()] to an
#' input or output stream.
#'
Expand All @@ -47,7 +47,7 @@ compression_codec <- function(type = "GZIP") {
#' The `CompressedInputStream$create()` and `CompressedOutputStream$create()`
#' factory methods instantiate the object and take the following arguments:
#'
#' - `stream` An `InputStream` or `OutputStream`, respectively
#' - `stream` An [InputStream] or [OutputStream], respectively
#' - `codec` A `Codec`
#'
#' @section Methods:
Expand Down
188 changes: 70 additions & 118 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,37 @@ Writable <- R6Class("Writable", inherit = Object,
)
)

#' @title OutputStream
#'
#' @title OutputStream classes
#' @description `FileOutputStream` is for writing to a file;
#' `BufferOutputStream` and `FixedSizeBufferWriter` write to buffers;
#' `MockOutputStream` just reports back how many bytes it received, for testing
#' purposes. You can create one and pass it to any of the table writers, for
#' example.
#' @usage NULL
#' @format NULL
#' @docType class
#' @section Factory:
#'
#' The `$create()` factory methods instantiate the `OutputStream` object and
#' take the following arguments, depending on the subclass:
#'
#' - `path` For `FileOutputStream`, a character file name
#' - `initial_capacity` For `BufferOutputStream`, the size in bytes of the
#' buffer.
#' - `x` For `FixedSizeBufferWriter`, a [Buffer] or an object that can be
#' made into a buffer via `buffer()`.
#'
#' `MockOutputStream$create()` does not take any arguments.
#'
#' @section Methods:
#'
#' - Buffer `Read`(`int` nbytes): Read `nbytes` bytes
#' - `void` `close`(): close the stream
#' - `$tell()`: return the position in the stream
#' - `$close()`: close the stream
#' - `$write(x)`: send `x` to the stream
#' - `$capacity()`: for `BufferOutputStream`
#' - `$getvalue()`: for `BufferOutputStream`
#' - `$GetExtentBytesWritten()`: for `MockOutputStream`, report how many bytes
#' were sent.
#'
#' @rdname OutputStream
#' @name OutputStream
Expand All @@ -47,87 +68,50 @@ OutputStream <- R6Class("OutputStream", inherit = Writable,
)
)

#' @title class arrow::io::FileOutputStream
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname FileOutputStream
#' @name FileOutputStream
#' @rdname OutputStream
#' @export
FileOutputStream <- R6Class("FileOutputStream", inherit = OutputStream)

FileOutputStream$create <- function(path) {
path <- normalizePath(path, mustWork = FALSE)
shared_ptr(FileOutputStream, io___FileOutputStream__Open(path))
}

#' @title class arrow::io::MockOutputStream
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname MockOutputStream
#' @name MockOutputStream
#' @rdname OutputStream
#' @export
MockOutputStream <- R6Class("MockOutputStream", inherit = OutputStream,
public = list(
GetExtentBytesWritten = function() io___MockOutputStream__GetExtentBytesWritten(self)
)
)

MockOutputStream$create <- function() {
shared_ptr(MockOutputStream, io___MockOutputStream__initialize())
}

#' @title class arrow::io::BufferOutputStream
#'
#' @usage NULL
#' @docType class
#' @section Methods:
#'
#' TODO
#'
#' @rdname BufferOutputStream
#' @name BufferOutputStream
#' @format NULL
#' @rdname OutputStream
#' @export
BufferOutputStream <- R6Class("BufferOutputStream", inherit = OutputStream,
public = list(
capacity = function() io___BufferOutputStream__capacity(self),
getvalue = function() shared_ptr(Buffer, io___BufferOutputStream__Finish(self)),

Write = function(bytes) io___BufferOutputStream__Write(self, bytes),
Tell = function() io___BufferOutputStream__Tell(self)
write = function(bytes) io___BufferOutputStream__Write(self, bytes),
tell = function() io___BufferOutputStream__Tell(self)
)
)

BufferOutputStream$create <- function(initial_capacity = 0L) {
shared_ptr(BufferOutputStream, io___BufferOutputStream__Create(initial_capacity))
}

#' @title class arrow::io::FixedSizeBufferWriter
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname FixedSizeBufferWriter
#' @name FixedSizeBufferWriter
#' @rdname OutputStream
#' @export
FixedSizeBufferWriter <- R6Class("FixedSizeBufferWriter", inherit = OutputStream)

FixedSizeBufferWriter$create <- function(x) {
x <- buffer(x)
assert_that(x$is_mutable)
Expand All @@ -136,35 +120,43 @@ FixedSizeBufferWriter$create <- function(x) {

# InputStream -------------------------------------------------------------

#' @title class arrow::io::Readable
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname Readable
#' @name Readable

Readable <- R6Class("Readable", inherit = Object,
public = list(
Read = function(nbytes) shared_ptr(Buffer, io___Readable__Read(self, nbytes))
)
)

#' @title class arrow::io::InputStream
#'
#' @title InputStream classes
#' @description `RandomAccessFile` inherits from `InputStream` and is a base
#' class for: `ReadableFile` for reading from a file; `MemoryMappedFile` for
#' the same but with memory mapping; and `BufferReader` for reading from a
#' buffer. Use these with the various table readers.
#' @usage NULL
#' @format NULL
#' @docType class
#' @section Factory:
#'
#' The `$create()` factory methods instantiate the `InputStream` object and
#' take the following arguments, depending on the subclass:
#'
#' - `path` For `ReadableFile`, a character file name
#' - `x` For `BufferReader`, a [Buffer] or an object that can be
#' made into a buffer via `buffer()`.
#'
#' To instantiate a `MemoryMappedFile`, call [mmap_open()].
#'
#' @section Methods:
#'
#' TODO
#' - `$GetSize()`:
#' - `$supports_zero_copy()`: Logical
#' - `$seek(position)`: go to that position in the stream
#' - `$tell()`: return the position in the stream
#' - `$close()`: close the stream
#' - `$Read(nbytes)`: read data from the stream, either a specified `nbytes` or
#' all, if `nbytes` is not provided
#' - `$ReadAt(position, nbytes)`: similar to `$seek(position)$Read(nbytes)`
#' - `$Resize(size)`: for a `MemoryMappedFile` that is writeable
#'
#' @rdname InputStream
#' @name InputStream
Expand All @@ -174,25 +166,16 @@ InputStream <- R6Class("InputStream", inherit = Readable,
)
)

#' @title class arrow::io::RandomAccessFile
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname RandomAccessFile
#' @name RandomAccessFile
#' @rdname InputStream
#' @export
RandomAccessFile <- R6Class("RandomAccessFile", inherit = InputStream,
public = list(
GetSize = function() io___RandomAccessFile__GetSize(self),
supports_zero_copy = function() io___RandomAccessFile__supports_zero_copy(self),
Seek = function(position) io___RandomAccessFile__Seek(self, position),
Tell = function() io___RandomAccessFile__Tell(self),
seek = function(position) io___RandomAccessFile__Seek(self, position),
tell = function() io___RandomAccessFile__Tell(self),

Read = function(nbytes = NULL) {
if (is.null(nbytes)) {
Expand All @@ -211,61 +194,30 @@ RandomAccessFile <- R6Class("RandomAccessFile", inherit = InputStream,
)
)

#' @title class arrow::io::MemoryMappedFile
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#'
#' @section Methods:
#'
#' TODO
#'
#' @seealso [mmap_open()], [mmap_create()]
#'
#'
#' @rdname MemoryMappedFile
#' @name MemoryMappedFile
#' @rdname InputStream
#' @export
MemoryMappedFile <- R6Class("MemoryMappedFile", inherit = RandomAccessFile,
public = list(
Resize = function(size) io___MemoryMappedFile__Resize(self, size)
)
)

#' @title class arrow::io::ReadableFile
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname ReadableFile
#' @name ReadableFile
#' @rdname InputStream
#' @export
ReadableFile <- R6Class("ReadableFile", inherit = RandomAccessFile)

ReadableFile$create <- function(path) {
shared_ptr(ReadableFile, io___ReadableFile__Open(normalizePath(path)))
}

#' @title class arrow::io::BufferReader
#'
#' @usage NULL
#' @format NULL
#' @docType class
#'
#' @section Methods:
#'
#' TODO
#'
#' @rdname BufferReader
#' @name BufferReader
#' @rdname InputStream
#' @export
BufferReader <- R6Class("BufferReader", inherit = RandomAccessFile)

BufferReader$create <- function(x) {
x <- buffer(x)
shared_ptr(BufferReader, io___BufferReader__initialize(x))
Expand Down
14 changes: 7 additions & 7 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ read_parquet <- function(file,
#' f <- system.file("v0.7.1.parquet", package="arrow")
#' pq <- ParquetFileReader$create(f)
#' pq$GetSchema()
#' tab <- pq$ReadTable()
#' tab <- pq$ReadTable(starts_with("c"))
#' tab$schema
#' }
#' @include arrow-package.R
Expand Down Expand Up @@ -130,7 +130,9 @@ ParquetFileReader$create <- function(file,
#'
#' @section Methods:
#'
#' TODO
#' - `$read_dictionary(column_index)`
#' - `$set_read_dictionary(column_index, read_dict)`
#' - `$use_threads(use_threads)`
#'
#' @export
ParquetReaderProperties <- R6Class("ParquetReaderProperties",
Expand Down Expand Up @@ -172,11 +174,9 @@ ParquetReaderProperties$create <- function(use_threads = option_use_threads()) {
#'
#' @examples
#' \donttest{
#' try({
#' tf <- tempfile(fileext = ".parquet")
#' on.exit(unlink(tf))
#' write_parquet(tibble::tibble(x = 1:5), tf)
#' })
#' tf <- tempfile(fileext = ".parquet")
#' on.exit(unlink(tf))
#' write_parquet(tibble::tibble(x = 1:5), tf)
#' }
#' @export
write_parquet <- function(table, file) {
Expand Down
Loading

0 comments on commit e6b75f4

Please sign in to comment.