Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dvc/commands/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from funcy import chunks, compact, log_durations

from dvc.cli import formatter
from dvc.cli import completion, formatter
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.log import logger
Expand Down Expand Up @@ -108,6 +108,7 @@ def _show_status(cls, status: "DataStatus") -> int: # noqa: C901
def run(self) -> int:
with log_durations(logger.trace, "in data_status"):
status = self.repo.data_status(
targets=self.args.targets,
granular=self.args.granular,
untracked_files=self.args.untracked_files,
not_in_remote=self.args.not_in_remote,
Expand Down Expand Up @@ -147,6 +148,14 @@ def add_parser(subparsers, parent_parser):
formatter_class=formatter.RawDescriptionHelpFormatter,
help=DATA_STATUS_HELP,
)
data_status_parser.add_argument(
"targets",
nargs="*",
help=(
"Limit command scope to these tracked files/directories, "
".dvc files and stage names."
),
).complete = completion.FILE # type: ignore[attr-defined]
data_status_parser.add_argument(
"--json",
action="store_true",
Expand Down
171 changes: 142 additions & 29 deletions dvc/repo/data.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import os
import posixpath
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, TypedDict, Union
from typing import TYPE_CHECKING, Optional, TypedDict, Union

from dvc.fs.callbacks import DEFAULT_CALLBACK
from dvc.repo.worktree import worktree_view
from dvc.ui import ui
from dvc_data.index.view import DataIndexView

if TYPE_CHECKING:
from dvc.fs.callbacks import Callback
from dvc.repo import Repo
from dvc.scm import Git, NoSCM
from dvc_data.index import DataIndex
from dvc_data.index import BaseDataIndex, DataIndex, DataIndexKey
from dvc_data.index.diff import Change


Expand Down Expand Up @@ -47,12 +47,13 @@ def _adapt_path(change: "Change") -> str:


def _diff(
old: "DataIndex",
new: "DataIndex",
old: "BaseDataIndex",
new: "BaseDataIndex",
*,
granular: bool = False,
not_in_cache: bool = False,
callback: "Callback" = DEFAULT_CALLBACK,
filter_keys: Optional[list["DataIndexKey"]] = None,
) -> dict[str, list[str]]:
from dvc_data.index.diff import UNCHANGED, UNKNOWN, diff

Expand All @@ -74,6 +75,16 @@ def _add_change(typ, change):
with_unknown=True,
callback=callback,
):
# The index is a trie, so even when we filter by a specific path
# like `dir/file`, all parent nodes leading to that path (e.g., `dir/`)
# still appear in the view. As a result, keys like `dir/` will be present
# even if only `dir/file` matches the filter.
# We need to skip such entries to avoid showing root of tracked directories.
if filter_keys and not any(
change.key[: len(filter_key)] == filter_key for filter_key in filter_keys
):
continue

if (
change.typ == UNCHANGED
and (not change.old or not change.old.hash_info)
Expand Down Expand Up @@ -136,7 +147,45 @@ def _git_info(scm: Union["Git", "NoSCM"], untracked_files: str = "all") -> GitIn
)


def _diff_index_to_wtree(repo: "Repo", **kwargs: Any) -> dict[str, list[str]]:
def filter_index(
index: Union["DataIndex", "DataIndexView"],
filter_keys: Optional[list["DataIndexKey"]] = None,
) -> "BaseDataIndex":
if not filter_keys:
return index

if isinstance(index, DataIndexView):
orig_index = index._index
parent_filter_fn = index.filter_fn
else:
orig_index = index
parent_filter_fn = None

def filter_fn(key: "DataIndexKey") -> bool:
if parent_filter_fn is not None and not parent_filter_fn(key):
return False

for filter_key in filter_keys:
# eg: if key is "dir/file" and filter_key is "dir/", return True
if key[: len(filter_key)] == filter_key:
return True
# eg: if key is `dir/` and filter_key is `dir/file`, also return True.
# This ensures we include parent prefixes needed to reach matching leaves.
# Intermediate prefixes must be retained to access nested keys.
if filter_key[: len(key)] == key:
return True
return False

from dvc_data.index import view

return view(orig_index, filter_fn=filter_fn)


def _diff_index_to_wtree(
repo: "Repo",
filter_keys: Optional[list["DataIndexKey"]] = None,
granular: bool = False,
) -> dict[str, list[str]]:
from .index import build_data_index

with ui.progress(desc="Building workspace index", unit="entry") as pb:
Expand All @@ -147,30 +196,45 @@ def _diff_index_to_wtree(repo: "Repo", **kwargs: Any) -> dict[str, list[str]]:
compute_hash=True,
callback=pb.as_callback(),
)
workspace_view = filter_index(workspace, filter_keys=filter_keys)

with ui.progress(
desc="Calculating diff between index/workspace",
unit="entry",
) as pb:
index = repo.index.data["repo"]
view = filter_index(index, filter_keys=filter_keys)
return _diff(
repo.index.data["repo"],
workspace,
view,
workspace_view,
filter_keys=filter_keys,
granular=granular,
not_in_cache=True,
callback=pb.as_callback(),
**kwargs,
)


