Skip to content

Commit cda7e3a

Browse files
authored
ForkSession: deepcopy underlying Session objects. (#1248)
1 parent 56f3caa commit cda7e3a

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

icechunk-python/python/icechunk/session.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,11 @@ def fork(self) -> "ForkSession":
387387
"You should not need to fork a read-only session. Read-only sessions can be pickled and transmitted directly."
388388
)
389389
self._allow_changes = True
390-
return ForkSession(self._session)
390+
# force a deep-copy of the underlying Session,
391+
# so that multiple forks can be created and
392+
# used independently in a local session.
393+
# See test_dask.py::test_fork_session_deep_copies for an example
394+
return ForkSession(PySession.from_bytes(self._session.as_bytes()))
391395

392396

393397
class ForkSession(Session):

icechunk-python/tests/test_dask.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,25 @@ def test_xarray_to_icechunk_nested_pickling(scheduler) -> None:
9090
session.commit("wrote another commit.")
9191
with xr.open_zarr(session.store, consolidated=False) as actual:
9292
assert_identical(actual, newds)
93+
94+
95+
@pytest.mark.parametrize("scheduler", ["threads", "processes"])
96+
def test_fork_session_deep_copies(scheduler) -> None:
97+
with dask.config.set(scheduler=scheduler):
98+
ds = create_test_data(dim_sizes=(2, 3, 4)).drop_encoding().chunk(dim3=1)
99+
with tempfile.TemporaryDirectory() as tmpdir:
100+
repo = Repository.create(local_filesystem_storage(tmpdir))
101+
102+
session = repo.writable_session("main")
103+
ds.to_zarr(session.store, mode="w", compute=False)
104+
session.commit("expand store in prep for region writes")
105+
106+
session = repo.writable_session("main")
107+
forks = [session.fork() for _ in range(ds.sizes["dim3"])]
108+
for t, fork in zip(range(ds.sizes["dim3"]), forks, strict=True):
109+
to_icechunk(ds.isel(dim3=[t]), fork, region="auto")
110+
session.merge(*forks)
111+
session.commit("yaya writes succeeded")
112+
113+
actual = xr.open_dataset(repo.readonly_session("main").store, engine="zarr")
114+
xr.testing.assert_identical(actual, ds)

0 commit comments

Comments
 (0)