Skip to content
Merged
9 changes: 6 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Version 0.7.0 (in development)

* Made writing custom slice sources easier: (#82)
* Made writing custom slice sources easier and more flexible: (#82)

- Slice items can now be a `contextlib.AbstractContextManager`
so custom slice functions can now be used with
Expand All @@ -11,9 +11,12 @@
is applicable. Deprecated `SliceSource.dispose()`.

- Introduced new optional configuration setting `slice_source_kwargs` that
contains keyword-arguments, which are passed to a configured `slice_source` together with
each slice item.
contains keyword-arguments, which are passed to a configured `slice_source`
together with each slice item.

- Introduced optional configuration setting `extra` that holds additional
configuration not validated by default. Intended use is by a `slice_source` that
expects an argument named `ctx` and therefore can access the configuration.

## Version 0.6.0 (from 2024-03-12)

Expand Down
10 changes: 10 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ Options for the filesystem given by the URI of `target_dir`.
Type _string_.
The fully qualified name of a class or function that receives a slice item as argument(s) and provides the slice dataset. If a class is given, it must be derived from `zappend.api.SliceSource`. If the function is a context manager, it must yield an `xarray.Dataset`. If a plain function is given, it must return any valid slice item type. Refer to the user guide for more information.

## `slice_source_kwargs`

Type _object_.
Extra keyword-arguments passed to a configured `slice_source` together with each slice item.

## `slice_engine`

Type _string_.
Expand Down Expand Up @@ -408,3 +413,8 @@ Type _boolean_.
If `true`, log only what would have been done, but don't apply any changes.
Defaults to `false`.

## `extra`

Type _object_.
Extra settings. Intended use is by a `slice_source` that expects an argument named `ctx` to access the extra settings and other configuration.

202 changes: 113 additions & 89 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,13 @@ You can also use dataset objects of type
[`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html)
as slice item. Such objects may originate from opening datasets from some storage, e.g.,
`xarray.open_dataset(slice_store, ...)` or by composing, aggregating, resampling
slice datasets from other datasets and data variables.
slice datasets from other datasets and dataset variables.

!!! note "Datasets are not closed automatically"
If you pass [`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html) objects to `zappend` they will not be
automatically closed. This may become be issue, if you have many datasets
and each one binds resources such as open file handles. Consider using a
_slice source_ then, see below.

Chunked data arrays of an `xarray.Dataset` are usually instances of
[Dask arrays](https://docs.dask.org/en/stable/array.html), to allow for out-of-core
Expand All @@ -676,9 +682,9 @@ for computed slice datasets, especially if the specified target dataset chunking
different from the slice dataset chunking. This may cause Dask graphs to be
computed multiple times if the source chunking overlaps multiple target chunks,
potentially causing large resource overheads while recomputing and/or reloading the same
source chunks multiple times. In such cases it can help to "terminate"
computations for each slice by persisting the computed dataset first and then to
reopen it. This can be specified using the `persist_mem_slice` setting:
source chunks multiple times. In such cases it can help to "terminate" computations for
each slice by persisting the computed dataset first and then to reopen it. This can be
specified using the `persist_mem_slice` setting:

```json
{
Expand All @@ -692,61 +698,96 @@ at the cost of additional i/o. It therefore defaults to `false`.

#### Slice Sources

If you need some custom cleanup after a slice has been processed and appended to the
target dataset, you can use instances of `zappend.api.SliceSource` as slice items.
The `SliceSource` methods with special meaning are:
A slice source gives you full control about how a slice dataset is created, loaded,
or generated and how its bound resources, if any, are released. In its simplest form,
a slice source is a plain Python function that can take any arguments and returns
an `xarray.Dataset`:

```python
import xarray as xr

# Slice source argument `path` is just an example.
def get_dataset(path: str) -> xr.Dataset:
# Provide dataset here. No matter how, e.g.:
return xr.open_dataset(path)
```

If you need cleanup code that is executed after the slice dataset has been appended,
you can turn your slice source function into a
[context manager](https://docs.python.org/3/library/contextlib.html)
(new in zappend v0.7):

```python
from contextlib import contextmanager
import xarray as xr

# Slice source argument `path` is just an example.
@contextmanager
def get_dataset(path: str) -> xr.Dataset:
# Bind any resources and provide dataset here, e.g.:
dataset = xr.open_dataset(path)
try:
# Yield (not return!) the dataset
yield dataset
finally:
# Cleanup code here, release any bound resources, e.g.:
dataset.close()
```

You can also implement your slice source as a class derived from the abstract
`zappend.api.SliceSource` class. Its interface methods are:

* `get_dataset()`: a zero-argument method that returns the slice dataset of type
`xarray.Dataset`. You must implement this abstract method.
* `close()`: perform any resource cleanup tasks
* `close()`: Optional method. Put your cleanup code here.
(in zappend < v0.7, the `close` method was called `dispose`).
* `__init__()`: optional constructor that receives any arguments passed to the
slice source.

Here is the template code for your own slice source implementation:

```python
import xarray as xr
from zappend.api import SliceSource

class MySliceSource(SliceSource):
# Pass any positional and keyword arguments that you need
# to the constructor. `path` is just an example.
# Slice source argument `path` is just an example.
def __init__(self, path: str):
self.path = path
self.ds = None
self.dataset = None

def get_dataset(self) -> xr.Dataset:
# Write code here that obtains the dataset.
self.ds = xr.open_dataset(self.path)
# You can put any processing here.
return self.ds
# Bind any resources and provide dataset here, e.g.:
self.dataset = xr.open_dataset(self.path)
return self.dataset

def close(self):
# Write code here that performs cleanup.
if self.ds is not None:
self.ds.close()
self.ds = None
# Cleanup code here, release any bound resources, e.g.:
if self.dataset is not None:
self.dataset.close()
```

Instead of providing instances of `SliceSource` as slice items, it is often
easier to pass your `SliceSource` class and let `zappend` pass the slice item as
arguments(s) to your `SliceSource`'s constructor. This can be achieved using the
the `slice_source` configuration setting. If you need to access configuration
settings, it is even required to use the `slice_source` setting.
You may prefer implementing a class because your slice source is complex and you want
to split its logic into separate methods. You may also just prefer classes as a matter
of your personal taste. Another advantage of using a class is that you can pass
instances of it as slice items to the `zappend` function without further configuration.
However, the intended use of a slice source is to configure it by specifying the
`slice_source` setting. In a JSON or YAML configuration file it specifies the fully
qualified name of the slice source function or class:

```json
{
"slice_source": "mymodule.MySliceSource"
}
```

The `slice_source` setting can actually be **any Python function** that returns a
valid slice item as described above such as a file path or URI, or
an `xarray.Dataset`.
If you use the `zappend` function, you can pass the function or class directly:

```python
zappend(["slice-1.nc", "slice-2.nc", "slice-3.nc"],
target_dir="target.zarr",
slice_source=MySliceSource)
```

If a slice source is configured, each slice item passed to `zappend` is passed as
argument to your slice source.
If the slice source setting is used, each slice item passed to `zappend` is passed as
argument(s) to your slice source.

* Slices passed to the `zappend` CLI command become slice source arguments
of type `str`.
Expand All @@ -762,90 +803,73 @@ argument to your slice source.

You can also pass extra keyword arguments to your slice source using the
`slice_source_kwargs` setting. Keyword arguments passed as slice items take
precedence, that is, they overwrite arguments passed by `slice_source_kawrgs`.
precedence, that is, they overwrite arguments passed by `slice_source_kwargs`.

If your slice source has many parameters that stay the same for all slices you may
prefer providing parameters as configuration settings, rather than function or class
arguments. This can be achieved using the `extra` setting:

In addition, your slice source function or class constructor specified by
`slice_source` may define a 1st positional argument or keyword argument
named `ctx`, which will receive the current processing context of type
`zappend.api.Context`. This can be useful if you need to read configuration
settings.
```json
{
"extra": {
"quantiles": [0.1, 0.5, 0.9],
"use_threshold": true,
"filter": "gauss"
}
}
```

To access the settings in `extra` your slice source function or class constructor
must define a special argument named `ctx`. It must be a 1ˢᵗ positional argument or
a keyword argument. The argument `ctx` is the current processing context of type
`zappend.api.Context` that also contains the configuration. The settings in `extra`
can be accessed using the dictionary returned from `ctx.config.extra`.

Here is a more advanced example of a slice source that opens datasets from a given
file path and averages the values along the time dimension:

```python
import numpy as np
import xarray as xr
from zappend.api import Context
from zappend.api import SliceSource
from zappend.api import zappend

def get_mean_time(slice_ds: xr.Dataset) -> xr.DataArray:
time = slice_ds.time
t0 = time[0]
dt = time[-1] - t0
return xr.DataArray(np.array([t0 + dt / 2],
dtype=slice_ds.time.dtype),
dims="time")

def get_mean_slice(slice_ds: xr.Dataset) -> xr.Dataset:
mean_slice_ds = slice_ds.mean("time")
# Re-introduce time dimension of size one
mean_slice_ds = mean_slice_ds.expand_dims("time", axis=0)
mean_slice_ds.coords["time"] = get_mean_time(slice_ds)
return mean_slice_ds

class MySliceSource(SliceSource):
def __init__(self, slice_path):
def __init__(self, ctx: Context, slice_path: str):
self.quantiles = ctx.config.extra.get("quantiles", [0.5])
self.slice_path = slice_path
self.ds = None

def get_dataset(self):
self.ds = xr.open_dataset(self.slice_path)
return get_mean_slice(self.ds)
return self.get_agg_slice(self.ds)

def close(self):
if self.ds is not None:
self.ds.close()
self.ds = None

def get_agg_slice(self, slice_ds: xr.Dataset) -> xr.Dataset:
agg_slice_ds = slice_ds.quantile(self.quantiles, dim="time")
# Re-introduce time dimension of size one
agg_slice_ds = agg_slice_ds.expand_dims("time", axis=0)
agg_slice_ds.coords["time"] = self.get_mean_time(slice_ds)
return agg_slice_ds

@classmethod
def get_mean_time(cls, slice_ds: xr.Dataset) -> xr.DataArray:
time = slice_ds.time
t0 = time[0]
dt = time[-1] - t0
return xr.DataArray(np.array([t0 + dt / 2],
dtype=slice_ds.time.dtype),
dims="time")

zappend(["slice-1.nc", "slice-2.nc", "slice-3.nc"],
target_dir="target.zarr",
slice_source=MySliceSource)
```

Since zappend 0.7, a slice source can also be written as a Python
[context manager](https://docs.python.org/3/library/contextlib.html),
which allows you implementing the `get_dataset()` and `close()`
methods in one single function, instead of a class. Here is the above example
written as context manager.

```python
from contextlib import contextmanager
import numpy as np
import xarray as xr
from zappend.api import zappend

# Same as above here

@contextmanager
def get_slice_dataset(slice_path):
# allocate resources here
ds = xr.open_dataset(slice_path)
mean_ds = get_mean_slice(ds)
try:
# yield (!) the slice dataset
# so it can be appended
yield mean_ds
finally:
# after slice dataset has been appended
# release resources here
ds.close()

zappend(["slice-1.nc", "slice-2.nc", "slice-3.nc"],
target_dir="target.zarr",
slice_source=get_slice_dataset)
```

## Profiling

Runtime profiling is very important for understanding program runtime behavior
Expand Down
16 changes: 16 additions & 0 deletions tests/config/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,22 @@ def test_slice_source_kwargs(self):
{"a": 1, "b": True, "c": "nearest"}, config.slice_source_kwargs
)

def test_extra(self):
config = Config(
{
"target_dir": "memory://target.zarr",
}
)
self.assertEqual({}, config.extra)

config = Config(
{
"target_dir": "memory://target.zarr",
"extra": {"a": 1, "b": True, "c": "nearest"},
}
)
self.assertEqual({"a": 1, "b": True, "c": "nearest"}, config.extra)


def new_custom_slice_source(ctx: Context, index: int):
return CustomSliceSource(ctx, index)
Expand Down
1 change: 1 addition & 0 deletions tests/config/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def test_get_config_schema(self):
"disable_rollback",
"dry_run",
"excluded_variables",
"extra",
"force_new",
"fixed_dims",
"included_variables",
Expand Down
8 changes: 8 additions & 0 deletions zappend/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ def dry_run(self) -> bool:
"""Whether to run in dry mode."""
return bool(self._config.get("dry_run"))

@property
def extra(self) -> dict[str, Any]:
"""Extra settings.
Intended use is by a `slice_source` that expects an argument
named `ctx` to access the extra settings and other configuration.
"""
return self._config.get("extra") or {}

@property
def logging(self) -> dict[str, Any] | str | bool | None:
"""Logging configuration."""
Expand Down
9 changes: 9 additions & 0 deletions zappend/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,15 @@
"type": "boolean",
"default": False,
},
extra={
"description": (
"Extra settings."
" Intended use is by a `slice_source` that expects an argument"
" named `ctx` to access the extra settings and other configuration."
),
"type": "object",
"additionalProperties": True,
},
),
"additionalProperties": False,
}
Expand Down
Loading