Skip to content

Commit

Permalink
index: introduce basic push
Browse files Browse the repository at this point in the history
Very basic implementation to start with. Will be tweaked later.

Pre-requisite for iterative/dvc#9333
  • Loading branch information
efiop committed Jul 30, 2023
1 parent adbc8a3 commit 473b2dc
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
76 changes: 76 additions & 0 deletions src/dvc_data/index/push.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
from typing import TYPE_CHECKING, Optional

from dvc_objects.fs.callbacks import DEFAULT_CALLBACK

from dvc_data.hashfile.db import get_index
from dvc_data.hashfile.transfer import transfer

from .build import build
from .checkout import apply, compare
from .fetch import _log_missing
from .index import ObjectStorage

if TYPE_CHECKING:
from dvc_objects.fs.callbacks import Callback


logger = logging.getLogger(__name__)


def push(
idxs,
callback: "Callback" = DEFAULT_CALLBACK,
jobs: Optional[int] = None,
):
fetched, failed = 0, 0
for fs_index in idxs:
data = fs_index.storage_map[()].data
cache = fs_index.storage_map[()].cache

if callback != DEFAULT_CALLBACK:
cb = callback.as_tqdm_callback(
unit="file",
total=len(fs_index),
desc=f"Pushing to {data.fs.protocol}",
)
else:
cb = callback

with cb:
if isinstance(cache, ObjectStorage) and isinstance(
data, ObjectStorage
):
result = transfer(
cache.odb,
data.odb,
[
entry.hash_info
for _, entry in fs_index.iteritems()
if entry.hash_info
],
jobs=jobs,
src_index=get_index(cache.odb),
cache_odb=data.odb,
verify=cache.odb.verify,
validate_status=_log_missing,
callback=cb,
)
fetched += len(result.transferred)
failed += len(result.failed)
else:
old = build(data.path, data.fs)
diff = compare(old, fs_index)
data.fs.makedirs(data.fs.path.parent(data.path), exist_ok=True)
apply(
diff,
data.path,
data.fs,
update_meta=False,
storage="cache",
jobs=jobs,
callback=cb,
)
fetched += len(diff.changes.get("added", []))

return fetched, failed
54 changes: 54 additions & 0 deletions tests/index/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,60 @@ def test_fetch(tmp_upath, make_odb, odb, as_filesystem):
}


def test_push(tmp_upath, make_odb, odb, as_filesystem):
from dvc_data.index.collect import collect
from dvc_data.index.push import push

index = DataIndex(
{
("foo",): DataIndexEntry(
key=("foo",),
meta=Meta(),
hash_info=HashInfo(
name="md5", value="d3b07384d113edec49eaa6238ad5ff00"
),
),
("data",): DataIndexEntry(
key=("data",),
meta=Meta(isdir=True),
hash_info=HashInfo(
name="md5",
value="1f69c66028c35037e8bf67e5bc4ceb6a.dir",
),
),
}
)
remote_odb = make_odb()
index.storage_map.add_cache(ObjectStorage((), odb))
index.storage_map.add_remote(ObjectStorage((), remote_odb))

data = collect([index], "remote")
push(data)
odb.clear()
assert not list(odb.all())
assert list(remote_odb.all())

diff = checkout.compare(None, index)
checkout.apply(
diff,
str(tmp_upath / "checkout"),
as_filesystem(tmp_upath.fs),
storage="remote",
)
assert (tmp_upath / "checkout" / "foo").read_text() == "foo\n"
assert (tmp_upath / "checkout" / "data").is_dir()
assert (tmp_upath / "checkout" / "data" / "bar").read_text() == "bar\n"
assert (tmp_upath / "checkout" / "data" / "baz").read_text() == "baz\n"
assert set((tmp_upath / "checkout").iterdir()) == {
(tmp_upath / "checkout" / "foo"),
(tmp_upath / "checkout" / "data"),
}
assert set((tmp_upath / "checkout" / "data").iterdir()) == {
(tmp_upath / "checkout" / "data" / "bar"),
(tmp_upath / "checkout" / "data" / "baz"),
}


@pytest.mark.parametrize(
"write, read",
[
Expand Down

0 comments on commit 473b2dc

Please sign in to comment.