diff --git a/jina/checker.py b/jina/checker.py
index 7a396b1af8b74..de3d5caadaef6 100644
--- a/jina/checker.py
+++ b/jina/checker.py
@@ -31,15 +31,15 @@ def __init__(self, args: 'argparse.Namespace'):
                 ) as tc:
                     if args.target == 'executor':
                         hostname, port, protocol, _ = parse_host_scheme(args.host)
-                        r = WorkerRuntime.is_ready(f'{hostname}:{port}')
+                        r = WorkerRuntime.is_ready(ctrl_address=f'{hostname}:{port}')
                     elif args.target == 'gateway':
                         hostname, port, protocol, _ = parse_host_scheme(args.host)
                         r = GatewayRuntime.is_ready(
                             f'{hostname}:{port}',
-                            protocol=GatewayProtocolType.from_string(protocol),
+                            protocol=GatewayProtocolType.from_string(protocol)
                         )
                     elif args.target == 'flow':
-                        r = Client(host=args.host).is_flow_ready(timeout=args.timeout)
+                        r = Client(host=args.host).is_flow_ready(timeout=args.timeout / 1000)
                     if not r:
                         default_logger.warning(
                             'not responding, attempt (%d/%d) in 1s'
diff --git a/jina/clients/base/helper.py b/jina/clients/base/helper.py
index 5f1c8b07b04c1..710119d0b7272 100644
--- a/jina/clients/base/helper.py
+++ b/jina/clients/base/helper.py
@@ -155,7 +155,9 @@ async def send_dry_run(self, **kwargs):
         :param kwargs: keyword arguments to make sure compatible API with other clients
         :return: send get message
         """
-        return await self.session.get(url=self.url).__aenter__()
+        return await self.session.get(
+            url=self.url, timeout=kwargs.get('timeout', None)
+        ).__aenter__()
 
     async def recv_message(self):
         """Receive message for HTTP (sleep)
diff --git a/jina/resources/k8s/template/deployment-executor.yml b/jina/resources/k8s/template/deployment-executor.yml
index 4c03706a1ff01..b6f2642387f0c 100644
--- a/jina/resources/k8s/template/deployment-executor.yml
+++ b/jina/resources/k8s/template/deployment-executor.yml
@@ -50,7 +50,7 @@ spec:
             valueFrom:
               fieldRef:
                 fieldPath: metadata.name
-          readinessProbe:
+          startupProbe:
             exec:
               command:
                 - jina
@@ -58,7 +58,17 @@ spec:
                 - executor
                 - 127.0.0.1:{port}
             initialDelaySeconds: 5
-            periodSeconds: 20
+            periodSeconds: 5
+            timeoutSeconds: 10
+          livenessProbe:
+            exec:
+              command:
+                - jina
+                - ping
+                - executor
+                - 127.0.0.1:{port}
+            initialDelaySeconds: 30
+            periodSeconds: 5
             timeoutSeconds: 10
           lifecycle:
             preStop:
diff --git a/jina/resources/k8s/template/deployment-gateway.yml b/jina/resources/k8s/template/deployment-gateway.yml
index 2b39083ec8f21..4c860e62f7a12 100644
--- a/jina/resources/k8s/template/deployment-gateway.yml
+++ b/jina/resources/k8s/template/deployment-gateway.yml
@@ -49,7 +49,7 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.name
-          readinessProbe:
+          startupProbe:
             exec:
               command:
                 - jina
@@ -57,7 +57,17 @@ spec:
                 - gateway
                 - {protocol}://127.0.0.1:{port}
             initialDelaySeconds: 5
-            periodSeconds: 20
+            periodSeconds: 5
+            timeoutSeconds: 10
+          livenessProbe:
+            exec:
+              command:
+                - jina
+                - ping
+                - gateway
+                - {protocol}://127.0.0.1:{port}
+            initialDelaySeconds: 30
+            periodSeconds: 5
             timeoutSeconds: 10
           lifecycle:
             preStop:
diff --git a/jina/resources/k8s/template/deployment-uses-after.yml b/jina/resources/k8s/template/deployment-uses-after.yml
index 80ef94f7b6742..8fff1b977e182 100644
--- a/jina/resources/k8s/template/deployment-uses-after.yml
+++ b/jina/resources/k8s/template/deployment-uses-after.yml
@@ -50,12 +50,26 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-
-        readinessProbe:
-          tcpSocket:
-            port: {port}
+        startupProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port}
           initialDelaySeconds: 5
