Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ S3method(as_data_type,pyarrow.lib.DataType)
S3method(as_data_type,pyarrow.lib.Field)
S3method(as_record_batch,RecordBatch)
S3method(as_record_batch,Table)
S3method(as_record_batch,arrow_dplyr_query)
S3method(as_record_batch,data.frame)
S3method(as_record_batch,pyarrow.lib.RecordBatch)
S3method(as_record_batch,pyarrow.lib.Table)
Expand Down Expand Up @@ -227,6 +228,7 @@ export(ReadableFile)
export(RecordBatch)
export(RecordBatchFileReader)
export(RecordBatchFileWriter)
export(RecordBatchReader)
export(RecordBatchStreamReader)
export(RecordBatchStreamWriter)
export(RoundMode)
Expand Down
52 changes: 30 additions & 22 deletions r/R/dataset-scan.R
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ head.Scanner <- function(x, n = 6L, ...) {

#' @export
tail.Scanner <- function(x, n = 6L, ...) {
tail_from_batches(dataset___Scanner__ScanBatches(x), n)
tail_from_batches(dataset___Scanner__ScanBatches(x), n)$read_table()
}

tail_from_batches <- function(batches, n) {
Expand All @@ -169,43 +169,57 @@ tail_from_batches <- function(batches, n) {
if (n <= 0) break
}
# rev() the result to put the batches back in the right order
Table$create(!!!rev(result))
RecordBatchReader$create(batches = rev(result))
}

#' Apply a function to a stream of RecordBatches
#'
#' As an alternative to calling `collect()` on a `Dataset` query, you can
#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
#' This lets you aggregate on each chunk and pull the intermediate results into
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
#' `Dataset` result in memory.
#' This lets you do more complex operations in R that operate on chunks of data
#' without having to hold the entire Dataset in memory at once. You can include
#' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the
#' stream of data in Arrow after it.
#'
#' This is experimental and not recommended for production use.
#' Note that, unlike the core dplyr methods that are implemented in the Arrow
#' query engine, `map_batches()` is not lazy: it starts evaluating on the data
#' when you call it, even if you send its result to another pipeline function.
#'
#' This is experimental and not recommended for production use. It is also
#' single-threaded and runs in R not C++, so it won't be as fast as core
#' Arrow methods.
#'
#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' @param FUN A function or `purrr`-style lambda expression to apply to each
#' batch
#' batch. It must return a RecordBatch or something coercible to one via
#' `as_record_batch()'.
#' @param ... Additional arguments passed to `FUN`
#' @param .data.frame logical: collect the resulting chunks into a single
#' `data.frame`? Default `TRUE`
#' @param .data.frame Deprecated argument, ignored
#' @return An `arrow_dplyr_query`.
#' @export
map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
# TODO: ARROW-15271 possibly refactor do_exec_plan to return a RecordBatchReader
map_batches <- function(X, FUN, ..., .data.frame = NULL) {
if (!is.null(.data.frame)) {
warning(
"The .data.frame argument is deprecated. ",
"Call collect() on the result to get a data.frame.",
call. = FALSE
)
}
plan <- ExecPlan$create()
final_node <- plan$Build(as_adq(X))
reader <- plan$Run(final_node)
FUN <- as_mapper(FUN)

# TODO: wrap batch in arrow_dplyr_query with X$selected_columns,
# X$temp_columns, and X$group_by_vars
# if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
# TODO: for future consideration
# * Move eval to C++ and make it a generator so it can stream, not block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 that would be nice.

# * Accept an output schema argument: with that, we could make this lazy (via collapse)
batch <- reader$read_next_batch()
res <- vector("list", 1024)
i <- 0L
while (!is.null(batch)) {
i <- i + 1L
res[[i]] <- FUN(batch, ...)
res[[i]] <- as_record_batch(FUN(batch, ...))
batch <- reader$read_next_batch()
}

Expand All @@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
res <- res[seq_len(i)]
}

if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) {
res <- dplyr::bind_rows(map(res, dplyr::collect))
} else if (.data.frame) {
res <- dplyr::bind_rows(map(res, as.data.frame))
}

