From 935f442b7496bf8cce098a34896daed10cf340e3 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 13:57:19 -0800 Subject: [PATCH 01/17] Add env var Signed-off-by: Archit Kulkarni --- python/ray/_private/runtime_env/packaging.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index a042821cb5be..db3a354fed75 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -246,6 +246,9 @@ def match(p: Path): def _get_gitignore(path: Path) -> Optional[Callable]: + if os.environ.get("RAY_RUNTIME_ENV_SKIP_GITIGNORE"): + return None + path = path.absolute() ignore_file = path / ".gitignore" if ignore_file.is_file(): From 05c1219252b3dc0957f508ca2c06bb5c65734aac Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 14:15:36 -0800 Subject: [PATCH 02/17] Use Ray Constants file Signed-off-by: Archit Kulkarni --- python/ray/_private/ray_constants.py | 3 +++ python/ray/_private/runtime_env/packaging.py | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 3494cbdf2efa..b66516b0720f 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -82,6 +82,9 @@ def env_set_by_user(key): # the local working_dir and py_modules to be uploaded, or these files might get # garbage collected before the job starts. RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_DEFAULT = 10 * 60 +# If set to 1, then `.gitignore` files will not be parsed and loaded into "excludes" +# when using a local working_dir or py_modules. +RAY_RUNTIME_ENV_SKIP_GITIGNORE = "RAY_RUNTIME_ENV_SKIP_GITIGNORE" RAY_STORAGE_ENVIRONMENT_VARIABLE = "RAY_STORAGE" # Hook for running a user-specified runtime-env hook. This hook will be called # unconditionally given the runtime_env dict passed for ray.init. It must return diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index db3a354fed75..4bf99ffb585b 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -15,6 +15,7 @@ from ray._private.ray_constants import ( RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_DEFAULT, RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_ENV_VAR, + RAY_RUNTIME_ENV_SKIP_GITIGNORE, ) from ray._private.gcs_utils import GcsAioClient from ray._private.thirdparty.pathspec import PathSpec @@ -246,7 +247,8 @@ def match(p: Path): def _get_gitignore(path: Path) -> Optional[Callable]: - if os.environ.get("RAY_RUNTIME_ENV_SKIP_GITIGNORE"): + skip_gitignore = os.environ.get(RAY_RUNTIME_ENV_SKIP_GITIGNORE, "0") == "1" + if skip_gitignore: return None path = path.absolute() From 42e2e3a41c050865c167ef0d5e4f602b69938170 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 14:24:33 -0800 Subject: [PATCH 03/17] Add docstring Signed-off-by: Archit Kulkarni --- python/ray/_private/runtime_env/packaging.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index 4bf99ffb585b..f18d64b8f5e8 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -247,6 +247,16 @@ def match(p: Path): def _get_gitignore(path: Path) -> Optional[Callable]: + """Returns a function that returns True if the path should be excluded. + + Returns None if there is no .gitignore file in the path. + + Args: + path: The path to the directory to check for a .gitignore file. + + Returns: + A function that returns True if the path should be excluded. + """ skip_gitignore = os.environ.get(RAY_RUNTIME_ENV_SKIP_GITIGNORE, "0") == "1" if skip_gitignore: return None From c8f5a48685e6e7e7ad9ae3f9c73c854a166aa53c Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 14:24:37 -0800 Subject: [PATCH 04/17] Add unit test Signed-off-by: Archit Kulkarni --- python/ray/tests/test_runtime_env_packaging.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/ray/tests/test_runtime_env_packaging.py b/python/ray/tests/test_runtime_env_packaging.py index 4816460b0263..7ec25833e80a 100644 --- a/python/ray/tests/test_runtime_env_packaging.py +++ b/python/ray/tests/test_runtime_env_packaging.py @@ -30,6 +30,7 @@ remove_dir_from_filepaths, unzip_package, upload_package_if_needed, + _get_gitignore, ) from ray.experimental.internal_kv import ( _initialize_internal_kv, @@ -510,6 +511,13 @@ def test_parse_gcs_uri(self, gcs_uri): assert package_name == gcs_uri.split("/")[-1] +def test_get_gitignore(tmp_path): + gitignore_path = tmp_path / ".gitignore" + gitignore_path.write_text("*.pyc") + assert _get_gitignore(tmp_path)(Path(tmp_path / "foo.pyc")) is True + assert _get_gitignore(tmp_path)(Path(tmp_path / "foo.py")) is False + + @pytest.mark.skipif(sys.platform == "win32", reason="Fails on windows") def test_travel(tmp_path): dir_paths = set() From 5c7848ac42a9af1b7d9e37c43e2292220c66782b Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 14:26:42 -0800 Subject: [PATCH 05/17] Add doc Signed-off-by: Archit Kulkarni --- doc/source/ray-core/handling-dependencies.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/ray-core/handling-dependencies.rst b/doc/source/ray-core/handling-dependencies.rst index e15f2c0d4c63..718cc9e3f44b 100644 --- a/doc/source/ray-core/handling-dependencies.rst +++ b/doc/source/ray-core/handling-dependencies.rst @@ -304,7 +304,7 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime Note: Setting a local directory per-task or per-actor is currently unsupported; it can only be set per-job (i.e., in ``ray.init()``). - Note: If your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. + Note: If your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_SKIP_GITIGNORE=1` on the machine doing the uploading. - ``py_modules`` (List[str|module]): Specifies Python modules to be available for import in the Ray workers. (For more ways to specify packages, see also the ``pip`` and ``conda`` fields below.) Each entry must be either (1) a path to a local directory, (2) a URI to a remote zip file (see :ref:`remote-uris` for details), (3) a Python module object, or (4) a path to a local `.whl` file. @@ -325,7 +325,7 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime Note: Setting options (1) and (3) per-task or per-actor is currently unsupported, it can only be set per-job (i.e., in ``ray.init()``). - Note: For option (1), if your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. + Note: For option (1), if your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_SKIP_GITIGNORE=1` on the machine doing the uploading. Note: This feature is currently limited to modules that are packages with a single directory containing an ``__init__.py`` file. For single-file modules, you may use ``working_dir``. From bfea1fddb1e27940cac34259216288e330dce925 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 14:42:40 -0800 Subject: [PATCH 06/17] Add broader unit test Signed-off-by: Archit Kulkarni --- .../ray/tests/test_runtime_env_packaging.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_runtime_env_packaging.py b/python/ray/tests/test_runtime_env_packaging.py index 7ec25833e80a..81e03eb35f5a 100644 --- a/python/ray/tests/test_runtime_env_packaging.py +++ b/python/ray/tests/test_runtime_env_packaging.py @@ -12,7 +12,10 @@ import pytest from ray._private.gcs_utils import GcsClient -from ray._private.ray_constants import KV_NAMESPACE_PACKAGE +from ray._private.ray_constants import ( + KV_NAMESPACE_PACKAGE, + RAY_RUNTIME_ENV_SKIP_GITIGNORE, +) from ray._private.runtime_env.packaging import ( GCS_STORAGE_MAX_SIZE, MAC_OS_ZIP_HIDDEN_DIR_NAME, @@ -518,14 +521,20 @@ def test_get_gitignore(tmp_path): assert _get_gitignore(tmp_path)(Path(tmp_path / "foo.py")) is False +@pytest.mark.parametrize("skip_gitignore", [True, False]) @pytest.mark.skipif(sys.platform == "win32", reason="Fails on windows") -def test_travel(tmp_path): +def test_travel(tmp_path, skip_gitignore, monkeypatch): dir_paths = set() file_paths = set() item_num = 0 excludes = [] root = tmp_path / "test" + if skip_gitignore: + monkeypatch.setenv(RAY_RUNTIME_ENV_SKIP_GITIGNORE, "1") + else: + monkeypatch.delenv(RAY_RUNTIME_ENV_SKIP_GITIGNORE, raising=False) + def construct(path, excluded=False, depth=0): nonlocal item_num path.mkdir(parents=True) @@ -561,6 +570,18 @@ def construct(path, excluded=False, depth=0): file_paths.add((str(path / uid), str(v))) item_num += 1 + # Add gitignore file + gitignore = root / ".gitignore" + gitignore.write_text("*.pyc") + file_paths.add((str(gitignore), "*.pyc")) + + # Add file that should be ignored by gitignore + with (root / "foo.pyc").open("w") as f: + f.write("foo") + if skip_gitignore: + # If skip_gitignore is True, then the file should be visited + file_paths.add((str(root / "foo.pyc"), "foo")) + construct(root) exclude_spec = _get_excludes(root, excludes) visited_dir_paths = set() From d65eb72713416613dcfe4275257e8a9a5e62869a Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 8 Mar 2023 14:48:25 -0800 Subject: [PATCH 07/17] Fix docstring Signed-off-by: Archit Kulkarni --- python/ray/_private/runtime_env/packaging.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index f18d64b8f5e8..9ad0684a1568 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -249,7 +249,8 @@ def match(p: Path): def _get_gitignore(path: Path) -> Optional[Callable]: """Returns a function that returns True if the path should be excluded. - Returns None if there is no .gitignore file in the path. + Returns None if there is no .gitignore file in the path, or if the + RAY_RUNTIME_ENV_SKIP_GITIGNORE environment variable is set to 1. Args: path: The path to the directory to check for a .gitignore file. From 51dc3ade445434acb8c141dbdafd65c767b68cf8 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Fri, 10 Mar 2023 13:32:13 -0800 Subject: [PATCH 08/17] skip_gitignore -> ignore_gitignore Signed-off-by: Archit Kulkarni --- doc/source/ray-core/handling-dependencies.rst | 4 ++-- python/ray/_private/ray_constants.py | 2 +- python/ray/_private/runtime_env/packaging.py | 8 ++++---- python/ray/tests/test_runtime_env_packaging.py | 16 ++++++++-------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/doc/source/ray-core/handling-dependencies.rst b/doc/source/ray-core/handling-dependencies.rst index 718cc9e3f44b..a0bf74b399c6 100644 --- a/doc/source/ray-core/handling-dependencies.rst +++ b/doc/source/ray-core/handling-dependencies.rst @@ -304,7 +304,7 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime Note: Setting a local directory per-task or per-actor is currently unsupported; it can only be set per-job (i.e., in ``ray.init()``). - Note: If your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_SKIP_GITIGNORE=1` on the machine doing the uploading. + Note: If your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading. - ``py_modules`` (List[str|module]): Specifies Python modules to be available for import in the Ray workers. (For more ways to specify packages, see also the ``pip`` and ``conda`` fields below.) Each entry must be either (1) a path to a local directory, (2) a URI to a remote zip file (see :ref:`remote-uris` for details), (3) a Python module object, or (4) a path to a local `.whl` file. @@ -325,7 +325,7 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime Note: Setting options (1) and (3) per-task or per-actor is currently unsupported, it can only be set per-job (i.e., in ``ray.init()``). - Note: For option (1), if your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_SKIP_GITIGNORE=1` on the machine doing the uploading. + Note: For option (1), if your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading. Note: This feature is currently limited to modules that are packages with a single directory containing an ``__init__.py`` file. For single-file modules, you may use ``working_dir``. diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index b66516b0720f..402d2d2a59f5 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -84,7 +84,7 @@ def env_set_by_user(key): RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_DEFAULT = 10 * 60 # If set to 1, then `.gitignore` files will not be parsed and loaded into "excludes" # when using a local working_dir or py_modules. -RAY_RUNTIME_ENV_SKIP_GITIGNORE = "RAY_RUNTIME_ENV_SKIP_GITIGNORE" +RAY_RUNTIME_ENV_IGNORE_GITIGNORE = "RAY_RUNTIME_ENV_IGNORE_GITIGNORE" RAY_STORAGE_ENVIRONMENT_VARIABLE = "RAY_STORAGE" # Hook for running a user-specified runtime-env hook. This hook will be called # unconditionally given the runtime_env dict passed for ray.init. It must return diff --git a/python/ray/_private/runtime_env/packaging.py b/python/ray/_private/runtime_env/packaging.py index 9ad0684a1568..d591843337cf 100644 --- a/python/ray/_private/runtime_env/packaging.py +++ b/python/ray/_private/runtime_env/packaging.py @@ -15,7 +15,7 @@ from ray._private.ray_constants import ( RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_DEFAULT, RAY_RUNTIME_ENV_URI_PIN_EXPIRATION_S_ENV_VAR, - RAY_RUNTIME_ENV_SKIP_GITIGNORE, + RAY_RUNTIME_ENV_IGNORE_GITIGNORE, ) from ray._private.gcs_utils import GcsAioClient from ray._private.thirdparty.pathspec import PathSpec @@ -250,7 +250,7 @@ def _get_gitignore(path: Path) -> Optional[Callable]: """Returns a function that returns True if the path should be excluded. Returns None if there is no .gitignore file in the path, or if the - RAY_RUNTIME_ENV_SKIP_GITIGNORE environment variable is set to 1. + RAY_RUNTIME_ENV_IGNORE_GITIGNORE environment variable is set to 1. Args: path: The path to the directory to check for a .gitignore file. @@ -258,8 +258,8 @@ def _get_gitignore(path: Path) -> Optional[Callable]: Returns: A function that returns True if the path should be excluded. """ - skip_gitignore = os.environ.get(RAY_RUNTIME_ENV_SKIP_GITIGNORE, "0") == "1" - if skip_gitignore: + ignore_gitignore = os.environ.get(RAY_RUNTIME_ENV_IGNORE_GITIGNORE, "0") == "1" + if ignore_gitignore: return None path = path.absolute() diff --git a/python/ray/tests/test_runtime_env_packaging.py b/python/ray/tests/test_runtime_env_packaging.py index 81e03eb35f5a..96dcd53c1cfd 100644 --- a/python/ray/tests/test_runtime_env_packaging.py +++ b/python/ray/tests/test_runtime_env_packaging.py @@ -14,7 +14,7 @@ from ray._private.gcs_utils import GcsClient from ray._private.ray_constants import ( KV_NAMESPACE_PACKAGE, - RAY_RUNTIME_ENV_SKIP_GITIGNORE, + RAY_RUNTIME_ENV_IGNORE_GITIGNORE, ) from ray._private.runtime_env.packaging import ( GCS_STORAGE_MAX_SIZE, @@ -521,19 +521,19 @@ def test_get_gitignore(tmp_path): assert _get_gitignore(tmp_path)(Path(tmp_path / "foo.py")) is False -@pytest.mark.parametrize("skip_gitignore", [True, False]) +@pytest.mark.parametrize("ignore_gitignore", [True, False]) @pytest.mark.skipif(sys.platform == "win32", reason="Fails on windows") -def test_travel(tmp_path, skip_gitignore, monkeypatch): +def test_travel(tmp_path, ignore_gitignore, monkeypatch): dir_paths = set() file_paths = set() item_num = 0 excludes = [] root = tmp_path / "test" - if skip_gitignore: - monkeypatch.setenv(RAY_RUNTIME_ENV_SKIP_GITIGNORE, "1") + if ignore_gitignore: + monkeypatch.setenv(RAY_RUNTIME_ENV_IGNORE_GITIGNORE, "1") else: - monkeypatch.delenv(RAY_RUNTIME_ENV_SKIP_GITIGNORE, raising=False) + monkeypatch.delenv(RAY_RUNTIME_ENV_IGNORE_GITIGNORE, raising=False) def construct(path, excluded=False, depth=0): nonlocal item_num @@ -578,8 +578,8 @@ def construct(path, excluded=False, depth=0): # Add file that should be ignored by gitignore with (root / "foo.pyc").open("w") as f: f.write("foo") - if skip_gitignore: - # If skip_gitignore is True, then the file should be visited + if ignore_gitignore: + # If ignore_gitignore is True, then the file should be visited file_paths.add((str(root / "foo.pyc"), "foo")) construct(root) From 32329255b76dc82df5729d9def36afc06d90fb87 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Fri, 10 Mar 2023 14:23:35 -0800 Subject: [PATCH 09/17] Update doc/source/ray-core/handling-dependencies.rst Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com> Signed-off-by: Archit Kulkarni --- doc/source/ray-core/handling-dependencies.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/handling-dependencies.rst b/doc/source/ray-core/handling-dependencies.rst index a0bf74b399c6..d0efc97763b8 100644 --- a/doc/source/ray-core/handling-dependencies.rst +++ b/doc/source/ray-core/handling-dependencies.rst @@ -325,7 +325,7 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime Note: Setting options (1) and (3) per-task or per-actor is currently unsupported, it can only be set per-job (i.e., in ``ray.init()``). - Note: For option (1), if your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading. + Note: For option (1), if the local directory contains a ``.gitignore`` file, the files and paths specified there are not uploaded to the cluster. You can disable this by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading. Note: This feature is currently limited to modules that are packages with a single directory containing an ``__init__.py`` file. For single-file modules, you may use ``working_dir``. From bda5766defb095ff370271648b3e8589685e39ce Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Fri, 10 Mar 2023 14:23:44 -0800 Subject: [PATCH 10/17] Update doc/source/ray-core/handling-dependencies.rst Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com> Signed-off-by: Archit Kulkarni --- doc/source/ray-core/handling-dependencies.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/handling-dependencies.rst b/doc/source/ray-core/handling-dependencies.rst index d0efc97763b8..9bdd069e6509 100644 --- a/doc/source/ray-core/handling-dependencies.rst +++ b/doc/source/ray-core/handling-dependencies.rst @@ -304,7 +304,7 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime Note: Setting a local directory per-task or per-actor is currently unsupported; it can only be set per-job (i.e., in ``ray.init()``). - Note: If your local directory contains a ``.gitignore`` file, the files and paths specified therein will not be uploaded to the cluster. This can be disabled by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading. + Note: If the local directory contains a ``.gitignore`` file, the files and paths specified there are not uploaded to the cluster. You can disable this by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading. - ``py_modules`` (List[str|module]): Specifies Python modules to be available for import in the Ray workers. (For more ways to specify packages, see also the ``pip`` and ``conda`` fields below.) Each entry must be either (1) a path to a local directory, (2) a URI to a remote zip file (see :ref:`remote-uris` for details), (3) a Python module object, or (4) a path to a local `.whl` file. From ef1f8169a5cf12ee0e90191f150c6ac501f222bb Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 13 Mar 2023 12:00:11 -0700 Subject: [PATCH 11/17] Pipe `added_num` through info client Signed-off-by: Archit Kulkarni --- dashboard/modules/job/common.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/dashboard/modules/job/common.py b/dashboard/modules/job/common.py index a5d927541016..e558842bc4e5 100644 --- a/dashboard/modules/job/common.py +++ b/dashboard/modules/job/common.py @@ -189,13 +189,27 @@ def __init__(self, gcs_aio_client: GcsAioClient): self._gcs_aio_client = gcs_aio_client assert _internal_kv_initialized() - async def put_info(self, job_id: str, job_info: JobInfo): - await self._gcs_aio_client.internal_kv_put( + async def put_info( + self, job_id: str, job_info: JobInfo, overwrite: bool = True + ) -> int: + """Put job info to the internal kv store. + + Args: + job_id: The job id. + job_info: The job info. + overwrite: Whether to overwrite the existing job info. + + Returns: + The number of keys added. 1 if the key was added, 0 if the key + already exists and overwrite is False. + """ + added_num = await self._gcs_aio_client.internal_kv_put( self.JOB_DATA_KEY.format(job_id=job_id).encode(), json.dumps(job_info.to_json()).encode(), - True, + overwrite, namespace=ray_constants.KV_NAMESPACE_JOB, ) + return added_num async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]: serialized_info = await self._gcs_aio_client.internal_kv_get( From ea5b75ac73e0364a9d5a76012d90a93f613f4572 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 13 Mar 2023 12:00:22 -0700 Subject: [PATCH 12/17] Add docstring for added_num Signed-off-by: Archit Kulkarni --- python/ray/_private/gcs_utils.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 7e1dd51181e2..86faf4600132 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -520,6 +520,20 @@ async def internal_kv_put( namespace: Optional[bytes], timeout: Optional[float] = None, ) -> int: + """Put a key-value pair into the GCS. + + Args: + key: The key to put. + value: The value to put. + overwrite: Whether to overwrite the value if the key already exists. + namespace: The namespace to put the key-value pair into. + timeout: The timeout in seconds. + + Returns: + The number of keys added. If overwrite is True, this will be 1 if the + key was added and 0 if the key was updated. If overwrite is False, + this will be 1 if the key was added and 0 if the key already exists. + """ logger.debug(f"internal_kv_put {key!r} {value!r} {overwrite} {namespace!r}") req = gcs_service_pb2.InternalKVPutRequest( namespace=namespace, From bec92fd711ec0e6cd0508076ed9ce47ddd3613e3 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 13 Mar 2023 12:01:04 -0700 Subject: [PATCH 13/17] Make "check existence and put new info" atomic Signed-off-by: Archit Kulkarni --- dashboard/modules/job/job_manager.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index fbe608507a46..df5878a89864 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -802,8 +802,6 @@ async def submit_job( entrypoint_num_gpus = 0 if submission_id is None: submission_id = generate_job_id() - elif await self._job_info_client.get_status(submission_id) is not None: - raise RuntimeError(f"Job {submission_id} already exists.") logger.info(f"Starting job with submission_id: {submission_id}") job_info = JobInfo( @@ -816,7 +814,14 @@ async def submit_job( entrypoint_num_gpus=entrypoint_num_gpus, entrypoint_resources=entrypoint_resources, ) - await self._job_info_client.put_info(submission_id, job_info) + num_added = await self._job_info_client.put_info( + submission_id, job_info, overwrite=False + ) + if num_added == 0: + raise ValueError( + f"Job with submission_id {submission_id} already exists. " + "Please use a different submission_id." + ) # Wait for the actor to start up asynchronously so this call always # returns immediately and we can catch errors with the actor starting From ddc2c92ec34ae4fb417193beb2ba82266b6eed2c Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 13 Mar 2023 12:01:10 -0700 Subject: [PATCH 14/17] Add unit test Signed-off-by: Archit Kulkarni --- .../modules/job/tests/test_job_manager.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 5e0d841d22bb..ce4dfa73548d 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -332,6 +332,39 @@ async def test_pass_job_id(job_manager): ) +@pytest.mark.asyncio +async def test_simultaneous_submit_job(job_manager): + """Test that we can submit multiple jobs at once.""" + job_ids = await asyncio.gather( + job_manager.submit_job(entrypoint="echo hello"), + job_manager.submit_job(entrypoint="echo hello"), + job_manager.submit_job(entrypoint="echo hello"), + ) + + for job_id in job_ids: + await async_wait_for_condition_async_predicate( + check_job_succeeded, job_manager=job_manager, job_id=job_id + ) + + +@pytest.mark.asyncio +async def test_simultaneous_with_same_id(job_manager): + """Test that we can submit multiple jobs at once with the same id. + + The second job should raise a friendly error. + """ + with pytest.raises(ValueError) as excinfo: + await asyncio.gather( + job_manager.submit_job(entrypoint="echo hello", submission_id="1"), + job_manager.submit_job(entrypoint="echo hello", submission_id="1"), + ) + assert "Job with submission_id 1 already exists" in str(excinfo.value) + # Check that the (first) job can still succeed. + await async_wait_for_condition_async_predicate( + check_job_succeeded, job_manager=job_manager, job_id="1" + ) + + @pytest.mark.asyncio class TestShellScriptExecution: async def test_submit_basic_echo(self, job_manager): From cfc425f373849e245993da5e1840ebf0466f1153 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 13 Mar 2023 12:09:05 -0700 Subject: [PATCH 15/17] Update docstring Signed-off-by: Archit Kulkarni --- dashboard/modules/job/common.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dashboard/modules/job/common.py b/dashboard/modules/job/common.py index e558842bc4e5..9a4e56561d79 100644 --- a/dashboard/modules/job/common.py +++ b/dashboard/modules/job/common.py @@ -200,8 +200,9 @@ async def put_info( overwrite: Whether to overwrite the existing job info. Returns: - The number of keys added. 1 if the key was added, 0 if the key - already exists and overwrite is False. + The number of keys added. If overwrite is True, this will be 1 if the + key was added and 0 if the key was updated. If overwrite is False, + this will be 1 if the key was added and 0 if the key already exists. """ added_num = await self._gcs_aio_client.internal_kv_put( self.JOB_DATA_KEY.format(job_id=job_id).encode(), From 9bc357af44a335c6af1603b3459d632f0a2b77d0 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 13 Mar 2023 13:26:14 -0700 Subject: [PATCH 16/17] Return bool `new_key_added` Signed-off-by: Archit Kulkarni --- dashboard/modules/job/common.py | 8 +++----- dashboard/modules/job/job_manager.py | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/dashboard/modules/job/common.py b/dashboard/modules/job/common.py index 9a4e56561d79..0bc1d0c8f26f 100644 --- a/dashboard/modules/job/common.py +++ b/dashboard/modules/job/common.py @@ -191,7 +191,7 @@ def __init__(self, gcs_aio_client: GcsAioClient): async def put_info( self, job_id: str, job_info: JobInfo, overwrite: bool = True - ) -> int: + ) -> bool: """Put job info to the internal kv store. Args: @@ -200,9 +200,7 @@ async def put_info( overwrite: Whether to overwrite the existing job info. Returns: - The number of keys added. If overwrite is True, this will be 1 if the - key was added and 0 if the key was updated. If overwrite is False, - this will be 1 if the key was added and 0 if the key already exists. + True if a new key is added. """ added_num = await self._gcs_aio_client.internal_kv_put( self.JOB_DATA_KEY.format(job_id=job_id).encode(), @@ -210,7 +208,7 @@ async def put_info( overwrite, namespace=ray_constants.KV_NAMESPACE_JOB, ) - return added_num + return added_num == 1 async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]: serialized_info = await self._gcs_aio_client.internal_kv_get( diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index df5878a89864..f05bdff21044 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -814,10 +814,10 @@ async def submit_job( entrypoint_num_gpus=entrypoint_num_gpus, entrypoint_resources=entrypoint_resources, ) - num_added = await self._job_info_client.put_info( + new_key_added = await self._job_info_client.put_info( submission_id, job_info, overwrite=False ) - if num_added == 0: + if not new_key_added: raise ValueError( f"Job with submission_id {submission_id} already exists. " "Please use a different submission_id." From 722a104dcd5fefe5b71734bd15db6c54951707eb Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 15 Mar 2023 10:50:03 -0700 Subject: [PATCH 17/17] Change RuntimeError to ValueError to fix test Signed-off-by: Archit Kulkarni --- dashboard/modules/job/tests/test_job_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index ce4dfa73548d..952caa96c26c 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -326,7 +326,7 @@ async def test_pass_job_id(job_manager): ) # Check that the same job_id is rejected. - with pytest.raises(RuntimeError): + with pytest.raises(ValueError): await job_manager.submit_job( entrypoint="echo hello", submission_id=submission_id )