Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/docs #19

Merged
merged 2 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions app/api/router/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,18 @@
@router.put("/insurance")
async def predict_insurance(info: ModelCorePrediction, model_name: str):
"""
Get information and predict insurance fee
param:
info:
# 임시로 int형태를 받도록 제작
# preprocess 단계를 거치도록 만들 예정
age: int
sex: int
bmi: float
children: int
smoker: int
region: int
return:
insurance_fee: float
정보를 입력받아 보험료를 예측하여 반환합니다.

Args:
info(dict): 다음의 값들을 입력받습니다. age(int), sex(int), bmi(float), children(int), smoker(int), region(int)

Returns:
insurance_fee(float): 보험료 예측값입니다.
"""
def sync_call(info, model_name):
"""
none sync 함수를 sync로 만들어 주기 위한 함수이며 입출력은 부모 함수와 같습니다.
"""
model = ScikitLearnModel(model_name)
model.load_model()

Expand All @@ -61,16 +58,27 @@ def sync_call(info, model_name):

@router.put("/atmos")
async def predict_temperature(time_series: List[float]):
"""
온도 1시간 간격 시계열을 입력받아 이후 24시간 동안의 온도를 1시간 간격의 시계열로 예측합니다.

Args:
time_series(List): 72시간 동안의 1시간 간격 온도 시계열 입니다. 72개의 원소를 가져야 합니다.

Returns:
List[float]: 입력받은 시간 이후 24시간 동안의 1시간 간격 온도 예측 시계열 입니다.
"""
if len(time_series) != 72:
L.error(f'input time_series: {time_series} is not valid')
return "time series must have 72 values"

def sync_pred_ts(time_series):
tf_model = my_model.model
"""
none sync 함수를 sync로 만들어 주기 위한 함수이며 입출력은 부모 함수와 같습니다.
"""
time_series = np.array(time_series).reshape(1, -1, 1)
result = tf_model.predict(time_series)
result = my_model.predict_target(time_series)
L.info(
f"Predict Args info: {time_series.flatten().tolist()}\n\tmodel_name: {tf_model}\n\tPrediction Result: {result.tolist()[0]}")
f"Predict Args info: {time_series.flatten().tolist()}\n\tmodel_name: {my_model.model_name}\n\tPrediction Result: {result.tolist()[0]}")

return result

Expand Down
39 changes: 27 additions & 12 deletions app/api/router/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ def train_insurance(
version: float = 0.1
):
"""
insurance와 관련된 학습을 실행하기 위한 API입니다.

Args:
PORT (int): PORT to run NNi. Defaults to 8080
experiment_sec (int): Express the experiment time in seconds Defaults to 20
experiment_name (str): experiment name Defaults to exp1
experimeter (str): experimenter (author) Defaults to DongUk
model_name (str): model name Defaults to insurance_fee_model
version (float): version of experiment Defaults to 0.1
PORT (int): NNI가 실행될 포트번호. 기본 값: 8080
experiment_sec (int): 최대 실험시간(초단위). 기본 값: 20
experiment_name (str): 실험이름. 기본 값: exp1
experimeter (str): 실험자의 이름. 기본 값: DongUk
model_name (str): 모델의 이름. 기본 값: insurance_fee_model
version (float): 실험의 버전. 기본 값: 0.1

Returns:
msg: Regardless of success or not, return address values including PORT.
msg: 실험 실행의 성공과 상관없이 포트번호를 포함한 NNI Dashboard의 주소를 반환합니다.

Note:
실험의 최종 결과를 반환하지 않습니다.
"""
L.info(
f"Train Args info\n\texperiment_sec: {experiment_sec}\n\texperiment_name: {experiment_name}\n\texperimenter: {experimenter}\n\tmodel_name: {model_name}\n\tversion: {version}")
Expand Down Expand Up @@ -64,10 +70,21 @@ def train_insurance(

@router.put("/atmos")
def train_atmos(expr_name: str):
"""
온도 시계열과 관련된 학습을 실행하기 위한 API입니다.

Args:
expr_name(str): NNI가 실행할 실험의 이름 입니다. 이 파라미터를 기반으로 project_dir/experiments/[expr_name] 경로로 찾아가 config.yml을 이용하여 NNI를 실행합니다.

Returns:
str: NNI실험이 실행된 결과값을 반환하거나 실행과정에서 발생한 에러 메세지를 반환합니다.

Note:
실험의 최종 결과를 반환하지 않습니다.
"""
nni_port = get_free_port()
expr_path = os.path.join(base_dir, 'experiments', expr_name)

# subprocess로 nni실행
try:
nni_create_result = subprocess.getoutput(
"nnictl create --port {} --config {}/config.yml".format(
Expand All @@ -81,7 +98,7 @@ def train_atmos(expr_name: str):
target = check_expr_over,
args = (expr_id, expr_name, expr_path)
)
m_process.start()# 자식 프로세스 분리(nni 실험 진행상황 감시 및 모델 저장)
m_process.start()

L.info(nni_create_result)
return nni_create_result
Expand All @@ -92,6 +109,4 @@ def train_atmos(expr_name: str):

except Exception as e:
L.error(e)
return {'error': str(e)}

# 코드는 바이너리로 저장하는건 별로인가?(버전관리 차원에서 score랑 같은 행에...)
return {'error': str(e)}
11 changes: 11 additions & 0 deletions app/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ class ModelCoreBase(BaseModel):


class ModelCorePrediction(BaseModel):
"""
predict_insurance API의 입력 값 검증을 위한 pydantic 클래스입니다.

Attributes:
age(int)
sex(int)
bmi(float)
children(int)
smoker(int)
region(int)
"""
age: int
sex: int
bmi: float
Expand Down
17 changes: 17 additions & 0 deletions app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@
load_dotenv(verbose=True)

def connect(db):
"""
database와의 연결을 위한 함수 입니다.

Args:
db(str): 사용할 데이터베이스의 이름을 전달받습니다.

Returns:
created database engine: 데이터베이스에 연결된 객체를 반환합니다.

Examples:
>>> engine = connect("my_db")
>>> query = "SHOW timezone;"
>>> engine.execute(query).fetchall()
[('Asia/Seoul',)]
>>> print(engine)
Engine(postgresql://postgres:***@127.0.0.1:5432/my_db)
"""
print(db)

POSTGRES_USER = os.getenv("POSTGRES_USER")
Expand Down
108 changes: 99 additions & 9 deletions app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@


class CoreModel:

"""
predict API 호출을 받았을 때 사용될 ML 모델을 로드하는 클래스입니다.

Attributes:
model_name(str): 예측을 실행할 모델의 이름
model(obj): 모델이 저장될 인스턴스 변수
query(str): 입력받은 모델이름을 기반으로 데이터베이스에서 모델을 불러오는 SQL query입니다.
"""
def __init__(self, model_name):
self.model_name = model_name
self.model = None
Expand All @@ -35,17 +42,40 @@ def __init__(self, model_name):
""".format(self.model_name)

def load_model(self):
"""
본 클래스를 상속받았을 때 이 함수를 구현하지 않으면 예외를 발생시킵니다.
"""
raise Exception

def predict_target(self, target_data):
"""
데이터베이스에서 불러와 인스턴스 변수에 저장된 모델을 기반으로 예측을 수행합니다.

Args:
target_data: predict API 호출 시 입력받은 값입니다. 자료형은 모델에 따라 다릅니다.

Returns:
예측된 값을 반환 합니다. 자료형은 모델에 따라 다릅니다.
"""
return self.model.predict(target_data)


class ScikitLearnModel(CoreModel):
"""
Scikit learn 라이브러리 기반의 모델을 불러오기 위한 클래스입니다.
Examples:
>>> sk_model = ScikitLearnModel("my_model")
>>> sk_model.load_model()
>>> sk_model.predict_target(target)
predict result
"""
def __init__(self, *args):
super().__init__(*args)

def load_model(self):
"""
모델을 데이터베이스에서 불러와 인스턴스 변수에 저장하는 함수 입니다. 상속받은 부모클래스의 인스턴스 변수를 이용하며, 반환 값은 없습니다.
"""
_model = engine.execute(self.query).fetchone()
if _model is None:
raise ValueError('Model Not Found!')
Expand All @@ -56,10 +86,21 @@ def load_model(self):


class TensorFlowModel(CoreModel):
"""
Tensorflow 라이브러리 기반의 모델을 불러오기 위한 클래스입니다.
Examples:
>>> tf_model = TensorflowModel("my_model")
>>> tf_model.load_model()
>>> tf_model.predict_target(target)
predict result
"""
def __init__(self, *args):
super().__init__(*args)

def load_model(self):
"""
모델을 데이터베이스에서 불러와 인스턴스 변수에 저장하는 함수 입니다. 상속받은 부모클래스의 인스턴스 변수를 이용하며, 반환 값은 없습니다.
"""
_model = engine.execute(self.query).fetchone()
if _model is None:
raise ValueError('Model Not Found!')
Expand All @@ -82,6 +123,19 @@ def write_yml(
model_name,
version
):
"""
NNI 실험을 시작하기 위한 config.yml파일을 작성하는 함수 입니다.

Args:
path(str): 실험의 경로
experiment_name(str): 실험의 이름
experimenter(str): 실험자의 이름
model_name(str): 모델의 이름
version(float): 버전

Returns:
반환 값은 없으며 입력받은 경로로 yml파일이 작성됩니다.
"""
with open('{}/{}.yml'.format(path, model_name), 'w') as yml_config_file:
yaml.dump({
'authorName': f'{experimenter}',
Expand Down Expand Up @@ -111,7 +165,20 @@ def write_yml(

return


def zip_model(model_path):
"""
입력받은 모델의 경로를 찾아가 모델을 압축하여 메모리 버퍼를 반환합니다.

Args:
model_path(str): 모델이 있는 경로입니다.

Returns:
memory buffer: 모델을 압축한 메모리 버퍼를 반환합니다.

Note:
모델을 보조기억장치에 파일로 저장하지 않습니다.
"""
model_buffer = io.BytesIO()

basedir = os.path.basename(model_path)
Expand All @@ -128,18 +195,41 @@ def zip_model(model_path):

return model_buffer


def get_free_port():
"""
호출 즉시 사용가능한 포트번호를 반환합니다.

Returns:
현재 사용가능한 포트번호

Examples:
>>> avail_port = get_free_port() # 사용 가능한 포트, 그때그때 다름
>>> print(avail_port)
45675
"""
with socketserver.TCPServer(("localhost", 0), None) as s:
free_port = s.server_address[1]
return free_port


def check_expr_over(experiment_id, experiment_name, experiment_path):
"""
train API에서 사용되기 위하여 만들어 졌습니다. experiment_id를 입력받아 해당 id를 가진 nni 실험을 모니터링 합니다. 현재 추상화되어있지 않아 코드 재사용성이 부족하며 개선이 필요합니다.
* 파일로 생성되는 모델이 너무 많아지지 않도록 유지합니다.(3개 이상 모델이 생성되면 성능순으로 3위 미만은 삭제)
* nnictl experiment list(shell command)를 주기적으로 호출하여 실험이 현제 진행중인지 파악합니다.
* 실험의 상태가 DONE으로 변경되면 최고점수 모델을 데이터베이스에 저장하고 nnictl stop experiment_id를 실행하여 실험을 종료한 후 프로세스가 종료됩니다.

Args:
experiment_id(str)
experiment_name(str)
experiment_path(str)
"""
minute = 60

while True:
time.sleep(1*minute)

# 실험이 끝났는지 확인
expr_list = subprocess.getoutput("nnictl experiment list")

running_expr = [expr for expr in expr_list.split('\n') if experiment_id in expr]
Expand All @@ -148,32 +238,32 @@ def check_expr_over(experiment_id, experiment_name, experiment_path):
stop_expr = subprocess.getoutput("nnictl stop {}".format(
experiment_id))
L.info(stop_expr)
break # 실험이 끝나면 무한루프 종료
break

elif experiment_id not in expr_list:
L.info(expr_list)
break # 갑자기 누군가가 nnictl stop으로 다 꺼버렸을 상황에 대비
break

else:
model_path = os.path.join(experiment_path,
"temp",
"*_{}*".format(experiment_name))
exprs = glob.glob(model_path)
if len(exprs) > 3: # 모델파일이 너무 많아지지 않게 3개 넘으면 삭제
if len(exprs) > 3:
exprs.sort()
[shutil.rmtree(_) for _ in exprs[3:]]
# 모델저장


model_path = os.path.join(experiment_path,
"temp",
"*_{}*".format(experiment_name))
exprs = glob.glob(model_path)
if not exprs: # 모델파일이 하나도 없을 경우 그냥 종료
if not exprs:
return 0

exprs.sort()
exprs = exprs[0]
metrics = os.path.basename(exprs).split("_")[:2] # metric 개수
metrics = os.path.basename(exprs).split("_")[:2]
metrics = [float(metric) for metric in metrics]

score_sql = """SELECT mae
Expand Down