diff --git a/jina/checker.py b/jina/checker.py index 7a396b1af8b74..de3d5caadaef6 100644 --- a/jina/checker.py +++ b/jina/checker.py @@ -31,15 +31,15 @@ def __init__(self, args: 'argparse.Namespace'): ) as tc: if args.target == 'executor': hostname, port, protocol, _ = parse_host_scheme(args.host) - r = WorkerRuntime.is_ready(f'{hostname}:{port}') + r = WorkerRuntime.is_ready(ctrl_address=f'{hostname}:{port}') elif args.target == 'gateway': hostname, port, protocol, _ = parse_host_scheme(args.host) r = GatewayRuntime.is_ready( f'{hostname}:{port}', - protocol=GatewayProtocolType.from_string(protocol), + protocol=GatewayProtocolType.from_string(protocol) ) elif args.target == 'flow': - r = Client(host=args.host).is_flow_ready(timeout=args.timeout) + r = Client(host=args.host).is_flow_ready(timeout=args.timeout / 1000) if not r: default_logger.warning( 'not responding, attempt (%d/%d) in 1s' diff --git a/jina/clients/base/helper.py b/jina/clients/base/helper.py index 5f1c8b07b04c1..710119d0b7272 100644 --- a/jina/clients/base/helper.py +++ b/jina/clients/base/helper.py @@ -155,7 +155,9 @@ async def send_dry_run(self, **kwargs): :param kwargs: keyword arguments to make sure compatible API with other clients :return: send get message """ - return await self.session.get(url=self.url).__aenter__() + return await self.session.get( + url=self.url, timeout=kwargs.get('timeout', None) + ).__aenter__() async def recv_message(self): """Receive message for HTTP (sleep) diff --git a/jina/resources/k8s/template/deployment-executor.yml b/jina/resources/k8s/template/deployment-executor.yml index 4c03706a1ff01..b6f2642387f0c 100644 --- a/jina/resources/k8s/template/deployment-executor.yml +++ b/jina/resources/k8s/template/deployment-executor.yml @@ -50,7 +50,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - readinessProbe: + startupProbe: exec: command: - jina @@ -58,7 +58,17 @@ spec: - executor - 127.0.0.1:{port} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: diff --git a/jina/resources/k8s/template/deployment-gateway.yml b/jina/resources/k8s/template/deployment-gateway.yml index 2b39083ec8f21..4c860e62f7a12 100644 --- a/jina/resources/k8s/template/deployment-gateway.yml +++ b/jina/resources/k8s/template/deployment-gateway.yml @@ -49,7 +49,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - readinessProbe: + startupProbe: exec: command: - jina @@ -57,7 +57,17 @@ spec: - gateway - {protocol}://127.0.0.1:{port} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - gateway + - {protocol}://127.0.0.1:{port} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: diff --git a/jina/resources/k8s/template/deployment-uses-after.yml b/jina/resources/k8s/template/deployment-uses-after.yml index 80ef94f7b6742..8fff1b977e182 100644 --- a/jina/resources/k8s/template/deployment-uses-after.yml +++ b/jina/resources/k8s/template/deployment-uses-after.yml @@ -50,12 +50,26 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - - readinessProbe: - tcpSocket: - port: {port} + startupProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} initialDelaySeconds: 5 - periodSeconds: 10 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} + initialDelaySeconds: 30 + periodSeconds: 5 + timeoutSeconds: 10 lifecycle: preStop: exec: @@ -85,8 +99,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - - readinessProbe: + startupProbe: exec: command: - jina @@ -94,7 +107,17 @@ spec: - executor - 127.0.0.1:{port_uses_after} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port_uses_after} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: diff --git a/jina/resources/k8s/template/deployment-uses-before-after.yml b/jina/resources/k8s/template/deployment-uses-before-after.yml index b9a60b88e67fd..74a7d8e041dab 100644 --- a/jina/resources/k8s/template/deployment-uses-before-after.yml +++ b/jina/resources/k8s/template/deployment-uses-before-after.yml @@ -50,12 +50,26 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - - readinessProbe: - tcpSocket: - port: {port} + startupProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port_uses} initialDelaySeconds: 5 - periodSeconds: 10 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} + initialDelaySeconds: 30 + periodSeconds: 5 + timeoutSeconds: 10 lifecycle: preStop: exec: @@ -85,8 +99,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - - readinessProbe: + startupProbe: exec: command: - jina @@ -94,7 +107,17 @@ spec: - executor - 127.0.0.1:{port_uses_before} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port_uses_before} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: @@ -125,7 +148,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - readinessProbe: + startupProbe: exec: command: - jina @@ -133,7 +156,17 @@ spec: - executor - 127.0.0.1:{port_uses_after} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port_uses_after} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: diff --git a/jina/resources/k8s/template/deployment-uses-before.yml b/jina/resources/k8s/template/deployment-uses-before.yml index 21bb6878f9896..c4631802395d9 100644 --- a/jina/resources/k8s/template/deployment-uses-before.yml +++ b/jina/resources/k8s/template/deployment-uses-before.yml @@ -50,11 +50,26 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - readinessProbe: - tcpSocket: - port: {port} + startupProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} initialDelaySeconds: 5 - periodSeconds: 10 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} + initialDelaySeconds: 30 + periodSeconds: 5 + timeoutSeconds: 10 lifecycle: preStop: exec: @@ -84,8 +99,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - - readinessProbe: + startupProbe: exec: command: - jina @@ -93,7 +107,17 @@ spec: - executor - 127.0.0.1:{port_uses_before} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port_uses_before} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: diff --git a/jina/resources/k8s/template/statefulset-executor.yml b/jina/resources/k8s/template/statefulset-executor.yml index d5809e8e00143..66ed11a9723c8 100644 --- a/jina/resources/k8s/template/statefulset-executor.yml +++ b/jina/resources/k8s/template/statefulset-executor.yml @@ -50,7 +50,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name - readinessProbe: + startupProbe: exec: command: - jina @@ -58,7 +58,17 @@ spec: - executor - 127.0.0.1:{port} initialDelaySeconds: 5 - periodSeconds: 20 + periodSeconds: 5 + timeoutSeconds: 10 + livenessProbe: + exec: + command: + - jina + - ping + - executor + - 127.0.0.1:{port} + initialDelaySeconds: 30 + periodSeconds: 5 timeoutSeconds: 10 lifecycle: preStop: diff --git a/jina/serve/networking.py b/jina/serve/networking.py index 17278c5bf2bd8..d752f40d6e652 100644 --- a/jina/serve/networking.py +++ b/jina/serve/networking.py @@ -975,6 +975,8 @@ async def _handle_aiorpcerror( error.code() != grpc.StatusCode.UNAVAILABLE and error.code() != grpc.StatusCode.CANCELLED and error.code() != grpc.StatusCode.DEADLINE_EXCEEDED + and error.code() != grpc.StatusCode.UNKNOWN + and error.code() != grpc.StatusCode.INTERNAL ): return error elif ( diff --git a/jina/serve/runtimes/asyncio.py b/jina/serve/runtimes/asyncio.py index 401642b49e6be..d49834e87a16a 100644 --- a/jina/serve/runtimes/asyncio.py +++ b/jina/serve/runtimes/asyncio.py @@ -161,34 +161,25 @@ async def async_run_forever(self): # Static methods used by the Pod to communicate with the `Runtime` in the separate process @staticmethod - def activate(**kwargs): - """ - Activate the runtime, does not apply to these runtimes - - :param kwargs: extra keyword arguments - """ - # does not apply to this types of runtimes - pass - - @staticmethod - def is_ready(ctrl_address: str, **kwargs) -> bool: + def is_ready(ctrl_address: str, timeout: float = 1.0, **kwargs) -> bool: """ Check if status is ready. :param ctrl_address: the address where the control request needs to be sent + :param timeout: timeout of the health check in seconds :param kwargs: extra keyword arguments :return: True if status is ready else False. """ - try: from grpc_health.v1 import health_pb2, health_pb2_grpc response = GrpcConnectionPool.send_health_check_sync( - ctrl_address, timeout=1.0 + ctrl_address, timeout=timeout + ) + return ( + response.status == health_pb2.HealthCheckResponse.ServingStatus.SERVING ) - # TODO: Get the proper value of the ServingStatus SERVING KEY - return response.status == 1 except RpcError: return False diff --git a/jina/serve/runtimes/gateway/__init__.py b/jina/serve/runtimes/gateway/__init__.py index bb9c5f3b7de40..8ce8b0fbb9e03 100644 --- a/jina/serve/runtimes/gateway/__init__.py +++ b/jina/serve/runtimes/gateway/__init__.py @@ -132,12 +132,18 @@ async def async_run_forever(self): self.is_cancel.set() @staticmethod - def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> bool: + def is_ready( + ctrl_address: str, + protocol: Optional[str] = 'grpc', + timeout: float = 1.0, + **kwargs, + ) -> bool: """ Check if status is ready. :param ctrl_address: the address where the control request needs to be sent :param protocol: protocol of the gateway runtime + :param timeout: timeout of grpc call in seconds :param kwargs: extra keyword arguments :return: True if status is ready else False. @@ -151,7 +157,9 @@ def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> b res = AsyncNewLoopRuntime.is_ready(ctrl_address) else: try: - conn = urllib.request.urlopen(url=f'http://{ctrl_address}') + conn = urllib.request.urlopen( + url=f'http://{ctrl_address}', timeout=timeout + ) res = conn.code == HTTPStatus.OK except: res = False diff --git a/jina/serve/runtimes/worker/__init__.py b/jina/serve/runtimes/worker/__init__.py index 0ee5900e82bb7..5964dbecdc468 100644 --- a/jina/serve/runtimes/worker/__init__.py +++ b/jina/serve/runtimes/worker/__init__.py @@ -313,3 +313,25 @@ async def _status(self, empty, context) -> jina_pb2.JinaInfoProto: for k, v in env_info.items(): info_proto.envs[k] = str(v) return info_proto + + async def Check( + self, request: health_pb2.HealthCheckRequest, context + ) -> health_pb2.HealthCheckResponse: + """Calls the underlying HealthServicer.Check method with the same arguments + :param request: grpc request + :param context: grpc request context + :returns: the grpc HealthCheckResponse + """ + self.logger.debug(f'Receive Check request') + return await self._health_servicer.Check(request, context) + + async def Watch( + self, request: health_pb2.HealthCheckRequest, context + ) -> health_pb2.HealthCheckResponse: + """Calls the underlying HealthServicer.Watch method with the same arguments + :param request: grpc request + :param context: grpc request context + :returns: the grpc HealthCheckResponse + """ + self.logger.debug(f'Receive Watch request') + return await self._health_servicer.Watch(request, context) diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py index f932c313cee95..5d49d6b10d422 100644 --- a/tests/k8s/test_k8s.py +++ b/tests/k8s/test_k8s.py @@ -279,11 +279,6 @@ async def test_flow_with_monitoring(logger, tmpdir, docker_images, port_generato flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) - from kubernetes import client - - api_client = client.ApiClient() - core_client = client.CoreV1Api(api_client=api_client) - app_client = client.AppsV1Api(api_client=api_client) await create_all_flow_deployments_and_wait_ready( dump_path, namespace=namespace, diff --git a/tests/k8s/test_k8s_failures.py b/tests/k8s/test_k8s_failures.py index 46532dfc12e61..ba4b7a9a23646 100644 --- a/tests/k8s/test_k8s_failures.py +++ b/tests/k8s/test_k8s_failures.py @@ -229,79 +229,6 @@ def inject_failures(k8s_cluster, logger): raise Exception(f"Injecting failures failed with {returncode}") -def watch_k8s_resources(namespace): - import os - import subprocess - import time - - while True: - pod_metadata = subprocess.check_output( - ( - 'kubectl', - '-n', - namespace, - 'get', - 'pods', - '-l', - 'app=executor0', - '-o', - "jsonpath=\"{$.items[*]['.metadata.name', '.metadata.uid']}\"", - ), - env=os.environ, - ) - print('pod metadata:') - print(pod_metadata.decode()) - endpoints = subprocess.check_output( - ( - 'kubectl', - '-n', - namespace, - 'get', - 'endpoints', - '-l', - 'app=executor0', - '-o', - "jsonpath=\"{$.items[*].subsets[*].addresses[*]['targetRef.name', 'ip']}\"", - ), - env=os.environ, - ) - print('endpoints:') - print(endpoints.decode()) - container_statuses = subprocess.check_output( - ( - 'kubectl', - '-n', - namespace, - 'get', - 'pods', - '-l', - 'app=executor0', - '-o', - "jsonpath=\"{$.items[*].status.containerStatuses[*].ready}\"", - ), - env=os.environ, - ) - print('container statuses:') - print(container_statuses.decode()) - print() - time.sleep(5) - - -def print_services(namespace): - services = subprocess.check_output( - ( - 'kubectl', - '-n', - namespace, - 'get', - 'service', - ), - env=os.environ, - ) - print('services:') - print(services.decode()) - - @pytest.mark.asyncio @pytest.mark.timeout(3600) @pytest.mark.parametrize( @@ -316,126 +243,115 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster): api_client = client.ApiClient() core_client = client.CoreV1Api(api_client=api_client) app_client = client.AppsV1Api(api_client=api_client) - k8s_endpoints_watcher = multiprocessing.Process( - target=watch_k8s_resources, args=(namespace,), daemon=True + flow = Flow(prefetch=100).add(replicas=2, uses=f'docker://{docker_images[0]}') + + dump_path = os.path.join(str(tmpdir), namespace) + flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) + + await create_all_flow_deployments_and_wait_ready( + dump_path, + namespace=namespace, + api_client=api_client, + app_client=app_client, + core_client=core_client, + deployment_replicas_expected={ + 'gateway': 1, + 'executor0': 2, + }, + logger=logger, ) - k8s_endpoints_watcher.start() - try: - flow = Flow(prefetch=100).add(replicas=2, uses=f'docker://{docker_images[0]}') - - dump_path = os.path.join(str(tmpdir), namespace) - flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace) - await create_all_flow_deployments_and_wait_ready( - dump_path, + stop_event = asyncio.Event() + send_task = asyncio.create_task( + run_test_until_event( + flow=flow, namespace=namespace, - api_client=api_client, - app_client=app_client, core_client=core_client, - deployment_replicas_expected={ - 'gateway': 1, - 'executor0': 2, - }, + endpoint='/', + stop_event=stop_event, logger=logger, ) - print_services(namespace) - - stop_event = asyncio.Event() - send_task = asyncio.create_task( - run_test_until_event( - flow=flow, - namespace=namespace, - core_client=core_client, - endpoint='/', - stop_event=stop_event, - logger=logger, - sleep_time=None, - ) - ) - logger.info(f' Sending task has been scheduled') - await asyncio.sleep(5.0) - # Scale down the Executor to 1 replicas - await scale( - deployment_name='executor0', - desired_replicas=1, - core_client=core_client, - app_client=app_client, - k8s_namespace=namespace, - logger=logger, - ) - logger.info(f' Scaling to 1 replicas has been done') - await asyncio.sleep(5.0) - # Scale back up to 2 replicas - await scale( - deployment_name='executor0', - desired_replicas=2, - core_client=core_client, - app_client=app_client, - k8s_namespace=namespace, - logger=logger, - ) - logger.info(f' Scaling to 2 replicas has been done') - await asyncio.sleep(5.0) - # restart all pods in the deployment - await restart_deployment( - deployment='executor0', - app_client=app_client, - core_client=core_client, - k8s_namespace=namespace, - logger=logger, - ) - logger.info(f' Restarting deployment has been done') - await asyncio.sleep(5.0) - await delete_pod( - deployment='executor0', + ) + logger.info(f' Sending task has been scheduled') + await asyncio.sleep(5.0) + # Scale down the Executor to 1 replicas + await scale( + deployment_name='executor0', + desired_replicas=1, + core_client=core_client, + app_client=app_client, + k8s_namespace=namespace, + logger=logger, + ) + logger.info(f' Scaling to 1 replicas has been done') + await asyncio.sleep(5.0) + # Scale back up to 2 replicas + await scale( + deployment_name='executor0', + desired_replicas=2, + core_client=core_client, + app_client=app_client, + k8s_namespace=namespace, + logger=logger, + ) + logger.info(f' Scaling to 2 replicas has been done') + await asyncio.sleep(5.0) + # restart all pods in the deployment + await restart_deployment( + deployment='executor0', + app_client=app_client, + core_client=core_client, + k8s_namespace=namespace, + logger=logger, + ) + logger.info(f' Restarting deployment has been done') + await asyncio.sleep(5.0) + await delete_pod( + deployment='executor0', + core_client=core_client, + k8s_namespace=namespace, + logger=logger, + ) + logger.info(f'Deleting pod has been done') + await asyncio.sleep(5.0) + + stop_event.set() + responses, sent_ids = await send_task + logger.info(f'Sending task has finished') + logger.info(f'Sending task has finished: {len(sent_ids)} vs {len(responses)}') + assert len(sent_ids) == len(responses) + doc_ids = set() + pod_ids = set() + logger.info(f'Collecting doc and pod ids from responses...') + assert len(sent_ids) == len(responses) + for response in responses: + doc = response.docs[0] + doc_id, pod_id = doc.id, doc.tags['replica_uid'] + doc_ids.add(doc_id) + pod_ids.add(pod_id) + assert len(sent_ids) == len(doc_ids) + logger.info(f'pod_ids {pod_ids}') + assert len(pod_ids) >= 2 # 2 original + 2 restarted + 1 scaled up + 1 deleted + + # do the random failure test + # start sending again + logger.info('Start sending for random failure test') + stop_event.clear() + send_task = asyncio.create_task( + run_test_until_event( + flow=flow, + namespace=namespace, core_client=core_client, - k8s_namespace=namespace, + endpoint='/', + stop_event=stop_event, logger=logger, ) - logger.info(f'Deleting pod has been done') - await asyncio.sleep(5.0) - - stop_event.set() - responses, sent_ids = await send_task - logger.info(f'Sending task has finished') - logger.info(f'Sending task has finished: {len(sent_ids)} vs {len(responses)}') - assert len(sent_ids) == len(responses) - doc_ids = set() - pod_ids = set() - logger.info(f'Collecting doc and pod ids from responses...') - assert len(sent_ids) == len(responses) - for response in responses: - doc = response.docs[0] - doc_id, pod_id = doc.id, doc.tags['replica_uid'] - doc_ids.add(doc_id) - pod_ids.add(pod_id) - assert len(sent_ids) == len(doc_ids) - logger.info(f'pod_ids {pod_ids}') - assert len(pod_ids) >= 2 # 2 original + 2 restarted + 1 scaled up + 1 deleted - - # do the random failure test - # start sending again - logger.info('Start sending for random failure test') - stop_event.clear() - send_task = asyncio.create_task( - run_test_until_event( - flow=flow, - namespace=namespace, - core_client=core_client, - endpoint='/', - stop_event=stop_event, - logger=logger, - ) - ) - # inject failures - inject_failures(k8s_cluster, logger) - # wait a bit - await asyncio.sleep(5.0) - # check that no message was lost - stop_event.set() - responses, sent_ids = await send_task - assert len(sent_ids) == len(responses) - except Exception as exc: - raise exc - finally: - k8s_endpoints_watcher.terminate() + ) + # inject failures + inject_failures(k8s_cluster, logger) + # wait a bit + await asyncio.sleep(5.0) + # check that no message was lost + stop_event.set() + responses, sent_ids = await send_task + assert len(sent_ids) == len(responses)