Skip to content

Commit dc39f83

Browse files
ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader
Ticket title is misleading: this PR actually removes do_exec_plan(). plan$Run() now always returns a RBR; the two cases where Tables are used to post-process ExecPlan results are encapsulated in Run() now. There is one catch that still needs addressing, but I'll make another jira for it: you can provide schema metadata to the WriteNode but not the other SinkNodes, so anything that preserves R metadata needs to handle that separately because Run() will drop it. ~~This seems to be a limitation of the C++ library.~~ *(edit: I see where I can inject this in `compute::MakeGeneratorReader` in the thing that consumes the sink node, I had made a note about that on the JIRA previously. I still will take this up in ARROW-16607.)* One other change here: map_batches() now returns a RBR and requires that the function it maps returns something that is coercible to a RecordBatch. Closes #13170 from nealrichardson/exec-to-rbr Authored-by: Neal Richardson <neal.p.richardson@gmail.com> Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
1 parent 663dc32 commit dc39f83

17 files changed

Lines changed: 155 additions & 103 deletions

r/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ S3method(as_data_type,pyarrow.lib.DataType)
6161
S3method(as_data_type,pyarrow.lib.Field)
6262
S3method(as_record_batch,RecordBatch)
6363
S3method(as_record_batch,Table)
64+
S3method(as_record_batch,arrow_dplyr_query)
6465
S3method(as_record_batch,data.frame)
6566
S3method(as_record_batch,pyarrow.lib.RecordBatch)
6667
S3method(as_record_batch,pyarrow.lib.Table)
@@ -227,6 +228,7 @@ export(ReadableFile)
227228
export(RecordBatch)
228229
export(RecordBatchFileReader)
229230
export(RecordBatchFileWriter)
231+
export(RecordBatchReader)
230232
export(RecordBatchStreamReader)
231233
export(RecordBatchStreamWriter)
232234
export(RoundMode)

r/R/dataset-scan.R

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ head.Scanner <- function(x, n = 6L, ...) {
153153

154154
#' @export
155155
tail.Scanner <- function(x, n = 6L, ...) {
156-
tail_from_batches(dataset___Scanner__ScanBatches(x), n)
156+
tail_from_batches(dataset___Scanner__ScanBatches(x), n)$read_table()
157157
}
158158

159159
tail_from_batches <- function(batches, n) {
@@ -169,43 +169,57 @@ tail_from_batches <- function(batches, n) {
169169
if (n <= 0) break
170170
}
171171
# rev() the result to put the batches back in the right order
172-
Table$create(!!!rev(result))
172+
RecordBatchReader$create(batches = rev(result))
173173
}
174174

175175
#' Apply a function to a stream of RecordBatches
176176
#'
177177
#' As an alternative to calling `collect()` on a `Dataset` query, you can
178178
#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
179-
#' This lets you aggregate on each chunk and pull the intermediate results into
180-
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
181-
#' `Dataset` result in memory.
179+
#' This lets you do more complex operations in R that operate on chunks of data
180+
#' without having to hold the entire Dataset in memory at once. You can include
181+
#' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the
182+
#' stream of data in Arrow after it.
182183
#'
183-
#' This is experimental and not recommended for production use.
184+
#' Note that, unlike the core dplyr methods that are implemented in the Arrow
185+
#' query engine, `map_batches()` is not lazy: it starts evaluating on the data
186+
#' when you call it, even if you send its result to another pipeline function.
187+
#'
188+
#' This is experimental and not recommended for production use. It is also
189+
#' single-threaded and runs in R not C++, so it won't be as fast as core
190+
#' Arrow methods.
184191
#'
185192
#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
186193
#' `dplyr` methods on `Dataset`.
187194
#' @param FUN A function or `purrr`-style lambda expression to apply to each
188-
#' batch
195+
#' batch. It must return a RecordBatch or something coercible to one via
196+
#' `as_record_batch()'.
189197
#' @param ... Additional arguments passed to `FUN`
190-
#' @param .data.frame logical: collect the resulting chunks into a single
191-
#' `data.frame`? Default `TRUE`
198+
#' @param .data.frame Deprecated argument, ignored
199+
#' @return An `arrow_dplyr_query`.
192200
#' @export
193-
map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
194-
# TODO: ARROW-15271 possibly refactor do_exec_plan to return a RecordBatchReader
201+
map_batches <- function(X, FUN, ..., .data.frame = NULL) {
202+
if (!is.null(.data.frame)) {
203+
warning(
204+
"The .data.frame argument is deprecated. ",
205+
"Call collect() on the result to get a data.frame.",
206+
call. = FALSE
207+
)
208+
}
195209
plan <- ExecPlan$create()
196210
final_node <- plan$Build(as_adq(X))
197211
reader <- plan$Run(final_node)
198212
FUN <- as_mapper(FUN)
199213

200-
# TODO: wrap batch in arrow_dplyr_query with X$selected_columns,
201-
# X$temp_columns, and X$group_by_vars
202-
# if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
214+
# TODO: for future consideration
215+
# * Move eval to C++ and make it a generator so it can stream, not block
216+
# * Accept an output schema argument: with that, we could make this lazy (via collapse)
203217
batch <- reader$read_next_batch()
204218
res <- vector("list", 1024)
205219
i <- 0L
206220
while (!is.null(batch)) {
207221
i <- i + 1L
208-
res[[i]] <- FUN(batch, ...)
222+
res[[i]] <- as_record_batch(FUN(batch, ...))
209223
batch <- reader$read_next_batch()
210224
}
211225

@@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
214228
res <- res[seq_len(i)]
215229
}
216230

217-
if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) {
218-
res <- dplyr::bind_rows(map(res, dplyr::collect))
219-
} else if (.data.frame) {
220-
res <- dplyr::bind_rows(map(res, as.data.frame))
221-
}
222-
223-
res
231+
RecordBatchReader$create(batches = res)
224232
}
225233

