Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ CUDA=11.0.3
DASK=latest
DOTNET=6.0
GCC_VERSION=""
GO=1.16
GO=1.17
STATICCHECK=v0.2.2
HDFS=3.2.1
JDK=8
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -86,9 +86,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -123,9 +123,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -158,9 +158,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -193,9 +193,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -228,9 +228,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/debian-10-go.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

ARG arch=amd64
ARG go=1.16
ARG go=1.17
ARG staticcheck=v0.2.2
FROM ${arch}/golang:${go}-buster

Expand Down
2 changes: 1 addition & 1 deletion ci/docker/debian-11-go.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

ARG arch=amd64
ARG go=1.16
ARG go=1.17
ARG staticcheck=v0.2.2
FROM ${arch}/golang:${go}-bullseye

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/go_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set -ex
source_dir=${1}/go
ARCH=`uname -m`

# Arm64 CI is triggered by travis and run in arm64v8/golang:1.16-bullseye
# Arm64 CI is triggered by travis and run in arm64v8/golang:1.17-bullseye
if [ "aarch64" == "$ARCH" ]; then
# Install `staticcheck`
GO111MODULE=on go install honnef.co/go/tools/cmd/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion dev/release/verify-release-candidate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ install_go() {
return 0
fi

local version=1.16.12
local version=1.17.13
show_info "Installing go version ${version}..."

local arch="$(uname -m)"
Expand Down
4 changes: 2 additions & 2 deletions dev/tasks/tasks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1449,13 +1449,13 @@ tasks:
ci: github
template: r/github.linux.revdepcheck.yml

test-debian-11-go-1.16:
test-debian-11-go-1.17:
ci: azure
template: docker-tests/azure.linux.yml
params:
env:
DEBIAN: 11
GO: 1.16
GO: 1.17
image: debian-go

test-ubuntu-default-docs:
Expand Down
18 changes: 12 additions & 6 deletions go/arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ package cdata
// int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); }
// int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); }
// const char* stream_get_last_error(struct ArrowArrayStream* st) { return st->get_last_error(st); }
// struct ArrowArray* get_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
// struct ArrowArray* get_arr() {
// struct ArrowArray* out = (struct ArrowArray*)(malloc(sizeof(struct ArrowArray)));
// memset(out, 0, sizeof(struct ArrowArray));
// return out;
// }
// struct ArrowArrayStream* get_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); }
//
import "C"
Expand Down Expand Up @@ -655,18 +659,22 @@ func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, er
func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) {
rdr.stream = C.get_stream()
C.ArrowArrayStreamMove(stream, rdr.stream)
rdr.arr = C.get_arr()
runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) {
if r.cur != nil {
r.cur.Release()
}
C.ArrowArrayStreamRelease(r.stream)
C.ArrowArrayRelease(r.arr)
C.free(unsafe.Pointer(r.stream))
C.free(unsafe.Pointer(r.arr))
})
}

// Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface
type nativeCRecordBatchReader struct {
stream *CArrowArrayStream
arr *CArrowArray
schema *arrow.Schema

cur arrow.Record
Expand Down Expand Up @@ -713,18 +721,16 @@ func (n *nativeCRecordBatchReader) next() error {
n.cur = nil
}

arr := C.get_arr()
defer C.free(unsafe.Pointer(arr))
errno := C.stream_get_next(n.stream, arr)
errno := C.stream_get_next(n.stream, n.arr)
if errno != 0 {
return n.getError(int(errno))
}

if C.ArrowArrayIsReleased(arr) == 1 {
if C.ArrowArrayIsReleased(n.arr) == 1 {
return io.EOF
}

rec, err := ImportCRecordBatchWithSchema(arr, n.schema)
rec, err := ImportCRecordBatchWithSchema(n.arr, n.schema)
if err != nil {
return err
}
Expand Down
27 changes: 26 additions & 1 deletion go/arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"encoding/binary"
"fmt"
"reflect"
"runtime/cgo"
"strings"
"unsafe"

Expand Down Expand Up @@ -362,7 +363,9 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0]))
}

out.private_data = unsafe.Pointer(storeData(arr.Data()))
arr.Data().Retain()
h := cgo.NewHandle(arr.Data())
out.private_data = unsafe.Pointer(&h)
out.release = (*[0]byte)(C.goReleaseArray)
switch arr := arr.(type) {
case *array.List:
Expand Down Expand Up @@ -400,3 +403,25 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
out.children = nil
}
}

type cRecordReader struct {
rdr array.RecordReader
}

func (rr cRecordReader) getSchema(out *CArrowSchema) int {
ExportArrowSchema(rr.rdr.Schema(), out)
return 0
}

func (rr cRecordReader) next(out *CArrowArray) int {
if rr.rdr.Next() {
ExportArrowRecordBatch(rr.rdr.Record(), out, nil)
return 0
}
releaseArr(out)
return 0
}

func (rr cRecordReader) release() {
rr.rdr.Release()
}
35 changes: 35 additions & 0 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"errors"
"io"
"runtime"
"runtime/cgo"
"testing"
"time"
"unsafe"

"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/decimal128"
"github.com/apache/arrow/go/v10/arrow/internal/arrdata"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -659,3 +661,36 @@ func TestRecordReaderStream(t *testing.T) {
assert.Equal(t, "baz", rec.Column(1).(*array.String).Value(2))
}
}

func TestExportRecordReaderStream(t *testing.T) {
reclist := arrdata.Records["primitives"]
rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist)

out := createTestStreamObj()
ExportRecordReader(rdr, out)

assert.NotNil(t, out.get_schema)
assert.NotNil(t, out.get_next)
assert.NotNil(t, out.get_last_error)
assert.NotNil(t, out.release)
assert.NotNil(t, out.private_data)

h := *(*cgo.Handle)(out.private_data)
assert.Same(t, rdr, h.Value().(cRecordReader).rdr)

importedRdr := ImportCArrayStream(out, nil)
i := 0
for {
rec, err := importedRdr.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
assert.NoError(t, err)
}

assert.Truef(t, array.RecordEqual(reclist[i], rec), "expected: %s\ngot: %s", reclist[i], rec)
i++
}
assert.EqualValues(t, len(reclist), i)
}
10 changes: 9 additions & 1 deletion go/arrow/cdata/cdata_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ package cdata
//
// void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out);
// struct ArrowArray* get_test_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
// struct ArrowArrayStream* get_test_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); }
// struct ArrowArrayStream* get_test_stream() {
// struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream));
// memset(out, 0, sizeof(struct ArrowArrayStream));
// return out;
// }
//
// void release_test_arr(struct ArrowArray* arr) {
// for (int i = 0; i < arr->n_buffers; ++i) {
Expand Down Expand Up @@ -251,6 +255,10 @@ func createCArr(arr arrow.Array) *CArrowArray {
return carr
}

func createTestStreamObj() *CArrowArrayStream {
return C.get_test_stream()
}

func arrayStreamTest() *CArrowArrayStream {
st := C.get_test_stream()
C.setup_array_stream_test(2, st)
Expand Down
Loading