Skip to content

Conversation

@jvanstraten
Copy link
Contributor

@jvanstraten jvanstraten commented Jul 4, 2022

The min/max aggregate compute kernels seemed to discard their state between partitions, so they would only aggregate the last partition they see (in each thread).

This is the simplest change I could come up with to fix this, but honestly I'm not sure why the local variable even exists. It seems to me it could just be replaced with this->state directly, since there doesn't seem to be any failure path where this->state isn't updated from local. Am I missing something?

ETA: I tried to make a test case for this, only to find that there is already a test case for this. In that case however, it seems that the merging of the partition results is done by Merge'ing the result of separate Consume calls, rather than chaining multiple Consume calls. I'm not sure how to trigger the latter behavior from a normal C++ test case.

@github-actions
Copy link

github-actions bot commented Jul 4, 2022

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, don't think you're missing anything.

It seems you should be able to hit this by sending multiple batches through the plan, but I would have thought that's already tested.

@jvanstraten
Copy link
Contributor Author

Chunked arrays are tested here, but, as cleaned up from my debug prints for chunked_input1, the call pattern there is

auto min_max_impl_1 = MinMaxImpl(...)
auto min_max_impl_2 = MinMaxImpl(...)
min_max_impl_2.Consume([5,1,2,3,4]) -> (1, 5)
min_max_impl_1.MergeFrom(min_max_impl_2) -> (1, 5)
auto min_max_impl_3 = MinMaxImpl(...)
min_max_impl_3.Consume([9,1,null,3,4]) -> (1, 9)
min_max_impl_1.MergeFrom(min_max_impl_3) -> (1, 9)
min_max_impl_1.Finalize() -> (1, 9)

for which it does not matter whether Consume() overrides the previous state. The test cases aren't great anyway since the last chunk has the min and max values for each of them, but even if I'd swap one of them around you'd still get

auto min_max_impl_1 = MinMaxImpl(...)
auto min_max_impl_2 = MinMaxImpl(...)
min_max_impl_2.Consume([9,1,null,3,4]) -> (1, 9)
min_max_impl_1.MergeFrom(min_max_impl_2) -> (1, 9)
auto min_max_impl_3 = MinMaxImpl(...)
min_max_impl_3.Consume([5,1,2,3,4]) -> (1, 5)
min_max_impl_1.MergeFrom(min_max_impl_3) -> (1, 9)
min_max_impl_1.Finalize() -> (1, 9)

I don't know why it's doing it this way. Seems rather inefficient to me. Evidently though, for larger-scale workloads it does call Consume() more than once before merging and throwing away each MinMaxImpl instance.

@lidavidm
Copy link
Member

lidavidm commented Jul 5, 2022

My bad, I was looking at the hash aggregate tests!

I suppose the 'right' way to test it is to construct an ExecPlan and feed the data through. There are some tests in plan_test.cc but it doesn't have much coverage of the kernels themselves. We may need some parameterization/a helper to test "both ways" of calling aggregates in much the same way hash_aggregate_test.cc does.

@lidavidm
Copy link
Member

lidavidm commented Jul 5, 2022

The behavior you're seeing stems from this:

class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
public:
Status Init(KernelContext* ctx, KernelInitArgs args) override {
input_descrs_ = &args.inputs;
options_ = args.options;
return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
}
Status Execute(const ExecBatch& args, ExecListener* listener) override {
return ExecuteImpl(args.values, listener);
}
Status ExecuteImpl(const std::vector<Datum>& args, ExecListener* listener) {
ARROW_ASSIGN_OR_RAISE(
batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize()));
ExecBatch batch;
while (batch_iterator_->Next(&batch)) {
// TODO: implement parallelism
if (batch.length > 0) {
RETURN_NOT_OK(Consume(batch));
}
}
Datum out;
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &out));
RETURN_NOT_OK(listener->OnResult(std::move(out)));
return Status::OK();
}
Datum WrapResults(const std::vector<Datum>&,
const std::vector<Datum>& outputs) override {
DCHECK_EQ(1, outputs.size());
return outputs[0];
}
private:
Status Consume(const ExecBatch& batch) {
// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
ARROW_ASSIGN_OR_RAISE(
auto batch_state,
kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_}));
if (batch_state == nullptr) {
return Status::Invalid("ScalarAggregation requires non-null kernel state");
}
KernelContext batch_ctx(exec_context());
batch_ctx.SetState(batch_state.get());
RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state()));
return Status::OK();
}

