Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cpp/perspective/src/cpp/view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ View<CTX_T>::to_arrow(
bool emit_group_by,
bool compress
) const {
PSP_GIL_UNLOCK();
PSP_READ_LOCK(*get_lock());

std::shared_ptr<t_data_slice<CTX_T>> data_slice =
get_data(start_row, end_row, start_col, end_col);
return data_slice_to_arrow(data_slice, emit_group_by, compress);
Expand All @@ -620,6 +623,8 @@ View<t_ctx2>::to_csv(
std::int32_t start_col,
std::int32_t end_col
) const {
PSP_GIL_UNLOCK();
PSP_READ_LOCK(*get_lock());

// See generic instance.
if (is_column_only() && m_ctx->unity_get_column_count() == 0) {
Expand All @@ -639,6 +644,8 @@ View<t_ctx1>::to_csv(
std::int32_t start_col,
std::int32_t end_col
) const {
PSP_GIL_UNLOCK();
PSP_READ_LOCK(*get_lock());
std::shared_ptr<t_data_slice<t_ctx1>> data_slice =
get_data(start_row, end_row, start_col, end_col);
return data_slice_to_csv(data_slice);
Expand All @@ -652,9 +659,11 @@ View<CTX_T>::to_csv(
std::int32_t start_col,
std::int32_t end_col
) const {
PSP_GIL_UNLOCK();
PSP_READ_LOCK(*get_lock());

// Arrow has a big whih miscalculates CSV header size as 1 when there are no
// columns (and hence no rows) in the dataset, so intercept these calls/
// Arrow has a bug which miscalculates CSV header size as 1 when there are
// no columns (and hence no rows) in the dataset, so intercept these calls/
if (m_ctx->unity_get_column_count() == 0) {
return std::make_shared<std::string>("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,57 @@ def update(x):
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()

def test_concurrent_updates_with_limit_tables_are_threadsafe(self):
# This is a tricky tune - at time of writing, 1000 is has a >50%
# chance of triggering this o my dev machine
TEST_ITERATIONS = 1000
global running
perspective_server = Server()
client = perspective_server.new_local_client()
table = client.table(
{"col{}".format(i): "integer" for i in range(100)}, limit=100
)

running = True

# Create an updating thread that overlaps the index alot
def feed(table):
row = {"col{}".format(i): random.randint(0, 100) for i in range(100)}
while running:
table.update([row for _ in range(100)])

thread = threading.Thread(target=feed, args=(table,))
thread.start()

results = []

# Create a thread that serialized the table alot, checking for nulls
def feed2(table):
global running
view = table.view()
while len(results) < TEST_ITERATIONS:
arr = view.to_arrow()
table2 = client.table(arr)
view2 = table2.view()
json = view2.to_json(end_row=1)
view2.delete()
table2.delete()
results.append(json)

view.delete()
running = False

thread2 = threading.Thread(target=feed2, args=(table,))
thread2.start()

thread.join()
thread2.join()

assert table.size() == 100
for result in results:
for row in result:
for col, val in row.items():
assert val is not None

table.delete()
Loading