Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,10 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter));
pre_filtered = true;
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
if (options->start_offset != kDefaultStartOffset) {
ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRangeRowGroups(
row_groups, options->start_offset, options->length));
}
}
// Open the reader and pay the real IO cost.
auto make_generator =
Expand All @@ -607,6 +611,10 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
// row groups were not already filtered; do this now
ARROW_ASSIGN_OR_RAISE(row_groups,
parquet_fragment->FilterRowGroups(options->filter));
if (options->start_offset != kDefaultStartOffset) {
ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRangeRowGroups(
row_groups, options->start_offset, options->length));
}
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
ARROW_ASSIGN_OR_RAISE(auto column_projection,
Expand Down Expand Up @@ -881,6 +889,28 @@ Result<std::vector<int>> ParquetFileFragment::FilterRowGroups(
return row_groups;
}

Result<std::vector<int>> ParquetFileFragment::FilterRangeRowGroups(
std::vector<int> filtered_row_groups, int64_t start_offset, int64_t length) {
std::vector<int> row_groups;
for (int row_group : filtered_row_groups) {
auto rg_metadata = metadata_->RowGroup(row_group);
std::shared_ptr<parquet::ColumnChunkMetaData> cc0 = rg_metadata->ColumnChunk(0);
int64_t r_start = cc0->data_page_offset();
if (cc0->has_dictionary_page() && r_start > cc0->dictionary_page_offset()) {
r_start = cc0->dictionary_page_offset();
}
int64_t r_bytes = 0L;
for (int col_id = 0; col_id < rg_metadata->num_columns(); col_id++) {
r_bytes += rg_metadata->ColumnChunk(col_id)->total_compressed_size();
}
int64_t midpoint = r_start + r_bytes / 2;
if (midpoint >= start_offset && midpoint < (start_offset + length)) {
row_groups.push_back(row_group);
}
}
return row_groups;
}

Result<std::vector<compute::Expression>> ParquetFileFragment::TestRowGroups(
compute::Expression predicate) {
auto lock = physical_schema_mutex_.Lock();
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {

/// Return a filtered subset of row group indices.
Result<std::vector<int>> FilterRowGroups(compute::Expression predicate);
Result<std::vector<int>> FilterRangeRowGroups(
std::vector<int> pre_filter, int64_t start_offset, int64_t length);
/// Simplify the predicate against the statistics of each row group.
Result<std::vector<compute::Expression>> TestRowGroups(compute::Expression predicate);
/// Try to count rows matching the predicate using metadata. Expects
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,29 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdown) {
kNumRowGroups - 5);
}

TEST_P(TestParquetFileFormatScan, RangeScan) {
constexpr int64_t kNumRowGroups = 16;
constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason for this exact number?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following existing tests

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.


auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups);
auto source = GetFileSource(reader.get());

SetSchema(reader->schema()->fields());
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

SetFilter(literal(true));
SetRange(0, std::numeric_limits<int64_t>::max());
CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups);

SetFilter(literal(true));
SetRange(0, 0);
CountRowsAndBatchesInScan(fragment, 0, 0);

SetFilter(literal(true));
SetRange(0, 2048);
CountRowsAndBatchesInScan(fragment, 6, 3);
}

TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) {
constexpr int64_t kNumRowGroups = 16;

Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,22 @@ Status ScannerBuilder::BatchSize(int64_t batch_size) {
return Status::OK();
}

Status ScannerBuilder::StartOffset(int64_t start_offset) {
if (start_offset <= 0) {
return Status::Invalid("StartOffset must be greater than 0, got ", start_offset);
}
scan_options_->start_offset = start_offset;
return Status::OK();
}

Status ScannerBuilder::Length(int64_t length) {
if (length <= 0) {
return Status::Invalid("Length must be greater than 0, got ", length);
}
scan_options_->length = length;
return Status::OK();
}

Status ScannerBuilder::BatchReadahead(int32_t batch_readahead) {
if (batch_readahead < 0) {
return Status::Invalid("BatchReadahead must be greater than or equal 0, got ",
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ constexpr int64_t kDefaultBatchSize = 1 << 17; // 128Ki rows
constexpr int32_t kDefaultBatchReadahead = 16;
constexpr int32_t kDefaultFragmentReadahead = 4;
constexpr int32_t kDefaultBytesReadahead = 1 << 25; // 32MiB
constexpr int64_t kDefaultStartOffset = -1;

/// Scan-specific options, which can be changed between scans of the same dataset.
struct ARROW_DS_EXPORT ScanOptions {
Expand Down Expand Up @@ -136,6 +137,14 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Parameters which control when the plan should pause for a slow consumer
acero::BackpressureOptions backpressure =
acero::BackpressureOptions::DefaultBackpressure();

/// Parameters which control where the scan starts.
/// This is used in reading a big file by splitting into multiple scanners.
int64_t start_offset = kDefaultStartOffset;

/// Parameters which control how long the scanner should read.
/// This is used in reading a big file by splitting into multiple scanners.
int64_t length;
};

/// Scan-specific options, which can be changed between scans of the same dataset.
Expand Down Expand Up @@ -496,6 +505,18 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// Schema.
Status Filter(const compute::Expression& filter);

/// \brief set the start offset of the scanner
/// \param[in] start_offset start offset for the scan
///
/// \return Failure if the start_offset is not greater than 0
Status StartOffset(int64_t start_offset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For changes in the dataset api, I'd request @westonpace to take a look.


/// \brief set the length of the scanner
/// \param[in] length length for the scan
///
/// \return Failure if the length is not greater than 0
Status Length(int64_t length);

/// \brief Indicate if the Scanner should make use of the available
/// ThreadPool found in ScanOptions;
Status UseThreads(bool use_threads = true);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ class FileFormatFixtureMixin : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema));
}

void SetRange(int64_t start_offset, int64_t length) {
opts_->start_offset = start_offset;
opts_->length = length;
}

void Project(std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts_->dataset_schema));
Expand Down
37 changes: 37 additions & 0 deletions java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,43 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_jni_JniWrapper
* Method: createRangeScanner
* Signature: (J[Ljava/lang/String;Ljava/lang/String;JJJJ)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createRangeScanner(
JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jstring filter,
jlong batch_size, jlong start_offset, jlong length, jlong memory_pool_id) {
JNI_METHOD_START
arrow::MemoryPool* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
if (pool == nullptr) {
JniThrow("Memory pool does not exist or has been closed");
}
std::shared_ptr<arrow::dataset::Dataset> dataset =
RetrieveNativeInstance<arrow::dataset::Dataset>(dataset_id);
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
JniGetOrThrow(dataset->NewScan());
JniAssertOkOrThrow(scanner_builder->Pool(pool));
if (columns != nullptr) {
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
if (filter != nullptr) {
// TODO: issue 32625 https://github.com/apache/arrow/issues/32625
}
JniAssertOkOrThrow(scanner_builder->StartOffset(start_offset));
JniAssertOkOrThrow(scanner_builder->Length(length));
JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size));

auto scanner = JniGetOrThrow(scanner_builder->Finish());
std::shared_ptr<DisposableScannerAdaptor> scanner_adaptor =
JniGetOrThrow(DisposableScannerAdaptor::Create(scanner));
jlong id = CreateNativeRef(scanner_adaptor);
return id;
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_jni_JniWrapper
* Method: closeScanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ private JniWrapper() {
public native long createScanner(long datasetId, String[] columns, ByteBuffer substraitProjection,
ByteBuffer substraitFilter, long batchSize, long memoryPool);

/**
* Create Scanner from a Dataset and get the native pointer of the Dataset.
* @param datasetId the native pointer of the arrow::dataset::Dataset instance.
* @param columns desired column names.
* Columns not in this list will not be emitted when performing scan operation. Null equals
* to "all columns".
* @param filter the filter to apply on the scan.
* @param batchSize batch size of scanned record batches.
* @param startOffset start offset of the range scan.
* @param length length of the range scan.
* @param memoryPool identifier of memory pool used in the native scanner.
* @return the native pointer of the arrow::dataset::Scanner instance.
*/
public native long createRangeScanner(long datasetId, String[] columns, String filter, long batchSize,
long startOffset, long length, long memoryPool);

/**
* Get a serialized schema from native instance of a Scanner.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public synchronized NativeScanner newScan(ScanOptions options) {
return new NativeScanner(context, scannerId);
}

/**
* Create a new scan with range options.
* @param options scan options with offset and length
* @return newly created native scanner
*/
public synchronized NativeScanner newRangeScan(ScanOptions options) {
if (closed) {
throw new NativeInstanceReleasedException();
}
long scannerId = JniWrapper.get().createRangeScanner(datasetId, options.getColumns().orElse(null),
options.getFilter().orElse(null), options.getBatchSize(), options.getStartOffset(), options.getLength(),
context.getMemoryPool().getNativeInstanceId());
return new NativeScanner(context, scannerId);
}

@Override
public synchronized void close() {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
*/
public class ScanOptions {
private final long batchSize;
private final long startOffset;
private final long length;
private final Optional<String[]> columns;
private final Optional<String> filter;
private final Optional<ByteBuffer> substraitProjection;
private final Optional<ByteBuffer> substraitFilter;

Expand Down Expand Up @@ -56,11 +59,27 @@ public ScanOptions(String[] columns, long batchSize) {
* Only columns present in the Array will be scanned.
*/
public ScanOptions(long batchSize, Optional<String[]> columns) {
this(batchSize, columns, -1, -1, Optional.empty());
}

/**
* Constructor.
* @param batchSize Maximum row number each batch returned
* @param columns (Optional) Projected columns
* @param startOffset scan range start offset
* @param length scan range length
* @param filter (Optional) filter to apply with the scan
*/
public ScanOptions(long batchSize, Optional<String[]> columns,
long startOffset, long length, Optional<String> filter) {
Preconditions.checkNotNull(columns);
this.batchSize = batchSize;
this.columns = columns;
this.substraitProjection = Optional.empty();
this.substraitFilter = Optional.empty();
this.startOffset = startOffset;
this.length = length;
this.filter = filter;
}

public ScanOptions(long batchSize) {
Expand All @@ -71,6 +90,18 @@ public Optional<String[]> getColumns() {
return columns;
}

public Optional<String> getFilter() {
return filter;
}

public long getStartOffset() {
return startOffset;
}

public long getLength() {
return length;
}

public long getBatchSize() {
return batchSize;
}
Expand Down