-
Notifications
You must be signed in to change notification settings - Fork 15
Add necessary support for arrow-backed table UDFs #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import ( | |
| "context" | ||
| "database/sql" | ||
| "database/sql/driver" | ||
| "fmt" | ||
| "sync" | ||
| "testing" | ||
|
|
||
|
|
@@ -315,3 +316,95 @@ func TestArrowClosedConn(t *testing.T) { | |
| }) | ||
| require.Error(t, err) | ||
| } | ||
|
|
||
| func TestArrowTableUDF(t *testing.T) { | ||
| db := openDbWrapper(t, ``) | ||
| defer closeDbWrapper(t, db) | ||
|
|
||
| conn := openConnWrapper(t, db, context.Background()) | ||
| defer closeConnWrapper(t, conn) | ||
|
|
||
| c := newConnectorWrapper(t, ``, nil) | ||
| defer closeConnectorWrapper(t, c) | ||
|
|
||
| innerConn := openDriverConnWrapper(t, c) | ||
| defer closeDriverConnWrapper(t, &innerConn) | ||
|
|
||
| ar, err := NewArrowFromConn(innerConn) | ||
| require.NoError(t, err) | ||
|
|
||
| // Create an arrow array of type Float64 buffered in memory | ||
| schema := arrow.NewSchema([]arrow.Field{ | ||
| {Name: "col0", Type: arrow.PrimitiveTypes.Float64}, | ||
| }, nil) | ||
| alloc := memory.NewGoAllocator() | ||
| builder := array.NewFloat64Builder(alloc) | ||
| defer builder.Release() | ||
|
|
||
| // Add values > data chunk size to test multiple chunks | ||
| for range 10000 { | ||
| builder.Append(float64(0.5)) | ||
| } | ||
|
|
||
| arr := builder.NewArray() | ||
| rb := array.NewRecordBatch(schema, []arrow.Array{arr}, int64(arr.Len())) | ||
| tbl := array.NewTableFromRecords(schema, []arrow.RecordBatch{rb}) | ||
|
|
||
| RegisterTableUDF(conn, "get_arrow", ChunkTableFunction{ | ||
| BindArguments: func(named map[string]any, args ...any) (ChunkTableSource, error) { | ||
| return &arrowTableUdf{tbl: tbl, ar: ar}, nil | ||
| }, | ||
| }) | ||
|
|
||
| res, err := db.QueryContext(context.Background(), `SELECT * FROM get_arrow()`) | ||
| require.NoError(t, err) | ||
| defer closeRowsWrapper(t, res) | ||
|
|
||
| var rowCount int | ||
| for res.Next() { | ||
| var val float64 | ||
| require.NoError(t, res.Scan(&val)) | ||
| require.Equal(t, 0.5, val) | ||
| rowCount++ | ||
| } | ||
| require.Equal(t, 10000, rowCount) | ||
| } | ||
|
|
||
| // Define a table UDF | ||
| type arrowTableUdf struct { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could make sense to expose a utility UDF struct for this, such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it would be nice to have a separate registration method related to the Arrow and separate UDF type for the arrow data, Could you please wrap the existing chunked table UDF or create a new one? |
||
| ar *Arrow | ||
| tbl arrow.Table | ||
| rdr *array.TableReader | ||
| } | ||
|
|
||
| func (u *arrowTableUdf) Init() { | ||
| u.rdr = array.NewTableReader(u.tbl, int64(GetDataChunkCapacity())) | ||
| } | ||
|
|
||
| func (u *arrowTableUdf) ColumnInfos() []ColumnInfo { | ||
| t, _ := NewTypeInfo(TYPE_DOUBLE) | ||
| return []ColumnInfo{{ | ||
| Name: "col0", | ||
| T: t, | ||
| }} | ||
| } | ||
|
|
||
| func (u *arrowTableUdf) Cardinality() *CardinalityInfo { | ||
| return &CardinalityInfo{ | ||
| Cardinality: uint(u.tbl.NumRows()), | ||
| Exact: true, | ||
| } | ||
| } | ||
|
|
||
| func (u *arrowTableUdf) FillChunk(chunk DataChunk) error { | ||
| if u.rdr.Next() { | ||
| b := u.rdr.RecordBatch() | ||
| defer b.Release() | ||
| if err := u.ar.MoveArrowToDataChunk(b, chunk); err != nil { | ||
| return fmt.Errorf("failed to move arrow to data chunk: %w", err) | ||
| } | ||
| chunk.SetSize(int(b.NumRows())) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.