Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
local file path or URI of type `str` or `FileObj`.
Dropped concept of _slice factories_ entirely. [#78]

* Chunk sizes can now be `null` for a given dimension. In this case the actual
chunk size used is the size of the array's shape in that dimension. [#77]

* Internal refactoring: Extracted `Config` class out of `Context` and
made available via new `Context.config: Config` property.
The change concerns any usages of the `ctx: Context` argument passed to
Expand Down
15 changes: 11 additions & 4 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,17 @@ Variable metadata.
Must be one of the following:

* Type _array_.
Chunk sizes in the order of the dimensions.
The items of the array are of type _integer_.
Chunk sizes for each dimension of the variable.
The items of the array must be one of the following:

* Type _integer_.
Dimension is chunked using given size.

* Disable chunking in this dimension.
Its value is `null`.


* Disable chunking.
* Disable chunking in all dimensions.
Its value is `null`.


Expand Down Expand Up @@ -243,7 +250,7 @@ Options for the filesystem given by the protocol of `temp_dir`.
## `force_new`

Type _boolean_.
Force creation of a new target dataset. An existing target dataset (and its lock) will be permanently deleted before appending of slice datasets begins. WARNING: the deletion cannot be rolled back.
Force creation of a new target dataset. An existing target dataset (and its lock) will be permanently deleted before appending of slice datasets begins. WARNING: the deletion cannot be rolled back.
Defaults to `false`.

## `disable_rollback`
Expand Down
57 changes: 50 additions & 7 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,39 @@ multiple variables the wildcard variable name `*` can often be of help.

#### Chunking

Chunking refers to the subdivision of multidimensional data arrays into
smaller multidimensional blocks. Using the Zarr format, such blocks become
individual data files after optional [data packing](#data-packing)
and [compression](#compression). The chunk sizes of the
dimensions of the multidimensional blocks therefore determine the number of
blocks used per data array and also their size. Hence, chunk sizes have
a very large impact on I/O performance of datasets, especially if they are
persisted in remote filesystems such as S3. The chunk sizes are specified
using the `chunks` setting in the encoding of each variable.
The value of `chunks` can also be `null`, which means no chunking is
desired and the variable's data array will be persisted as one block.

By default, the chunking of the coordinate variable corresponding to the append
dimension will be its dimension in the first slice dataset. Often, this will be one or
a small number. Since `xarray` loads coordinates eagerly when opening a dataset, this
can lead to performance issues if the target dataset is served from object storage such
as S3. This is because, a separate HTTP request is required for every single chunk. It
is therefore very advisable to set the chunks of that variable to a larger number using
the `chunks` setting. For other variables, the chunking within the append dimension may
stay small if desired:
dimension will be its dimension size in the first slice dataset. Often, the size
will be `1` or another small number. Since `xarray` loads coordinates eagerly
when opening a dataset, this can lead to performance issues if the target
dataset is served from object storage such as S3. The reason for this is that a
separate HTTP request is required for every single chunk. It is therefore very
advisable to set the chunks of that variable to a larger number using the
`chunks` setting. For other variables, you could still use a small chunk size
in the append dimension.

Here is a typical chunking configuration for the append dimension `"time"`:

```json
{
"append_dim": "time",
"variables": {
"*": {
"encoding": {
"chunks": null
}
},
"time": {
"dims": ["time"],
"encoding": {
Expand All @@ -351,6 +372,28 @@ stay small if desired:
}
```

Sometimes, you may explicitly wish to not chunk a given dimension of a variable.
If you know the size of that dimension in advance, you can then use its size as
chunk size. But there are situations, where the final dimension size depends
on some processing parameters. For example, you could define your own
[slice source](#slice-sources) that takes a geodetic bounding box `bbox`
parameter to spatially crop your variables in the `x` and `y` dimensions.
If you want such dimensions to not be chunked, you can set their chunk sizes
to `null` (`None` in Python):

```json
{
"variables": {
"chl": {
"dims": ["time", "y", "x"],
"encoding": {
"chunks": [1, null, null]
}
}
}
}
```

#### Missing Data

To indicate missing data in a variable data array, a dedicated no-data or missing value
Expand Down
44 changes: 12 additions & 32 deletions tests/config/test_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def test_it_raises_if_config_is_not_object(self):
normalize_config(file_obj)

def test_normalize_sequence(self):
data_var_spec = {
"dims": ("time", "y", "x"),
"encoding": {
"dtype": "float32",
"chunks": (1, 20, 30),
"fill_value": None,
},
}
configs = (
{
"version": 1,
Expand Down Expand Up @@ -129,22 +137,8 @@ def test_normalize_sequence(self):
},
{
"variables": {
"chl": {
"dims": ("time", "y", "x"),
"encoding": {
"dtype": "float32",
"chunks": (1, 20, 30),
"fill_value": None,
},
},
"tsm": {
"dims": ("time", "y", "x"),
"encoding": {
"dtype": "float32",
"chunks": (1, 20, 30),
"fill_value": None,
},
},
"chl": data_var_spec,
"tsm": data_var_spec,
}
},
)
Expand All @@ -170,22 +164,8 @@ def test_normalize_sequence(self):
"dims": "time",
"encoding": {"dtype": "uint64"},
},
"chl": {
"dims": ("time", "y", "x"),
"encoding": {
"dtype": "float32",
"chunks": (1, 20, 30),
"fill_value": None,
},
},
"tsm": {
"dims": ("time", "y", "x"),
"encoding": {
"dtype": "float32",
"chunks": (1, 20, 30),
"fill_value": None,
},
},
"chl": data_var_spec,
"tsm": data_var_spec,
},
},
normalize_config(configs),
Expand Down
95 changes: 78 additions & 17 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,20 @@ def test_some_slices_local_output_to_existing_dir_force_new(self):
zappend(slices, target_dir=target_dir, force_new=True)
self.assertEqual(False, lock_file.exists())

def test_some_slices_with_class_slice_source(self):
def test_some_slices_with_slice_source_class(self):
class DropTsm(SliceSource):
def __init__(self, slice_ds):
self.slice_ds = slice_ds

def get_dataset(self) -> xr.Dataset:
return self.slice_ds.drop_vars(["tsm"])

def dispose(self):
pass

target_dir = "memory://target.zarr"
slices = [make_test_dataset(index=3 * i) for i in range(3)]
zappend(slices, target_dir=target_dir, slice_source=MySliceSource)
zappend(slices, target_dir=target_dir, slice_source=DropTsm)
ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 9, "y": 50, "x": 100}, ds.sizes)
self.assertEqual({"chl"}, set(ds.data_vars))
Expand All @@ -158,13 +168,13 @@ def test_some_slices_with_class_slice_source(self):
ds.attrs,
)

