-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW 16968: [C++] Expand Python-UDF support to Arrow Substrait #13500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format? or See also: |
|
This PR can be usefully reviewed despite it being close to but not entirely ready due to these reasons:
Also, please ignore all recent commits except the last one. |
| /* | ||
| for (const auto& ext : plan.extensions()) { | ||
| switch (ext.mapping_type_case()) { | ||
| case substrait::extensions::SimpleExtensionDeclaration::kExtensionFunction: { | ||
| const auto& fn = ext.extension_function(); | ||
| if (fn.has_udf()) { | ||
| const auto& udf = fn.udf(); | ||
| const auto& in_types = udf.input_types(); | ||
| int size = in_types.size(); | ||
| std::vector<std::pair<std::shared_ptr<DataType>, bool>> input_types; | ||
| for (int i=0; i<size; i++) { | ||
| ARROW_ASSIGN_OR_RAISE(auto input_type, FromProto(in_types.Get(i), ext_set)); | ||
| input_types.push_back(std::move(input_type)); | ||
| } | ||
| ARROW_ASSIGN_OR_RAISE(auto output_type, FromProto(udf.output_type(), ext_set)); | ||
| decls.push_back(std::move(UdfDeclaration{ | ||
| fn.name(), | ||
| udf.code(), | ||
| udf.summary(), | ||
| udf.description(), | ||
| std::move(input_types), | ||
| std::move(output_type), | ||
| })); | ||
| } | ||
| break; | ||
| } | ||
| default: { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the commented-out code-block explained here.
|
|
||
| import pytest | ||
|
|
||
| import numpy as np |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is important?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a leftover of my removing unit tests as explained here. I can remove this line but may need it back when the unit tests are included.
| if pyarrow_is_extension_id_registry(registry): | ||
| reg = <ExtensionIdRegistry>(registry) | ||
| return reg.sp_registry | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can get rid of this empty line.
| if pyarrow_is_function_registry(registry): | ||
| reg = <BaseFunctionRegistry>(registry) | ||
| return reg.registry | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe remove this empty line?
| import pyarrow as pa | ||
| from pyarrow.lib import tobytes | ||
| from pyarrow.lib import ArrowInvalid | ||
| from pyarrow.substrait import make_extension_id_registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This should probably come after the try-catch block for checking non-erroneous substrait import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not useful at all, is it? Can just call substrait.make_extension_id_registry?
| cdef public shared_ptr[CRecordBatch] pyarrow_unwrap_batch(object batch) | ||
| cdef public shared_ptr[CTable] pyarrow_unwrap_table(object table) | ||
|
|
||
| cdef public CFunctionRegistry* pyarrow_unwrap_function_registry(object registry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we consider using a shared_ptr instead?
cc @lidavidm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has crossed my mind when I was working on the code of this PR. I also think a shared_ptr would make things easier but will require changes across Py/Arrow that are likely better handled in a dedicated PR.
| ---------- | ||
| plan : Buffer | ||
| The serialized Substrait plan to execute. | ||
| extid_registry : ExtensionIdRegistry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: May be ext_id_registry?
| from pyarrow._compute cimport FunctionRegistry | ||
|
|
||
|
|
||
| from pyarrow._exec_plan cimport is_supported_execplan_output_type, execplan | ||
| from pyarrow._compute import make_function_registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: order of imports?
|
@rtpsw I went through the PR very abstractly. Will look into this again. Shouldn't it be better to wait until Substrait related factor are finalized. cc @westonpace |
Despite the Substrait agreement being pending, I posted this PR due to a chicken-and-egg problem: if the overall approach in this PR is rejected, then the Substrait agreement I'm currently pursuing could be irrelevant. At this time, I'm seeking only an approval of the approach here rather than a detailed review. Assuming this approval is given, I'll work to reach the Substrait agreement and get back to this PR. |
|
@westonpace, could you quickly check to confirm the general approach in this PR is acceptable, for the purpose noted here? |
|
The approach, if I'm understanding correctly, is to use C++ to make two passes through the plan (or maybe its one pass). The first pass gets all the UDFs out of the plan. Pyarrow then unpickles and registers those UDFs. The second actually consumes the plan, using a registry that contains those unpickled functions. This wouldn't be my first approach. I think I'd prefer adding another callback like the consumer_factory for UDF handling. This would make it easier to handle situations where there are alternative UDF handlers. Or, for example, a C++ or R user that still wants to be able to run python UDFs. However, I'm not opposed to this approach. The end pyarrow interface to the user is still just "substrait in->data out" so if we wanted to move to a different approach in the future that would be fine. |
This is a fair description. For the purpose of alignment with my corresponding Substrait proposal, could you confirm the data associated with each UDF is appropriate/acceptable? If so, I'll ensure it gets expressed in the Substrait plan, even if it end up being organized differently there.
The current approach does not block using a UDF handler. I think the only real difference is that in my approach the data for all UDFs is packed together and crosses the C++/Python boundary once. Given this data, one can write a loop that calls any UDF handler on any of the UDF records, with optional record filtering and other such enhancements if needed. This would be an alternative to the current behavior you described as "Pyarrow then unpickles and registers those UDFs"; I don't think this needs to be implemented right away, but I'm open to arguments in favor. |
|
For the purposes of a consumer (Acero) I would say summary and description are superfluous. I can see them being useful other components (e.g. plan visualizer) though. I don't think we need to be perfectly aligned with Substrait to start with so this is fine. New features should start as extensions and move into Substrait once there is some proven usage. |
|
Great, I'll shift my focus to the Substrait PR. Feel free to review at a lower priority until I notify here that I'm done with that PR. |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand whether this PR is finished or not (I see some code commented out). If not, can you please make it draft?
I am not an expert in this code, but here are some comments.
| namespace compute { | ||
|
|
||
| std::unique_ptr<FunctionRegistry> MakeFunctionRegistry() { | ||
| return FunctionRegistry::Make(GetFunctionRegistry()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the point of this mostly trivial function? Why not let the user call FunctionRegistry::Make directly?
| std::shared_ptr<Table>* output_table; | ||
| }; | ||
|
|
||
| /// @} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't remove this, this matches the opening brace in \addtogroup execnode-options above.
| const substrait::Plan& plan, | ||
| const ExtensionIdRegistry* registry = default_extension_id_registry()); | ||
| const ExtensionIdRegistry* registry = default_extension_id_registry(), | ||
| bool exclude_functions = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add documentation for this parameter in the docstring above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, as a nit, double negatives are not terrific, so I would instead suggest bool include_functions = true.
| Result<std::vector<FieldRef>> FromProto( | ||
| const google::protobuf::RepeatedPtrField<substrait::Expression>& exprs, | ||
| const std::string& what) { | ||
| std::vector<FieldRef> fields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to presize this?
| int size = exprs.size(); | ||
| for (int i = 0; i < size; i++) { | ||
| ARROW_ASSIGN_OR_RAISE(FieldRef field, FromProto(exprs[i], what)); | ||
| fields.push_back(field); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fields.push_back(field); | |
| fields.push_back(std::move(field)); |
| from pyarrow._compute import make_function_registry | ||
|
|
||
|
|
||
| def make_extension_id_registry(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Public APIs should get a docstring.
| return execplan([], output_type, c_decls, True, c_func_registry) | ||
|
|
||
|
|
||
| def run_query(plan, extid_registry, func_registry): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should func_registry be optional?
| def run_query(plan, extid_registry, func_registry): | |
| def run_query(plan, extid_registry, func_registry=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what about extid_registry? Are there cases where it could be omitted? @westonpace
| include "table.pxi" | ||
|
|
||
| # Compute registries | ||
| include "compute.pxi" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, why is this addition necessary? Can Substrait instead directly import these declarations?
| import pyarrow as pa | ||
| from pyarrow.lib import tobytes | ||
| from pyarrow.lib import ArrowInvalid | ||
| from pyarrow.substrait import make_extension_id_registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not useful at all, is it? Can just call substrait.make_extension_id_registry?
| reader = substrait.run_query(buf) | ||
| extid_registry = substrait.make_extension_id_registry() | ||
| func_registry = substrait.make_function_registry() | ||
| reader = substrait.run_query(buf, extid_registry, func_registry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there also be a test with func_registry omitted or None?
I converted to a draft. However, this is fairly mature code in the sense that I tested locally. What mostly keeps this PR from being ready for review is pending changes elsewhere that it depends on. See this explanation post, including about the commented-out code. |
|
For reference, the following issues were created as a follow-up: |
|
Thank you for your contribution. Unfortunately, this pull request has been marked as stale because it has had no activity in the past 365 days. Please remove the stale label or comment below, or this PR will be closed in 14 days. Feel free to re-open this if it has been closed in error. If you do not have repository permissions to reopen the PR, please tag a maintainer. |
See https://issues.apache.org/jira/browse/ARROW-16968