Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## Version 0.5.2 (in development)


* Added configuration setting `force_new`, which forces creation of a new
target dataset. An existing target dataset (and its lock) will be
permanently deleted before appending of slice datasets begins. [#72]

## Version 0.5.1 (2024-02-23)

Expand Down
4 changes: 4 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Options:
the previous ones.
-t, --target TARGET Target Zarr dataset path or URI. Overrides the
'target_dir' configuration field.
--force-new Force creation of a new target dataset. An existing
target dataset (and its lock) will be permanently
deleted before appending of slice datasets begins.
WARNING: the deletion cannot be rolled back.
--dry-run Run the tool without creating, changing, or deleting
any files.
--traceback Show Python traceback on error.
Expand Down
6 changes: 6 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ The URI or local path of the directory that will be used to temporarily store ro
Type _object_.
Options for the filesystem given by the protocol of `temp_dir`.

## `force_new`

Type _boolean_.
Force creation of a new target dataset. An existing target dataset (and its lock) will be permanently deleted before appending of slice datasets begins. WARNING: the deletion cannot be rolled back.
Defaults to `false`.

## `disable_rollback`

Type _boolean_.
Expand Down
1 change: 1 addition & 0 deletions tests/config/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def test_get_config_schema(self):
"disable_rollback",
"dry_run",
"excluded_variables",
"force_new",
"fixed_dims",
"included_variables",
"logging",
Expand Down
34 changes: 34 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from zappend.api import FileObj
from zappend.api import SliceSource
from zappend.api import zappend
from zappend.fsutil.transaction import Transaction
from .helpers import clear_memory_fs
from .helpers import make_test_dataset

Expand Down Expand Up @@ -108,6 +109,39 @@ def test_some_slices_local_output_to_non_existing_dir(self):
for slice_dir in slices:
shutil.rmtree(slice_dir, ignore_errors=True)

def test_some_slices_local_output_to_existing_dir_force_new(self):
target_dir = "memory://target.zarr"
slices = [
"memory://slice-0.zarr",
"memory://slice-1.zarr",
"memory://slice-2.zarr",
"memory://slice-3.zarr",
]
for uri in slices:
make_test_dataset(uri=uri)

# Expect nothing else to happen, even though force_new=True.
zappend(slices[:1], target_dir=target_dir, force_new=True)
target_ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 3, "y": 50, "x": 100}, target_ds.sizes)

# Expect deletion of existing target_dir
zappend(slices[1:], target_dir=target_dir, force_new=True)
target_ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 9, "y": 50, "x": 100}, target_ds.sizes)

# Expect no changes, even if force_new=True, because dry_run=True
zappend(slices, target_dir=target_dir, force_new=True, dry_run=True)
target_ds = xr.open_zarr(target_dir)
self.assertEqual({"time": 9, "y": 50, "x": 100}, target_ds.sizes)

# Expect the lock file to be deleted too
lock_file = Transaction.get_lock_file(FileObj(target_dir))
lock_file.write("")
self.assertEqual(True, lock_file.exists())
zappend(slices, target_dir=target_dir, force_new=True)
self.assertEqual(False, lock_file.exists())

def test_some_slices_with_class_slice_source(self):
target_dir = "memory://target.zarr"
slices = [make_test_dataset(index=3 * i) for i in range(3)]
Expand Down
8 changes: 7 additions & 1 deletion tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import unittest

import pytest
import numpy as np
import pytest
import xarray as xr

from zappend.context import Context
Expand Down Expand Up @@ -35,6 +35,12 @@ def test_with_existing_target(self):
self.assertEqual(target_dir, ctx.target_dir.uri)
self.assertIsInstance(ctx.target_metadata, DatasetMetadata)

def test_force_new(self):
ctx = Context({"target_dir": "memory://target.zarr"})
self.assertEqual(False, ctx.force_new)
ctx = Context({"target_dir": "memory://target.zarr", "force_new": True})
self.assertEqual(True, ctx.force_new)