def test_some_slices_with_func_slice_source(self):
def process_slice(slice_ds: xr.Dataset) -> SliceSource:
return MySliceSource(slice_ds)
def test_some_slices_with_slice_source_func(self):
def drop_tsm(slice_ds: xr.Dataset) -> xr.Dataset:
return slice_ds.drop_vars(["tsm"])

target_dir = "memory://target.zarr"
slices = [make_test_dataset(index=3 * i) for i in range(3)]
zappend(slices, target_dir=target_dir, slice_source=process_slice)
zappend(slices, target_dir=target_dir, slice_source=drop_tsm)
ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 9, "y": 50, "x": 100}, ds.sizes)
self.assertEqual({"chl"}, set(ds.data_vars))
Expand All @@ -177,6 +187,68 @@ def process_slice(slice_ds: xr.Dataset) -> SliceSource:
ds.attrs,
)

# See https://github.com/bcdev/zappend/issues/77
def test_some_slices_with_cropping_slice_source_no_chunks_spec(self):
def crop_ds(slice_ds: xr.Dataset) -> xr.Dataset:
w = slice_ds.x.size
h = slice_ds.y.size
return slice_ds.isel(x=slice(5, w - 5), y=slice(5, h - 5))

target_dir = "memory://target.zarr"
slices = [make_test_dataset(index=3 * i) for i in range(3)]
zappend(slices, target_dir=target_dir, slice_source=crop_ds)
ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 9, "y": 40, "x": 90}, ds.sizes)
self.assertEqual({"chl", "tsm"}, set(ds.data_vars))
self.assertEqual({"time", "y", "x"}, set(ds.coords))
self.assertEqual((90,), ds.x.encoding.get("chunks"))
self.assertEqual((40,), ds.y.encoding.get("chunks"))
self.assertEqual((3,), ds.time.encoding.get("chunks"))
# Chunk sizes are the ones of the original array, because we have not
# specified chunks in encoding.
self.assertEqual((1, 25, 45), ds.chl.encoding.get("chunks"))
self.assertEqual((1, 25, 45), ds.tsm.encoding.get("chunks"))

