1919 local_filesystem_storage ,
2020 s3_storage ,
2121)
22- from icechunk .xarray import is_dask_collection , to_icechunk
22+ from icechunk .xarray import to_icechunk
2323from xarray .testing import assert_identical
2424
2525# needed otherwise not discovered
@@ -140,9 +140,8 @@ def create_zarr_target(self) -> Generator[IcechunkStore]:
140140 pytest .skip ("v2 not supported" )
141141
142142 with tempfile .TemporaryDirectory () as tmpdir :
143- storage = local_filesystem_storage (tmpdir )
144- self ._repo = Repository .create (storage )
145- session = self ._repo .writable_session ("main" )
143+ repo = Repository .create (local_filesystem_storage (tmpdir ))
144+ session = repo .writable_session ("main" )
146145 yield session .store
147146
148147 @contextlib .contextmanager
@@ -154,47 +153,15 @@ def create(self):
154153 {"test" : xr .DataArray (data , dims = ("x" , "y" ), coords = {"x" : x , "y" : y })}
155154 )
156155 with tempfile .TemporaryDirectory () as tmpdir :
157- storage = local_filesystem_storage (tmpdir )
158- self ._repo = Repository .create (storage )
159- session = self ._repo .writable_session ("main" )
156+ repo = Repository .create (local_filesystem_storage (tmpdir ))
157+ session = repo .writable_session ("main" )
160158 self .save (session .store , ds )
161159 session .commit ("initial commit" )
162- yield self . _repo .writable_session ("main" ).store , ds
160+ yield repo .writable_session ("main" ).store , ds
163161
164162 def save (self , target , ds , ** kwargs ):
165163 # not really important here
166164 kwargs .pop ("compute" , None )
167-
168- # Check if we have dask arrays
169- has_dask = any (is_dask_collection (var .data ) for var in ds .variables .values ())
170-
171- # Special handling for dask arrays to support multiple writes to the same store.
172- #
173- # Context: Some xarray tests (e.g., test_dataset_to_zarr_align_chunks_true) call
174- # save() multiple times on the same store object to test append/region writes.
175- #
176- # Background: to_icechunk() handles dask vs non-dask arrays differently:
177- # - Non-dask: Writes directly to the session (allows uncommitted changes)
178- # - Dask: Uses fork/merge for parallel workers, which:
179- # 1. session.fork() creates a child session
180- # 2. Workers write chunks, changes are merged back to parent session
181- # 3. After merge, the parent session has uncommitted changes
182- #
183- # Problem: to_icechunk() only allows dask writes on clean sessions (safety check
184- # to avoid consistency issues during fork/merge). A second dask write fails with:
185- # "Calling `to_icechunk` is not allowed on a Session with uncommitted changes"
186- #
187- # Solution: Before each dask write, if there are uncommitted changes from a previous
188- # write, commit them and refresh the session for the next fork/merge cycle.
189- if has_dask and hasattr (self , "_repo" ):
190- session = target .session
191- if session .has_uncommitted_changes :
192- session .commit ("intermediate commit" )
193- # After commit, session is read-only. Get a fresh writable session and
194- # update the store so subsequent operations see the committed data.
195- new_store = self ._repo .writable_session ("main" ).store
196- target ._store = new_store ._store
197-
198165 to_icechunk (ds , session = target .session , ** kwargs )
199166
200167 def test_zarr_append_chunk_partial (self ):
@@ -216,3 +183,8 @@ def test_zarr_region_chunk_partial_offset(self):
216183 pytest .skip (
217184 "this test requires multiple saves, and is meant to exercise Xarray logic."
218185 )
186+
187+ def test_dataset_to_zarr_align_chunks_true (self , tmp_store ) -> None : # noqa: F811
188+ pytest .skip (
189+ "this test requires multiple saves, and is meant to exercise Xarray logic."
190+ )
0 commit comments