diff --git a/dvc/dependency/__init__.py b/dvc/dependency/__init__.py index e84f54b8d12..43513d31587 100644 --- a/dvc/dependency/__init__.py +++ b/dvc/dependency/__init__.py @@ -1,13 +1,11 @@ from collections import defaultdict from typing import Any, Mapping -from dvc.fs import get_cloud_fs from dvc.output import ARTIFACT_SCHEMA, Output from .base import Dependency from .param import ParamsDependency from .repo import RepoDependency -from .versioned import VersionedDependency # NOTE: schema for dependencies is basically the same as for outputs, but # without output-specific entries like 'cache' (whether or not output is @@ -29,11 +27,8 @@ def _get(stage, p, info): params = info.pop(ParamsDependency.PARAM_PARAMS) return ParamsDependency(stage, p, params) - _, fs_config, _ = get_cloud_fs(stage.repo, url=p) - if fs_config.get("version_aware") or info.get(Output.PARAM_VERSION_ID): - version_id = info.pop("version_id", None) - return VersionedDependency(stage, p, info=info, version_id=version_id) - return Dependency(stage, p, info) + version_id = info.pop(Output.PARAM_VERSION_ID, None) + return Dependency(stage, p, info, version_id=version_id) def loadd_from(stage, d_list): diff --git a/dvc/dependency/base.py b/dvc/dependency/base.py index b08435653cf..b38af858f3d 100644 --- a/dvc/dependency/base.py +++ b/dvc/dependency/base.py @@ -1,8 +1,14 @@ -from typing import Type +from collections import defaultdict +from copy import copy +from typing import TYPE_CHECKING, Dict, Optional, Set, Type from dvc.exceptions import DvcException from dvc.output import Output +if TYPE_CHECKING: + from dvc_data.hashfile.hash_info import HashInfo + from dvc_objects.db import ObjectDB + class DependencyDoesNotExistError(DvcException): def __init__(self, path): @@ -30,5 +36,66 @@ class Dependency(Output): ) # type: Type[DvcException] IsStageFileError = DependencyIsStageFileError # type: Type[DvcException] - def update(self, rev=None): - pass + def get_used_objs( + self, **kwargs + ) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]: + from dvc_data.build import build + from dvc_data.objects.tree import Tree, TreeError + + if not self.version_id: + return super().get_used_objs(**kwargs) + + used_obj_ids: Dict[ + Optional["ObjectDB"], Set["HashInfo"] + ] = defaultdict(set) + local_odb = self.repo.odb.local + try: + object_store, _, obj = build( + local_odb, + self.fs_path, + self.fs, + local_odb.fs.PARAM_CHECKSUM, + ) + except (FileNotFoundError, TreeError) as exc: + raise DvcException( + f"The path '{self.fs_path}' does not exist in the remote." + ) from exc + # TODO: support versioned import writes (push) + object_store = copy(object_store) + object_store.read_only = True + + used_obj_ids[object_store].add(obj.hash_info) + if isinstance(obj, Tree): + used_obj_ids[object_store].update(oid for _, _, oid in obj) + return used_obj_ids + + def workspace_status(self): + if not self.version_id: + return super().workspace_status() + + current = self.version_id + fs_path = self.fs.path.version_path(self.fs_path, None) + updated = self.fs.info(fs_path)["version_id"] + + if current != updated: + return {str(self): "update available"} + + return {} + + def status(self): + if not self.version_id: + return super().status() + + return self.workspace_status() + + def update(self, rev: Optional[str] = None): + if not self.version_id: + return + + if rev: + self.version_id = rev + else: + fs_path = self.fs.path.version_path(self.fs_path, rev) + details = self.fs.info(fs_path) + self.version_id = details["version_id"] + self.fs_path = self.fs.path.version_path(self.fs_path, self.version_id) diff --git a/dvc/dependency/versioned.py b/dvc/dependency/versioned.py deleted file mode 100644 index 032945ae446..00000000000 --- a/dvc/dependency/versioned.py +++ /dev/null @@ -1,79 +0,0 @@ -import logging -from collections import defaultdict -from copy import copy -from typing import TYPE_CHECKING, Dict, Optional, Set - -from dvc.exceptions import DvcException - -from .base import Dependency - -if TYPE_CHECKING: - from dvc_data.hashfile.hash_info import HashInfo - from dvc_objects.db import ObjectDB - -logger = logging.getLogger(__name__) - - -class VersionedDependency(Dependency): - def get_used_objs( - self, **kwargs - ) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]: - from dvc_data.build import build - from dvc_data.objects.tree import Tree, TreeError - - used_obj_ids: Dict[ - Optional["ObjectDB"], Set["HashInfo"] - ] = defaultdict(set) - local_odb = self.repo.odb.local - try: - object_store, _, obj = build( - local_odb, - self.fs_path, - self.fs, - local_odb.fs.PARAM_CHECKSUM, - ) - logger.debug( - "Staged versioned-import dvc-data reference to '%s://%s'", - obj.fs.protocol, - obj.path, - ) - except (FileNotFoundError, TreeError) as exc: - raise DvcException( - f"The path '{self.fs_path}' does not exist in the remote." - ) from exc - # TODO: support versioned import writes (push) - object_store = copy(object_store) - object_store.read_only = True - - used_obj_ids[object_store].add(obj.hash_info) - if isinstance(obj, Tree): - used_obj_ids[object_store].update(oid for _, _, oid in obj) - return used_obj_ids - - def workspace_status(self): - current = self.version_id - fs_path = self.fs.path.version_path(self.fs_path, None) - updated = self.fs.info(fs_path)["version_id"] - - if current != updated: - return {str(self): "update available"} - - return {} - - def status(self): - return self.workspace_status() - - def update(self, rev: Optional[str] = None): - """Update dependency to the specified version. - - Arguments: - rev: Version ID. rev=None will update to the latest file - version. - """ - if rev: - self.version_id = rev - else: - fs_path = self.fs.path.version_path(self.fs_path, rev) - details = self.fs.info(fs_path) - self.version_id = details["version_id"] - self.fs_path = self.fs.path.version_path(self.fs_path, self.version_id) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 0838018e089..d4ef146618c 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -265,9 +265,7 @@ def is_versioned_import(self): if not self.is_import: return False - from dvc.dependency import VersionedDependency - - return isinstance(self.deps[0], VersionedDependency) + return self.deps[0].version_id is not None @property def is_checkpoint(self):