Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

some details #67

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ For Bert, Google reports a [super-large Bert with 481B parameters](https://mlcom
### Installation
``` bash
$ git clone https://github.com/hpcaitech/ColossalAI-Inference.git
$ python setup.py install
$ pip install .
```

### Huggingface GPT2 Generation Task Case
Expand Down
Empty file added energon/__init__.py
Empty file.
4 changes: 2 additions & 2 deletions energon/server/batch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(self,
self.tokenizer.pad_token = pad_token # GPT2Tokenizer.eos_token
self.running_flag = True
self.publisher = redis.StrictRedis('localhost', 6379, charset="utf-8", decode_responses=True)
self.pool = ThreadPoolExecutor(max_workers=16)
self.pool = ThreadPoolExecutor(max_workers=pp+1) # a small thread pool can keep the message queue not in a crowded state.
self.main_thread = threading.Thread(target=self.processing_batch)
self.main_thread.start()

Expand Down Expand Up @@ -300,7 +300,7 @@ def processing_batch(self):
# self.publish_result(output, target_batch, start_time)
# pub_thread = threading.Thread(target=self.publish_result, args=(output, target_batch, start_time))
# pub_thread.start()
time.sleep(0.08)
time.sleep(0.05)

def publish_result(self, output, target_batch):
"""
Expand Down
21 changes: 13 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,29 @@ def cuda_ext_helper(name, sources, extra_cuda_flags):
# ['get_ncclid.cpp'],
# extra_cuda_flags + cc_flag))

setup(name='energon',
version='0.0.1-beta',
packages=find_packages(exclude=(
setup(
name='energon',
version='0.0.1b0',
packages=find_packages(
exclude=(
'benchmark',
'docker',
'tests',
'docs',
'example',
'examples',
'tests',
'scripts',
'requirements',
'*.egg-info',
'dist',
'build',
)),
description='Large-scale Model Inference',
license='Apache Software License 2.0',
ext_modules=ext_modules,
cmdclass={'build_ext': BuildExtension} if ext_modules else {},
install_requires=fetch_requirements('requirements.txt'),
entry_points='''
[console_scripts]
energon=energon.cli:typer_click_object
''')
entry_points={
'console_scripts': ['energon=energon.cli:typer_click_object',],
},
)
53 changes: 43 additions & 10 deletions tests/test_server/test_batch_server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from concurrent.futures import ThreadPoolExecutor

import requests
import threading
import math
import torch
import random
import os
import numpy as np
import time

latency = []
finish_time = []


def setup_seed(seed):
Expand All @@ -24,39 +28,68 @@ def generate_test_dataset(text_num: int = 100, max_len: int = 1024):
else:
tmp_str = "test "
len_list = torch.randint(low=1, high=max_len, size=(1, text_num))
# len_list = [math.floor(random.uniform(1, max_len)) for _ in range(text_num)]
res_text_list = [(tmp_str * len_list[0][i]) + "\n" for i in range(text_num)]
f = open(file_name, "w")
f.writelines(res_text_list)
res_text_list = [i.replace(" \n", "").replace('\n', '') for i in res_text_list]
return res_text_list


def send_request(input_: str, url_: str, port: str):
def generate_raising_dataset(text_num: int = 100, max_len: int = 1024):
file_name = "raising_set_{}_{}.txt".format(text_num, max_len)
if os.path.exists(file_name):
f = open(file_name)
res_text_list = f.readlines()
else:
tmp_str = "test "
# len_list = torch.randint(low=1, high=max_len, size=(1, text_num))
len_list = [1024 - i for i in range(text_num)]
res_text_list = [(tmp_str * len_list[i]) + "\n" for i in range(text_num)]
f = open(file_name, "w")
f.writelines(res_text_list)
res_text_list = [i.replace(" \n", "").replace('\n', '') for i in res_text_list]
return res_text_list


def send_request(input_: str, url_: str, port: str, num: int, record=False):
global latency
url_ = url_ + ":" + port + "/model_with_padding"
url_ = url_ + ":" + port + "/model_with_padding_naive"
# url_ = url_ + ":" + port + "/model_with_padding"
# url_ = url_ + ":" + port + "/server_random"
params = {"input_str": input_}
start_ = time.time()
response = requests.post(url=url_, json=params).text
latency.append(time.time() - start_)
if record:
lat = time.time() - start_
latency.append(lat)
# print("latency: {}, {}".format(num, lat))
finish_time.append(time.time())
print(response)


def test_batch():
global latency
setup_seed(42)
pool = ThreadPoolExecutor(max_workers=64)
ip_ = "http://127.0.0.1"
port_ = "8020"
req_num = 50
seq_len = 64
req_num = 512
seq_len = 256
# req_list = ["test " * 10 for _ in range(req_num)]
req_list = generate_test_dataset(req_num, seq_len)
print(min([len(k) for k in req_list]))
# req_list = generate_raising_dataset(req_num, seq_len)
# send_request(req_list[0], ip_, port_, -1)
send_request('1', ip_, port_, -1)
time.sleep(1)
st__ = time.time()
for i in range(req_num):
time.sleep(0.005)
temp_thread = threading.Thread(target=send_request, args=(req_list[i], ip_, port_))
temp_thread.start()
time.sleep(20)
pool.submit(send_request, req_list[i], ip_, port_, i, True)
time.sleep(40)
print(np.mean(latency))
print(req_num / (max(finish_time) - st__))


if __name__ == "__main__":
test_batch()