226234
#' @usage NULL

r/R/dplyr-collect.R

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,30 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) {
2727
}
2828

2929
# See query-engine.R for ExecPlan/Nodes
30+
plan <- ExecPlan$create()
31+
final_node <- plan$Build(x)
3032
tryCatch(
31-
tab <- do_exec_plan(x),
33+
tab <- plan$Run(final_node)$read_table(),
3234
# n = 4 because we want the error to show up as being from collect()
3335
# and not handle_csv_read_error()
3436
error = function(e, call = caller_env(n = 4)) {
3537
handle_csv_read_error(e, x$.data$schema, call)
3638
}
3739
)
3840

41+
# TODO(ARROW-16607): move KVM handling into ExecPlan
42+
if (ncol(tab)) {
43+
# Apply any column metadata from the original schema, where appropriate
44+
new_r_metadata <- get_r_metadata_from_old_schema(
45+
tab$schema,
46+
source_data(x)$schema,
47+
drop_attributes = has_aggregation(x)
48+
)
49+
if (!is.null(new_r_metadata)) {
50+
tab$r_metadata <- new_r_metadata
51+
}
52+
}
53+
3954
if (as_data_frame) {
4055
df <- as.data.frame(tab)
4156
restore_dplyr_features(df, x)

r/R/duckdb.R

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,7 @@ duckdb_disconnector <- function(con, tbl_name) {
123123
#' other processes (like DuckDB).
124124
#'
125125
#' @param .data the object to be converted
126-
#' @param as_arrow_query should the returned object be wrapped as an
127-
#' `arrow_dplyr_query`? (logical, default: `TRUE`)
128-
#'
129-
#' @return a `RecordBatchReader` object, wrapped as an arrow dplyr query which
130-
#' can be used in dplyr pipelines.
126+
#' @return A `RecordBatchReader`.
131127
#' @export
132128
#'
133129
#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")()
@@ -142,7 +138,7 @@ duckdb_disconnector <- function(con, tbl_name) {
142138
#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>%
143139
#' to_arrow() %>%
144140
#' collect()
145-
to_arrow <- function(.data, as_arrow_query = TRUE) {
141+
to_arrow <- function(.data) {
146142
# If this is an Arrow object already, return quickly since we're already Arrow
147143
if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) {
148144
return(.data)
@@ -161,9 +157,5 @@ to_arrow <- function(.data, as_arrow_query = TRUE) {
161157
# Run the query
162158
res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
163159

164-
if (as_arrow_query) {
165-
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
166-
} else {
167-
duckdb::duckdb_fetch_record_batch(res)
168-
}
160+
duckdb::duckdb_fetch_record_batch(res)
169161
}

r/R/query-engine.R

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
do_exec_plan <- function(.data) {
19-
plan <- ExecPlan$create()
20-
final_node <- plan$Build(.data)
21-
tab <- plan$Run(final_node)
22-
# TODO (ARROW-14289): make the head/tail methods return RBR not Table
23-
if (inherits(tab, "RecordBatchReader")) {
24-
tab <- tab$read_table()
25-
}
26-
27-
# If arrange() created $temp_columns, make sure to omit them from the result
28-
# We can't currently handle this in the ExecPlan itself because sorting
29-
# happens in the end (SinkNode) so nothing comes after it.
30-
if (length(final_node$sort$temp_columns) > 0) {
31-
tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE]
32-
}
33-
34-
if (ncol(tab)) {
35-
# Apply any column metadata from the original schema, where appropriate
36-
new_r_metadata <- get_r_metadata_from_old_schema(
37-
tab$schema,
38-
source_data(.data)$schema,
39-
drop_attributes = has_aggregation(.data)
40-
)
41-
if (!is.null(new_r_metadata)) {
42-
tab$r_metadata <- new_r_metadata
43-
}
44-
}
45-
tab
46-
}
47-
4818
ExecPlan <- R6Class("ExecPlan",
4919
inherit = ArrowObject,
5020
public = list(
@@ -220,17 +190,27 @@ ExecPlan <- R6Class("ExecPlan",
220190
# just use it to take the random slice
221191
slice_size <- node$head %||% node$tail
222192
if (!is.null(slice_size)) {
223-
# TODO (ARROW-14289): make the head methods return RBR not Table
224193
out <- head(out, slice_size)
225194
}
226195
# Can we now tell `self$Stop()` to StopProducing? We already have
227196
# everything we need for the head (but it seems to segfault: ARROW-14329)
228197
} else if (!is.null(node$tail)) {
229198
# Reverse the row order to get back what we expect
230-
# TODO: don't return Table, return RecordBatchReader
231199
out <- out$read_table()
232200
out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
201+
# Put back into RBR
202+
out <- as_record_batch_reader(out)
233203
}
204+
205+
# If arrange() created $temp_columns, make sure to omit them from the result
206+
# We can't currently handle this in ExecPlan_run itself because sorting
207+
# happens in the end (SinkNode) so nothing comes after it.
208+
if (length(node$sort$temp_columns) > 0) {
209+
tab <- out$read_table()
210+
tab <- tab[, setdiff(names(tab), node$sort$temp_columns), drop = FALSE]
211+
out <- as_record_batch_reader(tab)
212+
}
213+
234214
out
235215
},
236216
Write = function(node, ...) {

r/R/record-batch-reader.R

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#'
5757
#' @rdname RecordBatchReader
5858
#' @name RecordBatchReader
59+
#' @export
5960
#' @include arrow-object.R
6061
#' @examples
6162
#' tf <- tempfile()
@@ -104,6 +105,16 @@ RecordBatchReader <- R6Class("RecordBatchReader",
104105
schema = function() RecordBatchReader__schema(self)
105106
)
106107
)
108+
RecordBatchReader$create <- function(..., batches = list(...), schema = NULL) {
109+
are_batches <- map_lgl(batches, ~ inherits(., "RecordBatch"))
110+
if (!all(are_batches)) {
111+
stop(
112+
"All inputs to RecordBatchReader$create must be RecordBatches",
113+
call. = FALSE
114+
)
115+
}
116+
RecordBatchReader__from_batches(batches, schema)
117+
}
107118

108119
#' @export
109120
names.RecordBatchReader <- function(x) names(x$schema)
@@ -208,13 +219,13 @@ as_record_batch_reader.Table <- function(x, ...) {
208219
#' @rdname as_record_batch_reader
209220
#' @export
210221
as_record_batch_reader.RecordBatch <- function(x, ...) {
211-
RecordBatchReader__from_batches(list(x), NULL)
222+
RecordBatchReader$create(x, schema = x$schema)
212223
}
213224

214225
#' @rdname as_record_batch_reader
215226
#' @export
216227
as_record_batch_reader.data.frame <- function(x, ...) {
217-
as_record_batch_reader(as_record_batch(x))
228+
RecordBatchReader$create(as_record_batch(x))
218229
}
219230

220231
#' @rdname as_record_batch_reader
@@ -226,8 +237,8 @@ as_record_batch_reader.Dataset <- function(x, ...) {
226237
#' @rdname as_record_batch_reader
227238
#' @export
228239
as_record_batch_reader.arrow_dplyr_query <- function(x, ...) {
229-
# TODO(ARROW-15271): make ExecPlan return RBR
230-
as_record_batch_reader(collect.arrow_dplyr_query(x, as_data_frame = FALSE))
240+
# TODO(ARROW-16607): use ExecPlan directly when it handles metadata
241+
as_record_batch_reader(compute.arrow_dplyr_query(x))
231242
}
232243

233244
#' @rdname as_record_batch_reader

r/R/record-batch.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@ as_record_batch.Table <- function(x, ..., schema = NULL) {
297297
out
298298
}
299299

300+
#' @rdname as_record_batch
301+
#' @export
302+
as_record_batch.arrow_dplyr_query <- function(x, ...) {
303+
as_record_batch(compute.arrow_dplyr_query(x), ...)
304+
}
305+
300306
#' @rdname as_record_batch
301307
#' @export
302308
as_record_batch.data.frame <- function(x, ..., schema = NULL) {

r/R/table.R

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ Table$create <- function(..., schema = NULL) {
139139
if (all_record_batches(dots)) {
140140
return(Table__from_record_batches(dots, schema))
141141
}
142+
if (length(dots) == 1 && inherits(dots[[1]], c("RecordBatchReader", "RecordBatchFileReader"))) {
143+
tab <- dots[[1]]$read_table()
144+
if (!is.null(schema)) {
145+
tab <- tab$cast(schema)
146+
}
147+
return(tab)
148+
}
142149

143150
# If any arrays are length 1, recycle them
144151
dots <- recycle_scalars(dots)

r/man/as_record_batch.Rd

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/man/map_batches.Rd

Lines changed: 18 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)