Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 18 additions & 59 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,7 @@ Status ConcreteTypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
case flatbuf::Type::Timestamp: {
auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data);
TimeUnit::type unit = FromFlatbufferUnit(ts_type->unit());
if (ts_type->timezone() != 0 && ts_type->timezone()->Length() > 0) {
*out = timestamp(unit, ts_type->timezone()->str());
} else {
*out = timestamp(unit);
}
*out = timestamp(unit, StringFromFlatbuffers(ts_type->timezone()));
return Status::OK();
}
case flatbuf::Type::Duration: {
Expand Down Expand Up @@ -369,10 +365,7 @@ static Status TypeFromFlatbuffer(const flatbuf::Field* field,
const KeyValueMetadata* field_metadata,
std::shared_ptr<DataType>* out) {
auto type_data = field->type();
if (type_data == nullptr) {
return Status::IOError(
"Type-pointer in custom metadata of flatbuffer-encoded Field is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(type_data, "Field.type");
RETURN_NOT_OK(ConcreteTypeFromFlatbuffer(field->type_type(), type_data, children, out));

// Look for extension metadata in custom_metadata field
Expand Down Expand Up @@ -474,14 +467,8 @@ Status KeyValueMetadataFromFlatbuffer(const KVVector* fb_metadata,

metadata->reserve(fb_metadata->size());
for (const auto& pair : *fb_metadata) {
if (pair->key() == nullptr) {
return Status::IOError(
"Key-pointer in custom metadata of flatbuffer-encoded Schema is null.");
}
if (pair->value() == nullptr) {
return Status::IOError(
"Value-pointer in custom metadata of flatbuffer-encoded Schema is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(pair->key(), "custom_metadata.key");
CHECK_FLATBUFFERS_NOT_NULL(pair->value(), "custom_metadata.value");
metadata->Append(pair->key()->str(), pair->value()->str());
}

Expand Down Expand Up @@ -776,16 +763,16 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, DictionaryMemo* dictiona

// Reconstruct the data type
auto children = field->children();
if (children == nullptr) {
return Status::IOError("Children-pointer of flatbuffer-encoded Field is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(children, "Field.children");
std::vector<std::shared_ptr<Field>> child_fields(children->size());
for (int i = 0; i < static_cast<int>(children->size()); ++i) {
RETURN_NOT_OK(
FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
}
RETURN_NOT_OK(TypeFromFlatbuffer(field, child_fields, metadata.get(), &type));

auto field_name = StringFromFlatbuffers(field->name());

const flatbuf::DictionaryEncoding* encoding = field->dictionary();

if (encoding != nullptr) {
Expand All @@ -794,22 +781,14 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, DictionaryMemo* dictiona
// dictionary_memo
std::shared_ptr<DataType> index_type;
auto int_data = encoding->indexType();
if (int_data == nullptr) {
return Status::IOError(
"indexType-pointer in custom metadata of flatbuffer-encoded DictionaryEncoding "
"is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(int_data, "DictionaryEncoding.indexType");
RETURN_NOT_OK(IntFromFlatbuffer(int_data, &index_type));
ARROW_ASSIGN_OR_RAISE(type,
DictionaryType::Make(index_type, type, encoding->isOrdered()));
*out = ::arrow::field(field->name()->str(), type, field->nullable(), metadata);
*out = ::arrow::field(field_name, type, field->nullable(), metadata);
RETURN_NOT_OK(dictionary_memo->AddField(encoding->id(), *out));
} else {
auto name = field->name();
if (name == nullptr) {
return Status::IOError("Name-pointer of flatbuffer-encoded Field is null.");
}
*out = ::arrow::field(name->str(), type, field->nullable(), metadata);
*out = ::arrow::field(field_name, type, field->nullable(), metadata);
}
return Status::OK();
}
Expand Down Expand Up @@ -1183,17 +1162,15 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti
Status GetSchema(const void* opaque_schema, DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out) {
auto schema = static_cast<const flatbuf::Schema*>(opaque_schema);
if (schema->fields() == nullptr) {
return Status::IOError("Fields-pointer of flatbuffer-encoded Schema is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(schema, "schema");
CHECK_FLATBUFFERS_NOT_NULL(schema->fields(), "Schema.fields");
int num_fields = static_cast<int>(schema->fields()->size());

std::vector<std::shared_ptr<Field>> fields(num_fields);
for (int i = 0; i < num_fields; ++i) {
const flatbuf::Field* field = schema->fields()->Get(i);
if (field == nullptr) {
return Status::IOError("Field-pointer of flatbuffer-encoded Schema is null.");
}
// XXX I don't think this check is necessary (AP)
CHECK_FLATBUFFERS_NOT_NULL(field, "DictionaryEncoding.indexType");
RETURN_NOT_OK(FieldFromFlatbuffer(field, dictionary_memo, &fields[i]));
}

Expand Down Expand Up @@ -1225,12 +1202,7 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
auto dim = tensor->shape()->Get(i);

shape->push_back(dim->size());
auto fb_name = dim->name();
if (fb_name == 0) {
dim_names->push_back("");
} else {
dim_names->push_back(fb_name->str());
}
dim_names->push_back(StringFromFlatbuffers(dim->name()));
}

if (tensor->strides() && tensor->strides()->size() > 0) {
Expand All @@ -1239,11 +1211,7 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
}
}

auto type_data = tensor->type();
if (type_data == nullptr) {
return Status::IOError(
"Type-pointer in custom metadata of flatbuffer-encoded Tensor is null.");
}
auto type_data = tensor->type(); // Required
return ConcreteTypeFromFlatbuffer(tensor->type_type(), type_data, {}, type);
}

Expand Down Expand Up @@ -1283,12 +1251,7 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
}

if (dim_names) {
auto fb_name = dim->name();
if (fb_name == 0) {
dim_names->push_back("");
} else {
dim_names->push_back(fb_name->str());
}
dim_names->push_back(StringFromFlatbuffers(dim->name()));
}
}
}
Expand Down Expand Up @@ -1324,11 +1287,7 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
}
}

auto type_data = sparse_tensor->type();
if (type_data == nullptr) {
return Status::IOError(
"Type-pointer in custom metadata of flatbuffer-encoded SparseTensor is null.");
}
auto type_data = sparse_tensor->type(); // Required
if (type) {
return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type);
} else {
Expand Down
25 changes: 19 additions & 6 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,16 @@
#include "arrow/buffer.h"
#include "arrow/ipc/dictionary.h" // IYWU pragma: keep
#include "arrow/ipc/message.h"
#include "arrow/memory_pool.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

#include "generated/Message_generated.h"
#include "generated/Schema_generated.h"

namespace arrow {

class DataType;
class Schema;
class Tensor;
class SparseTensor;

namespace flatbuf = org::apache::arrow::flatbuf;

namespace io {
Expand Down Expand Up @@ -92,6 +88,23 @@ struct FileBlock {
int64_t body_length;
};

// Low-level utilities to help with reading Flatbuffers data.

#define CHECK_FLATBUFFERS_NOT_NULL(fb_value, name) \
if ((fb_value) == NULLPTR) { \
return Status::IOError("Unexpected null field ", name, \
" in flatbuffer-encoded metadata"); \
}

template <typename T>
inline uint32_t FlatBuffersVectorSize(const flatbuffers::Vector<T>* vec) {
return (vec == NULLPTR) ? 0 : vec->size();
}

inline std::string StringFromFlatbuffers(const flatbuffers::String* s) {
return (s == NULLPTR) ? "" : s->str();
}

// Read interface classes. We do not fully deserialize the flatbuffers so that
// individual fields metadata can be retrieved from very large schema without
//
Expand Down
21 changes: 9 additions & 12 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ class IpcComponentSource {

Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
auto buffers = metadata_->buffers();
if (buffers == nullptr) {
return Status::IOError(
"Buffers-pointer of flatbuffer-encoded RecordBatch is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(buffers, "RecordBatch.buffers");
if (buffer_index >= static_cast<int>(buffers->size())) {
return Status::IOError("buffer_index out of range.");
}
Expand All @@ -127,9 +124,7 @@ class IpcComponentSource {

Status GetFieldMetadata(int field_index, ArrayData* out) {
auto nodes = metadata_->nodes();
if (nodes == nullptr) {
return Status::IOError("Nodes-pointer of flatbuffer-encoded Table is null.");
}
CHECK_FLATBUFFERS_NOT_NULL(nodes, "Table.nodes");
// pop off a field
if (field_index >= static_cast<int>(nodes->size())) {
return Status::Invalid("Ran out of field metadata, likely malformed");
Expand Down Expand Up @@ -441,6 +436,7 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
// The dictionary is embedded in a record batch with a single column
std::shared_ptr<RecordBatch> batch;
auto batch_meta = dictionary_batch->data();
CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data");
RETURN_NOT_OK(ReadRecordBatch(batch_meta, ::arrow::schema({value_field}),
dictionary_memo, options, file, &batch));
if (batch->num_columns() != 1) {
Expand Down Expand Up @@ -475,9 +471,6 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
}
CHECK_MESSAGE_TYPE(Message::SCHEMA, message->type());
CHECK_HAS_NO_BODY(*message);
if (message->header() == nullptr) {
return Status::IOError("Header-pointer of flatbuffer-encoded Message is null.");
}
return internal::GetSchema(message->header(), &dictionary_memo_, &schema_);
}

Expand Down Expand Up @@ -663,9 +656,13 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
return Status::OK();
}

int num_dictionaries() const { return footer_->dictionaries()->size(); }
int num_dictionaries() const {
return static_cast<int>(internal::FlatBuffersVectorSize(footer_->dictionaries()));
}

int num_record_batches() const { return footer_->recordBatches()->size(); }
int num_record_batches() const {
return static_cast<int>(internal::FlatBuffersVectorSize(footer_->recordBatches()));
}

MetadataVersion version() const {
return internal::GetMetadataVersion(footer_->version());
Expand Down