# See https://github.com/bcdev/zappend/issues/77
def test_some_slices_with_cropping_slice_source_with_chunks_spec(self):
def crop_ds(slice_ds: xr.Dataset) -> xr.Dataset:
w = slice_ds.x.size
h = slice_ds.y.size
return slice_ds.isel(x=slice(5, w - 5), y=slice(5, h - 5))

variables = {
"*": {
"encoding": {
"chunks": None,
}
},
"chl": {
"encoding": {
"chunks": [1, None, None],
}
},
"tsm": {
"encoding": {
"chunks": [None, 25, 50],
}
},
}

target_dir = "memory://target.zarr"
slices = [make_test_dataset(index=3 * i) for i in range(3)]
zappend(
slices, target_dir=target_dir, slice_source=crop_ds, variables=variables
)
ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 9, "y": 40, "x": 90}, ds.sizes)
self.assertEqual({"chl", "tsm"}, set(ds.data_vars))
self.assertEqual({"time", "y", "x"}, set(ds.coords))
self.assertEqual((90,), ds.x.encoding.get("chunks"))
self.assertEqual((40,), ds.y.encoding.get("chunks"))
self.assertEqual((3,), ds.time.encoding.get("chunks"))
self.assertEqual((1, 40, 90), ds.chl.encoding.get("chunks"))
self.assertEqual((3, 25, 50), ds.tsm.encoding.get("chunks"))

def test_some_slices_with_inc_append_step(self):
target_dir = "memory://target.zarr"
slices = [make_test_dataset(index=i, shape=(1, 50, 100)) for i in range(3)]
Expand Down Expand Up @@ -391,14 +463,3 @@ def test_some_slices_with_profiling(self):
finally:
if os.path.exists("prof.out"):
os.remove("prof.out")


class MySliceSource(SliceSource):
def __init__(self, slice_ds):
self.slice_ds = slice_ds

def get_dataset(self) -> xr.Dataset:
return self.slice_ds.drop_vars(["tsm"])

def dispose(self):
pass
42 changes: 42 additions & 0 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,47 @@ def test_variable_encoding_from_netcdf(self):
).to_dict(),
)

def test_variable_encoding_can_deal_with_chunk_size_none(self):
# See https://github.com/bcdev/zappend/issues/77
a = xr.DataArray(np.zeros((2, 3, 4)), dims=("time", "y", "x"))
b = xr.DataArray(np.zeros((2, 3, 4)), dims=("time", "y", "x"))
self.assertEqual(
{
"attrs": {},
"sizes": {"time": 2, "x": 4, "y": 3},
"variables": {
"a": {
"attrs": {},
"dims": ("time", "y", "x"),
"encoding": {"chunks": (1, 3, 4)},
"shape": (2, 3, 4),
},
"b": {
"attrs": {},
"dims": ("time", "y", "x"),
"encoding": {"chunks": (2, 2, 3)},
"shape": (2, 3, 4),
},
},
},
DatasetMetadata.from_dataset(
xr.Dataset(
{
"a": a,
"b": b,
}
),
make_config(
{
"variables": {
"a": {"encoding": {"chunks": [1, None, None]}},
"b": {"encoding": {"chunks": [None, 2, 3]}},
},
}
),
).to_dict(),
)

def test_variable_encoding_normalisation(self):
def normalize(k, v):
metadata = DatasetMetadata.from_dataset(
Expand Down Expand Up @@ -363,6 +404,7 @@ def test_it_raises_on_unspecified_variable(self):
),
)

# noinspection PyMethodMayBeStatic
def test_it_raises_on_wrong_size_found_in_ds(self):
with pytest.raises(
ValueError,
Expand Down
2 changes: 1 addition & 1 deletion zappend/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,4 @@ def logging(self) -> dict[str, Any] | str | bool | None:
@property
def profiling(self) -> dict[str, Any] | str | bool | None:
"""Profiling configuration."""
return self._config.get("profiling") or {}
return self._config.get("profiling")
18 changes: 15 additions & 3 deletions zappend/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,23 @@
"description": "Storage chunking.",
"oneOf": [
{
"description": "Chunk sizes in the order of the dimensions.",
"description": "Chunk sizes for each dimension of the variable.",
"type": "array",
"items": {"type": "integer", "minimum": 1},
"items": {
"oneOf": [
{
"description": "Dimension is chunked using given size.",
"type": "integer",
"minimum": 1,
},
{
"description": "Disable chunking in this dimension.",
"const": None,
},
]
},
},
{"description": "Disable chunking.", "const": None},
{"description": "Disable chunking in all dimensions.", "const": None},
],
},
fill_value={
Expand Down
Loading