Skip to content

Commit ec579df

Browse files
authored
ARROW-17287: [C++] Create scan node that doesn't rely on the merged generator (#13782)
**Primary Goal:** Create a scanner that "cancels" properly. In other words, when the scan node is marked finished then all scan-related thread tasks will be finished. This is different than the current model where I/O tasks are allowed to keep parts of the scan alive via captures of shared_ptr state. **Secondary Goal:** Remove our dependency on the merged generator and make the scanner more accessible. The merged generator is complicated and does not support cancellation, and it currently only understood by a very small set of people. **Secondary Goal:** Add interfaces for schema evolution. This wasn't originally a goal but arose from my attempt to codify and normalize what we are currently doing. These interfaces should eventually allow for things like filling a missing field with a default value or using the parquet column id for field resolution. Performance isn't a goal for this rework but ideally this should not degrade performance. Authored-by: Weston Pace <[email protected]> Signed-off-by: Weston Pace <[email protected]>
1 parent 89c0214 commit ec579df

29 files changed

+2104
-135
lines changed

cpp/src/arrow/compute/exec/exec_plan.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
#include "arrow/datum.h"
3333
#include "arrow/record_batch.h"
3434
#include "arrow/result.h"
35+
#include "arrow/table.h"
3536
#include "arrow/util/async_generator.h"
3637
#include "arrow/util/checked_cast.h"
3738
#include "arrow/util/key_value_metadata.h"
3839
#include "arrow/util/logging.h"
3940
#include "arrow/util/tracing_internal.h"
41+
#include "arrow/util/vector.h"
4042

4143
namespace arrow {
4244

@@ -555,6 +557,61 @@ bool Declaration::IsValid(ExecFactoryRegistry* registry) const {
555557
return !this->factory_name.empty() && this->options != nullptr;
556558
}
557559

560+
Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
561+
ExecContext* exec_context) {
562+
std::shared_ptr<std::shared_ptr<Table>> output_table =
563+
std::make_shared<std::shared_ptr<Table>>();
564+
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
565+
ExecPlan::Make(exec_context));
566+
Declaration with_sink = Declaration::Sequence(
567+
{declaration, {"table_sink", TableSinkNodeOptions(output_table.get())}});
568+
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get()));
569+
ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
570+
return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; });
571+
}
572+
573+
Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
574+
ExecContext* exec_context) {
575+
return DeclarationToTableAsync(std::move(declaration), exec_context).result();
576+
}
577+
578+
Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
579+
Declaration declaration, ExecContext* exec_context) {
580+
return DeclarationToTableAsync(std::move(declaration), exec_context)
581+
.Then([](const std::shared_ptr<Table>& table) {
582+
return TableBatchReader(table).ToRecordBatches();
583+
});
584+
}
585+
586+
Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
587+
Declaration declaration, ExecContext* exec_context) {
588+
return DeclarationToBatchesAsync(std::move(declaration), exec_context).result();
589+
}
590+
591+
Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(Declaration declaration,
592+
ExecContext* exec_context) {
593+
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
594+
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
595+
ExecPlan::Make(exec_context));
596+
Declaration with_sink =
597+
Declaration::Sequence({declaration, {"sink", SinkNodeOptions(&sink_gen)}});
598+
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get()));
599+
ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
600+
auto collected_fut = CollectAsyncGenerator(sink_gen);
601+
return AllComplete({exec_plan->finished(), Future<>(collected_fut)})
602+
.Then([collected_fut, exec_plan]() -> Result<std::vector<ExecBatch>> {
603+
ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result());
604+
return ::arrow::internal::MapVector(
605+
[](std::optional<ExecBatch> batch) { return std::move(*batch); },
606+
std::move(collected));
607+
});
608+
}
609+
610+
Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration,
611+
ExecContext* exec_context) {
612+
return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result();
613+
}
614+
558615
namespace internal {
559616

560617
void RegisterSourceNode(ExecFactoryRegistry*);

cpp/src/arrow/compute/exec/exec_plan.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,39 @@ struct ARROW_EXPORT Declaration {
483483
std::string label;
484484
};
485485

486+
/// \brief Utility method to run a declaration and collect the results into a table
487+
///
488+
/// This method will add a sink node to the declaration to collect results into a
489+
/// table. It will then create an ExecPlan from the declaration, start the exec plan,
490+
/// block until the plan has finished, and return the created table.
491+
ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
492+
Declaration declaration, ExecContext* exec_context = default_exec_context());
493+
494+
/// \brief Asynchronous version of \see DeclarationToTable
495+
ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
496+
Declaration declaration, ExecContext* exec_context = default_exec_context());
497+
498+
/// \brief Utility method to run a declaration and collect the results into ExecBatch
499+
/// vector
500+
///
501+
/// \see DeclarationToTable for details
502+
ARROW_EXPORT Result<std::vector<ExecBatch>> DeclarationToExecBatches(
503+
Declaration declaration, ExecContext* exec_context = default_exec_context());
504+
505+
/// \brief Asynchronous version of \see DeclarationToExecBatches
506+
ARROW_EXPORT Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(
507+
Declaration declaration, ExecContext* exec_context = default_exec_context());
508+
509+
/// \brief Utility method to run a declaration and collect the results into a vector
510+
///
511+
/// \see DeclarationToTable for details
512+
ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
513+
Declaration declaration, ExecContext* exec_context = default_exec_context());
514+
515+
/// \brief Asynchronous version of \see DeclarationToBatches
516+
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
517+
Declaration declaration, ExecContext* exec_context = default_exec_context());
518+
486519
/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
487520
///
488521
/// The RecordBatchReader does not impose any ordering on emitted batches.

