Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 More than 1 input port containing files can be safely pulled #6286

Merged
merged 11 commits into from
Sep 3, 2024
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import tempfile
import threading
from pathlib import Path
from typing import Optional
from typing import Final
from uuid import uuid4

from models_library.projects_nodes_io import SimcoreS3FileID

_TMP_SIMCOREFILES: Final[Path] = Path(tempfile.gettempdir()) / "simcorefiles"


def create_simcore_file_id(
file_path: Path,
project_id: str,
node_id: str,
*,
file_base_path: Optional[Path] = None,
file_base_path: Path | None = None,
) -> SimcoreS3FileID:
s3_file_name = file_path.name
if file_base_path:
Expand All @@ -20,12 +22,9 @@ def create_simcore_file_id(
return SimcoreS3FileID(f"{clean_path}")


_INTERNAL_DIR = Path(tempfile.gettempdir(), "simcorefiles")


def create_folder_path(key: str) -> Path:
return Path(_INTERNAL_DIR, f"{threading.get_ident()}", key)
def get_folder_path(key: str) -> Path:
return _TMP_SIMCOREFILES / f"{uuid4()}" / key
pcrespov marked this conversation as resolved.
Show resolved Hide resolved


def create_file_path(key: str, name: str) -> Path:
return Path(_INTERNAL_DIR, f"{threading.get_ident()}", key, name)
def get_file_path(key: str, name: str) -> Path:
return _TMP_SIMCOREFILES / f"{uuid4()}" / key / name
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def get_value_from_link(
if file_to_key_map:
file_name = next(iter(file_to_key_map))

file_path = data_items_utils.create_file_path(key, file_name)
file_path = data_items_utils.get_file_path(key, file_name)
GitHK marked this conversation as resolved.
Show resolved Hide resolved
if other_value == file_path:
# this is a corner case: in case the output key of the other node has the same name as the input key
return other_value
Expand Down Expand Up @@ -194,7 +194,7 @@ async def pull_file_from_store(
) -> Path:
log.debug("pulling file from storage %s", value)
# do not make any assumption about s3_path, it is a str containing stuff that can be anything depending on the store
local_path = data_items_utils.create_folder_path(key)
local_path = data_items_utils.get_folder_path(key)
downloaded_file = await filemanager.download_path_from_s3(
user_id=user_id,
store_id=value.store,
Expand Down Expand Up @@ -275,7 +275,7 @@ async def pull_file_from_download_link(
value.label,
)

local_path = data_items_utils.create_folder_path(key)
local_path = data_items_utils.get_folder_path(key)
downloaded_file = await filemanager.download_file_from_link(
URL(f"{value.download_link}"),
local_path,
Expand Down
10 changes: 10 additions & 0 deletions packages/simcore-sdk/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import pytest
import simcore_sdk
from helpers.utils_port_v2 import CONSTANT_UUID
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.postgres_tools import PostgresTestConfig
from pytest_simcore.helpers.typing_env import EnvVarsDict
from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB
Expand Down Expand Up @@ -74,3 +76,11 @@ async def _mocked_function(*args, **kwargs) -> None:
pass

return _mocked_function


@pytest.fixture
def mock_uuid4(mocker: MockerFixture) -> None:
GitHK marked this conversation as resolved.
Show resolved Hide resolved
mocker.patch(
"simcore_sdk.node_ports_common.data_items_utils.uuid4",
return_value=CONSTANT_UUID,
)
5 changes: 4 additions & 1 deletion packages/simcore-sdk/tests/helpers/utils_port_v2.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any
from typing import Any, Final
from uuid import UUID

from simcore_sdk.node_ports_v2.ports_mapping import InputsList, OutputsList

CONSTANT_UUID: Final[UUID] = UUID(int=0)


def create_valid_port_config(conf_type: str, **kwargs) -> dict[str, Any]:
valid_config = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import filecmp
import os
import tempfile
import threading
from asyncio import gather
from collections.abc import Awaitable, Callable, Iterable
from pathlib import Path
Expand Down Expand Up @@ -37,6 +36,7 @@
from simcore_sdk.node_ports_v2.links import ItemConcreteValue, PortLink
from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports
from simcore_sdk.node_ports_v2.port import Port
from utils_port_v2 import CONSTANT_UUID

pytest_simcore_core_services_selection = [
"migration",
Expand Down Expand Up @@ -273,6 +273,7 @@ async def test_port_file_accessors(
e_tag: str,
option_r_clone_settings: RCloneSettings | None,
request: pytest.FixtureRequest,
mock_uuid4: None,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
):

if item_value == "symlink_path":
Expand Down Expand Up @@ -307,7 +308,7 @@ async def test_port_file_accessors(
await (await PORTS.outputs)[ServicePortKey("out_34")].set(item_value)
# this is the link to S3 storage
value = (await PORTS.outputs)[ServicePortKey("out_34")].value
assert isinstance(value, (DownloadLink, PortLink, BaseFileLink))
assert isinstance(value, DownloadLink | PortLink | BaseFileLink)
received_file_link = value.dict(by_alias=True, exclude_unset=True)
assert received_file_link["store"] == s3_simcore_location
assert (
Expand All @@ -331,7 +332,7 @@ async def test_port_file_accessors(
Path(
tempfile.gettempdir(),
"simcorefiles",
f"{threading.get_ident()}",
f"{CONSTANT_UUID}",
"out_34",
)
)
Expand Down Expand Up @@ -495,6 +496,7 @@ async def test_get_file_from_previous_node(
item_value: str,
item_pytype: type,
option_r_clone_settings: RCloneSettings | None,
mock_uuid4: None,
):
config_dict, _, _ = create_2nodes_configuration(
prev_node_inputs=None,
Expand All @@ -519,7 +521,7 @@ async def test_get_file_from_previous_node(
assert file_path == Path(
tempfile.gettempdir(),
"simcorefiles",
f"{threading.get_ident()}",
f"{CONSTANT_UUID}",
"in_15",
Path(item_value).name,
)
Expand Down Expand Up @@ -551,6 +553,7 @@ async def test_get_file_from_previous_node_with_mapping_of_same_key_name(
item_alias: str,
item_pytype: type,
option_r_clone_settings: RCloneSettings | None,
mock_uuid4: None,
):
config_dict, _, this_node_uuid = create_2nodes_configuration(
prev_node_inputs=None,
Expand Down Expand Up @@ -579,7 +582,7 @@ async def test_get_file_from_previous_node_with_mapping_of_same_key_name(
assert file_path == Path(
tempfile.gettempdir(),
"simcorefiles",
f"{threading.get_ident()}",
f"{CONSTANT_UUID}",
"in_15",
item_alias,
)
Expand Down Expand Up @@ -612,6 +615,7 @@ async def test_file_mapping(
item_pytype: type,
option_r_clone_settings: RCloneSettings | None,
create_valid_file_uuid: Callable[[str, Path], SimcoreS3FileID],
mock_uuid4: None,
):
config_dict, project_id, node_uuid = create_special_configuration(
inputs=[("in_1", item_type, await create_store_link(item_value))],
Expand All @@ -638,7 +642,7 @@ async def test_file_mapping(
assert file_path == Path(
tempfile.gettempdir(),
"simcorefiles",
f"{threading.get_ident()}",
f"{CONSTANT_UUID}",
"in_1",
item_alias,
)
Expand All @@ -649,7 +653,7 @@ async def test_file_mapping(
assert file_path == Path(
tempfile.gettempdir(),
"simcorefiles",
f"{threading.get_ident()}",
f"{CONSTANT_UUID}",
"in_1",
item_alias,
)
Expand All @@ -662,7 +666,7 @@ async def test_file_mapping(
await PORTS.set_file_by_keymap(file_path)
file_id = create_valid_file_uuid("out_1", file_path)
value = (await PORTS.outputs)[ServicePortKey("out_1")].value
assert isinstance(value, (DownloadLink, PortLink, BaseFileLink))
assert isinstance(value, DownloadLink | PortLink | BaseFileLink)
received_file_link = value.dict(by_alias=True, exclude_unset=True)
assert received_file_link["store"] == s3_simcore_location
assert received_file_link["path"] == file_id
Expand Down Expand Up @@ -734,7 +738,7 @@ async def _upload_create_task(item_key: str) -> None:

# since a race condition was created when uploading values in parallel
# it is expected to find at least one mismatching value here
with pytest.raises(AssertionError) as exc_info:
with pytest.raises(AssertionError) as exc_info: # noqa: PT012
for item_key, _, _ in outputs:
assert (await PORTS.outputs)[
ServicePortKey(item_key)
Expand Down
7 changes: 3 additions & 4 deletions packages/simcore-sdk/tests/unit/test_node_ports_v2_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import re
import shutil
import tempfile
import threading
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -39,7 +38,7 @@
)
from simcore_sdk.node_ports_v2.port import Port
from simcore_sdk.node_ports_v2.ports_mapping import InputsList, OutputsList
from utils_port_v2 import create_valid_port_config
from utils_port_v2 import CONSTANT_UUID, create_valid_port_config
from yarl import URL


Expand All @@ -57,7 +56,7 @@ def another_node_file_name() -> Path:


def download_file_folder_name() -> Path:
return Path(tempfile.gettempdir(), "simcorefiles", f"{threading.get_ident()}")
return Path(tempfile.gettempdir()) / "simcorefiles" / f"{CONSTANT_UUID}"


def project_id() -> str:
Expand Down Expand Up @@ -141,7 +140,7 @@ def another_node_file() -> Iterator[Path]:


@pytest.fixture
def download_file_folder() -> Iterator[Path]:
def download_file_folder(mock_uuid4: None) -> Iterator[Path]:
destination_path = download_file_folder_name()
destination_path.mkdir(parents=True, exist_ok=True)
yield destination_path
Expand Down
Loading