Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")

define_option(ARROW_COMPUTE "Build the Arrow Compute Modules" ON)

define_option(ARROW_DATASET "Build the Arrow Dataset Modules" ON)

define_option(ARROW_FLIGHT
"Build the Arrow Flight RPC System (requires GRPC, Protocol Buffers)" OFF)

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ add_subdirectory(io)
add_subdirectory(util)
add_subdirectory(vendored)

if(ARROW_DATASET)
add_subdirectory(dataset)
endif()

if(ARROW_FLIGHT)
add_subdirectory(flight)
endif()
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

add_custom_target(arrow_dataset)

# Headers: top level
arrow_install_all_headers("arrow/dataset")

set(ARROW_DATASET_SRCS scanner.cc)

add_arrow_lib(arrow_dataset
OUTPUTS
ARROW_DATASET_LIBRARIES
SOURCES
${ARROW_DATASET_SRCS}
SHARED_LINK_LIBS
arrow_shared
STATIC_LINK_LIBS
arrow_static)

if(ARROW_DATASET_TEST_LINKAGE STREQUAL "static")
set(ARROW_DATASET_TEST_LINK_LIBS arrow_dataset_static ${ARROW_TEST_STATIC_LINK_LIBS})
else()
set(ARROW_DATASET_TEST_LINK_LIBS arrow_dataset_shared ${ARROW_TEST_SHARED_LINK_LIBS})
endif()

