-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-15238: [C++] ARROW_ENGINE module with substrait consumer #11707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ff51e35
906aacf
5fc6c34
151cf72
1cf679b
6dce17b
53c7c7d
cd919f3
c079359
5c69fb1
2a77d28
a5c25de
873ec23
05be909
7309964
27af6b6
37a62de
8397ee5
61feb19
1eb4bc0
e63da1e
f37084a
2725ed7
11e7f3f
96727e1
07b259c
885836a
0b94f3a
98c74a8
6fd2e73
0034651
b5e6fa4
e7184f5
bf93511
2573bc5
095560f
24517ff
c4d9877
f2e0e71
b09d372
1557f4f
839826b
d12545f
9dd9c70
9a569c9
ee32bb5
cf33bd1
2fc0123
9468920
4da0939
1487b37
d237377
5e0d6e5
cd22ef0
1dc4c9d
ed5b0d5
5d61724
37b5673
5b362f9
b1a9bba
b6499ae
1473dc2
107c5b7
5012be6
2c73334
65c2ee8
6d8ebc7
adfe196
0a965c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -352,6 +352,7 @@ endif() | |
|
|
||
| if(ARROW_ENGINE) | ||
| set(ARROW_COMPUTE ON) | ||
| set(ARROW_DATASET ON) | ||
| endif() | ||
|
|
||
| if(ARROW_SKYHOOK) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| *_generated* | ||
| *generated/substrait/*.pb.* | ||
| *.grpc.fb.* | ||
| *arrowExports.cpp* | ||
| *parquet_constants.* | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| # - Find Arrow Engine (arrow/engine/api.h, libarrow_engine.a, libarrow_engine.so) | ||
| # | ||
| # This module requires Arrow from which it uses | ||
| # arrow_find_package() | ||
| # | ||
| # This module defines | ||
| # ARROW_ENGINE_FOUND, whether Arrow Engine has been found | ||
| # ARROW_ENGINE_IMPORT_LIB, | ||
| # path to libarrow_engine's import library (Windows only) | ||
| # ARROW_ENGINE_INCLUDE_DIR, directory containing headers | ||
| # ARROW_ENGINE_LIB_DIR, directory containing Arrow Engine libraries | ||
| # ARROW_ENGINE_SHARED_LIB, path to libarrow_engine's shared library | ||
| # ARROW_ENGINE_STATIC_LIB, path to libarrow_engine.a | ||
|
|
||
| if(DEFINED ARROW_ENGINE_FOUND) | ||
| return() | ||
| endif() | ||
|
|
||
| set(find_package_arguments) | ||
| if(${CMAKE_FIND_PACKAGE_NAME}_FIND_VERSION) | ||
| list(APPEND find_package_arguments "${${CMAKE_FIND_PACKAGE_NAME}_FIND_VERSION}") | ||
| endif() | ||
| if(${CMAKE_FIND_PACKAGE_NAME}_FIND_REQUIRED) | ||
| list(APPEND find_package_arguments REQUIRED) | ||
| endif() | ||
| if(${CMAKE_FIND_PACKAGE_NAME}_FIND_QUIETLY) | ||
| list(APPEND find_package_arguments QUIET) | ||
| endif() | ||
| find_package(Arrow ${find_package_arguments}) | ||
| find_package(Parquet ${find_package_arguments}) | ||
|
|
||
| if(ARROW_FOUND AND PARQUET_FOUND) | ||
| arrow_find_package(ARROW_ENGINE | ||
| "${ARROW_HOME}" | ||
| arrow_engine | ||
| arrow/engine/api.h | ||
| ArrowEngine | ||
| arrow-engine) | ||
| if(NOT ARROW_ENGINE_VERSION) | ||
| set(ARROW_ENGINE_VERSION "${ARROW_VERSION}") | ||
| endif() | ||
| endif() | ||
|
|
||
| if("${ARROW_ENGINE_VERSION}" VERSION_EQUAL "${ARROW_VERSION}") | ||
| set(ARROW_ENGINE_VERSION_MATCH TRUE) | ||
| else() | ||
| set(ARROW_ENGINE_VERSION_MATCH FALSE) | ||
| endif() | ||
|
|
||
| mark_as_advanced(ARROW_ENGINE_IMPORT_LIB | ||
| ARROW_ENGINE_INCLUDE_DIR | ||
| ARROW_ENGINE_LIBS | ||
| ARROW_ENGINE_LIB_DIR | ||
| ARROW_ENGINE_SHARED_IMP_LIB | ||
| ARROW_ENGINE_SHARED_LIB | ||
| ARROW_ENGINE_STATIC_LIB | ||
| ARROW_ENGINE_VERSION | ||
| ARROW_ENGINE_VERSION_MATCH) | ||
|
|
||
| find_package_handle_standard_args( | ||
| ArrowEngine | ||
| REQUIRED_VARS ARROW_ENGINE_INCLUDE_DIR ARROW_ENGINE_LIB_DIR ARROW_ENGINE_VERSION_MATCH | ||
| VERSION_VAR ARROW_ENGINE_VERSION) | ||
| set(ARROW_ENGINE_FOUND ${ArrowEngine_FOUND}) | ||
|
|
||
| if(ArrowEngine_FOUND AND NOT ArrowEngine_FIND_QUIETLY) | ||
| message(STATUS "Found the Arrow Engine by ${ARROW_ENGINE_FIND_APPROACH}") | ||
| message(STATUS "Found the Arrow Engine shared library: ${ARROW_ENGINE_SHARED_LIB}") | ||
| message(STATUS "Found the Arrow Engine import library: ${ARROW_ENGINE_IMPORT_LIB}") | ||
| message(STATUS "Found the Arrow Engine static library: ${ARROW_ENGINE_STATIC_LIB}") | ||
| endif() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,187 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include <arrow/api.h> | ||
| #include <arrow/compute/api.h> | ||
| #include <arrow/compute/exec/options.h> | ||
| #include <arrow/engine/substrait/serde.h> | ||
|
|
||
| #include <cstdlib> | ||
| #include <iostream> | ||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| namespace eng = ::arrow::engine; | ||
| namespace cp = ::arrow::compute; | ||
|
|
||
| #define ABORT_ON_FAILURE(expr) \ | ||
| do { \ | ||
| arrow::Status status_ = (expr); \ | ||
| if (!status_.ok()) { \ | ||
| std::cerr << status_.message() << std::endl; \ | ||
| abort(); \ | ||
| } \ | ||
| } while (0); | ||
|
|
||
| arrow::Future<std::shared_ptr<arrow::Buffer>> GetSubstraitFromServer(); | ||
|
|
||
| class IgnoringConsumer : public cp::SinkNodeConsumer { | ||
| public: | ||
| explicit IgnoringConsumer(size_t tag) : tag_{tag} {} | ||
|
|
||
| arrow::Status Consume(cp::ExecBatch batch) override { | ||
| // Consume a batch of data | ||
| // (just print its row count to stdout) | ||
| std::cout << "-" << tag_ << " consumed " << batch.length << " rows" << std::endl; | ||
| return arrow::Status::OK(); | ||
| } | ||
|
|
||
| arrow::Future<> Finish() override { | ||
| // Signal to the consumer that the last batch has been delivered | ||
| // (we don't do any real work in this consumer so mark it finished immediately) | ||
| // | ||
| // The returned future should only finish when all outstanding tasks have completed | ||
| // (after this method is called Consume is guaranteed not to be called again) | ||
| std::cout << "-" << tag_ << " finished" << std::endl; | ||
| return arrow::Future<>::MakeFinished(); | ||
| } | ||
|
|
||
| private: | ||
| size_t tag_; | ||
| }; | ||
|
|
||
| int main(int argc, char** argv) { | ||
| // Plans arrive at the consumer serialized in a substrait-formatted Buffer | ||
| auto maybe_serialized_plan = GetSubstraitFromServer().result(); | ||
| ABORT_ON_FAILURE(maybe_serialized_plan.status()); | ||
| std::shared_ptr<arrow::Buffer> serialized_plan = | ||
| std::move(maybe_serialized_plan).ValueOrDie(); | ||
|
|
||
| // Print the received plan to stdout as JSON | ||
| arrow::Result<std::string> maybe_plan_json = | ||
| eng::internal::SubstraitToJSON("Plan", *serialized_plan); | ||
| ABORT_ON_FAILURE(maybe_plan_json.status()); | ||
| std::cout << std::string('#', 50) << " received substrait::Plan:" << std::endl; | ||
| std::cout << maybe_plan_json.ValueOrDie() << std::endl; | ||
|
|
||
| // Deserializing a plan requires a factory for consumers: each time a sink node is | ||
| // deserialized, a consumer is constructed into which its batches will be piped. | ||
| std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers; | ||
| std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] { | ||
| // All batches produced by the plan will be fed into IgnoringConsumers: | ||
| auto tag = consumers.size(); | ||
| consumers.emplace_back(new IgnoringConsumer{tag}); | ||
| return consumers.back(); | ||
| }; | ||
|
|
||
| // NOTE Although most of the Deserialize functions require a const ExtensionSet& to | ||
| // resolve extension references, a Plan is what we use to construct that ExtensionSet. | ||
| // (It should be an optional output later.) In particular, it does not need to be kept | ||
| // alive nor does the serialized plan- none of the arrow:: objects in the output will | ||
| // contain references to memory owned by either. | ||
| auto maybe_decls = eng::DeserializePlan(*serialized_plan, consumer_factory); | ||
| ABORT_ON_FAILURE(maybe_decls.status()); | ||
| std::vector<cp::Declaration> decls = std::move(maybe_decls).ValueOrDie(); | ||
|
|
||
| // It's safe to drop the serialized plan; we don't leave references to its memory | ||
| serialized_plan.reset(); | ||
|
|
||
| // Construct an empty plan (note: configure Function registry and ThreadPool here) | ||
| auto maybe_plan = cp::ExecPlan::Make(); | ||
| ABORT_ON_FAILURE(maybe_plan.status()); | ||
| std::shared_ptr<cp::ExecPlan> plan = std::move(maybe_plan).ValueOrDie(); | ||
|
|
||
| for (const cp::Declaration& decl : decls) { | ||
| // Add decl to plan (note: configure ExecNode registry here) | ||
| ABORT_ON_FAILURE(decl.AddToPlan(plan.get()).status()); | ||
| } | ||
|
|
||
| // Validate the plan and print it to stdout | ||
| ABORT_ON_FAILURE(plan->Validate()); | ||
| std::cout << std::string('#', 50) << " produced arrow::ExecPlan:" << std::endl; | ||
| std::cout << plan->ToString() << std::endl; | ||
|
|
||
| // Start the plan... | ||
| std::cout << std::string('#', 50) << " consuming batches:" << std::endl; | ||
| ABORT_ON_FAILURE(plan->StartProducing()); | ||
|
|
||
| // ... and wait for it to finish | ||
| ABORT_ON_FAILURE(plan->finished().status()); | ||
| return EXIT_SUCCESS; | ||
| } | ||
|
|
||
| arrow::Future<std::shared_ptr<arrow::Buffer>> GetSubstraitFromServer() { | ||
| // Emulate server interaction by parsing hard coded JSON | ||
| return eng::internal::SubstraitFromJSON("Plan", R"({ | ||
| "relations": [ | ||
| {"rel": { | ||
| "read": { | ||
| "base_schema": { | ||
| "struct": { | ||
| "types": [ {"i64": {}}, {"bool": {}} ] | ||
| }, | ||
| "names": ["i", "b"] | ||
| }, | ||
| "filter": { | ||
| "selection": { | ||
| "directReference": { | ||
| "structField": { | ||
| "field": 1 | ||
| } | ||
| } | ||
| } | ||
| }, | ||
| "local_files": { | ||
| "items": [ | ||
| { | ||
| "uri_file": "file:///tmp/dat1.parquet", | ||
| "format": "FILE_FORMAT_PARQUET" | ||
| }, | ||
| { | ||
| "uri_file": "file:///tmp/dat2.parquet", | ||
| "format": "FILE_FORMAT_PARQUET" | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| }} | ||
| ], | ||
| "extension_uris": [ | ||
| { | ||
| "extension_uri_anchor": 7, | ||
| "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml" | ||
| } | ||
| ], | ||
| "extensions": [ | ||
| {"extension_type": { | ||
| "extension_uri_reference": 7, | ||
| "type_anchor": 42, | ||
| "name": "null" | ||
| }}, | ||
| {"extension_type_variation": { | ||
| "extension_uri_reference": 7, | ||
| "type_variation_anchor": 23, | ||
| "name": "u8" | ||
| }}, | ||
| {"extension_function": { | ||
| "extension_uri_reference": 7, | ||
| "function_anchor": 42, | ||
| "name": "add" | ||
| }} | ||
| ] | ||
| })"); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -722,6 +722,10 @@ if(ARROW_COMPUTE) | |
| add_subdirectory(compute) | ||
| endif() | ||
|
|
||
| if(ARROW_ENGINE) | ||
|
||
| add_subdirectory(engine) | ||
| endif() | ||
|
|
||
| if(ARROW_CUDA) | ||
| add_subdirectory(gpu) | ||
| endif() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.