cpp/src/arrow/compute/exec/expression.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "arrow/chunked_array.h"
2525
#include "arrow/compute/api_vector.h"
2626
#include "arrow/compute/exec/expression_internal.h"
27+
#include "arrow/compute/exec/util.h"
2728
#include "arrow/compute/exec_internal.h"
2829
#include "arrow/compute/function_internal.h"
2930
#include "arrow/io/memory.h"
@@ -77,9 +78,15 @@ Expression call(std::string function, std::vector<Expression> arguments,
7778
return Expression(std::move(call));
7879
}
7980

80-
const Datum* Expression::literal() const { return std::get_if<Datum>(impl_.get()); }
81+
const Datum* Expression::literal() const {
82+
if (impl_ == nullptr) return nullptr;
83+
84+
return std::get_if<Datum>(impl_.get());
85+
}
8186

8287
const Expression::Parameter* Expression::parameter() const {
88+
if (impl_ == nullptr) return nullptr;
89+
8390
return std::get_if<Parameter>(impl_.get());
8491
}
8592

@@ -91,6 +98,8 @@ const FieldRef* Expression::field_ref() const {
9198
}
9299

93100
const Expression::Call* Expression::call() const {
101+
if (impl_ == nullptr) return nullptr;
102+
94103
return std::get_if<Call>(impl_.get());
95104
}
96105

@@ -654,7 +663,7 @@ bool ExpressionHasFieldRefs(const Expression& expr) {
654663
}
655664