-          periodSeconds: 10
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port}
+          initialDelaySeconds: 30
+          periodSeconds: 5
+          timeoutSeconds: 10
         lifecycle:
           preStop:
             exec:
@@ -85,8 +99,7 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-
-        readinessProbe:
+        startupProbe:
           exec:
             command:
               - jina
@@ -94,7 +107,17 @@ spec:
               - executor
               - 127.0.0.1:{port_uses_after}
           initialDelaySeconds: 5
-          periodSeconds: 20
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port_uses_after}
+          initialDelaySeconds: 30
+          periodSeconds: 5
           timeoutSeconds: 10
         lifecycle:
           preStop:
diff --git a/jina/resources/k8s/template/deployment-uses-before-after.yml b/jina/resources/k8s/template/deployment-uses-before-after.yml
index b9a60b88e67fd..74a7d8e041dab 100644
--- a/jina/resources/k8s/template/deployment-uses-before-after.yml
+++ b/jina/resources/k8s/template/deployment-uses-before-after.yml
@@ -50,12 +50,26 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-
-        readinessProbe:
-          tcpSocket:
-            port: {port}
+        startupProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port_uses}
           initialDelaySeconds: 5
-          periodSeconds: 10
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port}
+          initialDelaySeconds: 30
+          periodSeconds: 5
+          timeoutSeconds: 10
         lifecycle:
           preStop:
             exec:
@@ -85,8 +99,7 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-
-        readinessProbe:
+        startupProbe:
           exec:
             command:
               - jina
@@ -94,7 +107,17 @@ spec:
               - executor
               - 127.0.0.1:{port_uses_before}
           initialDelaySeconds: 5
-          periodSeconds: 20
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port_uses_before}
+          initialDelaySeconds: 30
+          periodSeconds: 5
           timeoutSeconds: 10
         lifecycle:
           preStop:
@@ -125,7 +148,7 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-        readinessProbe:
+        startupProbe:
           exec:
             command:
               - jina
@@ -133,7 +156,17 @@ spec:
               - executor
               - 127.0.0.1:{port_uses_after}
           initialDelaySeconds: 5
-          periodSeconds: 20
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port_uses_after}
+          initialDelaySeconds: 30
+          periodSeconds: 5
           timeoutSeconds: 10
         lifecycle:
           preStop:
diff --git a/jina/resources/k8s/template/deployment-uses-before.yml b/jina/resources/k8s/template/deployment-uses-before.yml
index 21bb6878f9896..c4631802395d9 100644
--- a/jina/resources/k8s/template/deployment-uses-before.yml
+++ b/jina/resources/k8s/template/deployment-uses-before.yml
@@ -50,11 +50,26 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-        readinessProbe:
-          tcpSocket:
-            port: {port}
+        startupProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port}
           initialDelaySeconds: 5
-          periodSeconds: 10
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port}
+          initialDelaySeconds: 30
+          periodSeconds: 5
+          timeoutSeconds: 10
         lifecycle:
           preStop:
             exec:
@@ -84,8 +99,7 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.name
-
-        readinessProbe:
+        startupProbe:
           exec:
             command:
               - jina
@@ -93,7 +107,17 @@ spec:
               - executor
               - 127.0.0.1:{port_uses_before}
           initialDelaySeconds: 5
-          periodSeconds: 20
+          periodSeconds: 5
+          timeoutSeconds: 10
+        livenessProbe:
+          exec:
+            command:
+              - jina
+              - ping
+              - executor
+              - 127.0.0.1:{port_uses_before}
+          initialDelaySeconds: 30
+          periodSeconds: 5
           timeoutSeconds: 10
         lifecycle:
           preStop:
diff --git a/jina/resources/k8s/template/statefulset-executor.yml b/jina/resources/k8s/template/statefulset-executor.yml
index d5809e8e00143..66ed11a9723c8 100644
--- a/jina/resources/k8s/template/statefulset-executor.yml
+++ b/jina/resources/k8s/template/statefulset-executor.yml
@@ -50,7 +50,7 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.name
-          readinessProbe:
+          startupProbe:
             exec:
               command:
                 - jina
@@ -58,7 +58,17 @@ spec:
                 - executor
                 - 127.0.0.1:{port}
             initialDelaySeconds: 5
-            periodSeconds: 20
+            periodSeconds: 5
+            timeoutSeconds: 10
+          livenessProbe:
+            exec:
+              command:
+                - jina
+                - ping
+                - executor
+                - 127.0.0.1:{port}
+            initialDelaySeconds: 30
+            periodSeconds: 5
             timeoutSeconds: 10
           lifecycle:
             preStop:
diff --git a/jina/serve/networking.py b/jina/serve/networking.py
index 17278c5bf2bd8..d752f40d6e652 100644
--- a/jina/serve/networking.py
+++ b/jina/serve/networking.py
@@ -975,6 +975,8 @@ async def _handle_aiorpcerror(
             error.code() != grpc.StatusCode.UNAVAILABLE
             and error.code() != grpc.StatusCode.CANCELLED
             and error.code() != grpc.StatusCode.DEADLINE_EXCEEDED
+            and error.code() != grpc.StatusCode.UNKNOWN
+            and error.code() != grpc.StatusCode.INTERNAL
         ):
             return error
         elif (
diff --git a/jina/serve/runtimes/asyncio.py b/jina/serve/runtimes/asyncio.py
index 401642b49e6be..d49834e87a16a 100644
--- a/jina/serve/runtimes/asyncio.py
+++ b/jina/serve/runtimes/asyncio.py
@@ -161,34 +161,25 @@ async def async_run_forever(self):
     # Static methods used by the Pod to communicate with the `Runtime` in the separate process
 
     @staticmethod
-    def activate(**kwargs):
-        """
-        Activate the runtime, does not apply to these runtimes
-
-        :param kwargs: extra keyword arguments
-        """
-        # does not apply to this types of runtimes
-        pass
-
-    @staticmethod
-    def is_ready(ctrl_address: str, **kwargs) -> bool:
+    def is_ready(ctrl_address: str, timeout: float = 1.0, **kwargs) -> bool:
         """
         Check if status is ready.
 
         :param ctrl_address: the address where the control request needs to be sent
+        :param timeout: timeout of the health check in seconds
         :param kwargs: extra keyword arguments
 
         :return: True if status is ready else False.
         """
-
         try:
             from grpc_health.v1 import health_pb2, health_pb2_grpc
 
             response = GrpcConnectionPool.send_health_check_sync(
-                ctrl_address, timeout=1.0
+                ctrl_address, timeout=timeout
+            )
+            return (
+                response.status == health_pb2.HealthCheckResponse.ServingStatus.SERVING
             )
-            # TODO: Get the proper value of the ServingStatus SERVING KEY
-            return response.status == 1
         except RpcError:
             return False
 
diff --git a/jina/serve/runtimes/gateway/__init__.py b/jina/serve/runtimes/gateway/__init__.py
index bb9c5f3b7de40..8ce8b0fbb9e03 100644
--- a/jina/serve/runtimes/gateway/__init__.py
+++ b/jina/serve/runtimes/gateway/__init__.py
@@ -132,12 +132,18 @@ async def async_run_forever(self):
         self.is_cancel.set()
 
     @staticmethod
-    def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> bool:
+    def is_ready(
+        ctrl_address: str,
+        protocol: Optional[str] = 'grpc',
+        timeout: float = 1.0,
+        **kwargs,
+    ) -> bool:
         """
         Check if status is ready.
 
         :param ctrl_address: the address where the control request needs to be sent
         :param protocol: protocol of the gateway runtime
+        :param timeout: timeout of grpc call in seconds
         :param kwargs: extra keyword arguments
 
         :return: True if status is ready else False.
@@ -151,7 +157,9 @@ def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> b
             res = AsyncNewLoopRuntime.is_ready(ctrl_address)
         else:
             try:
-                conn = urllib.request.urlopen(url=f'http://{ctrl_address}')
+                conn = urllib.request.urlopen(
+                    url=f'http://{ctrl_address}', timeout=timeout
+                )
                 res = conn.code == HTTPStatus.OK
             except:
                 res = False
diff --git a/jina/serve/runtimes/worker/__init__.py b/jina/serve/runtimes/worker/__init__.py
index 0ee5900e82bb7..5964dbecdc468 100644
--- a/jina/serve/runtimes/worker/__init__.py
+++ b/jina/serve/runtimes/worker/__init__.py
@@ -313,3 +313,25 @@ async def _status(self, empty, context) -> jina_pb2.JinaInfoProto:
         for k, v in env_info.items():
             info_proto.envs[k] = str(v)
         return info_proto
+
+    async def Check(
+        self, request: health_pb2.HealthCheckRequest, context
+    ) -> health_pb2.HealthCheckResponse:
+        """Calls the underlying HealthServicer.Check method with the same arguments
+        :param request: grpc request
+        :param context: grpc request context
+        :returns: the grpc HealthCheckResponse
+        """
+        self.logger.debug(f'Receive Check request')
+        return await self._health_servicer.Check(request, context)
+
+    async def Watch(
+        self, request: health_pb2.HealthCheckRequest, context
+    ) -> health_pb2.HealthCheckResponse:
+        """Calls the underlying HealthServicer.Watch method with the same arguments
+        :param request: grpc request
+        :param context: grpc request context
+        :returns: the grpc HealthCheckResponse
+        """
+        self.logger.debug(f'Receive Watch request')
+        return await self._health_servicer.Watch(request, context)
diff --git a/tests/k8s/test_k8s.py b/tests/k8s/test_k8s.py
index f932c313cee95..5d49d6b10d422 100644
--- a/tests/k8s/test_k8s.py
+++ b/tests/k8s/test_k8s.py
@@ -279,11 +279,6 @@ async def test_flow_with_monitoring(logger, tmpdir, docker_images, port_generato
 
         flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace)
 
-        from kubernetes import client
-
-        api_client = client.ApiClient()
-        core_client = client.CoreV1Api(api_client=api_client)
-        app_client = client.AppsV1Api(api_client=api_client)
         await create_all_flow_deployments_and_wait_ready(
             dump_path,
             namespace=namespace,
diff --git a/tests/k8s/test_k8s_failures.py b/tests/k8s/test_k8s_failures.py
index 46532dfc12e61..ba4b7a9a23646 100644
--- a/tests/k8s/test_k8s_failures.py
+++ b/tests/k8s/test_k8s_failures.py
@@ -229,79 +229,6 @@ def inject_failures(k8s_cluster, logger):
         raise Exception(f"Injecting failures failed with {returncode}")
 
 
-def watch_k8s_resources(namespace):
-    import os
-    import subprocess
-    import time
-
-    while True:
-        pod_metadata = subprocess.check_output(
-            (
-                'kubectl',
-                '-n',
-                namespace,
-                'get',
-                'pods',
-                '-l',
-                'app=executor0',
-                '-o',
-                "jsonpath=\"{$.items[*]['.metadata.name', '.metadata.uid']}\"",
-            ),
-            env=os.environ,
-        )
-        print('pod metadata:')
-        print(pod_metadata.decode())
-        endpoints = subprocess.check_output(
-            (
-                'kubectl',
-                '-n',
-                namespace,
-                'get',
-                'endpoints',
-                '-l',
-                'app=executor0',
-                '-o',
-                "jsonpath=\"{$.items[*].subsets[*].addresses[*]['targetRef.name', 'ip']}\"",
-            ),
-            env=os.environ,
-        )
-        print('endpoints:')
-        print(endpoints.decode())
-        container_statuses = subprocess.check_output(
-            (
-                'kubectl',
-                '-n',
-                namespace,
-                'get',
-                'pods',
-                '-l',
-                'app=executor0',
-                '-o',
-                "jsonpath=\"{$.items[*].status.containerStatuses[*].ready}\"",
-            ),
-            env=os.environ,
-        )
-        print('container statuses:')
-        print(container_statuses.decode())
-        print()
-        time.sleep(5)
-
-
-def print_services(namespace):
-    services = subprocess.check_output(
-        (
-            'kubectl',
-            '-n',
-            namespace,
-            'get',
-            'service',
-        ),
-        env=os.environ,
-    )
-    print('services:')
-    print(services.decode())
-
-
 @pytest.mark.asyncio
 @pytest.mark.timeout(3600)
 @pytest.mark.parametrize(
@@ -316,126 +243,115 @@ async def test_failure_scenarios(logger, docker_images, tmpdir, k8s_cluster):
     api_client = client.ApiClient()
     core_client = client.CoreV1Api(api_client=api_client)
     app_client = client.AppsV1Api(api_client=api_client)
-    k8s_endpoints_watcher = multiprocessing.Process(
-        target=watch_k8s_resources, args=(namespace,), daemon=True
+    flow = Flow(prefetch=100).add(replicas=2, uses=f'docker://{docker_images[0]}')
+
+    dump_path = os.path.join(str(tmpdir), namespace)
+    flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace)
+
+    await create_all_flow_deployments_and_wait_ready(
+        dump_path,
+        namespace=namespace,
+        api_client=api_client,
+        app_client=app_client,
+        core_client=core_client,
+        deployment_replicas_expected={
+            'gateway': 1,
+            'executor0': 2,
+        },
+        logger=logger,
     )
-    k8s_endpoints_watcher.start()
-    try:
-        flow = Flow(prefetch=100).add(replicas=2, uses=f'docker://{docker_images[0]}')
-
-        dump_path = os.path.join(str(tmpdir), namespace)
-        flow.to_kubernetes_yaml(dump_path, k8s_namespace=namespace)
 
-        await create_all_flow_deployments_and_wait_ready(
-            dump_path,
+    stop_event = asyncio.Event()
+    send_task = asyncio.create_task(
+        run_test_until_event(
+            flow=flow,
             namespace=namespace,
-            api_client=api_client,
-            app_client=app_client,
             core_client=core_client,
-            deployment_replicas_expected={
-                'gateway': 1,
-                'executor0': 2,
-            },
+            endpoint='/',
+            stop_event=stop_event,
             logger=logger,
         )
-        print_services(namespace)
-
-        stop_event = asyncio.Event()
-        send_task = asyncio.create_task(
-            run_test_until_event(
-                flow=flow,
-                namespace=namespace,
-                core_client=core_client,
-                endpoint='/',
-                stop_event=stop_event,
-                logger=logger,
-                sleep_time=None,
-            )
-        )
-        logger.info(f' Sending task has been scheduled')
-        await asyncio.sleep(5.0)
-        # Scale down the Executor to 1 replicas
-        await scale(
-            deployment_name='executor0',
-            desired_replicas=1,
-            core_client=core_client,
-            app_client=app_client,
-            k8s_namespace=namespace,
-            logger=logger,
-        )
-        logger.info(f' Scaling to 1 replicas has been done')
-        await asyncio.sleep(5.0)
-        # Scale back up to 2 replicas
-        await scale(
-            deployment_name='executor0',
-            desired_replicas=2,
-            core_client=core_client,
-            app_client=app_client,
-            k8s_namespace=namespace,
-            logger=logger,
-        )
-        logger.info(f' Scaling to 2 replicas has been done')
-        await asyncio.sleep(5.0)
-        # restart all pods in the deployment
-        await restart_deployment(
-            deployment='executor0',
-            app_client=app_client,
-            core_client=core_client,
-            k8s_namespace=namespace,
-            logger=logger,
-        )
-        logger.info(f' Restarting deployment has been done')
-        await asyncio.sleep(5.0)
-        await delete_pod(
-            deployment='executor0',
+    )
+    logger.info(f' Sending task has been scheduled')
+    await asyncio.sleep(5.0)
+    # Scale down the Executor to 1 replicas
+    await scale(
+        deployment_name='executor0',
+        desired_replicas=1,
+        core_client=core_client,
+        app_client=app_client,
+        k8s_namespace=namespace,
+        logger=logger,
+    )
+    logger.info(f' Scaling to 1 replicas has been done')
+    await asyncio.sleep(5.0)
+    # Scale back up to 2 replicas
+    await scale(
+        deployment_name='executor0',
+        desired_replicas=2,
+        core_client=core_client,
+        app_client=app_client,
+        k8s_namespace=namespace,
+        logger=logger,
+    )
+    logger.info(f' Scaling to 2 replicas has been done')
+    await asyncio.sleep(5.0)
+    # restart all pods in the deployment
+    await restart_deployment(
+        deployment='executor0',
+        app_client=app_client,
+        core_client=core_client,
+        k8s_namespace=namespace,
+        logger=logger,
+    )
+    logger.info(f' Restarting deployment has been done')
+    await asyncio.sleep(5.0)
+    await delete_pod(
+        deployment='executor0',
+        core_client=core_client,
+        k8s_namespace=namespace,
+        logger=logger,
+    )
+    logger.info(f'Deleting pod has been done')
+    await asyncio.sleep(5.0)
+
+    stop_event.set()
+    responses, sent_ids = await send_task
+    logger.info(f'Sending task has finished')
+    logger.info(f'Sending task has finished: {len(sent_ids)} vs {len(responses)}')
+    assert len(sent_ids) == len(responses)
+    doc_ids = set()
+    pod_ids = set()
+    logger.info(f'Collecting doc and pod ids from responses...')
+    assert len(sent_ids) == len(responses)
+    for response in responses:
+        doc = response.docs[0]
+        doc_id, pod_id = doc.id, doc.tags['replica_uid']
+        doc_ids.add(doc_id)
+        pod_ids.add(pod_id)
+    assert len(sent_ids) == len(doc_ids)
+    logger.info(f'pod_ids {pod_ids}')
+    assert len(pod_ids) >= 2  # 2 original + 2 restarted + 1 scaled up + 1 deleted
+
+    # do the random failure test
+    # start sending again
+    logger.info('Start sending for random failure test')
+    stop_event.clear()
+    send_task = asyncio.create_task(
+        run_test_until_event(
+            flow=flow,
+            namespace=namespace,
             core_client=core_client,
-            k8s_namespace=namespace,
+            endpoint='/',
+            stop_event=stop_event,
             logger=logger,
         )
-        logger.info(f'Deleting pod has been done')
-        await asyncio.sleep(5.0)
-
-        stop_event.set()
-        responses, sent_ids = await send_task
-        logger.info(f'Sending task has finished')
-        logger.info(f'Sending task has finished: {len(sent_ids)} vs {len(responses)}')
-        assert len(sent_ids) == len(responses)
-        doc_ids = set()
-        pod_ids = set()
-        logger.info(f'Collecting doc and pod ids from responses...')
-        assert len(sent_ids) == len(responses)
-        for response in responses:
-            doc = response.docs[0]
-            doc_id, pod_id = doc.id, doc.tags['replica_uid']
-            doc_ids.add(doc_id)
-            pod_ids.add(pod_id)
-        assert len(sent_ids) == len(doc_ids)
-        logger.info(f'pod_ids {pod_ids}')
-        assert len(pod_ids) >= 2  # 2 original + 2 restarted + 1 scaled up + 1 deleted
-
-        # do the random failure test
-        # start sending again
-        logger.info('Start sending for random failure test')
-        stop_event.clear()
-        send_task = asyncio.create_task(
-            run_test_until_event(
-                flow=flow,
-                namespace=namespace,
-                core_client=core_client,
-                endpoint='/',
-                stop_event=stop_event,
-                logger=logger,
-            )
-        )
-        # inject failures
-        inject_failures(k8s_cluster, logger)
-        # wait a bit
-        await asyncio.sleep(5.0)
-        # check that no message was lost
-        stop_event.set()
-        responses, sent_ids = await send_task
-        assert len(sent_ids) == len(responses)
-    except Exception as exc:
-        raise exc
-    finally:
-        k8s_endpoints_watcher.terminate()
+    )
+    # inject failures
+    inject_failures(k8s_cluster, logger)
+    # wait a bit
+    await asyncio.sleep(5.0)
+    # check that no message was lost
+    stop_event.set()
+    responses, sent_ids = await send_task
+    assert len(sent_ids) == len(responses)