Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.x] Adding boolean argument to ML Commons API #155

Merged
merged 1 commit into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opensearch_py_ml/ml_commons/ml_common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
MODEL_UPLOAD_CHUNK_SIZE = 10_000_000
MODEL_MAX_SIZE = 4_000_000_000
BUF_SIZE = 65536 # lets read stuff in 64kb chunks!
TIMEOUT = 120 # timeout for synchronous method calls in seconds
87 changes: 53 additions & 34 deletions opensearch_py_ml/ml_commons/ml_commons_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from opensearchpy import OpenSearch

from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI
from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI, TIMEOUT
from opensearch_py_ml.ml_commons.model_uploader import ModelUploader


Expand All @@ -30,6 +30,8 @@ def upload_model(
model_path: str,
model_config_path: str,
isVerbose: bool = False,
load_model: bool = True,
wait_until_loaded: bool = True,
) -> str:
"""
This method uploads model into opensearch cluster using ml-common plugin's api.
Expand All @@ -56,19 +58,30 @@ def upload_model(
:type model_config_path: string
:param isVerbose: if isVerbose is true method will print more messages. default False
:type isVerbose: boolean
:param load_model: Whether to load the model in memory using uploaded model chunks
:type load_model: bool
:param wait_until_loaded: If load_model is true, whether to wait until the model is loaded into memory
:type wait_until_loaded: bool
:return: returns the model_id so that we can use this for further operation.
:rtype: string
"""
return self._model_uploader._upload_model(
model_id = self._model_uploader._upload_model(
model_path, model_config_path, isVerbose
)

# loading the model chunks from model index
if load_model:
self.load_model(model_id, wait_until_loaded=wait_until_loaded)

return model_id

def upload_pretrained_model(
self,
model_name: str,
model_version: str,
model_format: str,
load_model: bool = True,
wait_until_loaded: bool = True,
):
"""
This method uploads a pretrained model into opensearch cluster using ml-common plugin's api.
Expand All @@ -84,6 +97,8 @@ def upload_pretrained_model(
:type model_format: string
:param load_model: Whether to load the model in memory using uploaded model chunks
:type load_model: bool
:param wait_until_loaded: If load_model is true, whether to wait until the model is loaded into memory
:type wait_until_loaded: bool
:return: returns the model_id so that we can use this for further operation
:rtype: string
"""
Expand All @@ -97,20 +112,7 @@ def upload_pretrained_model(

# loading the model chunks from model index
if load_model:
self.load_model(model_id)
for i in range(120): # timeout is 120 seconds
time.sleep(1)
ml_model_status = self.get_model_info(model_id)
model_state = ml_model_status.get("model_state")
if model_state != "LOADING":
break

if model_state == "LOADED":
print("Model loaded into memory successfully")
elif model_state == "PARTIALLY_LOADED":
print("Model was loaded into memory only partially")
else:
raise Exception("Model load failed")
self.load_model(model_id, wait_until_loaded=wait_until_loaded)

return model_id

Expand All @@ -128,7 +130,7 @@ def _send_model_info(self, model_meta_json: dict):
url=f"{ML_BASE_URI}/models/_upload",
body=model_meta_json,
)
end = time.time() + 120 # timeout seconds
end = time.time() + TIMEOUT # timeout seconds
task_flag = False
while not task_flag or time.time() < end:
time.sleep(1)
Expand All @@ -142,22 +144,42 @@ def _send_model_info(self, model_meta_json: dict):
print("Model was uploaded successfully. Model Id: ", status["model_id"])
return status["model_id"]

def load_model(self, model_id: str) -> object:
def load_model(self, model_id: str, wait_until_loaded: bool = True) -> object:
"""
This method loads model into opensearch cluster using ml-common plugin's load model api

:param model_id: unique id of the model
:type model_id: string
:param wait_until_loaded: Whether to wait until the model is loaded into memory
:type wait_until_loaded: bool
:return: returns a json object, with task_id and status key.
:rtype: object
"""

API_URL = f"{ML_BASE_URI}/models/{model_id}/_load"

return self._client.transport.perform_request(
method="POST",
url=API_URL,
)
task_id = self._client.transport.perform_request(method="POST", url=API_URL)[
"task_id"
]

if wait_until_loaded:
# Wait until loaded
for i in range(TIMEOUT):
ml_model_status = self.get_model_info(model_id)
model_state = ml_model_status.get("model_state")
if model_state in ["LOADED", "PARTIALLY_LOADED"]:
break
time.sleep(1)

# Check the model status
if model_state == "LOADED":
print("Model loaded into memory successfully")
elif model_state == "PARTIALLY_LOADED":
print("Model was loaded into memory only partially")
else:
raise Exception("Model load failed")

return self._get_task_info(task_id)

def get_task_info(self, task_id: str, wait_until_task_done: bool = False) -> object:
"""
Expand All @@ -173,7 +195,7 @@ def get_task_info(self, task_id: str, wait_until_task_done: bool = False) -> obj
:rtype: object
"""
if wait_until_task_done:
end = time.time() + 120 # timeout seconds
end = time.time() + TIMEOUT # timeout seconds
task_flag = False
while not task_flag or time.time() < end:
time.sleep(1)
Expand Down Expand Up @@ -248,18 +270,15 @@ def unload_model(self, model_id: str, node_ids: List[str] = []) -> object:

API_URL = f"{ML_BASE_URI}/models/{model_id}/_unload"

API_BODY = {"node_ids": node_ids}
API_BODY = {}
if len(node_ids) > 0:
return self._client.transport.perform_request(
method="POST",
url=API_URL,
body=API_BODY,
)
else:
return self._client.transport.perform_request(
method="POST",
url=API_URL,
)
API_BODY["node_ids"] = node_ids

return self._client.transport.perform_request(
method="POST",
url=API_URL,
body=API_BODY,
)

def delete_model(self, model_id: str) -> object:
"""
Expand Down
49 changes: 19 additions & 30 deletions tests/ml_commons/test_ml_commons_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import os
import shutil
import time
from os.path import exists

from opensearchpy import OpenSearch
Expand Down Expand Up @@ -45,8 +44,6 @@
PRETRAINED_MODEL_VERSION = "1.0.1"
PRETRAINED_MODEL_FORMAT = "TORCH_SCRIPT"

UNLOAD_TIMEOUT = 300 # in seconds


def clean_test_folder(TEST_FOLDER):
if os.path.exists(TEST_FOLDER):
Expand Down Expand Up @@ -84,7 +81,10 @@ def test_integration_pretrained_model_upload_unload_delete():
model_version=PRETRAINED_MODEL_VERSION,
model_format=PRETRAINED_MODEL_FORMAT,
load_model=True,
wait_until_loaded=True,
)
ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") != "LOAD_FAILED"
except: # noqa: E722
raised = True
assert raised == False, "Raised Exception during pretrained model upload and load"
Expand All @@ -102,11 +102,6 @@ def test_integration_pretrained_model_upload_unload_delete():
raised = False
try:
ml_client.unload_model(model_id)
for i in range(int(UNLOAD_TIMEOUT / 10)):
ml_model_status = ml_client.get_model_info(model_id)
if ml_model_status.get("model_state") == "UNLOADED":
break
time.sleep(10)
ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") == "UNLOADED"
except: # noqa: E722
Expand Down Expand Up @@ -142,7 +137,7 @@ def test_integration_model_train_upload_full_cycle():
task_id = ""
try:
model_id = ml_client.upload_model(
MODEL_PATH, MODEL_CONFIG_FILE_PATH, isVerbose=True
MODEL_PATH, MODEL_CONFIG_FILE_PATH, load_model=False, isVerbose=True
)
print("Model_id:", model_id)
except: # noqa: E722
Expand All @@ -152,10 +147,12 @@ def test_integration_model_train_upload_full_cycle():
if model_id:
raised = False
try:
ml_load_status = ml_client.load_model(model_id)
# assert ml_load_status.get("status") == "CREATED"
ml_load_status = ml_client.load_model(model_id, wait_until_loaded=False)
task_id = ml_load_status.get("task_id")
assert task_id != "" or task_id is not None

ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") != "LOAD_FAILED"
except: # noqa: E722
raised = True
assert raised == False, "Raised Exception in loading model"
Expand Down Expand Up @@ -186,20 +183,17 @@ def test_integration_model_train_upload_full_cycle():
# This is test is being flaky. Sometimes the test is passing and sometimes showing 500 error
# due to memory circuit breaker.
# Todo: We need to revisit this test.
if ml_task_status.get("state") == "LOADED":
try:
raised = False
sentences = ["First test sentence", "Second test sentence"]
embedding_result = ml_client.generate_embedding(
model_id, sentences
)
print(embedding_result)
assert len(embedding_result.get("inference_results")) == 2
except: # noqa: E722
raised = True
assert (
raised == False
), "Raised Exception in generating sentence embedding"
try:
raised = False
sentences = ["First test sentence", "Second test sentence"]
embedding_result = ml_client.generate_embedding(model_id, sentences)
print(embedding_result)
assert len(embedding_result.get("inference_results")) == 2
except: # noqa: E722
raised = True
assert (
raised == False
), "Raised Exception in generating sentence embedding"

try:
delete_task_obj = ml_client.delete_task(task_id)
Expand All @@ -210,11 +204,6 @@ def test_integration_model_train_upload_full_cycle():

try:
ml_client.unload_model(model_id)
for i in range(int(UNLOAD_TIMEOUT / 10)):
ml_model_status = ml_client.get_model_info(model_id)
if ml_model_status.get("model_state") == "UNLOADED":
break
time.sleep(10)
ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") == "UNLOADED"
except: # noqa: E722
Expand Down