656665
Result<Expression> FoldConstants(Expression expr) {
657-
return Modify(
666+
return ModifyExpression(
658667
std::move(expr), [](Expression expr) { return expr; },
659668
[](Expression expr, ...) -> Result<Expression> {
660669
auto call = CallNotNull(expr);
@@ -799,7 +808,7 @@ Result<Expression> ReplaceFieldsWithKnownValues(const KnownFieldValues& known_va
799808
"ReplaceFieldsWithKnownValues called on an unbound Expression");
800809
}
801810

802-
return Modify(
811+
return ModifyExpression(
803812
std::move(expr),
804813
[&known_values](Expression expr) -> Result<Expression> {
805814
if (auto ref = expr.field_ref()) {
@@ -870,7 +879,7 @@ Result<Expression> Canonicalize(Expression expr, compute::ExecContext* exec_cont
870879
}
871880
} AlreadyCanonicalized;
872881

873-
return Modify(
882+
return ModifyExpression(
874883
std::move(expr),
875884
[&AlreadyCanonicalized, exec_context](Expression expr) -> Result<Expression> {
876885
auto call = expr.call();
@@ -1112,7 +1121,7 @@ Result<Expression> SimplifyIsValidGuarantee(Expression expr,
11121121
const Expression::Call& guarantee) {
11131122
if (guarantee.function_name != "is_valid") return expr;
11141123

1115-
return Modify(
1124+
return ModifyExpression(
11161125
std::move(expr), [](Expression expr) { return expr; },
11171126
[&](Expression expr, ...) -> Result<Expression> {
11181127
auto call = expr.call();
@@ -1154,7 +1163,7 @@ Result<Expression> SimplifyWithGuarantee(Expression expr,
11541163

11551164
if (auto inequality = Inequality::ExtractOne(guarantee)) {
11561165
ARROW_ASSIGN_OR_RAISE(auto simplified,
1157-
Modify(
1166+
ModifyExpression(
11581167
std::move(expr), [](Expression expr) { return expr; },
11591168
[&](Expression expr, ...) -> Result<Expression> {
11601169
return inequality->Simplify(std::move(expr));

cpp/src/arrow/compute/exec/expression.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ class ARROW_EXPORT Expression {
100100
// XXX someday
101101
// Result<PipelineGraph> GetPipelines();
102102

103+
bool is_valid() const { return impl_ != NULLPTR; }
104+
103105
/// Access a Call or return nullptr if this expression is not a call
104106
const Call* call() const;
105107
/// Access a Datum or return nullptr if this expression is not a literal

cpp/src/arrow/compute/exec/expression_internal.h

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -287,52 +287,5 @@ inline Result<std::shared_ptr<compute::Function>> GetFunction(
287287
return GetCastFunction(*to_type);
288288
}
289289

290-
/// Modify an Expression with pre-order and post-order visitation.
291-
/// `pre` will be invoked on each Expression. `pre` will visit Calls before their
292-
/// arguments, `post_call` will visit Calls (and no other Expressions) after their
293-
/// arguments. Visitors should return the Identical expression to indicate no change; this
294-
/// will prevent unnecessary construction in the common case where a modification is not
295-
/// possible/necessary/...
296-
///
297-
/// If an argument was modified, `post_call` visits a reconstructed Call with the modified
298-
/// arguments but also receives a pointer to the unmodified Expression as a second
299-
/// argument. If no arguments were modified the unmodified Expression* will be nullptr.
300-
template <typename PreVisit, typename PostVisitCall>
301-
Result<Expression> Modify(Expression expr, const PreVisit& pre,
302-
const PostVisitCall& post_call) {
303-
ARROW_ASSIGN_OR_RAISE(expr, Result<Expression>(pre(std::move(expr))));
304-
305-
auto call = expr.call();
306-
if (!call) return expr;
307-
308-
bool at_least_one_modified = false;
309-
std::vector<Expression> modified_arguments;
310-
311-
for (size_t i = 0; i < call->arguments.size(); ++i) {
312-
ARROW_ASSIGN_OR_RAISE(auto modified_argument,
313-
Modify(call->arguments[i], pre, post_call));
314-
315-
if (Identical(modified_argument, call->arguments[i])) {
316-
continue;
317-
}
318-
319-
if (!at_least_one_modified) {
320-
modified_arguments = call->arguments;
321-
at_least_one_modified = true;
322-
}
323-
324-
modified_arguments[i] = std::move(modified_argument);
325-
}
326-
327-
if (at_least_one_modified) {
328-
// reconstruct the call expression with the modified arguments
329-
auto modified_call = *call;
330-
modified_call.arguments = std::move(modified_arguments);
331-
return post_call(Expression(std::move(modified_call)), &expr);
332-
}
333-
334-
return post_call(std::move(expr), nullptr);
335-
}
336-
337290
} // namespace compute
338291
} // namespace arrow

cpp/src/arrow/compute/exec/filter_node.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ namespace {
3838
class FilterNode : public MapNode {
3939
public:
4040
FilterNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
41-
std::shared_ptr<Schema> output_schema, Expression filter, bool async_mode)
42-
: MapNode(plan, std::move(inputs), std::move(output_schema), async_mode),
41+
std::shared_ptr<Schema> output_schema, Expression filter)
42+
: MapNode(plan, std::move(inputs), std::move(output_schema)),
4343
filter_(std::move(filter)) {}
4444

4545
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
@@ -61,8 +61,7 @@ class FilterNode : public MapNode {
6161
filter_expression.type()->ToString());
6262
}
6363
return plan->EmplaceNode<FilterNode>(plan, std::move(inputs), std::move(schema),
64-
std::move(filter_expression),
65-
filter_options.async_mode);
64+
std::move(filter_expression));
6665
}
6766

6867
const char* kind_name() const override { return "FilterNode"; }

cpp/src/arrow/compute/exec/map_node.cc

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,10 @@ namespace arrow {
3434
namespace compute {
3535

3636
MapNode::MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
37-
std::shared_ptr<Schema> output_schema, bool async_mode)
37+
std::shared_ptr<Schema> output_schema)
3838
: ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
3939
std::move(output_schema),
40-
/*num_outputs=*/1) {
41-
if (async_mode) {
42-
executor_ = plan_->exec_context()->executor();
43-
} else {
44-
executor_ = nullptr;
45-
}
46-
}
40+
/*num_outputs=*/1) {}
4741

4842
void MapNode::ErrorReceived(ExecNode* input, Status error) {
4943
DCHECK_EQ(input, inputs_[0]);
@@ -82,9 +76,6 @@ void MapNode::StopProducing(ExecNode* output) {
8276

8377
void MapNode::StopProducing() {
8478
EVENT(span_, "StopProducing");
85-
if (executor_) {
86-
this->stop_source_.RequestStop();
87-
}
8879
if (input_counter_.Cancel()) {
8980
this->Finish();
9081
}

cpp/src/arrow/compute/exec/map_node.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ namespace compute {
4545
class ARROW_EXPORT MapNode : public ExecNode {
4646
public:
4747
MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
48-
std::shared_ptr<Schema> output_schema, bool async_mode);
48+
std::shared_ptr<Schema> output_schema);
4949

5050
void ErrorReceived(ExecNode* input, Status error) override;
5151

@@ -69,11 +69,6 @@ class ARROW_EXPORT MapNode : public ExecNode {
6969
protected:
7070
// Counter for the number of batches received
7171
AtomicCounter input_counter_;
72-
73-
::arrow::internal::Executor* executor_;
74-
75-
// Variable used to cancel remaining tasks in the executor
76-
StopSource stop_source_;
7772
};
7873

7974
} // namespace compute

cpp/src/arrow/compute/exec/options.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,10 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
8484
/// excluded in the batch emitted by this node.
8585
class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions {
8686
public:
87-
explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true)
88-
: filter_expression(std::move(filter_expression)), async_mode(async_mode) {}
87+
explicit FilterNodeOptions(Expression filter_expression)
88+
: filter_expression(std::move(filter_expression)) {}
8989

9090
Expression filter_expression;
91-
bool async_mode;
9291
};
9392

9493
/// \brief Make a node which executes expressions on input batches, producing new batches.
@@ -100,14 +99,11 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions {
10099
class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
101100
public:
102101
explicit ProjectNodeOptions(std::vector<Expression> expressions,
103-
std::vector<std::string> names = {}, bool async_mode = true)
104-
: expressions(std::move(expressions)),
105-
names(std::move(names)),
106-
async_mode(async_mode) {}
102+
std::vector<std::string> names = {})
103+
: expressions(std::move(expressions)), names(std::move(names)) {}
107104

108105
std::vector<Expression> expressions;
109106
std::vector<std::string> names;
110-
bool async_mode;
111107
};
112108

113109
/// \brief Make a node which aggregates input batches, optionally grouped by keys.

cpp/src/arrow/compute/exec/project_node.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ namespace {
4141
class ProjectNode : public MapNode {
4242
public:
4343
ProjectNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
44-
std::shared_ptr<Schema> output_schema, std::vector<Expression> exprs,
45-
bool async_mode)
46-
: MapNode(plan, std::move(inputs), std::move(output_schema), async_mode),
44+
std::shared_ptr<Schema> output_schema, std::vector<Expression> exprs)
45+
: MapNode(plan, std::move(inputs), std::move(output_schema)),
4746
exprs_(std::move(exprs)) {}
4847

4948
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
@@ -72,8 +71,7 @@ class ProjectNode : public MapNode {
7271
++i;
7372
}
7473
return plan->EmplaceNode<ProjectNode>(plan, std::move(inputs),
75-
schema(std::move(fields)), std::move(exprs),
76-
project_options.async_mode);
74+
schema(std::move(fields)), std::move(exprs));
7775
}
7876

7977
const char* kind_name() const override { return "ProjectNode"; }

0 commit comments

Comments
 (0)