def _diff_head_to_index(
repo: "Repo", head: str = "HEAD", **kwargs: Any
repo: "Repo",
head: str = "HEAD",
filter_keys: Optional[list["DataIndexKey"]] = None,
granular: bool = False,
) -> dict[str, list[str]]:
index = repo.index.data["repo"]
index_view = filter_index(index, filter_keys=filter_keys)

with repo.switch(head):
head_index = repo.index.data["repo"]
head_view = filter_index(head_index, filter_keys=filter_keys)

with ui.progress(desc="Calculating diff between head/index", unit="entry") as pb:
return _diff(head_index, index, callback=pb.as_callback(), **kwargs)
return _diff(
head_view,
index_view,
filter_keys=filter_keys,
granular=granular,
callback=pb.as_callback(),
)


class Status(TypedDict):
Expand Down Expand Up @@ -204,46 +268,87 @@ def _transform_git_paths_to_dvc(repo: "Repo", files: Iterable[str]) -> list[str]

def _get_entries_not_in_remote(
repo: "Repo",
filter_keys: Optional[list["DataIndexKey"]] = None,
granular: bool = False,
remote_refresh: bool = False,
) -> list[str]:
"""Get entries that are not in remote storage."""
from dvc.repo.worktree import worktree_view
from dvc_data.index import StorageKeyError

# View into the index, with only pushable entries
view = worktree_view(repo.index, push=True).data["repo"]
index = worktree_view(repo.index, push=True)
data_index = index.data["repo"]

missing_entries = []
for key, entry in view.iteritems(shallow=not granular):
if not (entry and entry.hash_info):
continue
view = filter_index(data_index, filter_keys=filter_keys) # type: ignore[arg-type]

k = (*key, "") if entry.meta and entry.meta.isdir else key
try:
if not view.storage_map.remote_exists(entry, refresh=remote_refresh):
missing_entries.append(os.path.sep.join(k))
except StorageKeyError:
pass
missing_entries = []
with ui.progress(desc="Checking remote", unit="entry") as pb:
for key, entry in view.iteritems(shallow=not granular):
if not (entry and entry.hash_info):
continue

# The index is a trie, so even when we filter by a specific path
# like `dir/file`, all parent nodes leading to that path (e.g., `dir/`)
# still appear in the view. As a result, keys like `dir/` will be present
# even if only `dir/file` matches the filter.
# We need to skip such entries to avoid showing root of tracked directories.
if filter_keys and not any(
key[: len(filter_key)] == filter_key for filter_key in filter_keys
):
continue

k = (*key, "") if entry.meta and entry.meta.isdir else key
try:
if not view.storage_map.remote_exists(entry, refresh=remote_refresh):
missing_entries.append(os.path.sep.join(k))
pb.update()
except StorageKeyError:
pass

return missing_entries


def _matches_target(p: str, targets: Iterable[str]) -> bool:
sep = os.sep
return any(p == t or p.startswith(t + sep) for t in targets)


def _prune_keys(filter_keys: list["DataIndexKey"]) -> list["DataIndexKey"]:
sorted_keys = sorted(set(filter_keys), key=len)
result: list[DataIndexKey] = []

for key in sorted_keys:
if not any(key[: len(prefix)] == prefix for prefix in result):
result.append(key)
return result


def status(
repo: "Repo",
targets: Optional[Iterable[Union[os.PathLike[str], str]]] = None,
*,
granular: bool = False,
untracked_files: str = "no",
not_in_remote: bool = False,
remote_refresh: bool = False,
granular: bool = False,
head: str = "HEAD",
) -> Status:
from dvc.scm import NoSCMError, SCMError

uncommitted_diff = _diff_index_to_wtree(repo, granular=granular)
unchanged = set(uncommitted_diff.pop("unchanged", []))
targets = targets or []
filter_keys: list[DataIndexKey] = [repo.fs.relparts(os.fspath(t)) for t in targets]
# try to remove duplicate and overlapping keys
filter_keys = _prune_keys(filter_keys)

uncommitted_diff = _diff_index_to_wtree(
repo, filter_keys=filter_keys, granular=granular
)
unchanged = set(uncommitted_diff.pop("unchanged", []))
entries_not_in_remote = (
_get_entries_not_in_remote(
repo,
filter_keys=filter_keys,
granular=granular,
remote_refresh=remote_refresh,
)
Expand All @@ -252,16 +357,24 @@ def status(
)

try:
committed_diff = _diff_head_to_index(repo, head=head, granular=granular)
committed_diff = _diff_head_to_index(
repo, filter_keys=filter_keys, head=head, granular=granular
)
except (SCMError, NoSCMError):
committed_diff = {}
else:
unchanged &= set(committed_diff.pop("unchanged", []))

git_info = _git_info(repo.scm, untracked_files=untracked_files)
untracked = git_info.get("untracked", [])
untracked = _transform_git_paths_to_dvc(repo, untracked)

scm_filter_targets = {
os.path.relpath(os.path.abspath(t), repo.scm.root_dir) for t in targets
}
untracked_it: Iterable[str] = git_info.get("untracked", [])
if scm_filter_targets:
untracked_it = (
f for f in untracked_it if _matches_target(f, scm_filter_targets)
)
untracked = _transform_git_paths_to_dvc(repo, untracked_it)
# order matters here
return Status(
not_in_cache=uncommitted_diff.pop("not_in_cache", []),
Expand Down
Loading