From 4a6b8114f57756ac084fe17227e122ed4106f9d0 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Sat, 11 May 2024 21:29:39 +0200 Subject: [PATCH 1/3] [Monitor] Reduce localhost monitoring_interval and wait_dur_sec --- lithops/constants.py | 7 +++-- lithops/executors.py | 10 +++---- lithops/localhost/v2/localhost.py | 18 +++--------- lithops/monitor.py | 26 +++++++++++------ lithops/tests/test_map_reduce.py | 46 ++++++++++++++++++++----------- lithops/wait.py | 11 ++++---- 6 files changed, 67 insertions(+), 51 deletions(-) diff --git a/lithops/constants.py b/lithops/constants.py index b74adcd00..55606ee44 100644 --- a/lithops/constants.py +++ b/lithops/constants.py @@ -36,7 +36,8 @@ MODE_DEFAULT = SERVERLESS MONITORING_DEFAULT = 'storage' -MONITORING_INTERVAL = 2 +MONITORING_INTERVAL = 2 # seconds +MONITORING_INTERVAL_LH = 0.1 # seconds SERVERLESS_BACKEND_DEFAULT = 'aws_lambda' STANDALONE_BACKEND_DEFAULT = 'aws_ec2' @@ -47,8 +48,8 @@ LOGS_PREFIX = "lithops.logs" RUNTIMES_PREFIX = "lithops.runtimes" -EXECUTION_TIMEOUT_DEFAULT = 1800 -EXECUTION_TIMEOUT_LOCALHOST_DEFAULT = 3600 +EXECUTION_TIMEOUT_DEFAULT = 1800 # seconds +EXECUTION_TIMEOUT_LOCALHOST_DEFAULT = 3600 # seconds LOCALHOST_RUNTIME_DEFAULT = os.path.basename(sys.executable) LOCALHOST_SERVICE_IDLE_TIMEOUT = 3 diff --git a/lithops/executors.py b/lithops/executors.py index b7511af6f..e42e33cf7 100644 --- a/lithops/executors.py +++ b/lithops/executors.py @@ -30,7 +30,7 @@ from lithops.future import ResponseFuture from lithops.invokers import create_invoker from lithops.storage import InternalStorage -from lithops.wait import wait, ALL_COMPLETED, THREADPOOL_SIZE, WAIT_DUR_SEC, ALWAYS +from lithops.wait import wait, ALL_COMPLETED, THREADPOOL_SIZE, ALWAYS from lithops.job import create_map_job, create_reduce_job from lithops.config import default_config, \ extract_localhost_config, extract_standalone_config, \ @@ -399,7 +399,7 @@ def wait( download_results: Optional[bool] = False, timeout: Optional[int] = None, threadpool_size: Optional[int] = THREADPOOL_SIZE, - wait_dur_sec: Optional[int] = WAIT_DUR_SEC, + wait_dur_sec: Optional[int] = None, show_progressbar: Optional[bool] = True ) -> Tuple[FuturesList, FuturesList]: """ @@ -416,7 +416,7 @@ def wait( :param download_results: Download results. Default false (Only get statuses) :param timeout: Timeout of waiting for results :param threadpool_size: Number of threads to use. Default 64 - :param wait_dur_sec: Time interval between each check + :param wait_dur_sec: Time interval between each check. Default 1 second :param show_progressbar: whether or not to show the progress bar. :return: `(fs_done, fs_notdone)` where `fs_done` is a list of futures that have @@ -470,7 +470,7 @@ def get_result( throw_except: Optional[bool] = True, timeout: Optional[int] = None, threadpool_size: Optional[int] = THREADPOOL_SIZE, - wait_dur_sec: Optional[int] = WAIT_DUR_SEC, + wait_dur_sec: Optional[int] = None, show_progressbar: Optional[bool] = True ): """ @@ -480,7 +480,7 @@ def get_result( :param throw_except: Reraise exception if call raised. Default True. :param timeout: Timeout for waiting for results. :param threadpool_size: Number of threads to use. Default 128 - :param wait_dur_sec: Time interval between each check. + :param wait_dur_sec: Time interval between each check. Default 1 second :param show_progressbar: whether or not to show the progress bar. :return: The result of the future/s diff --git a/lithops/localhost/v2/localhost.py b/lithops/localhost/v2/localhost.py index b57224921..1f2790fa9 100644 --- a/lithops/localhost/v2/localhost.py +++ b/lithops/localhost/v2/localhost.py @@ -33,7 +33,6 @@ from lithops.constants import ( JOBS_DIR, LOCALHOST_RUNTIME_DEFAULT, - RN_LOG_FILE, TEMP_DIR, LITHOPS_TEMP_DIR, COMPUTE_CLI_MSG, @@ -299,8 +298,7 @@ def run_task(self, job_key, call_id): task_filename = os.path.join(JOBS_DIR, job_key, call_id + '.task') cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename] - log = open(RN_LOG_FILE, 'a') - process = sp.Popen(cmd, stdout=log, stderr=log, start_new_session=True) + process = sp.Popen(cmd, start_new_session=True) self.task_processes[job_key_call_id] = process process.communicate() # blocks until the process finishes del self.task_processes[job_key_call_id] @@ -394,12 +392,8 @@ def start(self): cmd += f'--rm -v {tmp_path}:/tmp -it --detach ' cmd += f'--entrypoint=/bin/bash {self.runtime_name}' - log = open(RN_LOG_FILE, 'a') - self.container_process = sp.Popen( - shlex.split(cmd), stdout=log, - stderr=log, start_new_session=True - ) - self.container_process.communicate() + self.container_process = sp.Popen(shlex.split(cmd), start_new_session=True) + self.container_process.communicate() # blocks until the process finishes super().start() @@ -415,11 +409,7 @@ def run_task(self, job_key, call_id): cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py ' cmd += f'run_job {docker_task_filename}"' - log = open(RN_LOG_FILE, 'a') - process = sp.Popen( - shlex.split(cmd), stdout=log, - stderr=log, start_new_session=True - ) + process = sp.Popen(shlex.split(cmd), start_new_session=True) self.task_processes[job_key_call_id] = process process.communicate() # blocks until the process finishes del self.task_processes[job_key_call_id] diff --git a/lithops/monitor.py b/lithops/monitor.py index a81bf28ea..e04e7c32c 100644 --- a/lithops/monitor.py +++ b/lithops/monitor.py @@ -25,7 +25,10 @@ import threading import concurrent.futures as cf from tblib import pickling_support -from lithops.constants import MONITORING_INTERVAL +from lithops.constants import ( + MONITORING_INTERVAL, + MONITORING_INTERVAL_LH +) pickling_support.install() @@ -464,23 +467,30 @@ def __init__(self, executor_id, internal_storage, config=None): self.executor_id = executor_id self.internal_storage = internal_storage self.config = config - self.backend = self.config['lithops']['monitoring'].lower() if config else 'storage' + self.backend_type = self.config['lithops']['monitoring'].lower() if config else 'storage' + self.storage_backend = self.internal_storage.backend self.token_bucket_q = queue.Queue() self.monitor = None self.job_chunksize = {} self.MonitorClass = getattr( lithops.monitor, - f'{self.backend.capitalize()}Monitor' + f'{self.backend_type.capitalize()}Monitor' ) def start(self, fs, job_id=None, chunksize=None, generate_tokens=False): - if self.backend == 'storage': - mi = self.config['lithops'].get('monitoring_interval', MONITORING_INTERVAL) \ - if self.config else MONITORING_INTERVAL - bk_config = {'monitoring_interval': mi} + if self.backend_type == 'storage': + monitoring_interval = None + if self.config and 'lithops' in self.config: + monitoring_interval = self.config['lithops'].get('monitoring_interval') + if not monitoring_interval: + if self.storage_backend == 'localhost': + monitoring_interval = MONITORING_INTERVAL_LH + else: + monitoring_interval = MONITORING_INTERVAL + bk_config = {'monitoring_interval': monitoring_interval} else: - bk_config = self.config.get(self.backend) + bk_config = self.config.get(self.backend_type) if job_id: self.job_chunksize[job_id] = chunksize diff --git a/lithops/tests/test_map_reduce.py b/lithops/tests/test_map_reduce.py index 6bc39c1c6..dd3af1be7 100644 --- a/lithops/tests/test_map_reduce.py +++ b/lithops/tests/test_map_reduce.py @@ -114,24 +114,21 @@ def test_url_processing(self): result = fexec.get_result() assert result == self.words_in_files - def test_chunks_bucket(self): + def test_bucket_chunk_size(self): """tests the ability to create a separate function invocation based on the following parameters: chunk_size creates [file_size//chunk_size] - invocations to process each chunk_size bytes, of a given object. chunk_number - creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each. + invocations to process each chunk_size bytes, of a given object. """ - - logger.info('Testing chunks on a bucket') OBJ_CHUNK_SIZE = 1 * 800 ** 2 # create a new invocation - OBJ_CHUNK_NUMBER = 2 activations = 0 data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/' fexec = lithops.FunctionExecutor(config=pytest.lithops_config) - futures = fexec.map_reduce(my_map_function_obj, data_prefix, - my_reduce_function, - obj_chunk_size=OBJ_CHUNK_SIZE) + futures = fexec.map_reduce( + my_map_function_obj, data_prefix, + my_reduce_function, obj_chunk_size=OBJ_CHUNK_SIZE + ) result = fexec.get_result(futures) assert result == self.words_in_files @@ -140,27 +137,34 @@ def test_chunks_bucket(self): assert len(futures) == activations + 1 # +1 due to the reduce function + def test_bucket_chunk_number(self): + """tests the ability to create a separate function invocation + based on the following parameters: chunk_number + creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each. + """ + OBJ_CHUNK_NUMBER = 2 + + data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/' + fexec = lithops.FunctionExecutor(config=pytest.lithops_config) - futures = fexec.map_reduce(my_map_function_obj, data_prefix, - my_reduce_function, obj_chunk_number=OBJ_CHUNK_NUMBER) + futures = fexec.map_reduce( + my_map_function_obj, data_prefix, + my_reduce_function, obj_chunk_number=OBJ_CHUNK_NUMBER + ) result = fexec.get_result(futures) assert result == self.words_in_files assert len(futures) == len(TEST_FILES_URLS) * OBJ_CHUNK_NUMBER + 1 - def test_chunks_bucket_one_reducer_per_object(self): + def test_bucket_chunk_size_one_reducer_per_object(self): """tests the ability to create a separate function invocation based on the following parameters, as well as create a separate invocation of a reduce function for each object: chunk_size creates [file_size//chunk_size] invocations to process each chunk_size bytes, of a given object. hunk_number creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each. """ - - logger.info('Testing chunks on a bucket with one reducer per object') OBJ_CHUNK_SIZE = 1 * 1024 ** 2 - OBJ_CHUNK_NUMBER = 2 activations = 0 - data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/' fexec = lithops.FunctionExecutor(config=pytest.lithops_config) @@ -179,6 +183,16 @@ def test_chunks_bucket_one_reducer_per_object(self): # + len(TEST_FILES_URLS) due to map_reduce activation per object assert len(futures) == activations + len(TEST_FILES_URLS) + def test_bucket_chunk_number_one_reducer_per_object(self): + """tests the ability to create a separate function invocation based + on the following parameters, as well as create a separate invocation + of a reduce function for each object: chunk_size creates [file_size//chunk_size] + invocations to process each chunk_size bytes, of a given object. hunk_number + creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each. + """ + OBJ_CHUNK_NUMBER = 2 + data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/' + fexec = lithops.FunctionExecutor(config=pytest.lithops_config) futures = fexec.map_reduce( my_map_function_obj, diff --git a/lithops/wait.py b/lithops/wait.py index 97213097b..d63885f26 100644 --- a/lithops/wait.py +++ b/lithops/wait.py @@ -49,7 +49,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]], download_results: Optional[bool] = False, timeout: Optional[int] = None, threadpool_size: Optional[int] = THREADPOOL_SIZE, - wait_dur_sec: Optional[int] = WAIT_DUR_SEC, + wait_dur_sec: Optional[int] = None, show_progressbar: Optional[bool] = True, futures_from_executor_wait: Optional[bool] = False) -> Tuple[FuturesList, FuturesList]: """ @@ -68,7 +68,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]], :param download_results: Download results. Default false (Only get statuses) :param timeout: Timeout of waiting for results. :param threadpool_size: Number of threads to use. Default 64 - :param wait_dur_sec: Time interval between each check. + :param wait_dur_sec: Time interval between each check. Default 1 second :param show_progressbar: whether or not to show the progress bar. :return: `(fs_done, fs_notdone)` @@ -131,7 +131,8 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]], internal_storage=executor_data.internal_storage) job_monitor.start(fs=executor_data.futures) - sleep_sec = wait_dur_sec if job_monitor.backend == 'storage' else 0.3 + sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.backend_type == 'storage' \ + and job_monitor.storage_backend != 'localhost' else 0.1 if return_when == ALWAYS: for executor_data in executors_data: @@ -186,7 +187,7 @@ def get_result(fs: Optional[Union[ResponseFuture, FuturesList, List[ResponseFutu throw_except: Optional[bool] = True, timeout: Optional[int] = None, threadpool_size: Optional[int] = THREADPOOL_SIZE, - wait_dur_sec: Optional[int] = WAIT_DUR_SEC, + wait_dur_sec: Optional[int] = None, show_progressbar: Optional[bool] = True): """ For getting the results from all function activations @@ -196,7 +197,7 @@ def get_result(fs: Optional[Union[ResponseFuture, FuturesList, List[ResponseFutu :param throw_except: Reraise exception if call raised. Default True. :param timeout: Timeout for waiting for results. :param threadpool_size: Number of threads to use. Default 128 - :param wait_dur_sec: Time interval between each check. + :param wait_dur_sec: Time interval between each check. Default 1 second :param show_progressbar: whether or not to show the progress bar. :return: The result of the future/s From 7d8ce5da7962b60dcdefa344f17d45f27eef8cb3 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Sat, 11 May 2024 21:40:43 +0200 Subject: [PATCH 2/3] Update test map reduce --- .github/workflows/on_pull_request.yml | 1 + lithops/tests/test_map_reduce.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index 352de13f9..5001d1b75 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -15,6 +15,7 @@ jobs: localhost_tests: runs-on: ubuntu-22.04 + timeout-minutes: 5 strategy: fail-fast: false matrix: diff --git a/lithops/tests/test_map_reduce.py b/lithops/tests/test_map_reduce.py index dd3af1be7..279508eca 100644 --- a/lithops/tests/test_map_reduce.py +++ b/lithops/tests/test_map_reduce.py @@ -117,7 +117,7 @@ def test_url_processing(self): def test_bucket_chunk_size(self): """tests the ability to create a separate function invocation based on the following parameters: chunk_size creates [file_size//chunk_size] - invocations to process each chunk_size bytes, of a given object. + invocations to process each chunk_size bytes, of a given object. """ OBJ_CHUNK_SIZE = 1 * 800 ** 2 # create a new invocation activations = 0 From d75e342fbf0210a797c1b1ab94b19cf7ac422944 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Sat, 11 May 2024 21:54:59 +0200 Subject: [PATCH 3/3] Update localhost start_manager position --- lithops/localhost/v1/localhost.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lithops/localhost/v1/localhost.py b/lithops/localhost/v1/localhost.py index e30354cf0..a1c55504a 100644 --- a/lithops/localhost/v1/localhost.py +++ b/lithops/localhost/v1/localhost.py @@ -134,12 +134,11 @@ def invoke(self, job_payload): logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Putting job into localhost queue') - self.start_manager() - job_filename = self.env.prepare_job_file(job_payload) - self.job_queue.put((job_payload, job_filename)) + self.start_manager() + def get_runtime_key(self, runtime_name, *args): """ Generate the runtime key that identifies the runtime