Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitor] Reduce localhost monitoring_interval and wait_dur_sec #1345

Merged
merged 3 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/on_pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:

localhost_tests:
runs-on: ubuntu-22.04
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
Expand Down
7 changes: 4 additions & 3 deletions lithops/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
MODE_DEFAULT = SERVERLESS

MONITORING_DEFAULT = 'storage'
MONITORING_INTERVAL = 2
MONITORING_INTERVAL = 2 # seconds
MONITORING_INTERVAL_LH = 0.1 # seconds

SERVERLESS_BACKEND_DEFAULT = 'aws_lambda'
STANDALONE_BACKEND_DEFAULT = 'aws_ec2'
Expand All @@ -47,8 +48,8 @@
LOGS_PREFIX = "lithops.logs"
RUNTIMES_PREFIX = "lithops.runtimes"

EXECUTION_TIMEOUT_DEFAULT = 1800
EXECUTION_TIMEOUT_LOCALHOST_DEFAULT = 3600
EXECUTION_TIMEOUT_DEFAULT = 1800 # seconds
EXECUTION_TIMEOUT_LOCALHOST_DEFAULT = 3600 # seconds

LOCALHOST_RUNTIME_DEFAULT = os.path.basename(sys.executable)
LOCALHOST_SERVICE_IDLE_TIMEOUT = 3
Expand Down
10 changes: 5 additions & 5 deletions lithops/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from lithops.future import ResponseFuture
from lithops.invokers import create_invoker
from lithops.storage import InternalStorage
from lithops.wait import wait, ALL_COMPLETED, THREADPOOL_SIZE, WAIT_DUR_SEC, ALWAYS
from lithops.wait import wait, ALL_COMPLETED, THREADPOOL_SIZE, ALWAYS
from lithops.job import create_map_job, create_reduce_job
from lithops.config import default_config, \
extract_localhost_config, extract_standalone_config, \
Expand Down Expand Up @@ -399,7 +399,7 @@ def wait(
download_results: Optional[bool] = False,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True
) -> Tuple[FuturesList, FuturesList]:
"""
Expand All @@ -416,7 +416,7 @@ def wait(
:param download_results: Download results. Default false (Only get statuses)
:param timeout: Timeout of waiting for results
:param threadpool_size: Number of threads to use. Default 64
:param wait_dur_sec: Time interval between each check
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.

:return: `(fs_done, fs_notdone)` where `fs_done` is a list of futures that have
Expand Down Expand Up @@ -470,7 +470,7 @@ def get_result(
throw_except: Optional[bool] = True,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True
):
"""
Expand All @@ -480,7 +480,7 @@ def get_result(
:param throw_except: Reraise exception if call raised. Default True.
:param timeout: Timeout for waiting for results.
:param threadpool_size: Number of threads to use. Default 128
:param wait_dur_sec: Time interval between each check.
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.

:return: The result of the future/s
Expand Down
5 changes: 2 additions & 3 deletions lithops/localhost/v1/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,11 @@ def invoke(self, job_payload):

logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Putting job into localhost queue')

self.start_manager()

job_filename = self.env.prepare_job_file(job_payload)

self.job_queue.put((job_payload, job_filename))

self.start_manager()

def get_runtime_key(self, runtime_name, *args):
"""
Generate the runtime key that identifies the runtime
Expand Down
18 changes: 4 additions & 14 deletions lithops/localhost/v2/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from lithops.constants import (
JOBS_DIR,
LOCALHOST_RUNTIME_DEFAULT,
RN_LOG_FILE,
TEMP_DIR,
LITHOPS_TEMP_DIR,
COMPUTE_CLI_MSG,
Expand Down Expand Up @@ -299,8 +298,7 @@ def run_task(self, job_key, call_id):
task_filename = os.path.join(JOBS_DIR, job_key, call_id + '.task')

cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename]
log = open(RN_LOG_FILE, 'a')
process = sp.Popen(cmd, stdout=log, stderr=log, start_new_session=True)
process = sp.Popen(cmd, start_new_session=True)
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
Expand Down Expand Up @@ -394,12 +392,8 @@ def start(self):
cmd += f'--rm -v {tmp_path}:/tmp -it --detach '
cmd += f'--entrypoint=/bin/bash {self.runtime_name}'

log = open(RN_LOG_FILE, 'a')
self.container_process = sp.Popen(
shlex.split(cmd), stdout=log,
stderr=log, start_new_session=True
)
self.container_process.communicate()
self.container_process = sp.Popen(shlex.split(cmd), start_new_session=True)
self.container_process.communicate() # blocks until the process finishes

super().start()

Expand All @@ -415,11 +409,7 @@ def run_task(self, job_key, call_id):
cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py '
cmd += f'run_job {docker_task_filename}"'

log = open(RN_LOG_FILE, 'a')
process = sp.Popen(
shlex.split(cmd), stdout=log,
stderr=log, start_new_session=True
)
process = sp.Popen(shlex.split(cmd), start_new_session=True)
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
Expand Down
26 changes: 18 additions & 8 deletions lithops/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import threading
import concurrent.futures as cf
from tblib import pickling_support
from lithops.constants import MONITORING_INTERVAL
from lithops.constants import (
MONITORING_INTERVAL,
MONITORING_INTERVAL_LH
)

pickling_support.install()

Expand Down Expand Up @@ -464,23 +467,30 @@ def __init__(self, executor_id, internal_storage, config=None):
self.executor_id = executor_id
self.internal_storage = internal_storage
self.config = config
self.backend = self.config['lithops']['monitoring'].lower() if config else 'storage'
self.backend_type = self.config['lithops']['monitoring'].lower() if config else 'storage'
self.storage_backend = self.internal_storage.backend
self.token_bucket_q = queue.Queue()
self.monitor = None
self.job_chunksize = {}

self.MonitorClass = getattr(
lithops.monitor,
f'{self.backend.capitalize()}Monitor'
f'{self.backend_type.capitalize()}Monitor'
)

def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
if self.backend == 'storage':
mi = self.config['lithops'].get('monitoring_interval', MONITORING_INTERVAL) \
if self.config else MONITORING_INTERVAL
bk_config = {'monitoring_interval': mi}
if self.backend_type == 'storage':
monitoring_interval = None
if self.config and 'lithops' in self.config:
monitoring_interval = self.config['lithops'].get('monitoring_interval')
if not monitoring_interval:
if self.storage_backend == 'localhost':
monitoring_interval = MONITORING_INTERVAL_LH
else:
monitoring_interval = MONITORING_INTERVAL
bk_config = {'monitoring_interval': monitoring_interval}
else:
bk_config = self.config.get(self.backend)
bk_config = self.config.get(self.backend_type)

if job_id:
self.job_chunksize[job_id] = chunksize
Expand Down
46 changes: 30 additions & 16 deletions lithops/tests/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,21 @@ def test_url_processing(self):
result = fexec.get_result()
assert result == self.words_in_files

def test_chunks_bucket(self):
def test_bucket_chunk_size(self):
"""tests the ability to create a separate function invocation
based on the following parameters: chunk_size creates [file_size//chunk_size]
invocations to process each chunk_size bytes, of a given object. chunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
invocations to process each chunk_size bytes, of a given object.
"""

logger.info('Testing chunks on a bucket')
OBJ_CHUNK_SIZE = 1 * 800 ** 2 # create a new invocation
OBJ_CHUNK_NUMBER = 2
activations = 0

data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
futures = fexec.map_reduce(my_map_function_obj, data_prefix,
my_reduce_function,
obj_chunk_size=OBJ_CHUNK_SIZE)
futures = fexec.map_reduce(
my_map_function_obj, data_prefix,
my_reduce_function, obj_chunk_size=OBJ_CHUNK_SIZE
)
result = fexec.get_result(futures)
assert result == self.words_in_files

Expand All @@ -140,27 +137,34 @@ def test_chunks_bucket(self):

assert len(futures) == activations + 1 # +1 due to the reduce function

def test_bucket_chunk_number(self):
"""tests the ability to create a separate function invocation
based on the following parameters: chunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
"""
OBJ_CHUNK_NUMBER = 2

data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
futures = fexec.map_reduce(my_map_function_obj, data_prefix,
my_reduce_function, obj_chunk_number=OBJ_CHUNK_NUMBER)
futures = fexec.map_reduce(
my_map_function_obj, data_prefix,
my_reduce_function, obj_chunk_number=OBJ_CHUNK_NUMBER
)
result = fexec.get_result(futures)
assert result == self.words_in_files

assert len(futures) == len(TEST_FILES_URLS) * OBJ_CHUNK_NUMBER + 1

def test_chunks_bucket_one_reducer_per_object(self):
def test_bucket_chunk_size_one_reducer_per_object(self):
"""tests the ability to create a separate function invocation based
on the following parameters, as well as create a separate invocation
of a reduce function for each object: chunk_size creates [file_size//chunk_size]
invocations to process each chunk_size bytes, of a given object. hunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
"""

logger.info('Testing chunks on a bucket with one reducer per object')
OBJ_CHUNK_SIZE = 1 * 1024 ** 2
OBJ_CHUNK_NUMBER = 2
activations = 0

data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
Expand All @@ -179,6 +183,16 @@ def test_chunks_bucket_one_reducer_per_object(self):
# + len(TEST_FILES_URLS) due to map_reduce activation per object
assert len(futures) == activations + len(TEST_FILES_URLS)

def test_bucket_chunk_number_one_reducer_per_object(self):
"""tests the ability to create a separate function invocation based
on the following parameters, as well as create a separate invocation
of a reduce function for each object: chunk_size creates [file_size//chunk_size]
invocations to process each chunk_size bytes, of a given object. hunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
"""
OBJ_CHUNK_NUMBER = 2
data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
futures = fexec.map_reduce(
my_map_function_obj,
Expand Down
11 changes: 6 additions & 5 deletions lithops/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
download_results: Optional[bool] = False,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True,
futures_from_executor_wait: Optional[bool] = False) -> Tuple[FuturesList, FuturesList]:
"""
Expand All @@ -68,7 +68,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
:param download_results: Download results. Default false (Only get statuses)
:param timeout: Timeout of waiting for results.
:param threadpool_size: Number of threads to use. Default 64
:param wait_dur_sec: Time interval between each check.
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.

:return: `(fs_done, fs_notdone)`
Expand Down Expand Up @@ -131,7 +131,8 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
internal_storage=executor_data.internal_storage)
job_monitor.start(fs=executor_data.futures)

sleep_sec = wait_dur_sec if job_monitor.backend == 'storage' else 0.3
sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.backend_type == 'storage' \
and job_monitor.storage_backend != 'localhost' else 0.1

if return_when == ALWAYS:
for executor_data in executors_data:
Expand Down Expand Up @@ -186,7 +187,7 @@ def get_result(fs: Optional[Union[ResponseFuture, FuturesList, List[ResponseFutu
throw_except: Optional[bool] = True,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True):
"""
For getting the results from all function activations
Expand All @@ -196,7 +197,7 @@ def get_result(fs: Optional[Union[ResponseFuture, FuturesList, List[ResponseFutu
:param throw_except: Reraise exception if call raised. Default True.
:param timeout: Timeout for waiting for results.
:param threadpool_size: Number of threads to use. Default 128
:param wait_dur_sec: Time interval between each check.
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.

:return: The result of the future/s
Expand Down