Skip to content

Commit e0ccfa1

Browse files
jvanstratendrinwestonpace
authored
ARROW-16700: [C++][R][Datasets] aggregates on partitioning columns (#13518)
This updates the Scanner node such that it will use the guarantee expression to fill out columns missing from the dataset but guaranteed to be some constant with appropriate scalars, rather than just inserting a null placeholder column. In case both are available, the dataset constructor prefers using the scalar from the guarantee expression over the actual data, since the latter would probably be an array that unnecessarily repeats the constant value. This is the other part of what was uncovered while analyzing ARROW-16700, the more direct cause being a duplicate of ARROW-16904 (see also #13509 for my fix for that). Lead-authored-by: Jeroen van Straten <[email protected]> Co-authored-by: Aldrin M <[email protected]> Co-authored-by: octalene <[email protected]> Co-authored-by: Aldrin Montana <[email protected]> Co-authored-by: Weston Pace <[email protected]> Signed-off-by: Weston Pace <[email protected]>
1 parent b3ce0fa commit e0ccfa1

File tree

4 files changed

+73
-53
lines changed

4 files changed

+73
-53
lines changed

cpp/src/arrow/compute/exec/expression.cc

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -456,17 +456,31 @@ Result<Expression> Expression::Bind(const Schema& in_schema,
456456
return BindImpl(*this, in_schema, exec_context);
457457
}
458458

459-
Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial) {
459+
Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial,
460+
Expression guarantee) {
460461
ExecBatch out;
461462

462463
if (partial.kind() == Datum::RECORD_BATCH) {
463464
const auto& partial_batch = *partial.record_batch();
465+
out.guarantee = std::move(guarantee);
464466
out.length = partial_batch.num_rows();
465467

468+
ARROW_ASSIGN_OR_RAISE(auto known_field_values,
469+
ExtractKnownFieldValues(out.guarantee));
470+
466471
for (const auto& field : full_schema.fields()) {
467-
ARROW_ASSIGN_OR_RAISE(auto column,
468-
FieldRef(field->name()).GetOneOrNone(partial_batch));
472+
auto field_ref = FieldRef(field->name());
473+
474+
// If we know what the value must be from the guarantee, prefer to use that value
475+
// than the data from the record batch (if it exists at all -- probably it doesn't),
476+
// because this way it will be a scalar.
477+
auto known_field_value = known_field_values.map.find(field_ref);
478+
if (known_field_value != known_field_values.map.end()) {
479+
out.values.emplace_back(known_field_value->second);
480+
continue;
481+
}
469482

483+
ARROW_ASSIGN_OR_RAISE(auto column, field_ref.GetOneOrNone(partial_batch));
470484
if (column) {
471485
if (!column->type()->Equals(field->type())) {
472486
// Referenced field was present but didn't have the expected type.
@@ -490,13 +504,14 @@ Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial)
490504
ARROW_ASSIGN_OR_RAISE(auto partial_batch,
491505
RecordBatch::FromStructArray(partial.make_array()));
492506

493-
return MakeExecBatch(full_schema, partial_batch);
507+
return MakeExecBatch(full_schema, partial_batch, std::move(guarantee));
494508
}
495509

496510
if (partial.is_scalar()) {
497511
ARROW_ASSIGN_OR_RAISE(auto partial_array,
498512
MakeArrayFromScalar(*partial.scalar(), 1));
499-
ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array));
513+
ARROW_ASSIGN_OR_RAISE(
514+
auto out, MakeExecBatch(full_schema, partial_array, std::move(guarantee)));
500515

501516
for (Datum& value : out.values) {
502517
if (value.is_scalar()) continue;

cpp/src/arrow/compute/exec/expression.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ Result<Expression> SimplifyWithGuarantee(Expression,
226226
/// RecordBatch which may have missing or incorrectly ordered columns.
227227
/// Missing fields will be replaced with null scalars.
228228
ARROW_EXPORT Result<ExecBatch> MakeExecBatch(const Schema& full_schema,
229-
const Datum& partial);
229+
const Datum& partial,
230+
Expression guarantee = literal(true));
230231

231232
/// Execute a scalar expression against the provided state and input ExecBatch. This
232233
/// expression must be bound.

cpp/src/arrow/dataset/scanner.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -900,9 +900,6 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
900900
std::move(batch_gen),
901901
[scan_options](const EnumeratedRecordBatch& partial)
902902
-> Result<util::optional<compute::ExecBatch>> {
903-
ARROW_ASSIGN_OR_RAISE(util::optional<compute::ExecBatch> batch,
904-
compute::MakeExecBatch(*scan_options->dataset_schema,
905-
partial.record_batch.value));
906903
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
907904
// than this, for example parquet's row group stats. Failing to do this leaves
908905
// perf on the table because row group stats could be used to skip kernel execs in
@@ -911,7 +908,12 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
911908
// Additionally, if a fragment failed to perform projection pushdown there may be
912909
// unnecessarily materialized columns in batch. We could drop them now instead of
913910
// letting them coast through the rest of the plan.
914-
batch->guarantee = partial.fragment.value->partition_expression();
911+
auto guarantee = partial.fragment.value->partition_expression();
912+
913+
ARROW_ASSIGN_OR_RAISE(
914+
util::optional<compute::ExecBatch> batch,
915+
compute::MakeExecBatch(*scan_options->dataset_schema,
916+
partial.record_batch.value, guarantee));
915917

916918
// tag rows with fragment- and batch-of-origin
917919
batch->values.emplace_back(partial.fragment.index);

cpp/src/arrow/dataset/scanner_test.cc

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "arrow/compute/api_vector.h"
2828
#include "arrow/compute/cast.h"
2929
#include "arrow/compute/exec/exec_plan.h"
30+
#include "arrow/compute/exec/expression_internal.h"
3031
#include "arrow/dataset/plan.h"
3132
#include "arrow/dataset/test_util.h"
3233
#include "arrow/record_batch.h"
@@ -1371,16 +1372,19 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
13711372
const std::shared_ptr<Schema>& dataset_schema,
13721373
const std::shared_ptr<Schema>& physical_schema,
13731374
const std::vector<std::vector<std::string>>& fragment_batch_strs,
1374-
const std::vector<compute::Expression>& guarantees,
1375-
std::function<void(compute::ExecBatch*, const RecordBatch&)> make_exec_batch = {}) {
1375+
const std::vector<compute::Expression>& guarantees) {
1376+
// If guarantees are provided we must have one for each batch
13761377
if (!guarantees.empty()) {
13771378
EXPECT_EQ(fragment_batch_strs.size(), guarantees.size());
13781379
}
1380+
13791381
RecordBatchVector record_batches;
13801382
FragmentVector fragments;
13811383
fragments.reserve(fragment_batch_strs.size());
1382-
for (size_t i = 0; i < fragment_batch_strs.size(); i++) {
1383-
const auto& batch_strs = fragment_batch_strs[i];
1384+
1385+
// construct fragments first
1386+
for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); frag_ndx++) {
1387+
const auto& batch_strs = fragment_batch_strs[frag_ndx];
13841388
RecordBatchVector fragment_batches;
13851389
fragment_batches.reserve(batch_strs.size());
13861390
for (const auto& batch_str : batch_strs) {
@@ -1390,37 +1394,40 @@ DatasetAndBatches DatasetAndBatchesFromJSON(
13901394
fragment_batches.end());
13911395
fragments.push_back(std::make_shared<InMemoryFragment>(
13921396
physical_schema, std::move(fragment_batches),
1393-
guarantees.empty() ? literal(true) : guarantees[i]));
1397+
guarantees.empty() ? literal(true) : guarantees[frag_ndx]));
13941398
}
13951399

1400+
// then construct ExecBatches that can reference fields from constructed Fragments
13961401
std::vector<compute::ExecBatch> batches;
13971402
auto batch_it = record_batches.begin();
1398-
for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size();
1399-
++fragment_index) {
1400-
for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size();
1401-
++batch_index) {
1403+
for (size_t frag_ndx = 0; frag_ndx < fragment_batch_strs.size(); ++frag_ndx) {
1404+
size_t frag_batch_count = fragment_batch_strs[frag_ndx].size();
1405+
1406+
for (size_t batch_index = 0; batch_index < frag_batch_count; ++batch_index) {
14021407
const auto& batch = *batch_it++;
14031408

14041409
// the scanned ExecBatches will begin with physical columns
14051410
batches.emplace_back(*batch);
14061411

1407-
// allow customizing the ExecBatch (e.g. to fill in placeholders for partition
1408-
// fields)
1409-
if (make_exec_batch) {
1410-
make_exec_batch(&batches.back(), *batch);
1412+
// augment scanned ExecBatch with columns for this fragment's guarantee
1413+
if (!guarantees.empty()) {
1414+
EXPECT_OK_AND_ASSIGN(auto known_fields,
1415+
ExtractKnownFieldValues(guarantees[frag_ndx]));
1416+
for (const auto& known_field : known_fields.map) {
1417+
batches.back().values.emplace_back(known_field.second);
1418+
}
14111419
}
14121420

14131421
// scanned batches will be augmented with fragment and batch indices
1414-
batches.back().values.emplace_back(static_cast<int>(fragment_index));
1422+
batches.back().values.emplace_back(static_cast<int>(frag_ndx));
14151423
batches.back().values.emplace_back(static_cast<int>(batch_index));
14161424

14171425
// ... and with the last-in-fragment flag
1418-
batches.back().values.emplace_back(batch_index ==
1419-
fragment_batch_strs[fragment_index].size() - 1);
1420-
batches.back().values.emplace_back(fragments[fragment_index]->ToString());
1426+
batches.back().values.emplace_back(batch_index == frag_batch_count - 1);
1427+
batches.back().values.emplace_back(fragments[frag_ndx]->ToString());
14211428

14221429
// each batch carries a guarantee inherited from its Fragment's partition expression
1423-
batches.back().guarantee = fragments[fragment_index]->partition_expression();
1430+
batches.back().guarantee = fragments[frag_ndx]->partition_expression();
14241431
}
14251432
}
14261433

@@ -1437,31 +1444,26 @@ DatasetAndBatches MakeBasicDataset() {
14371444

14381445
const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"});
14391446

1440-
return DatasetAndBatchesFromJSON(
1441-
dataset_schema, physical_schema,
1442-
{
1443-
{
1444-
R"([{"a": 1, "b": null},
1445-
{"a": 2, "b": true}])",
1446-
R"([{"a": null, "b": true},
1447-
{"a": 3, "b": false}])",
1448-
},
1449-
{
1450-
R"([{"a": null, "b": true},
1451-
{"a": 4, "b": false}])",
1452-
R"([{"a": 5, "b": null},
1453-
{"a": 6, "b": false},
1454-
{"a": 7, "b": false}])",
1455-
},
1456-
},
1457-
{
1458-
equal(field_ref("c"), literal(23)),
1459-
equal(field_ref("c"), literal(47)),
1460-
},
1461-
[](compute::ExecBatch* batch, const RecordBatch&) {
1462-
// a placeholder will be inserted for partition field "c"
1463-
batch->values.emplace_back(std::make_shared<Int32Scalar>());
1464-
});
1447+
return DatasetAndBatchesFromJSON(dataset_schema, physical_schema,
1448+
{
1449+
{
1450+
R"([{"a": 1, "b": null},
1451+
{"a": 2, "b": true}])",
1452+
R"([{"a": null, "b": true},
1453+
{"a": 3, "b": false}])",
1454+
},
1455+
{
1456+
R"([{"a": null, "b": true},
1457+
{"a": 4, "b": false}])",
1458+
R"([{"a": 5, "b": null},
1459+
{"a": 6, "b": false},
1460+
{"a": 7, "b": false}])",
1461+
},
1462+
},
1463+
{
1464+
equal(field_ref("c"), literal(23)),
1465+
equal(field_ref("c"), literal(47)),
1466+
});
14651467
}
14661468

14671469
DatasetAndBatches MakeNestedDataset() {

0 commit comments

Comments
 (0)