Skip to content
Closed
8 changes: 4 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/R/dataset-format.R
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE,
FileWriteOptions <- R6Class("FileWriteOptions",
inherit = ArrowObject,
public = list(
update = function(table, ...) {
update = function(column_names, ...) {
check_additional_args <- function(format, passed_args) {
if (format == "parquet") {
supported_args <- names(formals(write_parquet))
Expand Down Expand Up @@ -437,7 +437,7 @@ FileWriteOptions <- R6Class("FileWriteOptions",
if (self$type == "parquet") {
dataset___ParquetFileWriteOptions__update(
self,
ParquetWriterProperties$create(table, ...),
ParquetWriterProperties$create(column_names, ...),
ParquetArrowWriterProperties$create(...)
)
} else if (self$type == "ipc") {
Expand Down
87 changes: 67 additions & 20 deletions r/R/dataset-write.R
Original file line number Diff line number Diff line change
Expand Up @@ -136,41 +136,88 @@ write_dataset <- function(dataset,
if (inherits(dataset, "arrow_dplyr_query")) {
# partitioning vars need to be in the `select` schema
dataset <- ensure_group_vars(dataset)
} else if (inherits(dataset, "grouped_df")) {
force(partitioning)
# Drop the grouping metadata before writing; we've already consumed it
# now to construct `partitioning` and don't want it in the metadata$r
dataset <- dplyr::ungroup(dataset)
} else {
if (inherits(dataset, "grouped_df")) {
force(partitioning)
# Drop the grouping metadata before writing; we've already consumed it
# now to construct `partitioning` and don't want it in the metadata$r
dataset <- dplyr::ungroup(dataset)
}
dataset <- tryCatch(
as_adq(dataset),
error = function(e) {
supported <- c(
"Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame"
)
stop(
"'dataset' must be a ",
oxford_paste(supported, "or", quote = FALSE),
", not ",
deparse(class(dataset)),
call. = FALSE
)
}
)
}

plan <- ExecPlan$create()
final_node <- plan$Build(dataset)
if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition probably still needs a test or two

# Because sorting and topK are only handled in the SinkNode (or in R!),
# they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
# to capture those, and then create a new plan for writing
# TODO(ARROW-15681): do sorting in WriteNode in C++
dataset <- as_adq(plan$Run(final_node))
plan <- ExecPlan$create()
final_node <- plan$Build(dataset)
}

scanner <- Scanner$create(dataset)
if (!inherits(partitioning, "Partitioning")) {
partition_schema <- scanner$schema[partitioning]
partition_schema <- final_node$schema[partitioning]
if (isTRUE(hive_style)) {
partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback)
partitioning <- HivePartitioning$create(
partition_schema,
null_fallback = list(...)$null_fallback
)
} else {
partitioning <- DirectoryPartitioning$create(partition_schema)
}
}

if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
}

path_and_fs <- get_path_and_filesystem(path)
options <- FileWriteOptions$create(format, table = scanner, ...)
output_schema <- final_node$schema
options <- FileWriteOptions$create(
format,
column_names = names(output_schema),
...
)

# TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
# and encapsulate this logic better
existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L

validate_positive_int_value(max_partitions, "max_partitions must be a positive, non-missing integer")
validate_positive_int_value(max_open_files, "max_open_files must be a positive, non-missing integer")
validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer")
validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer")
if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
}

dataset___Dataset__Write(
validate_positive_int_value(max_partitions)
validate_positive_int_value(max_open_files)
validate_positive_int_value(min_rows_per_group)
validate_positive_int_value(max_rows_per_group)
Comment on lines +204 to +207
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message says non-missing and yet we have defaults for all of these properties (and line 196 seems to tolerate a missing max_rows_per_group. Are they truly required to be non-missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it means "not NA" rather than "omitted", judging from the actual validation it is doing


new_r_meta <- get_r_metadata_from_old_schema(
output_schema,
source_data(dataset)$schema,
drop_attributes = has_aggregation(dataset)
)
if (!is.null(new_r_meta)) {
output_schema$r_metadata <- new_r_meta
}
plan$Write(
final_node, prepare_key_value_metadata(output_schema$metadata),
options, path_and_fs$fs, path_and_fs$path,
partitioning, basename_template, scanner,
partitioning, basename_template,
existing_data_behavior, max_partitions,
max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group
Expand All @@ -179,6 +226,6 @@ write_dataset <- function(dataset,

validate_positive_int_value <- function(value, msg) {
if (!is_integerish(value, n = 1) || is.na(value) || value < 0) {
abort(msg)
abort(paste(substitute(value), "must be a positive, non-missing integer"))
}
}
11 changes: 9 additions & 2 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ arrow_dplyr_query <- function(.data) {
# RecordBatch, or Dataset) and the state of the user's dplyr query--things
# like selected columns, filters, and group vars.
# An arrow_dplyr_query can contain another arrow_dplyr_query in .data
gv <- dplyr::group_vars(.data) %||% character()
gv <- tryCatch(
# If dplyr is not available, or if the input doesn't have a group_vars
# method, assume no group vars
dplyr::group_vars(.data) %||% character(),
error = function(e) character()
)

if (inherits(.data, "data.frame")) {
.data <- Table$create(.data)
Expand Down Expand Up @@ -247,7 +252,9 @@ abandon_ship <- function(call, .data, msg) {
query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader"))

source_data <- function(x) {
if (is_collapsed(x)) {
if (!inherits(x, "arrow_dplyr_query")) {
x
} else if (is_collapsed(x)) {
source_data(x$.data)
} else {
x$.data
Expand Down
22 changes: 21 additions & 1 deletion r/R/metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ remove_attributes <- function(x) {
}

arrow_attributes <- function(x, only_top_level = FALSE) {

att <- attributes(x)
removed_attributes <- remove_attributes(x)

Expand Down Expand Up @@ -208,3 +207,24 @@ arrow_attributes <- function(x, only_top_level = FALSE) {
NULL
}
}

get_r_metadata_from_old_schema <- function(new_schema,
old_schema,
drop_attributes = FALSE) {
# TODO: do we care about other (non-R) metadata preservation?
# How would we know if it were meaningful?
Comment on lines +214 to +215
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on the source of old_schema.

In the general case, the input is a collection of files and the output is a different set of files (sometimes we explode files and sometimes we merge files). The idea of writing metadata to the output files in somewhat meaningless. So, in general, I would say no, you don't care about preservation.

In python, users can create a dataset from a single file, and we do a little bit of work to preserve the metadata on write because we want to feel like it "round trips".

When creating or appending to a dataset users might want to specify general information about how the files were created, like "Origin": "Nightly update" but that is unrelated to the original metadata.

In the future the dataset write may append its own metadata (e.g. dataset statistics, or information about the dataset schema such as which columns are already sorted, etc.)

r_meta <- old_schema$r_metadata
if (!is.null(r_meta)) {
# Filter r_metadata$columns on columns with name _and_ type match
common_names <- intersect(names(r_meta$columns), names(new_schema))
keep <- common_names[
map_lgl(common_names, ~ old_schema[[.]] == new_schema[[.]])
]
r_meta$columns <- r_meta$columns[keep]
if (drop_attributes) {
# dplyr drops top-level attributes if you do summarize
r_meta$attributes <- NULL
}
}
r_meta
}
38 changes: 18 additions & 20 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ write_parquet <- function(x,
x$schema,
sink,
properties = properties %||% ParquetWriterProperties$create(
x,
names(x),
version = version,
compression = compression,
compression_level = compression_level,
Expand Down Expand Up @@ -307,33 +307,33 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
set_version = function(version) {
parquet___WriterProperties___Builder__version(self, make_valid_version(version))
},
set_compression = function(table, compression) {
set_compression = function(column_names, compression) {
compression <- compression_from_name(compression)
assert_that(is.integer(compression))
private$.set(
table, compression,
column_names, compression,
parquet___ArrowWriterProperties___Builder__set_compressions
)
},
set_compression_level = function(table, compression_level) {
set_compression_level = function(column_names, compression_level) {
# cast to integer but keep names
compression_level <- set_names(as.integer(compression_level), names(compression_level))
private$.set(
table, compression_level,
column_names, compression_level,
parquet___ArrowWriterProperties___Builder__set_compression_levels
)
},
set_dictionary = function(table, use_dictionary) {
set_dictionary = function(column_names, use_dictionary) {
assert_that(is.logical(use_dictionary))
private$.set(
table, use_dictionary,
column_names, use_dictionary,
parquet___ArrowWriterProperties___Builder__set_use_dictionary
)
},
set_write_statistics = function(table, write_statistics) {
set_write_statistics = function(column_names, write_statistics) {
assert_that(is.logical(write_statistics))
private$.set(
table, write_statistics,
column_names, write_statistics,
parquet___ArrowWriterProperties___Builder__set_write_statistics
)
},
Expand All @@ -342,9 +342,8 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
}
),
private = list(
.set = function(table, value, FUN) {
.set = function(column_names, value, FUN) {
msg <- paste0("unsupported ", substitute(value), "= specification")
column_names <- names(table)
given_names <- names(value)
if (is.null(given_names)) {
if (length(value) %in% c(1L, length(column_names))) {
Expand All @@ -364,7 +363,7 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
)
)

ParquetWriterProperties$create <- function(table,
ParquetWriterProperties$create <- function(column_names,
version = NULL,
compression = default_parquet_compression(),
compression_level = NULL,
Expand All @@ -377,16 +376,16 @@ ParquetWriterProperties$create <- function(table,
builder$set_version(version)
}
if (!is.null(compression)) {
builder$set_compression(table, compression = compression)
builder$set_compression(column_names, compression = compression)
}
if (!is.null(compression_level)) {
builder$set_compression_level(table, compression_level = compression_level)
builder$set_compression_level(column_names, compression_level = compression_level)
}
if (!is.null(use_dictionary)) {
builder$set_dictionary(table, use_dictionary)
builder$set_dictionary(column_names, use_dictionary)
}
if (!is.null(write_statistics)) {
builder$set_write_statistics(table, write_statistics)
builder$set_write_statistics(column_names, write_statistics)
}
if (!is.null(data_page_size)) {
builder$set_data_page_size(data_page_size)
Expand Down Expand Up @@ -600,10 +599,9 @@ ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads
parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads))
}

calculate_chunk_size <- function(rows, columns,
target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8),
max_chunks = getOption("arrow.parquet_max_chunks", 200)
) {
calculate_chunk_size <- function(rows, columns,
target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8),
max_chunks = getOption("arrow.parquet_max_chunks", 200)) {

# Ensure is a float to prevent integer overflow issues
num_cells <- as.numeric(rows) * as.numeric(columns)
Expand Down
29 changes: 11 additions & 18 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,15 @@ do_exec_plan <- function(.data) {

if (ncol(tab)) {
# Apply any column metadata from the original schema, where appropriate
original_schema <- source_data(.data)$schema
# TODO: do we care about other (non-R) metadata preservation?
# How would we know if it were meaningful?
r_meta <- original_schema$r_metadata
if (!is.null(r_meta)) {
# Filter r_metadata$columns on columns with name _and_ type match
new_schema <- tab$schema
common_names <- intersect(names(r_meta$columns), names(tab))
keep <- common_names[
map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]])
]
r_meta$columns <- r_meta$columns[keep]
if (has_aggregation(.data)) {
# dplyr drops top-level attributes if you do summarize
r_meta$attributes <- NULL
}
tab$r_metadata <- r_meta
new_r_metadata <- get_r_metadata_from_old_schema(
tab$schema,
source_data(.data)$schema,
drop_attributes = has_aggregation(.data)
)
if (!is.null(new_r_metadata)) {
tab$r_metadata <- new_r_metadata
}
}

tab
}

Expand Down Expand Up @@ -244,6 +233,10 @@ ExecPlan <- R6Class("ExecPlan",
}
out
},
Write = function(node, ...) {
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
ExecPlan_Write(self, node, ...)
},
Stop = function() ExecPlan_StopProducing(self)
)
)
Expand Down
Loading