Skip to content

Commit

Permalink
Merge pull request #103 from weaviate/pool-of-workers
Browse files Browse the repository at this point in the history
Worker pool implementation
  • Loading branch information
antas-marcin authored Jan 21, 2025
2 parents 6c9ce5b + 9439c86 commit 264710d
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 44 deletions.
20 changes: 18 additions & 2 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from vectorizer import Vectorizer, VectorInput
from meta import Meta
import torch


logger = getLogger("uvicorn")
Expand All @@ -21,6 +22,8 @@

get_bearer_token = HTTPBearer(auto_error=False)
allowed_tokens: List[str] = None
current_worker = 0
available_workers = 1


def is_authorized(auth: Optional[HTTPAuthorizationCredentials]) -> bool:
Expand All @@ -31,10 +34,18 @@ def is_authorized(auth: Optional[HTTPAuthorizationCredentials]) -> bool:
return True


def get_worker():
global current_worker
worker = current_worker % available_workers
current_worker += 1
return worker


async def lifespan(app: FastAPI):
global vec
global meta_config
global allowed_tokens
global available_workers

allowed_tokens = get_allowed_tokens()

Expand Down Expand Up @@ -98,7 +109,11 @@ def log_info_about_onnx(onnx_runtime: bool):
cuda_support = True
cuda_core = os.getenv("CUDA_CORE")
if cuda_core is None or cuda_core == "":
cuda_core = "cuda:0"
if use_sentence_transformers_vectorizer and torch.cuda.is_available():
available_workers = torch.cuda.device_count()
cuda_core = ",".join([f"cuda:{i}" for i in range(available_workers)])
else:
cuda_core = "cuda:0"
logger.info(f"CUDA_CORE set to {cuda_core}")
else:
logger.info("Running on CPU")
Expand Down Expand Up @@ -132,6 +147,7 @@ def log_info_about_onnx(onnx_runtime: bool):
use_sentence_transformers_multi_process,
model_name,
trust_remote_code,
available_workers,
)
yield

Expand Down Expand Up @@ -166,7 +182,7 @@ async def vectorize(
):
if is_authorized(auth):
try:
vector = await vec.vectorize(item.text, item.config)
vector = await vec.vectorize(item.text, item.config, get_worker())
return {"text": item.text, "vector": vector.tolist(), "dim": len(vector)}
except Exception as e:
logger.exception("Something went wrong while vectorizing data.")
Expand Down
110 changes: 87 additions & 23 deletions smoke_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
import time
import unittest
import requests
import threading
import time


class SmokeTest(unittest.TestCase):
sentences = [
"Python is easy to learn.",
"AI can enhance decision-making processes.",
"SpaceX is revolutionizing space travel.",
"Python excels in data analysis.",
"AI algorithms learn from data.",
"Mars rovers explore new terrains.",
"Python supports multiple programming paradigms.",
"AI improves user experience on websites.",
"The International Space Station orbits Earth.",
"Python's syntax is very readable.",
"AI in healthcare can predict outcomes.",
"Astronauts conduct experiments in space.",
"Python is widely used in web development.",
"Machine learning is a subset of AI.",
"NASA aims to return humans to the Moon.",
"Python libraries simplify complex tasks.",
"Autonomous vehicles rely on AI technologies.",
"Voyager 1 has left our solar system.",
"Python is open-source and community-driven.",
"Voice assistants use AI to understand speech.",
"Telescopes help in observing distant galaxies.",
"Python's popularity grows each year.",
"AI can identify patterns in big data.",
"Satellites provide crucial weather data.",
"Python can run on many operating systems.",
"Neural networks mimic human brain functions.",
"Space debris is a growing concern in orbit.",
"Python scripts automate repetitive tasks.",
"AI ethics is a growing field of study.",
"The Hubble Space Telescope has changed our view of the cosmos.",
]

def setUp(self):
self.url = "http://localhost:8000"

Expand All @@ -20,6 +55,26 @@ def setUp(self):

raise Exception("did not start up")

def _get_req_body(self, text: str, task_type: str = ""):
req_body = {"text": text}
if task_type != "":
req_body["config"] = {"task_type": task_type}
return req_body

def _try_to_vectorize(self, url: str, text: str, task_type: str = ""):
req_body = self._get_req_body(text, task_type)

res = requests.post(url, json=req_body)
resBody = res.json()

self.assertEqual(200, res.status_code)

# below tests that what we deem a reasonable vector is returned. We are
# aware of 384 and 768 dim vectors, which should both fall in that
# range
self.assertTrue(len(resBody["vector"]) > 100)
# print(f"vector dimensions are: {len(resBody['vector'])}")

def test_well_known_ready(self):
res = requests.get(self.url + "/.well-known/ready")

Expand All @@ -37,29 +92,38 @@ def test_meta(self):
self.assertIsInstance(res.json(), dict)

def test_vectorizing(self):
def get_req_body(task_type: str = ""):
req_body = {"text": "The London Eye is a ferris wheel at the River Thames."}
if task_type != "":
req_body["config"] = {"task_type": task_type}
return req_body

def try_to_vectorize(url, task_type: str = ""):
print(f"url: {url}")
req_body = get_req_body(task_type)

res = requests.post(url, json=req_body)
resBody = res.json()

self.assertEqual(200, res.status_code)

# below tests that what we deem a reasonable vector is returned. We are
# aware of 384 and 768 dim vectors, which should both fall in that
# range
self.assertTrue(len(resBody["vector"]) > 100)
print(f"vector dimensions are: {len(resBody['vector'])}")

try_to_vectorize(self.url + "/vectors/")
try_to_vectorize(self.url + "/vectors")
self._try_to_vectorize(
self.url + "/vectors/",
"The London Eye is a ferris wheel at the River Thames.",
)
self._try_to_vectorize(
self.url + "/vectors",
"The London Eye is a ferris wheel at the River Thames.",
)

def _test_vectorizing_sentences(self):
for sentence in self.sentences:
self._try_to_vectorize(self.url + "/vectors/", sentence)
self._try_to_vectorize(self.url + "/vectors", sentence)

def test_vectorizing_sentences_parallel(self):
start = time.time()
threads = []
for _ in range(10):
t = threading.Thread(target=self._test_vectorizing_sentences)
threads.append(t)
t.start()
for t in threads:
t.join()
end = time.time()
print(f"test_vectorizing_sentences_parallel took: {end - start}s")

def test_vectorizing_sentences(self):
start = time.time()
for _ in range(10):
self._test_vectorizing_sentences()
end = time.time()
print(f"test_vectorizing_sentences took: {end - start}s")


if __name__ == "__main__":
Expand Down
66 changes: 47 additions & 19 deletions vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import math
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Optional, Any, Literal
from typing import Optional, Any, Literal, List
from logging import getLogger, Logger

import nltk
Expand Down Expand Up @@ -55,6 +55,7 @@ def __init__(
use_sentence_transformers_multi_process: bool,
model_name: str,
trust_remote_code: bool,
workers: int | None,
):
self.executor = ThreadPoolExecutor()
if onnx_runtime:
Expand All @@ -67,6 +68,7 @@ def __init__(
cuda_core,
trust_remote_code,
use_sentence_transformers_multi_process,
workers,
)
else:
self.vectorizer = HuggingFaceVectorizer(
Expand All @@ -80,14 +82,22 @@ def __init__(
trust_remote_code,
)

async def vectorize(self, text: str, config: VectorInputConfig):
async def vectorize(self, text: str, config: VectorInputConfig, worker: int = 0):
if isinstance(self.vectorizer, SentenceTransformerVectorizer):
loop = asyncio.get_event_loop()
f = loop.run_in_executor(
self.executor, self.vectorizer.vectorize, text, config, worker
)
return await asyncio.wrap_future(f)

return await asyncio.wrap_future(
self.executor.submit(self.vectorizer.vectorize, text, config)
)


class SentenceTransformerVectorizer:
model: SentenceTransformer
workers: List[SentenceTransformer]
available_devices: List[str]
cuda_core: str
use_sentence_transformers_multi_process: bool
pool: dict[Literal["input", "output", "processes"], Any]
Expand All @@ -100,50 +110,68 @@ def __init__(
cuda_core: str,
trust_remote_code: bool,
use_sentence_transformers_multi_process: bool,
workers: int | None,
):
self.logger = getLogger("uvicorn")
self.cuda_core = cuda_core
self.use_sentence_transformers_multi_process = (
use_sentence_transformers_multi_process
)
self.logger.info(
f"Sentence transformer vectorizer running with model_name={model_name}, cache_folder={model_path} device:{self.get_device()} trust_remote_code:{trust_remote_code} use_sentence_transformers_multi_process: {self.use_sentence_transformers_multi_process}"
self.available_devices = self.get_devices(
workers, self.use_sentence_transformers_multi_process
)
self.model = SentenceTransformer(
model_name,
cache_folder=model_path,
device=self.get_device(),
trust_remote_code=trust_remote_code,
self.logger.info(
f"Sentence transformer vectorizer running with model_name={model_name}, cache_folder={model_path} available_devices:{self.available_devices} trust_remote_code:{trust_remote_code} use_sentence_transformers_multi_process: {self.use_sentence_transformers_multi_process}"
)
self.model.eval() # make sure we're in inference mode, not training
self.workers = []
for device in self.available_devices:
model = SentenceTransformer(
model_name,
cache_folder=model_path,
device=device,
trust_remote_code=trust_remote_code,
)
model.eval() # make sure we're in inference mode, not training
self.workers.append(model)

print(f"have a list of {len(self.workers)}")
if self.use_sentence_transformers_multi_process:
self.pool = self.model.start_multi_process_pool()
self.pool = self.workers[0].start_multi_process_pool()
self.logger.info(
"Sentence transformer vectorizer is set to use all available devices"
)
self.logger.info(
f"Created pool of {len(self.pool['processes'])} available {'CUDA' if torch.cuda.is_available() else 'CPU'} devices"
)

def get_device(self) -> Optional[str]:
def get_devices(
self,
workers: int | None,
use_sentence_transformers_multi_process: bool,
) -> List[str | None]:
if (
not self.use_sentence_transformers_multi_process
and self.cuda_core is not None
and self.cuda_core != ""
):
return self.cuda_core
return None
return self.cuda_core.split(",")
if use_sentence_transformers_multi_process or workers is None or workers < 1:
return [None]
return [None] * workers

def vectorize(self, text: str, config: VectorInputConfig):
def vectorize(self, text: str, config: VectorInputConfig, worker: int = 0):
if self.use_sentence_transformers_multi_process:
embedding = self.model.encode_multi_process(
embedding = self.workers[0].encode_multi_process(
[text], pool=self.pool, normalize_embeddings=True
)
return embedding[0]

embedding = self.model.encode(
print(
f"trying to vectorize: worker {worker} using device: {self.available_devices[worker]} available: {len(self.available_devices)}"
)
embedding = self.workers[worker].encode(
[text],
device=self.get_device(),
device=self.available_devices[worker],
convert_to_tensor=False,
convert_to_numpy=True,
normalize_embeddings=True,
Expand Down

0 comments on commit 264710d

Please sign in to comment.