def test_append_dim(self):
ctx = Context({"target_dir": "memory://target.zarr"})
self.assertEqual("time", ctx.append_dim)
Expand Down
35 changes: 28 additions & 7 deletions zappend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,35 @@
"-c",
metavar="CONFIG",
multiple=True,
help="Configuration JSON or YAML file."
" If multiple are passed, subsequent configurations"
" are incremental to the previous ones.",
help=(
"Configuration JSON or YAML file."
" If multiple are passed, subsequent configurations"
" are incremental to the previous ones."
),
)
@click.option(
"--target",
"-t",
metavar="TARGET",
help="Target Zarr dataset path or URI."
" Overrides the 'target_dir' configuration field.",
help=(
"Target Zarr dataset path or URI."
" Overrides the 'target_dir' configuration field."
),
)
@click.option(
"--force-new",
is_flag=True,
help=(
"Force creation of a new target dataset."
" An existing target dataset (and its lock) will be"
" permanently deleted before appending of slice datasets"
" begins. WARNING: the deletion cannot be rolled back."
),
)
@click.option(
"--dry-run",
is_flag=True,
help="Run the tool without creating, changing," " or deleting any files.",
help="Run the tool without creating, changing, or deleting any files.",
)
@click.option(
"--traceback",
Expand All @@ -49,6 +63,7 @@ def zappend(
slices: tuple[str, ...],
config: tuple[str, ...],
target: str | None,
force_new: bool,
dry_run: bool,
traceback: bool,
version: bool,
Expand Down Expand Up @@ -78,7 +93,13 @@ def zappend(

# noinspection PyBroadException
try:
zappend(slices, config=config, target_dir=target, dry_run=dry_run)
zappend(
slices,
config=config,
target_dir=target,
force_new=force_new,
dry_run=dry_run,
)
except BaseException as e:
if traceback:
import traceback as tb
Expand Down
10 changes: 10 additions & 0 deletions zappend/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,16 @@
"type": "object",
"additionalProperties": True,
},
force_new={
"description": (
"Force creation of a new target dataset. "
" An existing target dataset (and its lock) will be"
" permanently deleted before appending of slice datasets"
" begins. WARNING: the deletion cannot be rolled back."
),
"type": "boolean",
"default": False,
},
disable_rollback={
"description": (
"Disable rolling back dataset changes on failure."
Expand Down
5 changes: 5 additions & 0 deletions zappend/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ def persist_mem_slices(self) -> bool:
"""Whether to persist in-memory slice datasets."""
return self._config.get("persist_mem_slices", False)

@property
def force_new(self) -> bool:
"""If set, an existing target dataset will be deleted."""
return self._config.get("force_new", False)

@property
def disable_rollback(self) -> bool:
"""Whether to disable transaction rollbacks."""
Expand Down
7 changes: 5 additions & 2 deletions zappend/fsutil/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,18 @@ def __init__(
):
transaction_id = f"zappend-{uuid.uuid4()}"
rollback_dir = temp_dir / transaction_id
lock_file = target_dir.parent / (target_dir.filename + LOCK_EXT)
self._id = transaction_id
self._rollback_dir = rollback_dir
self._rollback_file = rollback_dir / ROLLBACK_FILE
self._target_dir = target_dir
self._lock_file = lock_file
self._lock_file = self.get_lock_file(target_dir)
self._disable_rollback = disable_rollback
self._entered_ctx = False

@classmethod
def get_lock_file(cls, file_obj: FileObj) -> FileObj:
return file_obj.parent / (file_obj.filename + LOCK_EXT)

@property
def target_dir(self) -> FileObj:
"""Target directory that is subject to this transaction."""
Expand Down
39 changes: 33 additions & 6 deletions zappend/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

from typing import Iterable, Any
import collections.abc
from typing import Iterable, Any

import numpy as np
import xarray as xr
import zarr.attrs
import zarr.convenience

from .config import ConfigLike
from .config import exclude_from_config
from .config import normalize_config
from .config import validate_config
from .config import eval_dyn_config_attrs
from .config import exclude_from_config
from .config import get_dyn_config_attrs_env
from .config import has_dyn_config_attrs
from .config import normalize_config
from .config import validate_config
from .context import Context
from .fsutil.transaction import Transaction
from .fsutil import FileObj
from .fsutil.transaction import RollbackCallback
from .log import logger
from .fsutil.transaction import Transaction
from .log import configure_logging
from .log import logger
from .profiler import Profiler
from .rollbackstore import RollbackStore
from .slice import SliceObj
Expand Down Expand Up @@ -53,6 +54,12 @@ def __init__(self, config: ConfigLike = None, **kwargs):
configure_logging(config.get("logging"))
self._profiler = Profiler(config.get("profiling"))
self._config = config
if config.get("force_new"):
logger.warning(
f"Setting 'force_new' is enabled. This will"
f" permanently delete existing targets (no rollback)."
)
delete_target_permanently(config)

def process_slices(self, slices: Iterable[SliceObj]):
"""Process the given `slices`.
Expand Down Expand Up @@ -119,6 +126,26 @@ def process_slice(self, slice_obj: SliceObj, slice_index: int = 0):
update_target_from_slice(ctx, slice_dataset, rollback_callback)


def delete_target_permanently(config: dict[str, Any]):
# TODO: I'm not happy with the config being a dict here, because it
# implies and hence duplicates definition of default values.
# Make Processor constructor turn config dict into config object,
# Pass config object to Context and publish via ctx.config property.
dry_run = config.get("dry_run", False)
target_uri = config.get("target_dir")
target_storage_options = config.get("target_storage_options")
target_dir = FileObj(target_uri, storage_options=target_storage_options)
if target_dir.exists():
logger.warning(f"Permanently deleting {target_dir}")
if not dry_run:
target_dir.delete(recursive=True)
target_lock = Transaction.get_lock_file(target_dir)
if target_lock.exists():
logger.warning(f"Permanently deleting {target_lock}")
if not dry_run:
target_lock.delete()


def create_target_from_slice(
ctx: Context, slice_ds: xr.Dataset, rollback_cb: RollbackCallback
):
Expand Down