Skip to content

Commit

Permalink
worktree push: separate worktree from version_aware
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Nov 28, 2022
1 parent 5b90893 commit a9b961a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 20 deletions.
2 changes: 1 addition & 1 deletion dvc/repo/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def push(
from dvc.repo.worktree import push_worktree

_remote = self.cloud.get_remote(name=remote)
if _remote.worktree:
if _remote.worktree or _remote.fs.version_aware:
return push_worktree(self, _remote)

used_run_cache = (
Expand Down
83 changes: 64 additions & 19 deletions dvc/repo/worktree.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TYPE_CHECKING, Optional
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional

from dvc.fs.callbacks import Callback

Expand All @@ -9,12 +10,17 @@
from dvc.repo import Repo
from dvc.stage import Stage
from dvc.types import TargetType
from dvc_data.hashfile.meta import Meta
from dvc_data.index import DataIndexView

logger = logging.getLogger(__name__)


def worktree_view(
index: "Index",
targets: Optional["TargetType"] = None,
push: bool = False,
latest_only: bool = True,
**kwargs,
) -> "IndexView":
"""Return view of data that can be stored in worktree remotes.
Expand All @@ -38,6 +44,10 @@ def outs_filter(out: "Output") -> bool:
or (push and not out.can_push)
):
return False
# If we are not enforcing push to latest version and have a version
# for this out, we assume it still exists and can skip pushing it
if push and not latest_only and out.meta.version_id is not None:
return False
return True

return index.targets_view(
Expand Down Expand Up @@ -68,32 +78,63 @@ def fetch_worktree(repo: "Repo", remote: "Remote") -> int:


def push_worktree(repo: "Repo", remote: "Remote") -> int:
from dvc_data.index import checkout
from dvc_data.index.save import build_tree

view = worktree_view(repo.index, push=True)
index = view.data["repo"]
total = len(index)
from dvc_data.index import build, checkout
from dvc_data.index import view as data_view

view = worktree_view(repo.index, push=True, latest_only=remote.worktree)
new_index = view.data["repo"]
if remote.worktree:
logger.debug("Indexing latest worktree for '%s'", remote.path)
old_index = build(remote.path, remote.fs)
old_view: Optional["DataIndexView"] = data_view(
old_index, lambda key: key in new_index
)
logger.debug("Pushing worktree changes to '%s'", remote.path)
else:
old_view = None
logger.debug("Pushing version-aware files to '%s'", remote.path)

if remote.worktree:

# for files, if our version's checksum (etag) matches the latest remote
# checksum, we do not need to push, even if the version IDs don't match
def _checksum(meta: "Meta") -> Any:
if not meta or meta.isdir:
return meta
return getattr(meta, remote.fs.PARAM_CHECKSUM)

diff_kwargs: Dict[str, Any] = {
"meta_only": True,
"meta_cmp_key": _checksum,
}
else:
diff_kwargs = {}

total = len(new_index)
with Callback.as_tqdm_callback(
unit="file", desc="Push", disable=total == 0
) as cb:
cb.set_size(total)
pushed = checkout(
index,
new_index,
remote.path,
remote.fs,
latest_only=False,
old=old_view,
delete=remote.worktree,
callback=cb,
latest_only=remote.worktree,
**diff_kwargs,
)
if pushed:
_update_pushed_meta(repo, view)
return pushed

for stage in view.stages:
for out in stage.outs:
if not out.use_cache:
continue

if not out.is_in_repo:
continue
def _update_pushed_meta(repo: "Repo", view: "IndexView"):
from dvc_data.index.save import build_tree

for stage in view.stages:
for out in stage.outs:
workspace, key = out.index_key
index = repo.index.data[workspace]
entry = index[key]
Expand All @@ -105,17 +146,21 @@ def push_worktree(repo: "Repo", remote: "Remote") -> int:
if entry.meta.isdir:
continue
fs_path = repo.fs.path.join(repo.root_dir, *subkey)
_, hash_info = old_tree.get(
meta, hash_info = old_tree.get(
repo.fs.path.relparts(fs_path, out.fs_path)
)
entry.hash_info = hash_info
if meta.version_id is not None:
# preserve existing version IDs for unchanged files in
# this dir (entry will have the latest remote version
# ID after checkout)
entry.meta = meta
tree_meta, new_tree = build_tree(index, key)
out.obj = new_tree
out.hash_info = new_tree.hash_info
out.meta = tree_meta
else:
out.hash_info = entry.hash_info
out.meta = entry.meta
if out.meta.version_id is None:
out.meta = entry.meta
stage.dvcfile.dump(stage, with_files=True, update_pipeline=False)

return pushed

0 comments on commit a9b961a

Please sign in to comment.