diff --git a/app.py b/app.py index 479c424..c469127 100644 --- a/app.py +++ b/app.py @@ -12,6 +12,7 @@ ) from vectorizer import Vectorizer, VectorInput from meta import Meta +import torch logger = getLogger("uvicorn") @@ -21,6 +22,8 @@ get_bearer_token = HTTPBearer(auto_error=False) allowed_tokens: List[str] = None +current_worker = 0 +available_workers = 1 def is_authorized(auth: Optional[HTTPAuthorizationCredentials]) -> bool: @@ -31,10 +34,18 @@ def is_authorized(auth: Optional[HTTPAuthorizationCredentials]) -> bool: return True +def get_worker(): + global current_worker + worker = current_worker % available_workers + current_worker += 1 + return worker + + async def lifespan(app: FastAPI): global vec global meta_config global allowed_tokens + global available_workers allowed_tokens = get_allowed_tokens() @@ -98,7 +109,11 @@ def log_info_about_onnx(onnx_runtime: bool): cuda_support = True cuda_core = os.getenv("CUDA_CORE") if cuda_core is None or cuda_core == "": - cuda_core = "cuda:0" + if use_sentence_transformers_vectorizer and torch.cuda.is_available(): + available_workers = torch.cuda.device_count() + cuda_core = ",".join([f"cuda:{i}" for i in range(available_workers)]) + else: + cuda_core = "cuda:0" logger.info(f"CUDA_CORE set to {cuda_core}") else: logger.info("Running on CPU") @@ -132,6 +147,7 @@ def log_info_about_onnx(onnx_runtime: bool): use_sentence_transformers_multi_process, model_name, trust_remote_code, + available_workers, ) yield @@ -166,7 +182,7 @@ async def vectorize( ): if is_authorized(auth): try: - vector = await vec.vectorize(item.text, item.config) + vector = await vec.vectorize(item.text, item.config, get_worker()) return {"text": item.text, "vector": vector.tolist(), "dim": len(vector)} except Exception as e: logger.exception("Something went wrong while vectorizing data.") diff --git a/smoke_test.py b/smoke_test.py index f7d8f21..72c37e8 100755 --- a/smoke_test.py +++ b/smoke_test.py @@ -1,9 +1,44 @@ import time import unittest import requests +import threading +import time class SmokeTest(unittest.TestCase): + sentences = [ + "Python is easy to learn.", + "AI can enhance decision-making processes.", + "SpaceX is revolutionizing space travel.", + "Python excels in data analysis.", + "AI algorithms learn from data.", + "Mars rovers explore new terrains.", + "Python supports multiple programming paradigms.", + "AI improves user experience on websites.", + "The International Space Station orbits Earth.", + "Python's syntax is very readable.", + "AI in healthcare can predict outcomes.", + "Astronauts conduct experiments in space.", + "Python is widely used in web development.", + "Machine learning is a subset of AI.", + "NASA aims to return humans to the Moon.", + "Python libraries simplify complex tasks.", + "Autonomous vehicles rely on AI technologies.", + "Voyager 1 has left our solar system.", + "Python is open-source and community-driven.", + "Voice assistants use AI to understand speech.", + "Telescopes help in observing distant galaxies.", + "Python's popularity grows each year.", + "AI can identify patterns in big data.", + "Satellites provide crucial weather data.", + "Python can run on many operating systems.", + "Neural networks mimic human brain functions.", + "Space debris is a growing concern in orbit.", + "Python scripts automate repetitive tasks.", + "AI ethics is a growing field of study.", + "The Hubble Space Telescope has changed our view of the cosmos.", + ] + def setUp(self): self.url = "http://localhost:8000" @@ -20,6 +55,26 @@ def setUp(self): raise Exception("did not start up") + def _get_req_body(self, text: str, task_type: str = ""): + req_body = {"text": text} + if task_type != "": + req_body["config"] = {"task_type": task_type} + return req_body + + def _try_to_vectorize(self, url: str, text: str, task_type: str = ""): + req_body = self._get_req_body(text, task_type) + + res = requests.post(url, json=req_body) + resBody = res.json() + + self.assertEqual(200, res.status_code) + + # below tests that what we deem a reasonable vector is returned. We are + # aware of 384 and 768 dim vectors, which should both fall in that + # range + self.assertTrue(len(resBody["vector"]) > 100) + # print(f"vector dimensions are: {len(resBody['vector'])}") + def test_well_known_ready(self): res = requests.get(self.url + "/.well-known/ready") @@ -37,29 +92,38 @@ def test_meta(self): self.assertIsInstance(res.json(), dict) def test_vectorizing(self): - def get_req_body(task_type: str = ""): - req_body = {"text": "The London Eye is a ferris wheel at the River Thames."} - if task_type != "": - req_body["config"] = {"task_type": task_type} - return req_body - - def try_to_vectorize(url, task_type: str = ""): - print(f"url: {url}") - req_body = get_req_body(task_type) - - res = requests.post(url, json=req_body) - resBody = res.json() - - self.assertEqual(200, res.status_code) - - # below tests that what we deem a reasonable vector is returned. We are - # aware of 384 and 768 dim vectors, which should both fall in that - # range - self.assertTrue(len(resBody["vector"]) > 100) - print(f"vector dimensions are: {len(resBody['vector'])}") - - try_to_vectorize(self.url + "/vectors/") - try_to_vectorize(self.url + "/vectors") + self._try_to_vectorize( + self.url + "/vectors/", + "The London Eye is a ferris wheel at the River Thames.", + ) + self._try_to_vectorize( + self.url + "/vectors", + "The London Eye is a ferris wheel at the River Thames.", + ) + + def _test_vectorizing_sentences(self): + for sentence in self.sentences: + self._try_to_vectorize(self.url + "/vectors/", sentence) + self._try_to_vectorize(self.url + "/vectors", sentence) + + def test_vectorizing_sentences_parallel(self): + start = time.time() + threads = [] + for _ in range(10): + t = threading.Thread(target=self._test_vectorizing_sentences) + threads.append(t) + t.start() + for t in threads: + t.join() + end = time.time() + print(f"test_vectorizing_sentences_parallel took: {end - start}s") + + def test_vectorizing_sentences(self): + start = time.time() + for _ in range(10): + self._test_vectorizing_sentences() + end = time.time() + print(f"test_vectorizing_sentences took: {end - start}s") if __name__ == "__main__": diff --git a/vectorizer.py b/vectorizer.py index f8e3546..4e872d8 100644 --- a/vectorizer.py +++ b/vectorizer.py @@ -2,7 +2,7 @@ import math from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from typing import Optional, Any, Literal +from typing import Optional, Any, Literal, List from logging import getLogger, Logger import nltk @@ -55,6 +55,7 @@ def __init__( use_sentence_transformers_multi_process: bool, model_name: str, trust_remote_code: bool, + workers: int | None, ): self.executor = ThreadPoolExecutor() if onnx_runtime: @@ -67,6 +68,7 @@ def __init__( cuda_core, trust_remote_code, use_sentence_transformers_multi_process, + workers, ) else: self.vectorizer = HuggingFaceVectorizer( @@ -80,14 +82,22 @@ def __init__( trust_remote_code, ) - async def vectorize(self, text: str, config: VectorInputConfig): + async def vectorize(self, text: str, config: VectorInputConfig, worker: int = 0): + if isinstance(self.vectorizer, SentenceTransformerVectorizer): + loop = asyncio.get_event_loop() + f = loop.run_in_executor( + self.executor, self.vectorizer.vectorize, text, config, worker + ) + return await asyncio.wrap_future(f) + return await asyncio.wrap_future( self.executor.submit(self.vectorizer.vectorize, text, config) ) class SentenceTransformerVectorizer: - model: SentenceTransformer + workers: List[SentenceTransformer] + available_devices: List[str] cuda_core: str use_sentence_transformers_multi_process: bool pool: dict[Literal["input", "output", "processes"], Any] @@ -100,24 +110,33 @@ def __init__( cuda_core: str, trust_remote_code: bool, use_sentence_transformers_multi_process: bool, + workers: int | None, ): self.logger = getLogger("uvicorn") self.cuda_core = cuda_core self.use_sentence_transformers_multi_process = ( use_sentence_transformers_multi_process ) - self.logger.info( - f"Sentence transformer vectorizer running with model_name={model_name}, cache_folder={model_path} device:{self.get_device()} trust_remote_code:{trust_remote_code} use_sentence_transformers_multi_process: {self.use_sentence_transformers_multi_process}" + self.available_devices = self.get_devices( + workers, self.use_sentence_transformers_multi_process ) - self.model = SentenceTransformer( - model_name, - cache_folder=model_path, - device=self.get_device(), - trust_remote_code=trust_remote_code, + self.logger.info( + f"Sentence transformer vectorizer running with model_name={model_name}, cache_folder={model_path} available_devices:{self.available_devices} trust_remote_code:{trust_remote_code} use_sentence_transformers_multi_process: {self.use_sentence_transformers_multi_process}" ) - self.model.eval() # make sure we're in inference mode, not training + self.workers = [] + for device in self.available_devices: + model = SentenceTransformer( + model_name, + cache_folder=model_path, + device=device, + trust_remote_code=trust_remote_code, + ) + model.eval() # make sure we're in inference mode, not training + self.workers.append(model) + + print(f"have a list of {len(self.workers)}") if self.use_sentence_transformers_multi_process: - self.pool = self.model.start_multi_process_pool() + self.pool = self.workers[0].start_multi_process_pool() self.logger.info( "Sentence transformer vectorizer is set to use all available devices" ) @@ -125,25 +144,34 @@ def __init__( f"Created pool of {len(self.pool['processes'])} available {'CUDA' if torch.cuda.is_available() else 'CPU'} devices" ) - def get_device(self) -> Optional[str]: + def get_devices( + self, + workers: int | None, + use_sentence_transformers_multi_process: bool, + ) -> List[str | None]: if ( not self.use_sentence_transformers_multi_process and self.cuda_core is not None and self.cuda_core != "" ): - return self.cuda_core - return None + return self.cuda_core.split(",") + if use_sentence_transformers_multi_process or workers is None or workers < 1: + return [None] + return [None] * workers - def vectorize(self, text: str, config: VectorInputConfig): + def vectorize(self, text: str, config: VectorInputConfig, worker: int = 0): if self.use_sentence_transformers_multi_process: - embedding = self.model.encode_multi_process( + embedding = self.workers[0].encode_multi_process( [text], pool=self.pool, normalize_embeddings=True ) return embedding[0] - embedding = self.model.encode( + print( + f"trying to vectorize: worker {worker} using device: {self.available_devices[worker]} available: {len(self.available_devices)}" + ) + embedding = self.workers[worker].encode( [text], - device=self.get_device(), + device=self.available_devices[worker], convert_to_tensor=False, convert_to_numpy=True, normalize_embeddings=True,