We could/should fix that up too (perhaps in a separate JIRA)

@lidavidm
Copy link
Member

lidavidm commented Jul 5, 2022

I suppose it's done that way because of the // TODO: implement parallelism but I wonder if we want to just say, "use Acero for that", or if we do want to actually go implement parallelism

@westonpace
Copy link
Member

I suppose the 'right' way to test it is to construct an ExecPlan and feed the data through. There are some tests in plan_test.cc but it doesn't have much coverage of the kernels themselves. We may need some parameterization/a helper to test "both ways" of calling aggregates in much the same way hash_aggregate_test.cc does.

@drin is working on this for min/max.

I think there is probably more interest/priority in making the exec plan case work well vs the chunked array case. If you are doing compute on chunked arrays and the entire chunked array fits in memory, then it is probably sufficient to just concatenate the chunked array into a single array at the beginning of your compute work.

@jvanstraten jvanstraten changed the title ARROW-16700: [C++][R][Datasets] aggregates on partitioning columns ARROW-16904: [C++] min/max not deterministic if Parquet files have multiple row groups Jul 5, 2022
@github-actions
Copy link

github-actions bot commented Jul 5, 2022

@github-actions
Copy link

github-actions bot commented Jul 5, 2022

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@jvanstraten
Copy link
Contributor Author

I've relinked this to ARROW-16904 since it's more indicative of what's being fixed and so ARROW-16700 can be used for the issue related to guarantee expressions, but feel free to overrule this PR with @drin's version once they've worked out test coverage.

@drin
Copy link
Contributor

drin commented Jul 5, 2022

Sorry, I just caught up to the various threads leading here.

I can just add onto this PR, if you don't mind @jvanstraten .

@jvanstraten
Copy link
Contributor Author

Also fine with me. I assume you mean to PR into my branch so it ends up in here?

@drin
Copy link
Contributor

drin commented Jul 5, 2022

yep!

@drin
Copy link
Contributor

drin commented Jul 7, 2022

opened a draft PR to add to this one:

https://github.com/jvanstraten/arrow/pull/5

It looks a bit messy because of rebases, not sure how to easily improve that

@lidavidm
Copy link
Member

lidavidm commented Jul 8, 2022

Something is very off with the merge here. Can we do something like cherry-pick the commits onto a fresh branch and then force-push?

jvanstraten and others added 7 commits July 8, 2022 17:15
Based on multiple locations fixed in 85789a9, it seemed better to
simplify a few conditions so that updating `this->state` from `local` is
consolidated and more maintainable. This should make it easier to
understand that there is only a difference in how the local state is
updated when consuming input data and there is no difference in how the
aggregate's state is updated from the local state.
This R test captures the code from ARROW-16904 that reproduces this bug.
It uses a scalar aggregate, min, on a dataset (1 column) that produces
several exec batches.
This test exercises the bug found in ARROW-16904, by creating a
ScalarAggregateNode for the "min_max" function. Previously, there was no
unit test for scalar aggregate nodes.
This minor improvement splits the min and max values into different
chunks of chunked_input1. This doesn't improve coverage given how the
scalar aggregate executes on a chunked array, but it seemed a nice extra
thing to include
Added to brief documentation for AggregateNodeOptions that provides
insight to how the `keys` attribute affects how the `aggregates`
attribute is used (or rather, how inputs are delegated to those
aggregates).
@jvanstraten jvanstraten force-pushed the ARROW-16700-aggregates-on-partitioning-columns branch from 9dffe78 to 695e29b Compare July 8, 2022 15:16
@jvanstraten
Copy link
Contributor Author

Should be fixed. I guess a merge commit was lost somewhere along the lines and git(hub) got confused.

Comment on lines +113 to +115
/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
/// expected to be a HashAggregate function. If the keys attribute is an empty vector,
/// then each aggregate is assumed to be a ScalarAggregate function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔️ thank you!


auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1);
auto minmax_opts = std::make_shared<ScalarAggregateOptions>();
auto expected_value = StructScalar::Make(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but doesn't ScalarFromJSON handle StructScalar directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried looking at the function and searching for usages but I couldn't figure it out. If you know how to do it, I can update it. I wasn't sure if a StructScalar is essentially an object (e.g. { -8, 12} would be a 2 field struct)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Append a JSON value that is either an array of N elements in order
// or an object mapping struct names to values (omitted struct members
// are mapped to null).

should be {"min": -8, "max": 12}

@lidavidm
Copy link
Member

lidavidm commented Jul 8, 2022

Note the R lints https://github.com/apache/arrow/runs/7253994871?check_suite_focus=true


> lintr::lint_package('/arrow/r')
Warning: file=tests/testthat/test-dataset.R,line=622,col=30,[infix_spaces_linter] Put spaces around all infix operators.
INFO:archery:Running Docker linter
Warning: file=tests/testthat/test-dataset.R,line=624,col=27,[function_left_parentheses_linter] Remove spaces before the left parenthesis in a function call.
Warning: file=tests/testthat/test-dataset.R,line=629,col=8,[pipe_continuation_linter] `%>%` should always have a space before it and a new line after it, unless the full pipeline fits on one line.
Warning: file=tests/testthat/test-dataset.R,line=632,col=26,[single_quotes_linter] Only use double-quotes.
Warning: file=tests/testthat/test-dataset.R,line=632,col=51,[infix_spaces_linter] Put spaces around all infix operators.
Warning: file=tests/testthat/test-dataset.R,line=632,col=52,[single_quotes_linter] Only use double-quotes.
Warning: file=tests/testthat/test-dataset.R,line=635,col=27,[function_left_parentheses_linter] Remove spaces before the left parenthesis in a function call.
Warning: file=tests/testthat/test-dataset.R,line=640,col=8,[pipe_continuation_linter] `%>%` should always have a space before it and a new line after it, unless the full pipeline fits on one line.
> 
>

@drin
Copy link
Contributor

drin commented Jul 8, 2022

Note the R lints https://github.com/apache/arrow/runs/7253994871?check_suite_focus=true

> lintr::lint_package('/arrow/r')
Warning: file=tests/testthat/test-dataset.R,line=622,col=30,[infix_spaces_linter] Put spaces around all infix operators.
...
Warning: file=tests/testthat/test-dataset.R,line=640,col=8,[pipe_continuation_linter] `%>%` should always have a space before it and a new line after it, unless the full pipeline fits on one line.

thanks. I tried to match style because i had trouble running the linter. I'll fix these and try to get it running.

drin added 2 commits July 8, 2022 12:17
Used `ScalarFromJSON` to construct the StructScalar instead of
`StructScalar::Make`.

Also fixed style using clang-format
drin and others added 2 commits July 8, 2022 12:50
simplifying test body by specifying `chunk_size` to `write_parquet` and using `replicate` instead of `sapply`.

Adding other style changes for readability.

Co-authored-by: Neal Richardson <[email protected]>
@lidavidm lidavidm merged commit 66c66d0 into apache:master Jul 11, 2022
@drin drin deleted the ARROW-16700-aggregates-on-partitioning-columns branch July 11, 2022 18:10
westonpace added a commit that referenced this pull request Jul 22, 2022
…13518)

This updates the Scanner node such that it will use the guarantee expression to fill out columns missing from the dataset but guaranteed to be some constant with appropriate scalars, rather than just inserting a null placeholder column. In case both are available, the dataset constructor prefers using the scalar from the guarantee expression over the actual data, since the latter would probably be an array that unnecessarily repeats the constant value.

This is the other part of what was uncovered while analyzing ARROW-16700, the more direct cause being a duplicate of ARROW-16904 (see also #13509 for my fix for that).

Lead-authored-by: Jeroen van Straten <[email protected]>
Co-authored-by: Aldrin M <[email protected]>
Co-authored-by: octalene <[email protected]>
Co-authored-by: Aldrin Montana <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants