diff --git a/README.md b/README.md index 69a78ec..70510bc 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ For Bert, Google reports a [super-large Bert with 481B parameters](https://mlcom ### Installation ``` bash $ git clone https://github.com/hpcaitech/ColossalAI-Inference.git -$ python setup.py install +$ pip install . ``` ### Huggingface GPT2 Generation Task Case diff --git a/energon/__init__.py b/energon/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/energon/server/batch_manager.py b/energon/server/batch_manager.py index d26f1b2..ccb125a 100644 --- a/energon/server/batch_manager.py +++ b/energon/server/batch_manager.py @@ -103,7 +103,7 @@ def __init__(self, self.tokenizer.pad_token = pad_token # GPT2Tokenizer.eos_token self.running_flag = True self.publisher = redis.StrictRedis('localhost', 6379, charset="utf-8", decode_responses=True) - self.pool = ThreadPoolExecutor(max_workers=16) + self.pool = ThreadPoolExecutor(max_workers=pp+1) # a small thread pool can keep the message queue not in a crowded state. self.main_thread = threading.Thread(target=self.processing_batch) self.main_thread.start() @@ -300,7 +300,7 @@ def processing_batch(self): # self.publish_result(output, target_batch, start_time) # pub_thread = threading.Thread(target=self.publish_result, args=(output, target_batch, start_time)) # pub_thread.start() - time.sleep(0.08) + time.sleep(0.05) def publish_result(self, output, target_batch): """ diff --git a/setup.py b/setup.py index be86050..9b1812e 100644 --- a/setup.py +++ b/setup.py @@ -145,24 +145,29 @@ def cuda_ext_helper(name, sources, extra_cuda_flags): # ['get_ncclid.cpp'], # extra_cuda_flags + cc_flag)) -setup(name='energon', - version='0.0.1-beta', - packages=find_packages(exclude=( +setup( + name='energon', + version='0.0.1b0', + packages=find_packages( + exclude=( 'benchmark', 'docker', 'tests', 'docs', - 'example', + 'examples', 'tests', 'scripts', 'requirements', '*.egg-info', + 'dist', + 'build', )), description='Large-scale Model Inference', + license='Apache Software License 2.0', ext_modules=ext_modules, cmdclass={'build_ext': BuildExtension} if ext_modules else {}, install_requires=fetch_requirements('requirements.txt'), - entry_points=''' - [console_scripts] - energon=energon.cli:typer_click_object - ''') + entry_points={ + 'console_scripts': ['energon=energon.cli:typer_click_object',], + }, + ) diff --git a/tests/test_server/test_batch_server.py b/tests/test_server/test_batch_server.py index 0020af3..d45a60b 100644 --- a/tests/test_server/test_batch_server.py +++ b/tests/test_server/test_batch_server.py @@ -1,5 +1,8 @@ +from concurrent.futures import ThreadPoolExecutor + import requests import threading +import math import torch import random import os @@ -7,6 +10,7 @@ import time latency = [] +finish_time = [] def setup_seed(seed): @@ -24,6 +28,7 @@ def generate_test_dataset(text_num: int = 100, max_len: int = 1024): else: tmp_str = "test " len_list = torch.randint(low=1, high=max_len, size=(1, text_num)) + # len_list = [math.floor(random.uniform(1, max_len)) for _ in range(text_num)] res_text_list = [(tmp_str * len_list[0][i]) + "\n" for i in range(text_num)] f = open(file_name, "w") f.writelines(res_text_list) @@ -31,32 +36,60 @@ def generate_test_dataset(text_num: int = 100, max_len: int = 1024): return res_text_list -def send_request(input_: str, url_: str, port: str): +def generate_raising_dataset(text_num: int = 100, max_len: int = 1024): + file_name = "raising_set_{}_{}.txt".format(text_num, max_len) + if os.path.exists(file_name): + f = open(file_name) + res_text_list = f.readlines() + else: + tmp_str = "test " + # len_list = torch.randint(low=1, high=max_len, size=(1, text_num)) + len_list = [1024 - i for i in range(text_num)] + res_text_list = [(tmp_str * len_list[i]) + "\n" for i in range(text_num)] + f = open(file_name, "w") + f.writelines(res_text_list) + res_text_list = [i.replace(" \n", "").replace('\n', '') for i in res_text_list] + return res_text_list + + +def send_request(input_: str, url_: str, port: str, num: int, record=False): global latency - url_ = url_ + ":" + port + "/model_with_padding" + url_ = url_ + ":" + port + "/model_with_padding_naive" + # url_ = url_ + ":" + port + "/model_with_padding" + # url_ = url_ + ":" + port + "/server_random" params = {"input_str": input_} start_ = time.time() response = requests.post(url=url_, json=params).text - latency.append(time.time() - start_) + if record: + lat = time.time() - start_ + latency.append(lat) + # print("latency: {}, {}".format(num, lat)) + finish_time.append(time.time()) print(response) def test_batch(): global latency + setup_seed(42) + pool = ThreadPoolExecutor(max_workers=64) ip_ = "http://127.0.0.1" port_ = "8020" - req_num = 50 - seq_len = 64 + req_num = 512 + seq_len = 256 # req_list = ["test " * 10 for _ in range(req_num)] req_list = generate_test_dataset(req_num, seq_len) + print(min([len(k) for k in req_list])) + # req_list = generate_raising_dataset(req_num, seq_len) + # send_request(req_list[0], ip_, port_, -1) + send_request('1', ip_, port_, -1) + time.sleep(1) + st__ = time.time() for i in range(req_num): - time.sleep(0.005) - temp_thread = threading.Thread(target=send_request, args=(req_list[i], ip_, port_)) - temp_thread.start() - time.sleep(20) + pool.submit(send_request, req_list[i], ip_, port_, i, True) + time.sleep(40) print(np.mean(latency)) + print(req_num / (max(finish_time) - st__)) if __name__ == "__main__": test_batch() -