foreach(LIB_TARGET ${ARROW_DATASET_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_DS_EXPORTING)
endforeach()

if(NOT WIN32)
add_arrow_test(file_test
EXTRA_LINK_LIBS
${ARROW_DATASET_TEST_LINK_LIBS}
PREFIX
"arrow-dataset"
LABELS
"arrow_dataset")
endif()
31 changes: 31 additions & 0 deletions cpp/src/arrow/dataset/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Arrow C++ Datasets

The `arrow::dataset` subcomponent provides an API to read and write
semantic datasets stored in different locations and formats. It
facilitates parallel processing of datasets spread across different
physical files and serialization formats. Other concerns such as
partitioning, filtering (partition- and column-level), and schema
normalization are also addressed.

## Development Status

Pre-alpha as of June 2019. API subject to change without notice.
26 changes: 26 additions & 0 deletions cpp/src/arrow/dataset/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/dataset/dataset.h"
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_csv.h"
#include "arrow/dataset/file_feather.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/scanner.h"
114 changes: 114 additions & 0 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"

namespace arrow {
namespace dataset {

/// \brief A granular piece of a Dataset, such as an individual file,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this class is used for. Can the user do something with it, apart from inspecting its properties?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a data structure for representing the internal structure of the dataset. Users in general will not have to interact with this data structure unless they wish to explore the physical topology of the dataset (e.g. iterating through partitions and listing files)

/// which can be read/scanned separately from other fragments
class ARROW_DS_EXPORT DataFragment {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the new macro?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This produces libarrow_dataset.so, which is a different shared library, so needs a different export header. One of the reasons to have a different shared library is that it needs to link to libparquet.so

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... The dependency relationships between Arrow and Parquet are already complicated (PyArrow depends on Parquet which depends on Arrow).

What if we make file formats pluggable (with some kind of registry for file extensions for example, or something else) and so Parquet would extend the datasets registry in libparquet.so without Arrow knowing about it upfront?

public:
virtual ~DataFragment() = default;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what that means. Scanning == reading the whole fragment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeholder. Let's implement some splittable DataFragments (e.g. Parquet) and then figure out what APIs we need

virtual bool splittable() const = 0;

/// \brief Partition options to use when scanning this fragment. May be
/// nullptr
virtual std::shared_ptr<ScanOptions> scan_options() const = 0;
};

/// \brief Conditions to apply to a dataset when reading to include or
/// exclude fragments, filter out rows, etc.
struct DataSelector {
std::vector<std::shared_ptr<Filter>> filters;

// TODO(wesm): Select specific partition keys, file path globs, or
// other common desirable selections
};

/// \brief A basic component of a Dataset which yields zero or more
/// DataFragments
class ARROW_DS_EXPORT DataSource {
public:
virtual ~DataSource() = default;

virtual std::string type() const = 0;

virtual std::unique_ptr<DataFragmentIterator> GetFragments(
const DataSelector& selector) = 0;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the difference, since DataSource is already able to give out a DataFragment iterator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is not yet complete -- this is intended as a concrete subclass. Note that DataSource has pure virtual functions while this one's is marked "override"

class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
public:
std::unique_ptr<DataFragmentIterator> GetFragments(
const DataSelector& selector) override;

private:
DataFragmentVector fragments_;
};

/// \brief Top-level interface for a Dataset with fragments coming
/// from possibly multiple sources
class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
public:
/// \param[in] source a single input data source
/// \param[in] schema a known schema to conform to, may be nullptr
explicit Dataset(std::shared_ptr<DataSource> source,
std::shared_ptr<Schema> schema = NULLPTR);

/// \param[in] sources one or more input data sources
/// \param[in] schema a known schema to conform to, may be nullptr
explicit Dataset(const std::vector<std::shared_ptr<DataSource>>& sources,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... a DataSource can already yield multiple fragments, is there a reason to have a similar thing happening here?

At worse, if we want to combine DataSources together, shouldn't we have a CompositeDataSource instead? (that would be the moral equivalent of itertools.chain perhaps?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, whatever we want. I don't think it's important to resolve this detail at this stage

std::shared_ptr<Schema> schema = NULLPTR);

virtual ~Dataset() = default;

/// \brief Begin to build a new Scan operation against this Dataset
ScannerBuilder NewScan() const;

const std::vector<std::shared_ptr<DataSource>>& sources() const { return sources_; }

std::shared_ptr<Schema> schema() const { return schema_; }

/// \brief Compute consensus schema from input data sources
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different than the schema given in the constructor? Or it's only when that schema is null?

Also, it seems all reading should happen within a Scanner, so is DataSet the right place for an inference function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where is the right place to handle the "objective schema" and where to do the workflow of:

  • Read this dataset (but I'm not sure what the schema is)
  • Infer the schema
  • Use the inferred schema to scan the dataset

Maybe the schema can be passed at part of the ScannerBuilder. I'm not sure. Let's work on implementing things

Status InferSchema(std::shared_ptr<Schema>* out);

/// \brief Return a copy of Dataset with a new target schema
Status ReplaceSchema(std::shared_ptr<Schema> schema, std::unique_ptr<Dataset>* out);

protected:
// The data sources must conform their output to this schema (with
// projections and filters taken into account)
std::shared_ptr<Schema> schema_;

std::vector<std::shared_ptr<DataSource>> sources_;
};

} // namespace dataset
} // namespace arrow
45 changes: 45 additions & 0 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Logic for automatically determining the structure of multi-file
/// dataset with possible partitioning according to available
/// partition schemes

#pragma once

#include <memory>
#include <string>

#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace dataset {

struct ARROW_DS_EXPORT DiscoveryOptions {
std::shared_ptr<FileFormat> format = NULLPTR;
std::shared_ptr<PartitionScheme> partition_scheme = NULLPTR;
};

/// \brief Using a root directory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a URI scheme at some point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, I'm not sure. If you have a FileSystem* is that sufficient? I think this detail will resolve itself as we implement things and generalize for multiple file systems and different storage layouts

ARROW_DS_EXPORT
Status DiscoverSource(const std::string& path, fs::FileSystem* filesystem,
const DiscoveryOptions& options, std::shared_ptr<DataSource>* out);

} // namespace dataset
} // namespace arrow
56 changes: 56 additions & 0 deletions cpp/src/arrow/dataset/disk_store.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>

#include "arrow/dataset/type_fwd.h"
#include "arrow/type_fwd.h"

namespace arrow {
namespace dataset {

/// \brief Loads a previously-written collection of Arrow protocol
/// files and exposes them in a way that can be consumed as a Dataset
/// source
class ARROW_DS_EXPORT DiskStoreReader : public DatasetSource {
public:
DiskStoreReader(const std::string& path, fs::FileSystem* filesystem);

private:
class DiskStoreReaderImpl;
std::unique_ptr<DiskStoreReaderImpl> impl_;

std::string path_;
fs::FileSystem* filesystem_;

DiskStoreReader() {}
};

/// \brief
class ARROW_DS_EXPORT DiskStoreWriter {
public:
Status Write(const RecordBatch& batch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a Close method too. Perhaps Abort as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is purely placeholder. I wanted to express the desire to be able to dump Scan output to disk and then memory map that later


private:
DiskStoreWriter() {}
};

} // namespace dataset
} // namespace arrow
Loading