res
RecordBatchReader$create(batches = res)
}

#' @usage NULL
Expand Down
17 changes: 16 additions & 1 deletion r/R/dplyr-collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,30 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) {
}

# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)
tryCatch(
tab <- do_exec_plan(x),
tab <- plan$Run(final_node)$read_table(),
# n = 4 because we want the error to show up as being from collect()
# and not handle_csv_read_error()
error = function(e, call = caller_env(n = 4)) {
handle_csv_read_error(e, x$.data$schema, call)
}
)

# TODO(ARROW-16607): move KVM handling into ExecPlan
if (ncol(tab)) {
# Apply any column metadata from the original schema, where appropriate
new_r_metadata <- get_r_metadata_from_old_schema(
tab$schema,
source_data(x)$schema,
drop_attributes = has_aggregation(x)
)
if (!is.null(new_r_metadata)) {
tab$r_metadata <- new_r_metadata
}
}

if (as_data_frame) {
df <- as.data.frame(tab)
restore_dplyr_features(df, x)
Expand Down
14 changes: 3 additions & 11 deletions r/R/duckdb.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ duckdb_disconnector <- function(con, tbl_name) {
#' other processes (like DuckDB).
#'
#' @param .data the object to be converted
#' @param as_arrow_query should the returned object be wrapped as an
#' `arrow_dplyr_query`? (logical, default: `TRUE`)
#'
#' @return a `RecordBatchReader` object, wrapped as an arrow dplyr query which
#' can be used in dplyr pipelines.
#' @return A `RecordBatchReader`.
#' @export
#'
#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")()
Expand All @@ -142,7 +138,7 @@ duckdb_disconnector <- function(con, tbl_name) {
#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>%
#' to_arrow() %>%
#' collect()
to_arrow <- function(.data, as_arrow_query = TRUE) {
to_arrow <- function(.data) {
# If this is an Arrow object already, return quickly since we're already Arrow
if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) {
return(.data)
Expand All @@ -161,9 +157,5 @@ to_arrow <- function(.data, as_arrow_query = TRUE) {
# Run the query
res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)

if (as_arrow_query) {
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
} else {
duckdb::duckdb_fetch_record_batch(res)
}
duckdb::duckdb_fetch_record_batch(res)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonkeane FYI. See also #11730 (comment) for historical context

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad we can get this cleaned up 🎉

}
44 changes: 12 additions & 32 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,6 @@
# specific language governing permissions and limitations
# under the License.

do_exec_plan <- function(.data) {
plan <- ExecPlan$create()
final_node <- plan$Build(.data)
tab <- plan$Run(final_node)
# TODO (ARROW-14289): make the head/tail methods return RBR not Table
if (inherits(tab, "RecordBatchReader")) {
tab <- tab$read_table()
}

# If arrange() created $temp_columns, make sure to omit them from the result
# We can't currently handle this in the ExecPlan itself because sorting
# happens in the end (SinkNode) so nothing comes after it.
if (length(final_node$sort$temp_columns) > 0) {
tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE]
}

if (ncol(tab)) {
# Apply any column metadata from the original schema, where appropriate
new_r_metadata <- get_r_metadata_from_old_schema(
tab$schema,
source_data(.data)$schema,
drop_attributes = has_aggregation(.data)
)
if (!is.null(new_r_metadata)) {
tab$r_metadata <- new_r_metadata
}
}
tab
}

ExecPlan <- R6Class("ExecPlan",
inherit = ArrowObject,
public = list(
Expand Down Expand Up @@ -220,17 +190,27 @@ ExecPlan <- R6Class("ExecPlan",
# just use it to take the random slice
slice_size <- node$head %||% node$tail
if (!is.null(slice_size)) {
# TODO (ARROW-14289): make the head methods return RBR not Table
out <- head(out, slice_size)
}
# Can we now tell `self$Stop()` to StopProducing? We already have
# everything we need for the head (but it seems to segfault: ARROW-14329)
} else if (!is.null(node$tail)) {
# Reverse the row order to get back what we expect
# TODO: don't return Table, return RecordBatchReader
out <- out$read_table()
out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
# Put back into RBR
out <- as_record_batch_reader(out)
}

# If arrange() created $temp_columns, make sure to omit them from the result
# We can't currently handle this in ExecPlan_run itself because sorting
# happens in the end (SinkNode) so nothing comes after it.
if (length(node$sort$temp_columns) > 0) {
tab <- out$read_table()
tab <- tab[, setdiff(names(tab), node$sort$temp_columns), drop = FALSE]
out <- as_record_batch_reader(tab)
}

out
},
Write = function(node, ...) {
Expand Down
19 changes: 15 additions & 4 deletions r/R/record-batch-reader.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#'
#' @rdname RecordBatchReader
#' @name RecordBatchReader
#' @export
#' @include arrow-object.R
#' @examples
#' tf <- tempfile()
Expand Down Expand Up @@ -104,6 +105,16 @@ RecordBatchReader <- R6Class("RecordBatchReader",
schema = function() RecordBatchReader__schema(self)
)
)
RecordBatchReader$create <- function(..., batches = list(...), schema = NULL) {
are_batches <- map_lgl(batches, ~ inherits(., "RecordBatch"))
if (!all(are_batches)) {
stop(
"All inputs to RecordBatchReader$create must be RecordBatches",
call. = FALSE
)
}
RecordBatchReader__from_batches(batches, schema)
}

#' @export
names.RecordBatchReader <- function(x) names(x$schema)
Expand Down Expand Up @@ -208,13 +219,13 @@ as_record_batch_reader.Table <- function(x, ...) {
#' @rdname as_record_batch_reader
#' @export
as_record_batch_reader.RecordBatch <- function(x, ...) {
RecordBatchReader__from_batches(list(x), NULL)
RecordBatchReader$create(x, schema = x$schema)
}

#' @rdname as_record_batch_reader
#' @export
as_record_batch_reader.data.frame <- function(x, ...) {
as_record_batch_reader(as_record_batch(x))
RecordBatchReader$create(as_record_batch(x))
}

#' @rdname as_record_batch_reader
Expand All @@ -226,8 +237,8 @@ as_record_batch_reader.Dataset <- function(x, ...) {
#' @rdname as_record_batch_reader
#' @export
as_record_batch_reader.arrow_dplyr_query <- function(x, ...) {
# TODO(ARROW-15271): make ExecPlan return RBR
as_record_batch_reader(collect.arrow_dplyr_query(x, as_data_frame = FALSE))
# TODO(ARROW-16607): use ExecPlan directly when it handles metadata
as_record_batch_reader(compute.arrow_dplyr_query(x))
}

#' @rdname as_record_batch_reader
Expand Down
6 changes: 6 additions & 0 deletions r/R/record-batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ as_record_batch.Table <- function(x, ..., schema = NULL) {
out
}

#' @rdname as_record_batch
#' @export
as_record_batch.arrow_dplyr_query <- function(x, ...) {
as_record_batch(compute.arrow_dplyr_query(x), ...)
}

#' @rdname as_record_batch
#' @export
as_record_batch.data.frame <- function(x, ..., schema = NULL) {
Expand Down
7 changes: 7 additions & 0 deletions r/R/table.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ Table$create <- function(..., schema = NULL) {
if (all_record_batches(dots)) {
return(Table__from_record_batches(dots, schema))
}
if (length(dots) == 1 && inherits(dots[[1]], c("RecordBatchReader", "RecordBatchFileReader"))) {
tab <- dots[[1]]$read_table()
if (!is.null(schema)) {
tab <- tab$cast(schema)
}
return(tab)
}

# If any arrays are length 1, recycle them
dots <- recycle_scalars(dots)
Expand Down
3 changes: 3 additions & 0 deletions r/man/as_record_batch.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 18 additions & 8 deletions r/man/map_batches.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading