Skip to content

Conversation

@jonkeane
Copy link
Member

@jonkeane jonkeane commented Nov 17, 2021

This enables the ability to stream data back from Arrow via a RecordBatchReader instead of always materializing the full table (though that is also possible with an argument).

This unlocks the ability to do the following (silly from an analysis standpoint) query, where the to_arrow() step uses a RecordBatchReader as a source rather than pulling the full table into memory at once.

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

nyc_ds <- open_dataset("path/to/nyc-taxi/", partitioning = c("year", "month"))

nyc_ds %>% 
  select(-rate_code_id) %>%
  mutate(day = wday(dropoff_at), hour = hour(dropoff_at)) %>% 
  to_duckdb() %>%
  # arrow doesn't (yet, at least in dplyr) support slicing like this, but this could be anything that one wants to do in duckdb
  group_by(year, month, day, hour) %>% 
  slice_max(tip_amount) %>% 
  to_arrow() %>%
  # but we can group_by %>% summarise()
  group_by(day, hour) %>% 
  summarise(mean_tip = mean(tip_amount))

A few notes:

  • This should only be merged after Fixing wrongful release of arrow arrays duckdb/duckdb#2957 is merged.
    We get mixed up data when pulling to_arrow() on datasets without that PR.
    The tests are gated to only run after the next release of DuckDB (0.3.2). The failure on rhub/debian-gcc-devel:latest is because that run actually installs DuckDB from github, which has that version number but not yet the patch on the PR
  • This also slightly changes the return value of to_arrow() instead of returning arrow_dplyr_query(Table) or arrow_dplyr_query(RecordBatchReader), we now simply return either the Table or RecordBatchReader and we now have dplyr methods for filte/mutate/etc. for RecordBatchReaders now. I can undo that change if we want to keep the wrapping, but in my experience with messing with this / trying to find the source of the data corruption bug, having the RecordBatchReader was helpful.

Testing locally

If one wants to test this locally, the easiest way is to install duckdb from pedro's branch with:

remotes::install_github("pdet/duckdb/tools/rpkg@rarrowstream",  build = FALSE)

And then arrow from this branch.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@paleolimbot
Copy link
Member

paleolimbot commented Nov 23, 2021

To my reading, the duckdb package is correctly calling arrow:::ImportRecordBatchReader() with a pointer (casted to uintptr_t casted to double) to a struct ArrowArrayStream as defined in the C API (happens here: https://github.com/duckdb/duckdb/blob/master/tools/rpkg/src/statement.cpp#L615-L632 ). This gets passed to arrow_dplyr_query(), which seems to work for at least a dinky example and error properly if you try to collect() twice. Once I do some more experimenting with streams I'll investigate this particular failure in more detail.

drv <- duckdb::duckdb()
con <- DBI::dbConnect(drv)

res <- DBI::dbSendQuery(con, "SELECT 'Hello, world!' as col1", arrow = TRUE)
record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)

query <- arrow:::arrow_dplyr_query(record_batch_reader)
dplyr::collect(query)
#> # A tibble: 1 × 1
#>   col1         
#>   <chr>        
#> 1 Hello, world!
dplyr::collect(query)
#> Error: IOError: Query Stream is closed
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:222  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:417  iterator_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:318  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:329  ReadAll(&batches)

DBI::dbDisconnect(con, shutdown = TRUE)

Created on 2021-11-23 by the reprex package (v2.0.1)

@jonkeane
Copy link
Member Author

jonkeane commented Dec 1, 2021

Any chance you've been able to dig more on this?

@paleolimbot
Copy link
Member

Nothing new yet, just listing the various ways this can fail.

First, intermittent success!

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n())

write_dataset(new_ds, tf, partitioning = "part")

ds <- open_dataset(tf)

waldo::compare(
  ds %>%
    to_duckdb() %>%
    # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
    select(-fct) %>%
    to_arrow() %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble(),
  ds %>%
    select(-fct) %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble()
)
#> ✓ No differences

Created on 2021-12-02 by the reprex package (v2.0.1)

Second, filter mismatch:

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n())

write_dataset(new_ds, tf, partitioning = "part")

ds <- open_dataset(tf)

waldo::compare(
  ds %>%
    to_duckdb() %>%
    # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
    select(-fct) %>%
    to_arrow() %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble(),
  ds %>%
    select(-fct) %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble()
)
#> old vs new
#>             int  dbl   lgl  chr row_order part
#> - old[1, ]    7  7.1  TRUE    g         0    3
#> + new[1, ]    6  6.1  TRUE <NA>        16    2
#> - old[2, ]    8  8.1    NA    h         0    3
#> + new[2, ]    7  7.1  TRUE    g        17    2
#> - old[3, ]    9   NA    NA    i         0    3
#> + new[3, ]    8  8.1    NA    h        18    2
#> - old[4, ]   10 10.1 FALSE    j         0    3
#> + new[4, ]    9   NA    NA    i        19    2
#> - old[5, ]    6  6.1  TRUE <NA>         4    3
#> + new[5, ]   10 10.1 FALSE    j        20    2
#> - old[6, ]    6  6.1  TRUE <NA>        16    2
#> + new[6, ]    6  6.1  TRUE <NA>        26    3
#> - old[7, ]    7  7.1  TRUE    g        17    2
#> + new[7, ]    7  7.1  TRUE    g        27    3
#> - old[8, ]    8  8.1    NA    h        18    2
#> + new[8, ]    8  8.1    NA    h        28    3
#> - old[9, ]    9   NA    NA    i        19    2
#> + new[9, ]    9   NA    NA    i        29    3
#> - old[10, ]  10 10.1 FALSE    j        20    2
#> + new[10, ]  10 10.1 FALSE    j        30    3
#>   old[11, ]   6  6.1  TRUE <NA>        36    4
#>   old[12, ]   7  7.1  TRUE    g        37    4
#>   old[13, ]   8  8.1    NA    h        38    4
#> 
#> `old$int[1:8]`: 7 8 9 10  6 6 7 8
#> `new$int[1:8]`: 6 7 8  9 10 6 7 8
#> 
#> `old$dbl[1:8]`: 7 8 NA 10  6 6 7 8
#> `new$dbl[1:8]`: 6 7  8 NA 10 6 7 8
#> 
#> `old$lgl[1:8]`: TRUE <NA> <NA> FALSE TRUE  TRUE TRUE <NA>
#> `new$lgl[1:8]`: TRUE TRUE <NA> <NA>  FALSE TRUE TRUE <NA>
#> 
#> `old$chr[1:8]`: "g" "h" "i" "j" NA  NA "g" "h"
#> `new$chr[1:8]`: NA  "g" "h" "i" "j" NA "g" "h"
#> 
#> `old$row_order[1:13]`:  0  0  0  0  4 16 17 18 19 20 and 3 more...
#> `new$row_order[1:13]`: 16 17 18 19 20 26 27 28 29 30           ...
#> 
#> `old$part[1:13]`: 3 3 3 3 3 2 2 2 2 2 and 3 more...
#> `new$part[1:13]`: 2 2 2 2 2 3 3 3 3 3           ...

Created on 2021-12-02 by the reprex package (v2.0.1)

Third, Query Stream is closed:

Details
Error: IOError: Query Stream is closed
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:222  ReadNext(&batch)
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:417  iterator_.Next()
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:326  ReadNext(&batch)
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:337  ReadAll(&batches) 

Fourth, segfault (only has happened once).

@paleolimbot
Copy link
Member

...and I can't get this to fail invoking the C API from the R end of things:

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n()) %>% 
  select(-false, -lgl, -fct)

write_dataset(new_ds, tf, partitioning = "part")

ds <- open_dataset(tf)

# let's pop this puppy open in C and see if what's coming from the pipe in Arrow
# is correct
# https://github.com/duckdb/duckdb/blob/master/tools/rpkg/R/register.R#L71-L82

stream <- carrow:::blank_invalid_array_stream()
stream_ptr <- carrow:::xptr_addr_double(stream)
s <- Scanner$create(
  ds, 
  NULL,
  filter = arrow:::build_expr(
    "&",
    arrow:::build_expr(">", Expression$field_ref("int"), 5),
    arrow:::build_expr(">", Expression$field_ref("part"), 1)
  ),
  use_async = FALSE,
  use_threads = TRUE
)$
  ToRecordBatchReader()$
  export_to_c(stream_ptr)

# check schema
recreated_schema <- carrow::from_carrow_array(
  list(schema = carrow::carrow_array_stream_get_schema(stream)),
  arrow::Schema
)
# ding!
recreated_schema == ds$schema
#> [1] TRUE

items <- list()
while (TRUE) {
  item <- carrow::carrow_array_stream_get_next(stream)
  if (is.null(item)) {
    break
  }
  items[[length(items) + 1L]] <- carrow::from_carrow_array(item)
}

result <- tibble::as_tibble(bind_rows(items))

waldo::compare(
  result %>% 
    select(part, everything()) %>%
    mutate(part = as.double(part)) %>% 
    arrange(row_order),
  tibble::as_tibble(new_ds) %>%
    filter(int > 5 & part > 1) %>% 
    select(part, everything())
)
#> ✓ No differences

Created on 2021-12-02 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

Putting this down for today, but narrowed down the segfault to computing the exec plan. Specifically, this line in the R package:

StopIfNotOk(plan->StartProducing());

I haven't been able to get lldb working with R, so I don't know where this occurs in the Arrow library.

I can force a segfault by creating a record batch reader from an R function (without DuckDB) as well, and I think the two are linked (because read_table() seems to work for both). I wonder if the exec plan is calling the array_stream->get_next() method from multiple threads?

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n()) %>% 
  select(-false, -lgl, -fct)

write_dataset(new_ds, tf, partitioning = "part")

ds <- open_dataset(tf)

stream <- carrow:::blank_invalid_array_stream()
stream_ptr <- carrow:::xptr_addr_double(stream)
s <- Scanner$create(
  ds, 
  NULL,
  filter = TRUE,
  use_async = FALSE,
  use_threads = TRUE
)$
  ToRecordBatchReader()$
  export_to_c(stream_ptr)


# now, create an R stream based on a function that wrap the input stream
# basically, see if we can roundtrip through R
stream2 <- carrow::carrow_array_stream_function(ds$schema, function() {
  message("streeeeaming!")
  carrow::carrow_array_stream_get_next(stream)
})

rbr <- carrow::carrow_array_stream_to_arrow(stream2)

# schema OK
rbr$schema
#> Schema
#> int: int32
#> dbl: double
#> dbl2: double
#> chr: string
#> row_order: int32
#> part: int32
#> 
#> See $metadata for additional Schema metadata

# query create OK
query <- arrow:::as_adq(rbr) 

# collect() is the only thing that segfaults
# segfault is here:
# https://github.com/apache/arrow/blob/master/r/src/compute-exec.cpp#L92
# result <- dplyr::collect(query)

# ...but a manual scan is OK, as well as read_table()
# (which may explain why the streaming worked before)
# rbr$read_next_batch()
# rbr$read_next_batch()
# rbr$read_next_batch()
# rbr$read_next_batch()
# rbr$read_next_batch()
rbr$read_table()
#> streeeeaming!
#> streeeeaming!
#> streeeeaming!
#> streeeeaming!
#> streeeeaming!
#> Table
#> 40 rows x 6 columns
#> $int <int32>
#> $dbl <double>
#> $dbl2 <double>
#> $chr <string>
#> $row_order <int32>
#> $part <int32>
#> 
#> See $metadata for additional Schema metadata

Created on 2021-12-02 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

The error might be related to the fact that the DuckDB stream errors after it's complete instead of returning 0 (success) and outputting an invalid array.

DuckDB code where this happens:

https://github.com/duckdb/duckdb/blob/master/src/common/arrow_wrapper.cpp#L103-L105

Example of a carrow stream that does this (and also causes the exec plan to segfault).

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n()) %>% 
  select(-false, -lgl, -fct)

write_dataset(new_ds, tf, partitioning = "part")

ds <- open_dataset(tf)

stream <- carrow:::blank_invalid_array_stream()
stream_ptr <- carrow:::xptr_addr_double(stream)
s <- Scanner$create(
  ds, 
  NULL,
  filter = TRUE,
  use_async = FALSE,
  use_threads = TRUE
)$
  ToRecordBatchReader()$
  export_to_c(stream_ptr)


rbr <- arrow:::ImportRecordBatchReader(stream_ptr)
rbr$read_table()
#> Table
#> 40 rows x 6 columns
#> $int <int32>
#> $dbl <double>
#> $dbl2 <double>
#> $chr <string>
#> $row_order <int32>
#> $part <int32>
#> 
#> See $metadata for additional Schema metadata
rbr$read_table()
#> Table
#> 0 rows x 6 columns
#> $int <int32>
#> $dbl <double>
#> $dbl2 <double>
#> $chr <string>
#> $row_order <int32>
#> $part <int32>
#> 
#> See $metadata for additional Schema metadata

# stream that errors after it's complete
stream <- carrow:::blank_invalid_array_stream()
stream_ptr <- carrow:::xptr_addr_double(stream)
s <- Scanner$create(
  ds, 
  NULL,
  filter = TRUE,
  use_async = FALSE,
  use_threads = TRUE
)$
  ToRecordBatchReader()$
  export_to_c(stream_ptr)

stream2 <- carrow::carrow_array_stream_function(ds$schema, function() {
  carrow::carrow_array_stream_get_next(stream)
})

rbr <- carrow::carrow_array_stream_to_arrow(stream2)
rbr$read_table()
#> Table
#> 40 rows x 6 columns
#> $int <int32>
#> $dbl <double>
#> $dbl2 <double>
#> $chr <string>
#> $row_order <int32>
#> $part <int32>
#> 
#> See $metadata for additional Schema metadata
rbr$read_table()
#> Error: Invalid: function array stream is finished
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:326  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:337  ReadAll(&batches)

Created on 2021-12-02 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

I've tried creating a stream that never errors and this still segfaults when using dplyr::collect() (but not when using read_table(). (So probably unrelated to this issue).

@jonkeane
Copy link
Member Author

jonkeane commented Dec 6, 2021

@bkietz or @westonpace might have some insight about what's going on here.

In your second to last comment here, the following seems odd to me. It's weird that a second read_table() would produce that empty array instead of an error about a closed stream of some sort. I wonder if that's a hint about what's going on in the DuckDB tests that seem to have garbage data in them (or / and the segfaults Dewey is running into)

rbr <- arrow:::ImportRecordBatchReader(stream_ptr)
rbr$read_table()
#> Table
#> 40 rows x 6 columns
#> $int <int32>
#> $dbl <double>
#> $dbl2 <double>
#> $chr <string>
#> $row_order <int32>
#> $part <int32>
#> 
#> See $metadata for additional Schema metadata
rbr$read_table()
#> Table
#> 0 rows x 6 columns
#> $int <int32>
#> $dbl <double>
#> $dbl2 <double>
#> $chr <string>
#> $row_order <int32>
#> $part <int32>
#> 
#> See $metadata for additional Schema metadata

@jonkeane
Copy link
Member Author

jonkeane commented Dec 9, 2021

@pitrou Here's one improvement to some of the DuckDB flakiness, but Dewey + I haven't yet figured out what's going wrong (sometimes) with the stream such that we can enable it.

@paleolimbot
Copy link
Member

Trying to track down the source of this, and one other theory is that it's and issue with the lifecycle of the underlying buffers (that they are getting freed out from under the struct ArrowArray). Again, just a theory! But it would explain the mix of intermittent success, corrupted data, and segfaults.

The Array is created here:

https://github.com/duckdb/duckdb/blob/048bce50e92b643e90cf7cc6bccda56a82378961/src/common/types/data_chunk.cpp#L684-L725

To my reading, there's nothing in the private data that guarantees that the underlying DataChunk won't get deleted:

https://github.com/duckdb/duckdb/blob/048bce50e92b643e90cf7cc6bccda56a82378961/src/common/types/data_chunk.cpp#L272-L297

...but perhaps I'm misreading the C++ here.

@paleolimbot
Copy link
Member

Hmm...in theory the duckdb memory is held by one of a few possible unique_ptrs.

https://github.com/duckdb/duckdb/blob/048bce50e92b643e90cf7cc6bccda56a82378961/src/common/types/data_chunk.cpp#L547-L549

I also can't fix the intermittent failure by keeping the DataChunk alive for the lifecycle of the array, so perhaps that isn't the issue.

@jonkeane
Copy link
Member Author

Here's that DuckDB PR @romainfrancois

@paleolimbot
Copy link
Member

Trying another angle...only using to_arrow(), I can't get a segfault or any weird behaviour:

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n())

write_dataset(new_ds, tf, partitioning = "part")


# try using only duckdb without to_arrow()
drv <- duckdb::duckdb()
con <- DBI::dbConnect(drv)

# easier with a single parquet here
tf2 <- tempfile()
write_parquet(new_ds, tf2)

DBI::dbExecute(
  con,
  glue::glue_sql(
    "CREATE VIEW ds_view AS SELECT * FROM parquet_scan({ tf2 });",
    .con = con
  )
)
#> [1] 0

DBI::dbExecute(
  con,
  glue::glue_sql(
    "CREATE TABLE ds_tbl AS SELECT * FROM parquet_scan({ tf2 });",
    .con = con
  )
)
#> [1] 40

res <- DBI::dbSendQuery(con, "SELECT * from ds_view", arrow = TRUE)
record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)

query <- arrow:::arrow_dplyr_query(record_batch_reader)
dplyr::collect(query)
#> # A tibble: 40 × 9
#>      int   dbl  dbl2 lgl   false chr   fct    part row_order
#>    <int> <dbl> <dbl> <lgl> <lgl> <chr> <chr> <dbl>     <int>
#>  1     1   1.1     5 TRUE  FALSE a     a         1         1
#>  2     2   2.1     5 NA    FALSE b     b         1         2
#>  3     3   3.1     5 FALSE FALSE c     c         1         3
#>  4    NA   4.1     5 TRUE  FALSE d     d         1         4
#>  5     5   5.1     5 TRUE  FALSE e     <NA>      1         5
#>  6     6   6.1     5 FALSE FALSE <NA>  <NA>      1         6
#>  7     7   7.1     5 FALSE FALSE g     g         1         7
#>  8     8   8.1     5 FALSE FALSE h     h         1         8
#>  9     9  NA       5 FALSE FALSE i     i         1         9
#> 10    10  10.1     5 TRUE  FALSE j     j         1        10
#> # … with 30 more rows

DBI::dbDisconnect(con, shutdown = TRUE)

I also can't get anything weird to happen if the dataset only has one file:

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n())


tf2 <- tempfile()
write_parquet(new_ds, tf2)

ds <- open_dataset(tf2)

waldo::compare(
  ds %>%
    to_duckdb() %>%
    # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
    select(-fct) %>%
    to_arrow() %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble(),
  ds %>%
    select(-fct) %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble()
)
#> ✓ No differences

Created on 2022-01-11 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

If you call the garbage collector after each attempt (which calls the duckdb array stream release() callback), it won't crash (but still won't give you the right answer):

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

make_duckdb_crash <- function(n = 50, gc_after = TRUE) {
  cat("Starting attempts [")
  on.exit(cat("]\n"))
  
  for (i in seq_len(n)) {

    example_data <- tibble::tibble(
      int = c(1:3, NA_integer_, 5:10),
      dbl = c(1:8, NA, 10) + .1,
      dbl2 = rep(5, 10),
      lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
      false = logical(10),
      chr = letters[c(1:5, NA, 7:10)],
      fct = factor(letters[c(1:4, NA, NA, 7:10)])
    )
    
    tf <- tempfile()
    new_ds <- rbind(
      cbind(example_data, part = 1),
      cbind(example_data, part = 2),
      cbind(example_data, part = 3),
      cbind(example_data, part = 4)
    ) %>%
      mutate(row_order = 1:n())
    
    write_dataset(new_ds, tf, partitioning = "part")
    
    ds <- open_dataset(tf)
    
    it_worked <- all.equal(
      ds %>%
        to_duckdb() %>%
        # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
        select(-fct) %>%
        to_arrow() %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble(),
      ds %>%
        select(-fct) %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble()
    )
    
    if (isTRUE(it_worked)) {
      cat("!")
    } else {
      cat("X")
    }
    
    if (gc_after) gc()
  }
}


make_duckdb_crash(n = 50, gc_after = TRUE)
#> Starting attempts [!!!!!XXX!!X!X!!!!X!XX!X!X!!!!!!!!X!!!!X!!!!X!X!!!X]
make_duckdb_crash(n = 50, gc_after = FALSE)
#> Starting attempts [X!!!!!!!!!!!X!X!X!!!!
#> Error: IOError: Query Stream is closed
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:233  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:417  iterator_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:336  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:347  ReadAll(&batches)
#> ]

Created on 2022-01-11 by the reprex package (v2.0.1)

@jonkeane
Copy link
Member Author

Iiiinteresting, on both of these fronts. In the second one, have you tried putting auto_disconnect = FALSE in the to_duckdb()? Now that we see this gc thing, I wonder if this is a consequence of our cleaning up doing something funny.

@paleolimbot
Copy link
Member

paleolimbot commented Jan 11, 2022

Totally! Skipping the auto disconnect fixes the crash (but doesn't fix the incorrect results).

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

make_duckdb_crash <- function(n = 50, auto_disconnect = TRUE) {
  cat("Starting attempts [")
  on.exit(cat("]\n"))
  
  for (i in seq_len(n)) {

    example_data <- tibble::tibble(
      int = c(1:3, NA_integer_, 5:10),
      dbl = c(1:8, NA, 10) + .1,
      dbl2 = rep(5, 10),
      lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
      false = logical(10),
      chr = letters[c(1:5, NA, 7:10)],
      fct = factor(letters[c(1:4, NA, NA, 7:10)])
    )
    
    tf <- tempfile()
    new_ds <- rbind(
      cbind(example_data, part = 1),
      cbind(example_data, part = 2),
      cbind(example_data, part = 3),
      cbind(example_data, part = 4)
    ) %>%
      mutate(row_order = 1:n())
    
    write_dataset(new_ds, tf, partitioning = "part")
    
    ds <- open_dataset(tf)
    
    it_worked <- all.equal(
      ds %>%
        to_duckdb(auto_disconnect = auto_disconnect) %>%
        # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
        select(-fct) %>%
        to_arrow() %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble(),
      ds %>%
        select(-fct) %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble()
    )
    
    if (isTRUE(it_worked)) {
      cat("!")
    } else {
      cat("X")
    }
  }
}


make_duckdb_crash(n = 50, auto_disconnect = FALSE)
#> Starting attempts [X!!!X!!!!!!!!!X!!!!!X!!!!!X!!XXXX!!!XXX!!!!!!!!!XX]
make_duckdb_crash(n = 50, auto_disconnect = TRUE)
#> Starting attempts [!X!!X!X!X!!XX!
#> Error: IOError: Query Stream is closed
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:233  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:417  iterator_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:336  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:347  ReadAll(&batches)
#> ]

Created on 2022-01-11 by the reprex package (v2.0.1)

@jonkeane
Copy link
Member Author

Interesting! I've got one theory about the segfault.

But still no clue what's going on in the other case (i.e. autodisconnect = FALSE / gc after each =TRUE). That is still very strange! We've narrowed it down that if the scan is starting in DuckDB that seems to work. (have you tried setting PRAGMA threads=N to ensure parallelism in duckdb — this should be default now, but I don't remember which version that started in...). Have you tried running the duckdb -> arrow bit only multiple times checking the results (like you have here with the round trip)?

@paleolimbot
Copy link
Member

Same result!

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

set_threads <- function(x, threads) {
  con <- dbplyr::remote_con(x)
  DBI::dbExecute(con, sprintf("PRAGMA threads=%d", threads))
  x
}

make_duckdb_crash <- function(n = 50, auto_disconnect = TRUE, threads = 1) {
  cat("Starting attempts [")
  on.exit(cat("]\n"))
  
  for (i in seq_len(n)) {

    example_data <- tibble::tibble(
      int = c(1:3, NA_integer_, 5:10),
      dbl = c(1:8, NA, 10) + .1,
      dbl2 = rep(5, 10),
      lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
      false = logical(10),
      chr = letters[c(1:5, NA, 7:10)],
      fct = factor(letters[c(1:4, NA, NA, 7:10)])
    )
    
    tf <- tempfile()
    new_ds <- rbind(
      cbind(example_data, part = 1),
      cbind(example_data, part = 2),
      cbind(example_data, part = 3),
      cbind(example_data, part = 4)
    ) %>%
      mutate(row_order = 1:n())
    
    write_dataset(new_ds, tf, partitioning = "part")
    
    ds <- open_dataset(tf)
    
    it_worked <- all.equal(
      ds %>%
        to_duckdb(auto_disconnect = auto_disconnect) %>%
        set_threads(threads) %>% 
        # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
        select(-fct) %>%
        to_arrow() %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble(),
      ds %>%
        select(-fct) %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble()
    )
    
    if (isTRUE(it_worked)) {
      cat("!")
    } else {
      cat("X")
    }
  }
}


make_duckdb_crash(n = 50, auto_disconnect = FALSE, threads = 1)
#> Starting attempts [X!X!!!X!X!!X!!!!!!!!X!X!!!XX!!!!X!X!!!XX!!!X!!!!!X]
make_duckdb_crash(n = 50, auto_disconnect = FALSE, threads = 10)
#> Starting attempts [!!!!X!!!XX!X!!X!!X!X!!X!XX!!!X!!X!!!XX!X!!X!!!!XX!]
make_duckdb_crash(n = 50, auto_disconnect = TRUE, threads = 1)
#> Starting attempts [!!!X!XX!!!!!!X!X
#> Error: IOError: Query Stream is closed
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:233  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:417  iterator_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:336  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:347  ReadAll(&batches)
#> ]
make_duckdb_crash(n = 50, auto_disconnect = TRUE, threads = 10)
#> Starting attempts [!X!XX!XXX
#> Error: IOError: Query Stream is closed
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1759  StatusFromCError(stream_.get_next(&stream_, &c_array))
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.h:233  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:417  iterator_.Next()
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:336  ReadNext(&batch)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/record_batch.cc:347  ReadAll(&batches)
#> ]

Created on 2022-01-11 by the reprex package (v2.0.1)

@jonkeane
Copy link
Member Author

Aaah I was thinking adding the threading to the code you have here: #11730 (comment) (and doing that a few times checking like you have in the later ones.

And/or what happens if you do arrow -> duckdb and select from there — does that return the same result always?

@paleolimbot
Copy link
Member

I see!

All works as long as we're not scanning a data set (with or without threading).

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

make_duckdb_crash <- function(n = 50, threads = 1) {
  cat("Starting attempts [")
  on.exit(cat("]\n"))
  
  for (i in seq_len(n)) {

    # try using only duckdb without to_arrow()
    drv <- duckdb::duckdb()
    con <- DBI::dbConnect(drv)
    DBI::dbExecute(con, sprintf("PRAGMA threads=%d", threads))

    example_data <- tibble::tibble(
      int = c(1:3, NA_integer_, 5:10),
      dbl = c(1:8, NA, 10) + .1,
      dbl2 = rep(5, 10),
      lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
      false = logical(10),
      chr = letters[c(1:5, NA, 7:10)],
      fct = factor(letters[c(1:4, NA, NA, 7:10)])
    )
    
    tf <- tempfile()
    new_ds <- rbind(
      cbind(example_data, part = 1),
      cbind(example_data, part = 2),
      cbind(example_data, part = 3),
      cbind(example_data, part = 4)
    ) %>%
      mutate(row_order = 1:n())
    
    # easier with a single parquet here
    write_parquet(new_ds, tf)
    
    DBI::dbExecute(
      con,
      glue::glue_sql(
        "CREATE VIEW ds_view AS SELECT * FROM parquet_scan({ tf });",
        .con = con
      )
    )
    
    res <- DBI::dbSendQuery(con, "SELECT * from ds_view", arrow = TRUE)
    rbr <- duckdb::duckdb_fetch_record_batch(res)
    rbr_query <- arrow:::as_adq(rbr)
    
    ds <- open_dataset(tf)
    
    it_worked <- all.equal(
      rbr_query %>% 
        select(-fct) %>%
        to_arrow() %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble(),
      ds %>%
        select(-fct) %>%
        filter(int > 5 & part > 1) %>%
        collect() %>%
        arrange(row_order) %>%
        tibble::as_tibble()
    )
    
    if (isTRUE(it_worked)) {
      cat("!")
    } else {
      cat("X")
    }
    
    DBI::dbDisconnect(con, shutdown = TRUE)
  }
}

make_duckdb_crash(n = 50, threads = 1)
#> Starting attempts [!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!]
make_duckdb_crash(n = 50, threads = 10)
#> Starting attempts [!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!]

Created on 2022-01-11 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

Putting this down for the day, but put together a setup for debugging the whole stack (that you can run off of the main branch since there were a few changes to the scanner):

Details
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

debugged_fun <- function(x) {
  force(x)
  name <- deparse(substitute(x))
  i <- 0
  function(...) {
    i <<- i + 1
    call <- match.call()
    call[[1]] <- as.symbol(name)
    cat(sprintf("%s[%d]: %s\n", name, i, format(call)))
    x(...)
  }
}

duckdb_register_arrow_mod <- function(conn, name, arrow_scannable) {
  export_fun <- function(arrow_scannable, stream_ptr, projection = NULL, filter = TRUE) {
    scanner <- arrow::Scanner$create(arrow_scannable, projection, filter)
    rbr <- scanner$ToRecordBatchReader()
    rbr$export_to_c(stream_ptr)
  }
  
  function_list <- list(
    debugged_fun(export_fun),
    debugged_fun(arrow::Expression$create), 
    debugged_fun(arrow::Expression$field_ref),
    debugged_fun(arrow::Expression$scalar)
  )
  
  .Call(
    "duckdb_register_arrow_R", 
    conn@conn_ref, 
    enc2utf8(as.character(name)), 
    function_list,
    arrow_scannable,
    PACKAGE = "duckdb"
  )
  
  invisible(TRUE)
}

to_duckdb_mod <- function(x, con) {
  table_name <- digest::digest(runif(1))
  duckdb_register_arrow_mod(con, table_name, x)
  dplyr::tbl(con, table_name)
}

to_arrow_mod <- function(x) {
  res <- DBI::dbSendQuery(
    dbplyr::remote_con(x),
    dbplyr::remote_query(x),
    arrow = TRUE
  )
  
  rbr <- duckdb::duckdb_fetch_record_batch(res)
  arrow:::arrow_dplyr_query(rbr)
}

make_crash <- function() {
  drv <- duckdb::duckdb()
  con <- DBI::dbConnect(drv)
  DBI::dbExecute(con, sprintf("PRAGMA threads=%d", 10))
  
  example_data <- tibble::tibble(
    int = c(1:3, NA_integer_, 5:10),
    dbl = c(1:8, NA, 10) + .1,
    dbl2 = rep(5, 10),
    lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
    false = logical(10),
    chr = letters[c(1:5, NA, 7:10)],
    fct = factor(letters[c(1:4, NA, NA, 7:10)])
  )
  
  tf <- tempfile()
  new_ds <- rbind(
    cbind(example_data, part = 1),
    cbind(example_data, part = 2),
    cbind(example_data, part = 3),
    cbind(example_data, part = 4)
  ) %>%
    mutate(row_order = 1:n())
  
  write_dataset(new_ds, tf, partitioning = "part")
  
  it_worked <- all.equal(
    open_dataset(tf) %>%
      to_duckdb_mod(con) %>%
      select(-fct) %>%
      to_arrow_mod() %>%
      filter(int > 5 & part > 1) %>%
      collect() %>%
      arrange(row_order) %>%
      tibble::as_tibble(),
    open_dataset(tf) %>%
      select(-fct) %>%
      filter(int > 5 & part > 1) %>%
      collect() %>%
      arrange(row_order) %>%
      tibble::as_tibble()
  )
  
  DBI::dbDisconnect(con, shutdown = TRUE)
  
  invisible(it_worked)
}

make_crash_n <- function(n = 50) {
  cat("Starting attempts [")
  on.exit(cat("]\n"))
  
  for (i in seq_len(n)) {
    if (isTRUE(make_crash())) {
      cat("!")
    } else {
      cat("X")
    }
  }
}


make_crash()
#> export_fun[1]: export_fun(<environment>, 5278113568)
#> export_fun[2]: export_fun(<environment>, 4799467424)
#> export_fun[3]: export_fun(<environment>, 5276680688)
#> export_fun[4]: export_fun(<environment>, 4756075920, c("int", "dbl", "dbl2", 
#>  export_fun[4]: "lgl", "false", "chr", "row_order", "part"), TRUE)

Created on 2022-01-11 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

Something that came out of that last bit is that the export_fun() is called four times (I would have expected it to be called once).

@jonkeane
Copy link
Member Author

jonkeane commented Jan 11, 2022

I've rebased (to grab those scanner changes). I also noticed that there was a db connection validation that might have been the source of the segfaults (I didn't realize it was still there, and definitely can do bad things since underneath that, there's a simple query and duckdb doesn't do well when multiple queries are run at the same time).

I also noticed that the differences seemed to be in the integer columns, so tried pulling those out to see if that helps, and it seems to (though seeing it be ok on CI will give me more confidence there) nope, turns out this also happens without the int columns too: https://github.com/apache/arrow/runs/4779732667?check_suite_focus=true#step:7:21463

@paleolimbot
Copy link
Member

After working on this for much of the day, I can no longer get this to work (with on-disk data sets) even with $read_table()!

Details
# remotes::install_github("apache/arrow/r")
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)],
  fct = factor(letters[c(1:4, NA, NA, 7:10)])
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1),
  cbind(example_data, part = 2),
  cbind(example_data, part = 3),
  cbind(example_data, part = 4)
) %>%
  mutate(row_order = 1:n())

write_dataset(new_ds, tf, partitioning = "part")

ds <- open_dataset(tf)
waldo::compare(
  ds %>%
    to_duckdb(auto_disconnect = FALSE) %>%
    # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
    select(-fct) %>%
    to_arrow() %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble(),
  ds %>%
    select(-fct) %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble()
)
#> `attr(old, 'row.names')[13:18]`: 13 14 15 16 17 18
#> `attr(new, 'row.names')[13:15]`: 13 14 15         
#> 
#> old vs new
#>              int  dbl dbl2   lgl false  chr row_order part
#> - old[1, ]  1347  2.1    5    NA FALSE    b         0 1347
#>   old[2, ]     6  6.1    5 FALSE FALSE <NA>        16    2
#>   old[3, ]     7  7.1    5 FALSE FALSE    g        17    2
#>   old[4, ]     8  8.1    5 FALSE FALSE    h        18    2
#> 
#> old vs new
#>              int  dbl dbl2   lgl false  chr row_order part
#>   old[14, ]    8  8.1    5 FALSE FALSE    h        38    4
#>   old[15, ]    9   NA    5  TRUE FALSE    i        39    4
#>   old[16, ]   10 10.1    5  TRUE FALSE    j        40    4
#> - old[17, ] 1347  2.1    5    NA FALSE    b      1347 1347
#> - old[18, ] 1347  2.1    5    NA FALSE    b      1347 1347
#> 
#> `old$int[1:4]`: 1347 6 7 8
#> `new$int[1:3]`:      6 7 8
#> 
#> `old$int[14:18]`: 8 9 10 1347 1347
#> `new$int[13:15]`: 8 9 10          
#> 
#> `old$dbl[1:4]`: 2.1 6.1 7.1 8.1
#> `new$dbl[1:3]`:     6.1 7.1 8.1
#> 
#> `old$dbl[14:18]`: 8.1  10.1 2.1 2.1
#> `new$dbl[13:15]`: 8.1  10.1        
#> 
#> `old$dbl2[13:18]`: 5 5 5 5 5 5
#> `new$dbl2[13:15]`: 5 5 5      
#> 
#> `old$lgl[1:4]`: <NA> FALSE FALSE FALSE
#> `new$lgl[1:3]`:      FALSE FALSE FALSE
#> 
#> `old$lgl[14:18]`: FALSE TRUE TRUE <NA> <NA>
#> `new$lgl[13:15]`: FALSE TRUE TRUE          
#> 
#> And 7 more differences ...

ds <- InMemoryDataset$create(new_ds)
waldo::compare(
  ds %>%
    to_duckdb(auto_disconnect = FALSE) %>%
    # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879
    select(-fct) %>%
    to_arrow() %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble(),
  ds %>%
    select(-fct) %>%
    filter(int > 5 & part > 1) %>%
    collect() %>%
    arrange(row_order) %>%
    tibble::as_tibble()
)
#> ✓ No differences

Created on 2022-01-11 by the reprex package (v2.0.1)

@paleolimbot
Copy link
Member

I've spent some more time trying to replicate this failure with and without DuckDB and/or carrow, and I've noticed that ArrowArrayStream.get_next() callback is getting called from DuckDB from multiple threads. For my carrow array stream it's definitely not safe to do that so we also get some other errors and crashes. For Arrow's RecordBatchReader it probably doesn't care about being called from the R main thread but maybe does care about all the calls coming from the same thread? All the calls that never give corrupted data seem not to spawn any other threads. Using as_adq() %>% collect() also spawns a worker thread but only reads the data once.

Full reprex I've been working off:

Details
# remotes::install_github("apache/arrow/r")
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

example_data <- tibble::tibble(
  int = c(1:3, NA_integer_, 5:10),
  dbl = c(1:8, NA, 10) + .1,
  dbl2 = rep(5, 10),
  lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
  false = logical(10),
  chr = letters[c(1:5, NA, 7:10)]
)

tf <- tempfile()
new_ds <- rbind(
  cbind(example_data, part = 1L),
  cbind(example_data, part = 2L),
  cbind(example_data, part = 3L),
  cbind(example_data, part = 4L)
) %>%
  mutate(row_order = 1:n()) %>% 
  select(row_order, everything())

write_dataset(new_ds, tf, partitioning = "part")

# function to print the calling threadid
cpp11::cpp_source(code = '
#include <cpp11.hpp>
#include <iostream>
#include <thread>
[[cpp11::register]]
void print_thread_id(std::string lab) {
  std::cout << lab << ": " << std::this_thread::get_id() << "\\n";
}
')

as_rbr <- function(x) {
  rbr <- Scanner$create(x)$ToRecordBatchReader()
  
  batches <- list()
  while (!is.null(batch <- rbr$read_next_batch())) {
    batches[[length(batches) + 1]] <- batch
  }
  
  carrow::carrow_array_stream_to_arrow(carrow::as_carrow_array_stream(batches))
}

# collecting via carrow always seems to work
open_dataset(tf) %>% 
  as_rbr() %>% 
  carrow::as_carrow_array_stream() %>% 
  carrow::carrow_array_stream_collect() %>% 
  waldo::compare(new_ds)
#> ✓ No differences

# collecting via read_table() always seems to work
open_dataset(tf) %>% 
  as_rbr() %>% 
  (function(rbr) rbr$read_table()) %>% 
  as.data.frame() %>% 
  as.data.frame() %>% 
  waldo::compare(new_ds)
#> ✓ No differences

# collecting RecordBatchReader via duckdb gives zero rows? Maybe because it's
# reading the dataset more than once
open_dataset(tf) %>% 
  as_rbr() %>%
  to_duckdb(auto_disconnect = FALSE) %>% 
  dplyr::collect()
#> # A tibble: 0 × 8
#> # … with 8 variables: row_order <int>, int <int>, dbl <dbl>, dbl2 <dbl>,
#> #   lgl <lgl>, false <lgl>, chr <chr>, part <int>
carrow_c_carrow_array_stream(): 0x104f17d40
get_schema(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
carrow_c_carrow_array_stream(): 0x104f17d40
get_next(): 0x104f17d40
get_schema(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
get_next(): 0x104f17d40
carrow_c_carrow_array_stream(): 0x104f17d40
get_schema(): 0x104f17d40
get_next(): 0x16b7ff000
get_next(): 0x16b7ff000
get_next(): 0x16b7ff000
get_next(): 0x16b7ff000
get_next(): 0x16b7ff000
get_next(): 0x16b88b000
get_next(): 0x16b9a3000
get_next(): 0x16ba2f000

@westonpace
Copy link
Member

For Arrow's RecordBatchReader it probably doesn't care about being called from the R main thread but maybe does care about all the calls coming from the same thread?

Generally speaking, iterators, readers, etc. (anything that is keeping some kind of internal pointer to a position in some source stream) is not going to be thread safe in Arrow-C++ unless it specifically says otherwise (and I can't think of any that do). That being said, access from multiple threads is probably ok as long as you protect the access with a mutex.

@westonpace
Copy link
Member

westonpace commented Jan 19, 2022

I dug into this a little bit today. I used Dewey's latest reprex but for reading I just used this (to avoid carrow at the moment):

  x <- open_dataset(tf) %>%
    to_duckdb(auto_disconnect = FALSE) %>%
    to_arrow() %>%
    dplyr::collect()
  print(x, max_footer_lines=100, n=100)

Warning, the following is a barrage of facts with a smattering of assumptions / conclusions at the end.

I found a few oddities. In that one query I expected to see two exec plans get created. The first plan sources from files and sinks into duckdb (crossing the bridge into duckdb). The second plan sources from duckdb (crossing the bridge back into arrow) and sinks into R.

In reality there were five exec plans created. Four of these came from calls to RArrowTabularStreamFactory::Produce (duckdb code). All four calls to Produce had the same factory_p. All four of these yield 4 batches (as expected) although the first three scans don't appear to be consumed. The first three calls had an empty project_columns and a null filters. The fourth call had a valid project_columns and a valid filters.

Across the run 12 instances of ExportedArrayStream are created. Only 4 of these are released. I don't think this is a problem but figured I'd record it. I believe the private data here is a shared_ptr so these could just be copies but for something that has a Release method I expected parity between creation and Release. Each of the first three calls to RArrowTabularStreamFactory::Produce generates 2 ExportedArrayStream instances and releases 1. None of the ExportedArrayStream's created by these calls ever yields any actual arrays.

The final call to RArrowTabularStreamFactory::Produce generates 6 instances of ExportedArrayStream and these generate 9 arrays each. All of the arrays are released (although rather quickly). So there are a total of 36 calls to send an array from Arrow to DuckDB and all 36 are released.

All 36 of these arrays seem valid and to be containing the correct data.

The final exec plan (which reads from duckdb's bridge and converts back to R) yields four batches. Some of these batches are repeated and/or contain garbage information.

My theory is that the arrays are being released by DuckDb before they are finished on the R side

I did a bit more investigation to try and come up with some conclusive evidence to this regard.

If I put a breakpoint on Arrow's release method I find that the array data is being released at duckdb/src/function/table/arrow.cpp:1078 which seems incorrect. Basically DuckDb is holding onto the batch until the next batch is read in and then it discards the old batch.

One possible theory is that DuckDb is making a copy of the array data and so it is ok to release the array data on the Arrow side after this copy is made. However, if I look at the buffer pointers that are exported (from Arrow) and the buffer pointers that are imported (from DuckDb) many of them match. If this is a "use after free" scenario it makes sense that not all buffer pointers are overwritten. However, if this is a "DuckDb makes a copy" scenario then I would expect all of the buffer pointers to be distinct. Furthermore, the buffers imported from DuckDb do not appear valid. I have more NULLs than I would expect and some of them are pointers to parts of memory that are not on the heap.

@paleolimbot
Copy link
Member

Thank you for digging into this!

A few bits from the duckdb source that might be relevant:

@jonkeane
Copy link
Member Author

Thank you so much for all this info. I chatted briefly with @pdet about this. Here are some notes from that conversation (though please correct me if I'm misrepresenting any of this!)

In reality there were five exec plans created. Four of these came from calls to RArrowTabularStreamFactory::Produce (duckdb code). All four calls to Produce had the same factory_p. All four of these yield 4 batches (as expected) although the first three scans don't appear to be consumed. The first three calls had an empty project_columns and a null filters. The fourth call had a valid project_columns and a valid filters.

Across the run 12 instances of ExportedArrayStream are created. Only 4 of these are released. I don't think this is a problem but figured I'd record it. I believe the private data here is a shared_ptr so these could just be copies but for something that has a Release method I expected parity between creation and Release. Each of the first three calls to RArrowTabularStreamFactory::Produce generates 2 ExportedArrayStream instances and releases 1. None of the ExportedArrayStream's created by these calls ever yields any actual arrays.

@pdet these extra calls are for getting the schema information, yeah?

The other source in DuckDB that might be relevant (and would be good for us to check form a using-Arrow's-C-interface-correctly perspective) are in: https://github.com/duckdb/duckdb/blob/master/src/main/query_result.cpp

@pdet
Copy link

pdet commented Jan 20, 2022

I've checked the comments of @westonpace, and by releasing (in the unique_ptr sense) the chunk pointers when scanning arrow objects instead of deleting them, the error is fixed, so basically DuckDB would delete arrow structs when referencing their pointers in the scan.
This is fine when reading data directly to duckdb, because due to the vectorized execution the data would actually get processed properly, but when this is needed to export to arrow again, it will cause arrow to read deleted memory spaces.

I will make a fix for this on the duckdb side soon and will ping you to test it, but that seems to be it.

@wesm
Copy link
Member

wesm commented Jan 24, 2022

I was looking at the Jira for this and it wasn't obvious to me what is meant by "true duckdb streaming" -- could you add a PR description to explain what this issue is about?

@jonkeane
Copy link
Member Author

Yes, absolutely. I'm working on cleaning this up + will add a description and flag it as ready. The long story short is: there was an issue with DuckDB that was causing the corruption we saw here that is being resolved there duckdb/duckdb#2957, so this PR will be effectively providing a path to come back from DuckDB via RecordBatch readers and without materializing the full table.

Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like committing to RecordBatchReader as the output type (and the corresponding changes that automatically create a dplyr query when it's needed). The last bit also supports RecordBatchReaders imported from Python (which you previously couldn't pipe into dplyr functions).

One of the things that came out of the debugging from everybody was that the input (to_duckdb()) can't be a RecordBatchReader since it's read four times. I think we probably need to read_table() on the input if the input is a RecordBatchReader.

r/R/duckdb.R Outdated
res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
out <- duckdb::duckdb_fetch_record_batch(res)

if (stream) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would drop the stream argument, personally, and force a user to collect() the result (now that this is implemented for the RecordBatchReader above).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Both one can collect() but also could do to_arrow()$read_table() if one absolutely needed to.

I was mostly keeping the option here for an escape hatch if we ran into another bug like we saw here, but we've got that anyway when returning a RBR

r/R/duckdb.R Outdated

# TODO: we shouldn't need $read_table(), but we get segfaults when we do.
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)$read_table())
duckdb::duckdb_fetch_record_batch(res)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you keep the arrow_dplyr_query() around this here then you don't need to add all of the RecordBatchReader methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's what I had originally. But I found that (especially for testing) being able to return a RecordBatchReader gave more flexibility (and to be able to start a dplyr query with a RecordBatchReader without needing to reach in and wrap arrow_dplyr_query() was also nice).

Besides the extra characters, I don't see an issue with allowing RBRs to be used like this — but LMK if I'm wrong about that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to add the RBR methods, though I'm not sure how useful they are in practice outside of this. The other concern is: suppose someone ends their pipeline with to_arrow() and they get an RBR back. RBR doesn't have many useful methods defined for it (print(), names(), etc.) yet, so it will be awkward to work with if you don't know that you can dplyr things on it. Maybe those methods should be added. Seems like scope creep here though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the issue is that to_arrow() is too general...something like as.data.frame(), as_record_batch_reader(), as_arrow_table(), etc. would be more specific. as_arrow_dplyr_query() is a bit of a mouthful and the name might change with the substrait stuff, so maybe to_arrow() is a good choice for that one (which would then push the record batch reader stuff to a separate ticket).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened ARROW-15168 about potentially adding some as_*() S3 generics, which may another place to push some of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTR as_arrow_dplyr_query() exists as as_adq() to avoid the mouthfulness. But it is currently neither generic nor exported.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "is to_arrow() too general" is a separate question from what I was raising, but it is a valid concern.

@jonkeane
Copy link
Member Author

I've kept the various RecordBatchReader methods, though I've re-enabled the default for wrapping in arrow_dplyr_query since that is a bit friendlier. I've opened https://issues.apache.org/jira/browse/ARROW-15489 to talk about how we might make RecordBatchReaders just as friendly.

Comment on lines +160 to +164
if (as_arrow_query) {
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
} else {
duckdb::duckdb_fetch_record_batch(res)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (as_arrow_query) {
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
} else {
duckdb::duckdb_fetch_record_batch(res)
}
out <- duckdb::duckdb_fetch_record_batch(res)
if (as_arrow_query) {
out <- arrow_dplyr_query(out)
}
out

#' to_arrow() %>%
#' collect()
to_arrow <- function(.data, stream = TRUE) {
to_arrow <- function(.data, as_arrow_query = TRUE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this argument?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably (and hopefully) not very, but I wanted to have an escape route in case we see another issue like at the start of this PR where duckdb::duckdb_fetch_record_batch(res) fails, but duckdb::duckdb_fetch_record_batch(res)$read_table() works (over time, the DuckDB master branch got into a state where both failed consistently, but at the beginning reading the table worked just fine, but accessing the RBR did not. And since we are at the mercy of both of our release cycles for fixing this, the cost of having this escape hatch doesn't seem so bad to me, but I can remove it.

It also helps a bit when debugging / testing (one doesn't have to recreate to_arrow() without the wrapper).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get the RBR from $.data in the query object, is that sufficient for debugging purposes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nm, it gets wrapped in a InMemoryDataset. Alright, I don't like this but this is fine, we can prune it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also mark it is a temporary workaround for when we simplify this in the future? It's a little silly to effectively say "this is deprecated" when we introduce it, but I'm not sure when we'll get to doing the simplification + improvements so this can simply emit RBRs and those have the right signaling/methods so that it's clear that they are a good thing to use in dplyr queries

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to_arrow() is a good fit...it gives the thing that a user probably wants. A possible future as_record_batch_reader() would be the right incantation for a user who wants it!

@jonkeane jonkeane closed this in 858470d Feb 7, 2022
@ursabot
Copy link

ursabot commented Feb 7, 2022

Benchmark runs are scheduled for baseline = 501d92e and contender = 858470d. 858470d is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.26% ⬆️0.0%] test-mac-arm
[Finished ⬇️1.79% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.04% ⬆️0.0%] ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants