Skip to content

[R] Troubles with using augmented columns #33464

@asfimport

Description

@asfimport

We can project to add augmented fields like __filename, but there are a few catches. Given:

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

ds <- InMemoryDataset$create(mtcars) %>%
  mutate(f = add_filename())

show_query(ds)
#> ExecPlan with 3 nodes:
#> 2:SinkNode{}
#>   1:ProjectNode{projection=[mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb, "f": __filename]}
#>     0:SourceNode{}

collect(ds)
#>     mpg cyl  disp  hp drat    wt  qsec vs am gear carb         f
#> 1  21.0   6 160.0 110 3.90 2.620 16.46  0  1    4    4 in-memory
#> 2  21.0   6 160.0 110 3.90 2.875 17.02  0  1    4    4 in-memory
#> 3  22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1 in-memory
...

Issue #1: you can't filter on that column because (my theory, based on the evidence) the ScanNode takes a projection and filter, but the filter is not evaluated with the augmented schema, so it doesn't find __filename. This seems fixable in C++.

ds %>%
  filter(f == "in-memory") %>%
  collect()
#> Error in `collect()`:
#> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#>  `add_filename()` or use of the `__filename` augmented field can only be used with with Dataset objects, and can only be added before doing an aggregation or a join.

#> Backtrace:
#>      ▆
#>   1. ├─ds %>% filter(f == "in-memory") %>% collect()
#>   2. ├─dplyr::collect(.)
#>   3. └─arrow:::collect.arrow_dplyr_query(.)
#>   4.   └─base::tryCatch(...)
#>   5.     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   6.       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7.         └─value[[3L]](cond)
#>   8.           └─arrow:::augment_io_error_msg(e, call, schema = x$.data$schema)
#>   9.             └─arrow:::handle_augmented_field_misuse(msg, call)
#>  10.               └─rlang::abort(msg, call = call)

Proof that it is in the ScanNode: If we collapse() the query after projecting to include filename but before the filter, the filter doesn't get included in the ScanNode, it's only applied after, as a FilterNode. This works:

ds %>%
  collapse() %>%
  filter(f == "in-memory") %>%
  collect()
#>     mpg cyl  disp  hp drat    wt  qsec vs am gear carb         f
#> 1  21.0   6 160.0 110 3.90 2.620 16.46  0  1    4    4 in-memory
#> 2  21.0   6 160.0 110 3.90 2.875 17.02  0  1    4    4 in-memory
#> 3  22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1 in-memory
...

A related failure mode: you have to first project to include the augmented column, you can't just include it in a filter:

InMemoryDataset$create(mtcars) %>%
  filter(add_filename() == "in-memory") %>%
  collect()
#> Error in `collect()`:
#> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#>  `add_filename()` or use of the `__filename` augmented field can only be used with with Dataset objects, and can only be added before doing an aggregation or a join.

#> Backtrace:
#>      ▆
#>   1. ├─... %>% collect()
#>   2. ├─dplyr::collect(.)
#>   3. └─arrow:::collect.arrow_dplyr_query(.)
#>   4.   └─base::tryCatch(...)
#>   5.     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   6.       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7.         └─value[[3L]](cond)
#>   8.           └─arrow:::augment_io_error_msg(e, call, schema = x$.data$schema)
#>   9.             └─arrow:::handle_augmented_field_misuse(msg, call)
#>  10.               └─rlang::abort(msg, call = call)

Issue #2, following on that: you can only add the augmented fields at the start of the query, something that goes in the ScanNode. This seems like something we would have to catch in R and error at the time add_filename() is called. That could probably be covered in ARROW-17356.

InMemoryDataset$create(mtcars) %>%
  collapse() %>%
  collapse() %>%
  filter(add_filename() == "in-memory") %>%
  collect()
#> Error in `collect()`:
#> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#>  `add_filename()` or use of the `__filename` augmented field can only be used with with Dataset objects, and can only be added before doing an aggregation or a join.

#> Backtrace:
#>      ▆
#>   1. ├─... %>% collect()
#>   2. ├─dplyr::collect(.)
#>   3. └─arrow:::collect.arrow_dplyr_query(.)
#>   4.   └─base::tryCatch(...)
#>   5.     └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#>   6.       └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#>   7.         └─value[[3L]](cond)
#>   8.           └─arrow:::augment_io_error_msg(e, call, schema = x$.data$schema)
#>   9.             └─arrow:::handle_augmented_field_misuse(msg, call)
#>  10.               └─rlang::abort(msg, call = call)

Reporter: Neal Richardson / @nealrichardson

Note: This issue was originally created as ARROW-18286. Please see the migration documentation for further details.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions