-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-16444: [R] Implement user-defined scalar functions in R bindings #13397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
r/src/compute.cpp
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious about the function docs, will Users pass the function docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not aware of any way to access the value of function documentation in the R bindings, so it would be a little weird to document it in the query engine itself (more likely would be that the package registering the function would provide their own R wrapper and document that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@paleolimbot I am not quite sure about extracting the function doc in R as well. I was merely referring to a string passed as the function docs explicitly if required. In Python I was looking into the inspect API and it can help with this. For Python UDFs, I did a little bit of a check if we can use it to extract all the required values to register a UDF. It seems possible. I was just curious how it is planned to handle such things in R API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what circumstance would the function documentation at the query engine level be useful? (Unless there is such a circumstance, I think including it in the R bindings is not necessary).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my understanding, the function docs are used at the function registration step. I am not quite sure if the query engine cares much about it. It would be the compute API which is the interested party. For example this is how it is used in Python UDFs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be out of scope, but what would should be done when a R user wants to register a custom function. What is the workflow? I am working on the Python UDFs and it would be great to learn a few things from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did see that the function docs are a required argument when creating a function and I saw the example in the Python UDFs that creates the documentation object. I think that the function documentation might be helpful for internal functions but for R and Python UDFs I think it unlikely that the documentation will ever be used. Until the feature is requested I'm inclined to leave the dummy function documentation instead of requiring a user to create it.
what would should be done when a R user wants to register a custom function.
Give me another day or so to work on this...I'll ping you when I have a better idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! Will look into this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way the idea is to auto extract everything so that user just write the function.
This JIRA is a very naive attempt to see if it could work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did see that the function docs are a required argument when creating a function and I saw the example in the Python UDFs that creates the documentation object. I think that the function documentation might be helpful for internal functions but for R and Python UDFs I think it unlikely that the documentation will ever be used. Until the feature is requested I'm inclined to leave the dummy function documentation instead of requiring a user to create it.
A similar deferral conclusion was reached here, though see this and this post for where things may develop.
|
Another comment on the function registration is that if you try to run the test cases twice, you will get a test failure saying the function is already registered. In R, is there a way to scope the test case such that the changed function registry doesn't affect the following test cases. I guess at some point the function unregistration needs to be handled. Or there is an interesting discussion going on about it in another PR. |
|
Experienced this in the interactive session. |
|
All good points and thank you for taking a look! Your edit did the trick, although the segfault may have been due to something about turning a # remotes::install_github("apache/arrow/r#13397")
library(arrow, warn.conflicts = FALSE)
fun <- arrow:::arrow_scalar_function(
int32(),
int64(),
function(x, y) {
y[[1]]$cast(int64())
}
)
arrow:::register_scalar_function("my_test_scalar_function", fun)
# works!
call_function("my_test_scalar_function", Array$create(1L))
#> Array
#> <int64>
#> [
#> 1
#> ]
# segfaults!
# call_function("my_test_scalar_function", Scalar$create(1L))Created on 2022-06-20 by the reprex package (v2.0.1)
It seems there's no way to unregister a function...if there were, I could unregister it in the test. I changed the |
|
I probably shouldn't be turning Scalar into an Array anyway (i.e., I should probably just do what the Python UDFs do and pass a |
r/src/compute.cpp
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something about this is giving me a segfault...is this the proper way to to get a shared pointer to a Scalar from an ExecValue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we execute the LOC:599 I think it is making a copy, right? If so, instead can we use the given experimental function in the Scalar API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr<arrow::Scalar> scalar = v.scalar->Copy();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's perfect!
I'm still getting a segfault and traced it to here: https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec.cc#L907-L911 , but it looks like it's because I'm returning an Array result when there were only Scalar inputs. I think that can be handled in the C++ wrapper code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, as far as I understand, it is checking if the registered function shape is maintained by the kernel's execution (the returned output). I guess it is a matter of controlling that within the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to write tests for all the possibilities but I think I have a workable approach!
Let me also look into this one a little bit closely.
Yes, we don't have such a function yet. There is a discussion to support that or to keep a scoped function registry that can be used depending on using functions in different scopes. For instance in the UDF case, the functions could get registered in a scoped function registry and it get cleaned up after the UDF scope goes out of scope. The aforementioned links include a discussion and PRs associated with that. At least it is discussed in the Substrait space. I think we could be able to use it for R API as well. |
I think we can consider an Scalar as an array of length 1 and keep all inputs represented as list which could be converted to an Array in a straightforward manner (I assume R-Arrow C++ API has utils for that). In Python UDF PR, I modified it based on some of the suggestions with a bit deeper context as far as compute API usage is considered. |
More details about the nested/scoped registries approach in this post. |
Thanks! It sounds like there are use-cases for a more advanced registry...I think a simple addition of RemoveFunction() would allow users to work around the issue until a clean solution can be arrived at (but outside the scope of this PR for sure). |
Note that support for nested/scoped function registry has already been pushed, so you could consider merging that into this PR. |
r/src/compute.cpp
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do this a different way (e.g. put the data that you need to persist in a KernelState instead — I also added the option to have static kernel-level data in #13398). ArrayKernelExec is a std::function and is meant to have the semantics of a function pointer, not a generic callable class. I actually plan to make ArrayKernelExec a function pointer because debugging with std::function adds ~4 levels to the call stack when using gdb which is very tedious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR reference...that did the trick! I think I've done this without misusing the API but feel free to let me know if I'm doing anything else against the spirit of current or future query engine plans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vibhatha See above. We should probably make this change on the python side eventually as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace I think we haven't adopted the KernelState in the Python UDFs. But we only have a separate callable like the updated PR contains. I guess, the KernelState integration is we need to adopt. Please correct me if I misinterpretted this conversation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eitherway, @wesm has been kind to include this already 🙏 . Do we need to include further changes to this?
cc @westonpace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hadn't realized it had been updated, sorry. The python UDF appears to be doing the right thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Learned a few things here. 👍
|
Ok! We now can use the functions in both the query engine and Part of the change is that I had to update the query plan execution because for this to work we need the whole query plan to execute so that the |
nealrichardson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool. I have some questions in the R code and tests, didn't look at the cpp
r/R/query-engine.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this argument? You can always consume a RBR into a Table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order for the user-defined function call from the query engine into R to work, the entire evaluation has to occur during one R call into C++ (specifically, within one call to RunWithCapturedR()). This is why some changes in the this file and compute-exec.cpp were needed...it's not all that clean feeling to me but I don't know there's a better way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Either way I'll leave a comment here in the code explaining why it's necessary!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I think this is a problem. It sounds like UDFs will just fail if you try to stream an RBR into something else (currently I'm thinking of the duckdb integration, but there are potentially other applications). I don't think we want to impose that limitation if we can avoid it--just thinking of the cognitive overhead added every time we have to explain "well it usually works if you do X but not in cases Y and Z", not to mention the constraint on ways you can compose Arrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely a problem...what I have here is the easy way, the hard way being to make sure that we have an event loop running at all times just in case there's a background thread that might want to execute R code.
The way this is coded now you can still stream via RecordBatchReader (at the R level), you just can't do that and use a UDF. For example:
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)
fun <- arrow_base_scalar_function(
int32(), int64(),
function(context, args) {
args[[1]] + 1L
}
)
register_scalar_function("my_test_scalar_function", fun)
record_batch(a = 2L) |>
mutate(b = my_test_scalar_function(a)) |>
collect()
#> # A tibble: 1 × 2
#> a b
#> <int> <int>
#> 1 2 3
record_batch(a = 2L) |>
mutate(b = my_test_scalar_function(a)) |>
to_duckdb() |>
collect()
#> Error in duckdb_execute(res): duckdb_execute_R: Failed to run query
#> Error: Invalid Input Error: arrow_scan: get_next failed(): NotImplemented: Call to R from a non-R thread without calling RunWithCapturedR
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/kernel.cc:391 resolver_(ctx, args)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec.cc:696 kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/expression.cc:602 executor->Init(&kernel_context, {kernel, descrs, options})
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/project_node.cc:92 ExecuteScalarExpression(simplified_expr, target, plan()->exec_context())
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/dataset/scanner.cc:79 delegate_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1651 reader()->ReadNext(&batch)Created on 2022-07-06 by the reprex package (v2.0.1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way this is coded now you can still stream via RecordBatchReader (at the R level), you just can't do that and use a UDF.
Yeah that's what worries me: it's surprising when something works in some circumstances and not in others, and we don't want to surprise our users like that. Maybe the exposure is low enough here that it's not a huge problem, I'm not sure, but it gives me pause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see if I can improve the failure mode for to_duckdb(), which is as far as I know the only feature that wouldn't work with a UDF. An warning message along the lines of "user-defined functions are not supported with to_duckdb()" and collecting to Table first would probably be ok?
Other options include (1) detecting whether or not a plan contains a UDF in as_record_batch_reader.arrow_dplyr_query() or (2) try to figure out an event loop situation that will work when the plan is executing in the background. Both of those will take quite some time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the improvements to SafeCallIntoR() the failure mode a little better (at least mentions user-defined function).
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)
fun <- arrow_scalar_function(
function(x) x + 1,
int32(), int64()
)
register_user_defined_function(fun, "plus_one")
record_batch(a = 2L) |>
mutate(b = plus_one(a)) |>
to_duckdb() |>
collect()
#> Error in duckdb_execute(res): duckdb_execute_R: Failed to run query
#> Error: Invalid Input Error: arrow_scan: get_next failed(): NotImplemented: Call to R (resolve scalar user-defined function output data type) from a non-R thread from an unsupported context
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/kernel.cc:391 resolver_(ctx, args)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec.cc:696 kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/expression.cc:602 executor->Init(&kernel_context, {kernel, descrs, options})
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/project_node.cc:92 ExecuteScalarExpression(simplified_expr, target, plan()->exec_context())
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/dataset/scanner.cc:79 delegate_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1651 reader()->ReadNext(&batch)I could attempt something more automatic in this PR (like read_table() or write_dataset()) when this error occurs in to_duckdb()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to the above comment: if we can identify that a UDF is used in the query before we run it, we could handle this more gracefully (either error informatively, or have to_duckdb() evaluate to a Table and send that to DuckDB).
r/tests/testthat/test-compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function has 2 args but the schema you provided only has 1, shouldn't this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "base" scalar function always has exactly two arguments...the kernel execution context and the list of Scalar/Array objects. (This example needs better names, though, to make sure that's clear).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to arrow_advanced_scalar_function()...another option would be to not export it and see if anybody requests it?
r/R/compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "base" mean here? I'm not sure this is the most evocative name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's almost certainly a better name...the 'base' version only deals with Arrow objects and gives some additional information from the execution context; the non 'base' version converts to R vectors and back automatically and is probably what most users want. For the geoarrow use-case, I specifically want to avoid R vectors and do.call(), which adds some additional overhead (which I should probably measure before adding the confusion of two scalar function constructors...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(whoops, I meant to post that above comment here):
I changed this to arrow_advanced_scalar_function()...another option would be to not export it and see if anybody requests it?
r/R/compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you annotate these functions with some code comments where it's not obvious what's happening? It took me a while to figure out why you were calling as.vector on the args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this some more: what if the UDF I want to write doesn't require the data to be pulled into R data structures? Maybe I've got some more complicated math that can be expressed with Arrow compute functions, but I don't want to type it out every time, so I want to define a function. Or there are things I want to use in Python (or some future world where we call to Rust, or Julia) and I want to send the Arrow data via the C interface. Is there a path for doing this? If so, what does it look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first one is ARROW-14071...this PR doesn't help with that although register_user_defined_function() could be used for that too (maybe it would look like register_user_defined_function(arrow_compute_binding(function(x) sin(x) ^ 2), "sin_squared")? Probably with a better name.)
The second one you can do with arrow_advanced_scalar_function()...it's a bit clunkier but I figured anybody that invested in evaluation overhead/type fidelity would be ok with some extra mental effort. It currently looks like this:
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
fun_wrapper <- arrow_advanced_scalar_function(
function(kernel_context, args) {
sin(args[[1]]) ^ 2
},
in_type = float64(),
out_type = float64()
)
register_user_defined_function(fun_wrapper, "sin_squared")
record_batch(x = runif(10)) |>
dplyr::mutate(sin_squared(x)) |>
dplyr::collect()
#> # A tibble: 10 × 2
#> x `sin_squared(x)`
#> <dbl> <dbl>
#> 1 0.425 0.170
#> 2 0.720 0.435
#> 3 0.613 0.331
#> 4 0.536 0.261
#> 5 0.780 0.495
#> 6 0.397 0.150
#> 7 0.948 0.659
#> 8 0.752 0.466
#> 9 0.551 0.274
#> 10 0.370 0.131Created on 2022-07-07 by the reprex package (v2.0.1)
r/R/compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why rlang:: here? We generally importFrom the namespace and don't use ::
r/R/compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you note why you have to register twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments about what each step does (registering in Arrow C++ enables use in Expression$create(); registering the binding enables use in mutate() and filter()).
r/R/compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case where name and registry_name should be different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about how both Joris and I are planning to register geospatial functions and they could collide in the registry (although I suppose if they do the exact same thing it won't matter if the evaluation is forked off to Python). Perhaps it's premature to anticipate that use case though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be interesting to validate whether that is indeed a concern or not (IIUC not, because R and Python would be running different runtimes and thus Acero/function registries).
Other unrelated thoughts:
- register_scalar_function() is not an obvious name; I'd expect something with "udf" in the name. What does pyarrow call this?
- I wonder if the signature could be
function(scalar_function, name = substitute(scalar_function)), with appropriate validation ofname. That would let you do something like:
my_cool_udf <- function(x, y) ...
register_scalar_function(my_cool_udf)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it we get separate registries when static linking (i.e., 99.99% of R usage) but shared registries when using a .so (e.g., conda, fedora36, most Arrow developers)...see ARROW-16688 which is the issue I opened after the shared object thing broke GDAL's reading of some files with extension types. That said I think we can safely remove the argument since it's a little confusing and only add it back in when somebody reports a problem (which might be never).
register_scalar_function() is indeed the Python incantation ( https://github.com/vibhatha/arrow/blob/master/python/pyarrow/_compute.pyx#L2429-L2441 )...maybe register_user_defined_function() is better? (The 'scalar' can be inferred from the class of the argument I think).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(+1 to inferring the name automatically...we might be able to make a decent output type guess too by evaluating the function with zero-length dummy arguments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For argument's sake I changed this to register_user_defined_function(). I tried the automatically-infer-the-name thing but it doesn't quite make sense...in almost all cases a user/package dev will have to create small wrapper function and so the call will look like register_user_defined_function(fun_wapper, "fun"). I could squish together arrow_scalar_function() and register_user_defined_function() like Python does, but I quite like the separation for testing (and it scales better to a future where we support vector functions and aggregate functions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, I think the reason that in pyarrow we decided to use "scalar" in the name of the register function, is that in the future we also want to allow other types of UDFs, and for example a vector function could have the same signature than a scalar function (in terms of input / output data types), so I am not sure you can guess the kernel type based on the arguments.
r/tests/testthat/test-compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the first argument to base_fun list()`?
Would a more interesting test be to have an array with more than one element, to demonstrate the vectorization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list() here is a dummy kernel execution context (but clearly needs a better name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would the dummy kernel execution context be anything other than empty list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave it a name at least (dummy_kernel_context <- list()) so that it's clear why it's there in the test.
r/tests/testthat/test-compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a test function be something that is clearly not in Arrow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to function(x) x * 32 but I get how that's not ideal either. Arrow's coverage is pretty impressive which is why times_32 is the best I came up with. Any ideas? I also considered the hyperbolic sin/cosines but you can even do those by chaining together some things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could do something wild and bespoke, like generate fitted values from a regression model run in R (stats::predict.lm()). I guess even that could be decomposed into functions we support in arrow, but it saves a bunch of code a user would have to write, and it makes for a better story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's a great example! I'll update the user-facing example to that...it's only a tiny bit bespoke - I never thought about registering a single-use kind of function (maybe specific to some model) but I could see doing a filter() on predictions from an ML model or something.
r/tests/testthat/test-compute.R
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be good to do a test with a dataset with multiple files? Particularly if we're concerned about R thread safety, we might need more than 1 row in a RecordBatch to confirm it's behaving.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
|
Thank you everybody for reviews! There are a lot of threads here so I'm going to try to summarise which bits were unresolved and how I think I resolved them (or maybe not). The CI failures on r-devel are because of a recent commit to r-devel; the CI failure on Windows is a lintr error that I don't know how to fix (cyclomatic complexity of an R6 object?).
A concern I have that never came up is that there is probably more overhead than is needed for each call into R. We can probably get it down to a single unwind protect...I think that would be best handled in a follow-up PR with a narrow scope but I could in theory try here too. Example usage with the final API: Detailslibrary(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)
some_model <- lm(mpg ~ disp + cyl, data = mtcars)
register_scalar_function(
"mtcars_predict_mpg",
function(context, disp, cyl) {
predict(some_model, newdata = data.frame(disp, cyl))
},
in_type = schema(disp = float64(), cyl = float64()),
out_type = float64(),
auto_convert = TRUE
)
as_arrow_table(mtcars) %>%
transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) %>%
collect() %>%
head()
#> mpg mpg_predicted
#> 1 21.0 21.84395
#> 2 21.0 21.84395
#> 3 22.8 26.08886
#> 4 21.4 19.82676
#> 5 18.7 14.55267
#> 6 18.1 20.50602Created on 2022-07-15 by the reprex package (v2.0.1) |
nealrichardson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My general take right now is that this is really awesome, and while I still have some concerns about the special-casing to get it to work, I'm thinking we should merge this before 9.0 but not make a huge deal out of it in the release announcement, or at least note it as very experimental. I think we're going to need some more "real world" testing to shake out some of the tricky parts, but the best way to get that is to get it merged and try to use it.
| #' a single argument (`types`), which is a `list()` of [DataType]s. If a | ||
| #' function it must return a [DataType]. | ||
| #' @param fun An R function or rlang-style lambda expression. The function | ||
| #' will be called with a first argument `context` which is a `list()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This first argument context feels like a real 🦶 🔫 . A few questions:
- Does it need to be first?
- Does it need to be called
context? - What does failure look like if I forget to include
contextas an arg to my function? (I'm guessing it's not pretty.) Can we detect up front if someone has forgotten to put context in the function? Something like checking thatlength(formals(fun)) == length(as_schema(in_type)) + 1and raise a useful error message if the check fails? - You explain what it contains, but what do I do with it? Is there something I would do with batch_size or output_type?
- You say above that functions need to be stateless, but what happens if I assign something into
contextin my function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A previous version of this PR didn't require the context argument when what was the equivalent of auto_convert was TRUE, but the comment was raised of "why two APIs" (and I agree...one wrapper function scheme is easier to remember).
In its current form, the context argument provides the information needed for auto_convert to do its magic. When auto_convert is TRUE, you could also use it to do something like runif(n = context$batch_size). The python version also provides the memory pool here but we don't provide a way to use the memory pool for constructing arrays, so I didn't add it to the context object.
Because it's a list(), assignments won't have any effect outside fun. A future version may be an environment to avoid the extra unwind protects needed to allocate a new list for each call (but could be one with an overridden [[<- to prevent modification).
I added some text to the documentation for fun and disallowed lambdas for now, since a potential future workaround could be to not pass the context argument for an rlang/purrr style lambda (e.g., ~.x + .y would be the equivalent of function(context, x, y) x + y). I hesitate to add too much convenience functionality in this PR since it's already rather unwieldy.
I added a length(formals(fun)) check...you're right that the error message was awful.
| #' `auto_convert` is `TRUE`, subsequent arguments are converted to | ||
| #' R vectors before being passed to `fun` and the output is automatically | ||
| #' constructed with the expected output type via [as_arrow_array()]. | ||
| #' @param auto_convert Use `TRUE` to convert inputs before passing to `fun` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should TRUE be default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I envision it being a lot more common to use auto_convert = TRUE and went back and forth on the default value a few times. I went with this because (1) it's what the Python bindings do and (2) forcing a user to "opt-in" to the auto-convert behaviour at least clues them in that there's something magical going on, even if they don't understand exactly what it is. I don't really have strong feelings about this, I guess FALSE just seemed like a safer default.
r/R/compute.R
Outdated
| } | ||
| } | ||
|
|
||
| as_scalar_function_out_type <- function(x) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is odd to me, can you add a comment explaining it? I also wonder if it's named badly, or actually whether both of these functions are: as_scalar_function_in_type returns a Schema and as_scalar_function_out_type returns a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment and updated the name to out_type_as_function()...I use these to simplify the amount of branches that have to exist in the C++ code (which is already rather complicated).
r/R/compute.R
Outdated
| ) | ||
| } | ||
|
|
||
| as_scalar_function_in_type <- function(x) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason these shouldn't be as_schema.Field and as_schema.DataType, in which case this function goes away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good point...I hesitate to add as_schema.(DataType|Field)() because I don't know that there's anywhere else that a DataType should be interpreted as a Schema. as_schema() might get used to sanitize arguments, in which case I would expect an error for something that can't be interpreted in this way (as opposed to, say, a substrait schema, which should be coerced and used).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also updated the name...I don't really have strong opinions about as_schema.Field()/data type, this PR just seemed like the wrong place to have that discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make a JIRA for that? Seems reasonable/good-first-issue material
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! ARROW-17179
r/R/query-engine.R
Outdated
| # as_record_batch_reader() will build and run an ExecPlan | ||
| node <- self$SourceNode(as_record_batch_reader(.data$.data)) | ||
| # as_arrow_table() will build and run an ExecPlan | ||
| node <- self$SourceNode(as_arrow_table(.data$.data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nealrichardson Did I do something horrible here? I think what this change does is force the result of a head() into memory (suboptimal but probably ok); however, I want to be very sure that I'm not forcing source of the head() into memory here (which would be very very bad).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this would force you to consume the whole RBR to take the head, no bueno. Can you detect whether a UDF is used in .data$.data and only do as_arrow_table in that case? (Maybe even just whether there are any UDFs registered is close enough?)
(Side note: this line will need special handling in #13541 too, it's a sneaky place where $Build() actually starts running the query, which we don't want there.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see...it was indeed very very bad! I reverted this change...unfortunately it means that %>% head() %>% collect() doesn't work with a UDF in the preceeding pipeline. The workarounds are %>% head() %>% (then something with a udf) %>% collect() %>% head() and %>% collect() %>% head(). I think it's probably better to keep that explicit rather than magically ingest the whole query (or wait until we can properly figure out how to execute UDFs in a R-level record batch reader to merge this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make a followup JIRA for this? I think there are a few options, but agree that we should just error for now, no need to grow the scope even further at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! (ARROW-17178)
| rbr$read_table() | ||
| } | ||
|
|
||
| if (identical(tolower(Sys.info()[["sysname"]]), "windows")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, maybe I'm just fatigued and not understanding: this runs successfully on windows and errors elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope: you're right and it's very weird. I tried to structure the test such that it either errored properly OR worked (but didn't sweep anything under the rug). To me it suggests that there's a route to conditionally evaluating on POSIX, too (I just don't know what it is yet).
Co-authored-by: Neal Richardson <[email protected]>
|
I'm on 📱 now and can't find the Review button, but I hereby +1 this, please merge when ready. Thanks for the great work and discussion here! |
|
Benchmark runs are scheduled for baseline = 791e5bd and contender = 010b592. 010b592 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
…Hub issue numbers (#34260) Rewrite the Jira issue numbers to the GitHub issue numbers, so that the GitHub issue numbers are automatically linked to the issues by pkgdown's auto-linking feature. Issue numbers have been rewritten based on the following correspondence. Also, the pkgdown settings have been changed and updated to link to GitHub. I generated the Changelog page using the `pkgdown::build_news()` function and verified that the links work correctly. --- ARROW-6338 #5198 ARROW-6364 #5201 ARROW-6323 #5169 ARROW-6278 #5141 ARROW-6360 #5329 ARROW-6533 #5450 ARROW-6348 #5223 ARROW-6337 #5399 ARROW-10850 #9128 ARROW-10624 #9092 ARROW-10386 #8549 ARROW-6994 #23308 ARROW-12774 #10320 ARROW-12670 #10287 ARROW-16828 #13484 ARROW-14989 #13482 ARROW-16977 #13514 ARROW-13404 #10999 ARROW-16887 #13601 ARROW-15906 #13206 ARROW-15280 #13171 ARROW-16144 #13183 ARROW-16511 #13105 ARROW-16085 #13088 ARROW-16715 #13555 ARROW-16268 #13550 ARROW-16700 #13518 ARROW-16807 #13583 ARROW-16871 #13517 ARROW-16415 #13190 ARROW-14821 #12154 ARROW-16439 #13174 ARROW-16394 #13118 ARROW-16516 #13163 ARROW-16395 #13627 ARROW-14848 #12589 ARROW-16407 #13196 ARROW-16653 #13506 ARROW-14575 #13160 ARROW-15271 #13170 ARROW-16703 #13650 ARROW-16444 #13397 ARROW-15016 #13541 ARROW-16776 #13563 ARROW-15622 #13090 ARROW-18131 #14484 ARROW-18305 #14581 ARROW-18285 #14615 * Closes: #33631 Authored-by: SHIMA Tatsuya <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
No description provided.