Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions c_glib/arrow-glib/input-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@

#include <arrow/io/interfaces.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>

#include <arrow-glib/buffer.hpp>
#include <arrow-glib/error.hpp>
#include <arrow-glib/file.hpp>
#include <arrow-glib/input-stream.hpp>
#include <arrow-glib/readable.hpp>
#include <arrow-glib/tensor.hpp>

G_BEGIN_DECLS

Expand Down Expand Up @@ -253,6 +255,36 @@ garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *input_stream,
}
}

/**
* garrow_seekable_input_stream_read_tensor:
* @input_stream: A #GArrowSeekableInputStream.
* @position: The read start position.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (transfer full) (nullable):
* #GArrowTensor on success, %NULL on error.
*
* Since: 0.4.0
*/
GArrowTensor *
garrow_seekable_input_stream_read_tensor(GArrowSeekableInputStream *input_stream,
gint64 position,
GError **error)
{
auto arrow_random_access_file =
garrow_seekable_input_stream_get_raw(input_stream);

std::shared_ptr<arrow::Tensor> arrow_tensor;
auto status = arrow::ipc::ReadTensor(position,
arrow_random_access_file.get(),
&arrow_tensor);
if (garrow_error_check(error, status, "[seekable-input-stream][read-tensor]")) {
return garrow_tensor_new_raw(&arrow_tensor);
} else {
return NULL;
}
}


G_DEFINE_TYPE(GArrowBufferInputStream, \
garrow_buffer_input_stream, \
Expand Down
4 changes: 4 additions & 0 deletions c_glib/arrow-glib/input-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include <arrow-glib/buffer.h>
#include <arrow-glib/tensor.h>

G_BEGIN_DECLS

Expand Down Expand Up @@ -123,6 +124,9 @@ GArrowBuffer *garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *in
gint64 position,
gint64 n_bytes,
GError **error);
GArrowTensor *garrow_seekable_input_stream_read_tensor(GArrowSeekableInputStream *input_stream,
gint64 position,
GError **error);


#define GARROW_TYPE_BUFFER_INPUT_STREAM \
Expand Down
33 changes: 32 additions & 1 deletion c_glib/arrow-glib/output-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
# include <config.h>
#endif

#include <arrow/api.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/writer.h>

#include <arrow-glib/buffer.hpp>
#include <arrow-glib/error.hpp>
#include <arrow-glib/file.hpp>
#include <arrow-glib/output-stream.hpp>
#include <arrow-glib/tensor.hpp>
#include <arrow-glib/writeable.hpp>

G_BEGIN_DECLS
Expand Down Expand Up @@ -168,6 +169,36 @@ garrow_output_stream_class_init(GArrowOutputStreamClass *klass)
g_object_class_install_property(gobject_class, PROP_OUTPUT_STREAM, spec);
}

/**
* garrow_output_stream_write_tensor:
* @stream: A #GArrowWriteable.
* @tensor: A #GArrowTensor to be written.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: The number of written bytes on success, -1 on error.
*
* Since: 0.4.0
*/
gint64
garrow_output_stream_write_tensor(GArrowOutputStream *stream,
GArrowTensor *tensor,
GError **error)
{
auto arrow_tensor = garrow_tensor_get_raw(tensor);
auto arrow_stream = garrow_output_stream_get_raw(stream);
int32_t metadata_length;
int64_t body_length;
auto status = arrow::ipc::WriteTensor(*arrow_tensor,
arrow_stream.get(),
&metadata_length,
&body_length);
if (garrow_error_check(error, status, "[output-stream][write-tensor]")) {
return metadata_length + body_length;
} else {
return -1;
}
}


G_DEFINE_TYPE(GArrowFileOutputStream,
garrow_file_output_stream,
Expand Down
5 changes: 5 additions & 0 deletions c_glib/arrow-glib/output-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <glib-object.h>

#include <arrow-glib/buffer.h>
#include <arrow-glib/tensor.h>

G_BEGIN_DECLS

Expand Down Expand Up @@ -71,6 +72,10 @@ struct _GArrowOutputStreamClass

GType garrow_output_stream_get_type(void) G_GNUC_CONST;

gint64 garrow_output_stream_write_tensor(GArrowOutputStream *stream,
GArrowTensor *tensor,
GError **error);


#define GARROW_TYPE_FILE_OUTPUT_STREAM \
(garrow_file_output_stream_get_type())
Expand Down
9 changes: 9 additions & 0 deletions c_glib/test/test-tensor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,13 @@ def test_column_major?
not @tensor.column_major?
end
end

def test_io
buffer = Arrow::PoolBuffer.new
output = Arrow::BufferOutputStream.new(buffer)
output.write_tensor(@tensor)
input = Arrow::BufferInputStream.new(buffer)
assert_equal(@tensor,
input.read_tensor(0))
end
end