Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: executor to sagemaker custom container #6046

Merged
merged 19 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_singleton.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 45
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_singleton.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 45
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ repos:
args:
- -S
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this update, I keep getting an error during commit. SO link

RuntimeError: The Poetry configuration is invalid:
            - [extras.pipfile_deprecated_finder.2] 'pip-shims<=0.3.4' does not match '^[a-zA-Z-_.0-9]+$'

hooks:
- id: isort
exclude: ^(jina/helloworld/|jina/proto/pb/jina_pb2.py|jina/proto/pb/jina_pb2_grpc.py|jina/proto/pb2/jina_pb2.py|jina/proto/pb2/jina_pb2_grpc.py|docs/|jina/resources/|jina/proto/docarray_v1|jina/proto/docarray_v2)
Expand Down
7 changes: 7 additions & 0 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ class WebsocketSubProtocols(str, Enum):
BYTES = 'bytes'


class ProviderType(BetterEnum):
"""Provider type."""

NONE = 0 #: no provider
SAGEMAKER = 1 #: AWS SageMaker


def replace_enum_to_str(obj):
"""
Transform BetterEnum type into string.
Expand Down
36 changes: 28 additions & 8 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
import copy
import json
import multiprocessing
import os
import platform
import re
import subprocess
import threading
import multiprocessing
import platform
import sys
import threading
import time
import warnings
from argparse import Namespace
from collections import defaultdict
from contextlib import ExitStack
Expand All @@ -29,7 +30,13 @@
__docker_host__,
__windows__,
)
from jina.enums import DeploymentRoleType, PodRoleType, PollingType, ProtocolType
from jina.enums import (
DeploymentRoleType,
PodRoleType,
PollingType,
ProtocolType,
ProviderType,
)
from jina.helper import (
ArgNamespace,
parse_host_scheme,
Expand Down Expand Up @@ -281,6 +288,7 @@ def __init__(
port_monitoring: Optional[int] = None,
prefer_platform: Optional[str] = None,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
Expand Down Expand Up @@ -380,6 +388,7 @@ def __init__(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -469,6 +478,21 @@ def __init__(
args = ArgNamespace.kwargs2namespace(kwargs, parser, True)
self.args = args
self._gateway_load_balancer = False
if self.args.provider == ProviderType.SAGEMAKER:
if self._gateway_kwargs.get('port', 0) == 8080:
raise ValueError(
f'Port 8080 is reserved for Sagemaker deployment. Please use another port'
)
if self.args.port != [8080]:
warnings.warn(
f'Port is changed to 8080 for Sagemaker deployment. Port {self.args.port} is ignored'
)
self.args.port = [8080]
if self.args.protocol != [ProtocolType.HTTP]:
warnings.warn(
f'Protocol is changed to HTTP for Sagemaker deployment. Protocol {self.args.protocol} is ignored'
)
self.args.protocol = [ProtocolType.HTTP]
if self._include_gateway and ProtocolType.HTTP in self.args.protocol:
self._gateway_load_balancer = True
log_config = kwargs.get('log_config')
Expand Down Expand Up @@ -1304,7 +1328,6 @@ def _roundrobin_cuda_device(device_str: Optional[str], replicas: int):

selected_devices = []
if device_str[2:]:

for device in Deployment._parse_devices(device_str[2:], num_devices):
selected_devices.append(device)
else:
Expand Down Expand Up @@ -1446,7 +1469,6 @@ def _set_pod_args(self) -> Dict[int, List[Namespace]]:

@staticmethod
def _set_uses_before_after_args(args: Namespace, entity_type: str) -> Namespace:

_args = copy.deepcopy(args)
_args.pod_role = PodRoleType.WORKER
_args.host = _args.host[0] or __default_host__
Expand Down Expand Up @@ -1648,7 +1670,6 @@ def _reload_deployment(changed_file):
watch_changes = self.args.reload

if watch_changes and self._is_executor_from_yaml:

with ImportExtensions(
required=True,
help_text='''reload requires watchfiles dependency to be installed. You can run `pip install
Expand Down Expand Up @@ -1692,7 +1713,6 @@ def _get_summary_table(self, all_panels: List[Panel]):
swagger_ui_link = None
redoc_link = None
for _port, _protocol in zip(_ports, _protocols):

address_table.add_row(':chains:', 'Protocol', _protocol)

_protocol = _protocol.lower()
Expand Down
9 changes: 9 additions & 0 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def __init__(
port_monitoring: Optional[int] = None,
prefetch: Optional[int] = 1000,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
proxy: Optional[bool] = False,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
Expand Down Expand Up @@ -272,6 +273,7 @@ def __init__(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -462,6 +464,7 @@ def __init__(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -866,6 +869,7 @@ def add(
port_monitoring: Optional[int] = None,
prefer_platform: Optional[str] = None,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
Expand Down Expand Up @@ -965,6 +969,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1127,6 +1132,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1319,6 +1325,7 @@ def config_gateway(
port_monitoring: Optional[int] = None,
prefetch: Optional[int] = 1000,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
proxy: Optional[bool] = False,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
Expand Down Expand Up @@ -1389,6 +1396,7 @@ def config_gateway(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -1488,6 +1496,7 @@ def config_gateway(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down
38 changes: 24 additions & 14 deletions jina/parsers/orchestrate/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from dataclasses import dataclass
from typing import Dict

from jina.enums import PodRoleType, ProtocolType
from jina.enums import PodRoleType, ProtocolType, ProviderType
from jina.helper import random_port
from jina.parsers.helper import (
_SHOW_ALL_ARGS,
CastToIntAction,
CastPeerPorts,
CastToIntAction,
KVAppendAction,
add_arg_group,
)
Expand Down Expand Up @@ -52,7 +52,7 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'):
type=int,
default=600000,
help='The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting '
'forever',
'forever',
)

gp.add_argument(
Expand All @@ -68,15 +68,17 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'):
action=KVAppendAction,
metavar='KEY: VALUE',
nargs='*',
help='The map of environment variables that are read from kubernetes cluster secrets' if _SHOW_ALL_ARGS
help='The map of environment variables that are read from kubernetes cluster secrets'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
gp.add_argument(
'--image-pull-secrets',
type=str,
nargs='+',
default=None,
help='List of ImagePullSecrets that the Kubernetes Pods need to have access to in order to pull the image. Used in `to_kubernetes_yaml`' if _SHOW_ALL_ARGS
help='List of ImagePullSecrets that the Kubernetes Pods need to have access to in order to pull the image. Used in `to_kubernetes_yaml`'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)

Expand Down Expand Up @@ -106,7 +108,7 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'):
action='store_true',
default=False,
help='If set, starting a Pod/Deployment does not block the thread/process. It then relies on '
'`wait_start_success` at outer function for the postpone check.'
'`wait_start_success` at outer function for the postpone check.'
if _SHOW_ALL_ARGS
else argparse.SUPPRESS,
)
Expand All @@ -116,7 +118,7 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'):
action='store_true',
default=False,
help='If set, the current Pod/Deployment can not be further chained, '
'and the next `.add()` will chain after the last Pod/Deployment not this current one.',
'and the next `.add()` will chain after the last Pod/Deployment not this current one.',
)

gp.add_argument(
Expand All @@ -134,9 +136,9 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'):
action='store_true',
default=False,
help='If set, the Executor will restart while serving if YAML configuration source or Executor modules '
'are changed. If YAML configuration is changed, the whole deployment is reloaded and new '
'processes will be restarted. If only Python modules of the Executor have changed, they will be '
'reloaded to the interpreter without restarting process.',
'are changed. If YAML configuration is changed, the whole deployment is reloaded and new '
'processes will be restarted. If only Python modules of the Executor have changed, they will be '
'reloaded to the interpreter without restarting process.',
)
gp.add_argument(
'--install-requirements',
Expand Down Expand Up @@ -195,6 +197,14 @@ def mixin_pod_runtime_args_parser(arg_group, pod_type='worker'):
help=f'Communication protocol of the server exposed by the {server_name}. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: {[protocol.to_string() for protocol in list(ProtocolType)]}.',
)

arg_group.add_argument(
'--provider',
type=ProviderType.from_string,
choices=list(ProviderType),
default=[ProviderType.NONE],
help=f'If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: {[provider.to_string() for provider in list(ProviderType)]}.',
)

arg_group.add_argument(
'--monitoring',
action='store_true',
Expand Down Expand Up @@ -225,7 +235,7 @@ def mixin_pod_runtime_args_parser(arg_group, pod_type='worker'):
action='store_true',
default=False,
help='If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. '
'Otherwise a no-op implementation will be provided.',
'Otherwise a no-op implementation will be provided.',
)

arg_group.add_argument(
Expand All @@ -247,7 +257,7 @@ def mixin_pod_runtime_args_parser(arg_group, pod_type='worker'):
action='store_true',
default=False,
help='If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. '
'Otherwise a no-op implementation will be provided.',
'Otherwise a no-op implementation will be provided.',
)

arg_group.add_argument(
Expand Down Expand Up @@ -283,8 +293,8 @@ def mixin_stateful_parser(parser):
type=str,
default=None,
help='When using --stateful option, it is required to tell the cluster what are the cluster configuration. This is important'
'when the Deployment is restarted. It indicates the ports to which each replica of the cluster binds.'
' It is expected to be a single list if shards == 1 or a dictionary if shards > 1.',
'when the Deployment is restarted. It indicates the ports to which each replica of the cluster binds.'
' It is expected to be a single list if shards == 1 or a dictionary if shards > 1.',
action=CastPeerPorts,
nargs='+',
)
Loading