Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] import-url: cloud versioned imports #8164

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dvc/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class RelPath(str):
"checksum_jobs": All(Coerce(int), Range(1)),
"jobs": All(Coerce(int), Range(1)),
Optional("no_traverse"): Bool, # obsoleted
Optional("version_aware"): Bool,
}
LOCAL_COMMON = {
"type": supported_cache_type,
Expand Down
76 changes: 73 additions & 3 deletions dvc/dependency/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from typing import Type
from collections import defaultdict
from copy import copy
from typing import TYPE_CHECKING, Dict, Optional, Set, Type

from dvc.exceptions import DvcException
from dvc.output import Output
from dvc_data.hashfile.meta import Meta

if TYPE_CHECKING:
from dvc_data.hashfile.hash_info import HashInfo
from dvc_objects.db import ObjectDB


class DependencyDoesNotExistError(DvcException):
Expand Down Expand Up @@ -30,5 +37,68 @@ class Dependency(Output):
) # type: Type[DvcException]
IsStageFileError = DependencyIsStageFileError # type: Type[DvcException]

def update(self, rev=None):
pass
def get_used_objs(
self, **kwargs
) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]:
from dvc_data.build import build
from dvc_data.objects.tree import Tree, TreeError

if not self.meta.version_id:
return super().get_used_objs(**kwargs)

used_obj_ids: Dict[
Optional["ObjectDB"], Set["HashInfo"]
] = defaultdict(set)
local_odb = self.repo.odb.local
try:
object_store, _, obj = build(
local_odb,
self.fs_path,
self.fs,
local_odb.fs.PARAM_CHECKSUM,
)
except (FileNotFoundError, TreeError) as exc:
raise DvcException(
f"The path '{self.fs_path}' does not exist in the remote."
) from exc
# TODO: support versioned import writes (push)
object_store = copy(object_store)
object_store.read_only = True

used_obj_ids[object_store].add(obj.hash_info)
if isinstance(obj, Tree):
used_obj_ids[object_store].update(oid for _, _, oid in obj)
return used_obj_ids

def workspace_status(self):
if not self.meta.version_id:
return super().workspace_status()

current = self.meta.version_id
fs_path = self.fs.path.version_path(self.fs_path, None)
updated = self.fs.info(fs_path)[Meta.PARAM_VERSION_ID]

if current != updated:
return {str(self): "update available"}

return {}

def status(self):
if not self.meta.version_id:
return super().status()

return self.workspace_status()

def update(self, rev: Optional[str] = None):
if not self.meta.version_id:
return

if rev:
self.meta.version_id = rev
else:
fs_path = self.fs.path.version_path(self.fs_path, rev)
details = self.fs.info(fs_path)
self.meta.version_id = details[Meta.PARAM_VERSION_ID]
self.fs_path = self.fs.path.version_path(
self.fs_path, self.meta.version_id
)
34 changes: 30 additions & 4 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,15 @@ def __init__(
repo=None,
):
self.repo = stage.repo if not repo and stage else repo
fs_cls, fs_config, fs_path = get_cloud_fs(self.repo, url=path)
meta = Meta.from_dict(info)
# NOTE: when version_aware is not passed into get_cloud_fs, it will be
# set based on whether or not path is versioned
fs_kwargs = {"version_aware": True} if meta.version_id else {}
fs_cls, fs_config, fs_path = get_cloud_fs(
self.repo,
url=path,
**fs_kwargs,
)
self.fs = fs_cls(**fs_config)

if (
Expand Down Expand Up @@ -320,7 +328,7 @@ def __init__(
# By resolved path, which contains actual location,
# should be absolute and don't contain remote:// refs.
self.stage = stage
self.meta = Meta.from_dict(info)
self.meta = meta
self.hash_info = HashInfo.from_dict(info)
self.use_cache = False if self.IS_DEPENDENCY else cache
self.metric = False if self.IS_DEPENDENCY else metric
Expand All @@ -335,6 +343,17 @@ def __init__(

self.remote = remote

if self.fs.version_aware:
self.def_path, version_id = self.fs.path.coalesce_version(
self.def_path, self.meta.version_id
)
self.fs_path = self.fs.path.version_path(self.fs_path, version_id)
self.meta.version_id = version_id
elif self.meta.version_id:
raise DvcException(
"Version ID unsupported for non-versioned filesystem"
)

def _parse_path(self, fs, fs_path):
parsed = urlparse(self.def_path)
if (
Expand All @@ -360,6 +379,10 @@ def __repr__(self):

def __str__(self):
if self.fs.protocol != "local":
if self.meta.version_id:
return self.fs.path.version_path(
self.def_path, self.meta.version_id
)
return self.def_path

if (
Expand Down Expand Up @@ -942,7 +965,7 @@ def get_used_objs(
if not self.use_cache:
return {}

if self.stage.is_repo_import:
if self.stage.is_repo_import or self.stage.is_versioned_import:
return self.get_used_external(**kwargs)

if not self.hash_info:
Expand Down Expand Up @@ -994,7 +1017,9 @@ def _named_obj_ids(self, obj):
def get_used_external(
self, **kwargs
) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]:
if not self.use_cache or not self.stage.is_repo_import:
if not self.use_cache or not (
self.stage.is_repo_import or self.stage.is_versioned_import
):
return {}

(dep,) = self.stage.deps
Expand Down Expand Up @@ -1095,6 +1120,7 @@ def is_plot(self) -> bool:
Meta.PARAM_SIZE: int,
Meta.PARAM_NFILES: int,
Meta.PARAM_ISEXEC: bool,
Meta.PARAM_VERSION_ID: str,
}

SCHEMA = {
Expand Down
5 changes: 4 additions & 1 deletion dvc/repo/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Iterable,
List,
Optional,
Tuple,
TypedDict,
cast,
)
Expand Down Expand Up @@ -204,7 +205,9 @@ def _diff_head_to_index(
repo: "Repo", head: str = "HEAD", **kwargs: Any
) -> Dict[str, List[str]]:
# we need to store objects from index and the HEAD to diff later
objs: Dict[str, Dict[str, "HashFile"]] = defaultdict(dict)
objs: Dict[str, Dict[str, Tuple["HashFile", "HashInfo"]]] = defaultdict(
dict
)
staged_diff = defaultdict(list)
for rev in repo.brancher(revs=[head]):
for out in repo.index.outs:
Expand Down
11 changes: 10 additions & 1 deletion dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ def is_repo_import(self):

return isinstance(self.deps[0], RepoDependency)

@property
def is_versioned_import(self):
if not self.is_import:
return False

return self.deps[0].meta.version_id is not None

@property
def is_checkpoint(self):
"""
Expand Down Expand Up @@ -621,7 +628,9 @@ def _checkout(out, **kwargs):
@rwlocked(read=["deps", "outs"])
def status(self, check_updates=False, filter_info=None):
ret = []
show_import = self.is_repo_import and check_updates
show_import = (
self.is_repo_import or self.is_versioned_import
) and check_updates

if not self.frozen or show_import:
self._status_deps(ret)
Expand Down