Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,7 @@ Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(compute::Expression)

auto create_fragment =
[schema](std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
if (!batch->schema()->Equals(schema)) {
return Status::TypeError("yielded batch had schema ", *batch->schema(),
" which did not match InMemorySource's: ", *schema);
}

RETURN_NOT_OK(CheckProjectable(*schema, *batch->schema()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this could be a construction-time check to avoid repeated checking except there is no way to return a Status there, unfortunately. (Not a big deal, though.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but would have to change this to a ::Make() method and didn't want to go that far here.

return std::make_shared<InMemoryFragment>(RecordBatchVector{std::move(batch)});
};

Expand Down
50 changes: 46 additions & 4 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ TEST_F(TestInMemoryDataset, ReplaceSchema) {
schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});

// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
auto new_schema = schema({field("i32", int32())});
ASSERT_OK_AND_ASSIGN(auto new_dataset, dataset->ReplaceSchema(new_schema));
AssertDatasetHasSchema(new_dataset, new_schema);
// add field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
new_schema = schema({field("str", utf8())});
ASSERT_OK_AND_ASSIGN(new_dataset, dataset->ReplaceSchema(new_schema));
AssertDatasetHasSchema(new_dataset, new_schema);
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
Expand Down Expand Up @@ -107,6 +111,40 @@ TEST_F(TestInMemoryDataset, InMemoryFragment) {
AssertSchemaEqual(batch->schema(), schema);
}

TEST_F(TestInMemoryDataset, HandlesDifferingSchemas) {
constexpr int64_t kBatchSize = 1024;

// These schemas can be merged
SetSchema({field("i32", int32()), field("f64", float64())});
auto batch1 = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
SetSchema({field("i32", int32())});
auto batch2 = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
RecordBatchVector batches{batch1, batch2};

auto dataset = std::make_shared<InMemoryDataset>(schema_, batches);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually want this to be valid though? I would expect the batches of a dataset to have a consistent schema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In file fragments, it's totally normal to have a physical schema that is different from the dataset schema.

This came up when I realized we could create a union dataset out of filesystem ones but not in-memory ones if the schemas differed.

The other way (arguably) would be to have ReplaceSchema project the batches (though that is a lot more work).

I thought about that, but then are we materializing the projected batches before any scan is started? It seems more efficient for the projection to happen as part of the scan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point about the fragments.

I was thinking InMemoryDataset already has all the data in memory, so it's not a big deal anyways. But yes, that's unnecessary work compared to this.


ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
ASSERT_EQ(*table->schema(), *schema_);
ASSERT_EQ(table->num_rows(), 2 * kBatchSize);

// These cannot be merged
SetSchema({field("i32", int32()), field("f64", float64())});
batch1 = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
SetSchema({field("i32", struct_({field("x", date32())}))});
batch2 = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
batches = RecordBatchVector({batch1, batch2});

dataset = std::make_shared<InMemoryDataset>(schema_, batches);

ASSERT_OK_AND_ASSIGN(scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
EXPECT_RAISES_WITH_MESSAGE_THAT(
TypeError, testing::HasSubstr("fields had matching names but differing types"),
scanner->ToTable());
}

class TestUnionDataset : public DatasetFixtureMixin {};

TEST_F(TestUnionDataset, ReplaceSchema) {
Expand All @@ -131,9 +169,13 @@ TEST_F(TestUnionDataset, ReplaceSchema) {
AssertDatasetEquals(reader.get(), dataset.get());

// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
auto new_schema = schema({field("i32", int32())});
ASSERT_OK_AND_ASSIGN(auto new_dataset, dataset->ReplaceSchema(new_schema));
AssertDatasetHasSchema(new_dataset, new_schema);
// add nullable field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
new_schema = schema({field("str", utf8())});
ASSERT_OK_AND_ASSIGN(new_dataset, dataset->ReplaceSchema(new_schema));
AssertDatasetHasSchema(new_dataset, new_schema);
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ TEST_F(TestFileSystemDataset, ReplaceSchema) {
FileSystemDataset::Make(schm, literal(true), format, nullptr, {}));

// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
auto new_schema = schema({field("i32", int32())});
ASSERT_OK_AND_ASSIGN(auto new_dataset, dataset->ReplaceSchema(new_schema));
AssertDatasetHasSchema(new_dataset, new_schema);
// add nullable field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
new_schema = schema({field("str", utf8())});
ASSERT_OK_AND_ASSIGN(new_dataset, dataset->ReplaceSchema(new_schema));
AssertDatasetHasSchema(new_dataset, new_schema);
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ using compute::project;

using fs::internal::GetAbstractPathExtension;

/// \brief Assert a dataset produces data with the schema
void AssertDatasetHasSchema(std::shared_ptr<Dataset> ds, std::shared_ptr<Schema> schema) {
ASSERT_OK_AND_ASSIGN(auto scanner_builder, ds->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
ASSERT_EQ(*table->schema(), *schema);
}

class FileSourceFixtureMixin : public ::testing::Test {
public:
std::unique_ptr<FileSource> GetSource(std::shared_ptr<Buffer> buffer) {
Expand Down
18 changes: 18 additions & 0 deletions r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,24 @@ test_that("UnionDataset can merge schemas", {
expect_equal(actual, expected)
})

test_that("UnionDataset handles InMemoryDatasets", {
sub_df1 <- Table$create(
x = Array$create(c(1, 2, 3)),
y = Array$create(c("a", "b", "c"))
)
sub_df2 <- Table$create(
x = Array$create(c(4, 5)),
z = Array$create(c("d", "e"))
)

ds1 <- InMemoryDataset$create(sub_df1)
ds2 <- InMemoryDataset$create(sub_df2)
ds <- c(ds1, ds2)
actual <- ds %>% collect(as_data_frame = FALSE)
expected <- concat_tables(sub_df1, sub_df2)
expect_equal(actual, expected)
})

test_that("map_batches", {
ds <- open_dataset(dataset_dir, partitioning = "part")

Expand Down