Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
## Version 0.5.2 (in development)
## Version 0.6.0 (in development)

* Added configuration setting `force_new`, which forces creation of a new
target dataset. An existing target dataset (and its lock) will be
permanently deleted before appending of slice datasets begins. [#72]

* Simplified writing of custom slice sources for users. The configuration setting
`slice_source` can now be a `SliceSource` class or any function that returns a
_slice item_: an `xarray.Dataset` object, a `SliceSource` object or
local file path or URI of type `str` or `FileObj`.
Dropped concept of _slice factories_ entirely. [#78]

* Internal refactoring: Extracted `Config` class out of `Context` and
made available via new `Context.config: Config` property.
The change concerns any usages of the `ctx: Context` argument passed to
user slice factories. [#74]

## Version 0.5.1 (2024-02-23)

* Fixed rollback for situations where writing to Zarr fails shortly after the
Expand Down
21 changes: 6 additions & 15 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ All described objects can be imported from the `zappend.api` module.
options:
show_root_heading: true

## Function `to_slice_factories()`

::: zappend.api.to_slice_factories
options:
show_root_heading: true

## Function `to_slice_factory()`

::: zappend.api.to_slice_factory
options:
show_root_heading: true

## Class `SliceSource`

::: zappend.api.SliceSource
Expand All @@ -28,21 +16,24 @@ All described objects can be imported from the `zappend.api` module.

::: zappend.api.Context

## Class `Config`

::: zappend.api.Config

## Class `FileObj`

::: zappend.api.FileObj

## Types

::: zappend.api.SliceObj
::: zappend.api.SliceItem
options:
show_root_heading: true

::: zappend.api.SliceFactory
::: zappend.api.SliceCallable
options:
show_root_heading: true


::: zappend.api.ConfigItem
options:
show_root_heading: true
Expand Down
214 changes: 113 additions & 101 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,38 @@ passed using the optional `target_storage_options` setting.

### Slice Datasets

If the slice paths passed to the `zappend` tool are given as URIs,
additional storage options may be provided for the filesystem given by the
URI's protocol. They may be specified using the `slice_storage_options` setting.
A _slice dataset_ is the dataset that is appended for every slice item passed
to `zappend`. Slice datasets can be provided in various ways.

* When using the [zappend CLI command](cli.md), slice items are passed as
command arguments where they point to slice datasets by local file paths or URIs.

* When using the [zappend Python function](api.md), slice items are passed
using the `slices` argument, which is a Python iterable. You can pass a list or tuple
of slice items or provide a Python generator that provides the slice items.

Each slice item can be a local file path or URI of type `str` or `FileObj`,
a dataset of type `xarray.Dataset`, or a `SliceSource` object explained in more detail
below.

#### Paths and URIs

A slice item of type `str` is interpreted as local file path or URI, in the case
the path has a protocol prefix, such as `s3://` as described above.

In the majority of `zappend` use cases the slice datasets to be appended to a target
dataset are passed as local file paths or URIs. A slice URI starts with a protocol
prefix, such as `s3://`, or `memory://`. Additional storage options may be required
for the filesystem given by the URI's protocol. They may be specified using the
`slice_storage_options` setting.

```json
{
"slice_storage_options": {
"anon": true
}
}
```

Sometimes, the slice dataset to be processed are not yet available, e.g.,
because another process is currently generating them. For such cases, the
Expand All @@ -563,82 +592,29 @@ Or use default polling:
}
```


### Slice Sources

A _slice source_ is an object that provides a slice dataset of type `xarray.Dataset`
for given parameters of any type.

The optional `slice_source` configuration setting is used to specify a custom
slice source. If not specified, `zappend` selects the slice source based on the type
of a given slice object. These types are described in following subsections.

If given, the value of the `slice_source` setting is a class derived from
`zappend.api.SliceSource`, or a function that creates an instance of
`zappend.api.SliceSource`, or the fully qualified name of the aforementioned.
In the case `slice_source` is given, the _slices_ argument passed to the CLI
command and Python function become parameters to the specified class constructor
or factory function.
The individual slice items in the `SLICES` arguments of the `zappend` CLI
command are of type `str`, typically interpreted as file paths or URIs.
The individual slice items passed in the `slices` argument of the
`zappend.api.zappend()` function can be of any type, but the `tuple`, `list`,
and `dict` types have a special meaning:

* `tuple`: a pair of the form `(args, kwargs)`, where `args` is a list
or tuple of positional arguments and `kwargs` is a dictionary of keyword
arguments;
* `list`: positional arguments only;
* `dict`: keyword arguments only;
* Any other type is interpreted as single positional argument.

In addition, your class constructor or factory function specified by `slice_source`
may specify a positional or keyword argument named `ctx`, which will receive the
current processing context of type `zappend.api.Context`.

If the `slice_source` setting is _not_ specified, the slice items passed as `slices`
argument to the [`zappend`](api.md) Python function can be one of the types described
in the following subsections.

#### Types `str` and `FileObj`

A slice object of type `str` is interpreted as local file path or URI, in the case
the path has a protocol prefix, such as `s3://`.

An alternative to providing the slice dataset as path or URI is using the
`zappend.api.FileObj` class, which combines a URI with dedicated filesystem
storage options.

```python
from zappend.api import FileObj

slice_obj = FileObj(slice_uri, storage_options=dict(...))
```

#### Type `Dataset`

In-memory slice objects can be passed as [`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html)
objects. Such objects may originate from opening datasets from some storage

```python
import xarray as xr
#### Dataset Objects

slice_obj = xr.open_dataset(slice_store, ...)
```
You can also use dataset objects of type
[`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html)
as slice item. Such objects may originate from opening datasets from some storage, e.g.,
`xarray.open_dataset(slice_store, ...)` or by composing, aggregating, resampling
slice datasets from other datasets and data variables.

or by composing, aggregating, resampling slice datasets from other datasets and
data variables. To allow for out-of-core computation of large datasets [Dask arrays](https://docs.dask.org/en/stable/array.html)
are used by both `xarray` and `zarr`. As a dask array may represent complex and/or
Chunked data arrays of an `xarray.Dataset` are usually instances of
[Dask arrays](https://docs.dask.org/en/stable/array.html), to allow for out-of-core
computation of large datasets. As a dask array may represent complex and/or
expensive processing graphs, high CPU loads and memory consumption are common issues
for computed slice datasets, especially if the specified target dataset chunking is
different from the slice dataset chunking. This may cause Dask graphs to be
computed multiple times if the source chunking overlaps multiple target chunks,
potentially causing large resource overheads while recomputing and/or reloading the same
source chunks multiple times.

In such cases it can help to "terminate" such computations for each slice by
persisting the computed dataset first and then to reopen it. This can be specified
using the `persist_mem_slice` setting:
source chunks multiple times. In such cases it can help to "terminate" such
computations for each slice by persisting the computed dataset first and then to
reopen it. This can be specified using the `persist_mem_slice` setting:

```json
{
Expand All @@ -650,34 +626,82 @@ If the flag is set, in-memory slices will be persisted to a temporary Zarr befor
appending them to the target dataset. It may prevent expensive re-computation of chunks
at the cost of additional i/o. It therefore defaults to `false`.

#### Type `SliceSource`
#### Slice Sources

If you need some custom cleanup after a slice has been processed and appended to the
target dataset, you can use an instance of `zappend.api.SliceSource` as slice item.
A `SliceSource` class requires you to implement two methods:

Often you want to perform some custom cleanup after a slice has been processed and
appended to the target dataset. In this case you can write your own
`zappend.api.SliceSource` by implementing its `get_dataset()` and `dispose()`
methods.
* `get_dataset()` to return the slice dataset of type `xarray.Dataset`, and
* `dispose()` to perform any resource cleanup tasks.

Slice source instances are supposed to be created by _slice factories_, see
subsection below.
Here is the template code for your own slice source implementation:

#### Type `SliceFactory`
```python
import xarray as xr
from zappend.api import SliceSource

A slice factory is a 1-argument function that receives a processing context of type
`zappend.api.Context` and yields a slice dataset object of one of the types
described above. Since a slice factory cannot have additional arguments, it is
normally defined as a [closure](https://en.wikipedia.org/wiki/Closure_(computer_programming))
to capture slice-specific information.
class MySliceSource(SliceSource):
# Pass any positional and keyword arguments that you need
# to the constructor. `path` is just an example.
def __init__(self, path: str):
self.path = path
self.ds = None

def get_dataset(self) -> xr.Dataset:
# Write code here that obtains the dataset.
self.ds = xr.open_dataset(self.path)
# You can put any processing here.
return self.ds

def dispose(self):
# Write code here that performs cleanup.
if self.ds is not None:
self.ds.close()
self.ds = None
```

In the following example, the actual slice dataset is computed from averaging another
dataset. A `SliceSource` is used to close the datasets after the slice has been
processed. Slice factories are created from the custom slice source and the slice paths
using the utility function [to_slice_factories()][zappend.slice.factory.to_slice_factories]:
Instead of providing instances of `SliceSource` directly as a slice item, it is often
easier to pass your `SliceSource` class and let `zappend` pass the slice item as
arguments(s) to your `SliceSource`'s constructor. This can be achieved using the
the `slice_source` configuration setting.

```json
{
"slice_source": "mymodule.MySliceSource"
}
```

The `slice_source` setting can actually be **any Python function** that returns a
valid slice item as described above.

If a slice source is configured, each slice item passed to `zappend` is passed as
argument to your slice source.

* Slices passed to the `zappend` CLI command become slice source arguments
of type `str`.
* Slice items passed to the `zappend` function via the `slices` argument can be of
any type, but the `tuple`, `list`, and `dict` types have a special meaning:

- `tuple`: a pair of the form `(args, kwargs)`, where `args` is a list
or tuple of positional arguments and `kwargs` is a dictionary of keyword
arguments;
- `list`: positional arguments only;
- `dict`: keyword arguments only;
- Any other type is interpreted as single positional argument.

In addition, your slice source function or class constructor specified by `slice_source`
may define a 1st positional argument or keyword argument named `ctx`,
which will receive the current processing context of type `zappend.api.Context`.
This can be useful if you need to read configuration settings.

Here is a more advanced example of a slice source that opens datasets from a given
file path and averages the values first along the time dimension:

```python
import numpy as np
import xarray as xr
from zappend.api import SliceSource
from zappend.api import to_slice_factories
from zappend.api import zappend

def get_mean_time(slice_ds: xr.Dataset) -> xr.DataArray:
Expand All @@ -690,37 +714,25 @@ def get_mean_time(slice_ds: xr.Dataset) -> xr.DataArray:

def get_mean_slice(slice_ds: xr.Dataset) -> xr.Dataset:
mean_slice_ds = slice_ds.mean("time")
# Re-introduce time dimension of size one
mean_slice_ds = mean_slice_ds.expand_dims("time", axis=0)
mean_slice_ds.coords["time"] = get_mean_time(slice_ds)
return mean_slice_ds

class MySliceSource(SliceSource):
def __init__(self, ctx, slice_path):
super().__init__(ctx)
def __init__(self, slice_path):
self.slice_path = slice_path
self.ds = None
self.mean_ds = None

def get_dataset(self):
self.ds = xr.open_dataset(self.slice_path)
self.mean_ds = get_mean_slice(self.ds)
return self.mean_ds
return get_mean_slice(self.ds)

def dispose(self):
if self.ds is not None:
self.ds.close()
self.ds = None
if self.mean_ds is not None:
self.mean_ds.close()
self.mean_ds = None

zappend(to_slice_factories(MySliceSource, ["slice-1.nc", "slice-2.nc", "slice-3.nc"]),
target_dir="target.zarr")
```

Note, the above example can be simplified by using the `slice_source` setting directly:

```python
zappend(["slice-1.nc", "slice-2.nc", "slice-3.nc"],
target_dir="target.zarr",
slice_source=MySliceSource)
Expand Down
Loading