Skip to content

Commit

Permalink
Merge pull request #24 from State-of-The-MLOps/feature/atmos_train
Browse files Browse the repository at this point in the history
Feature/atmos train
  • Loading branch information
ehddnr301 authored Oct 5, 2021
2 parents 439715d + dc240c9 commit 02ba16d
Show file tree
Hide file tree
Showing 20 changed files with 998 additions and 213 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
*.pkl
__pycache__
tf_model/**/*
log.txt
log.txt
experiments/**/temp/
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:

- repo: https://github.com/pre-commit/mirrors-autopep8
rev: 'v1.5.7'
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: autopep8
- id: black
language_version: python3
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
# MLOps
๐Ÿ‘Š Build MLOps system step by step ๐Ÿ‘Š
๐Ÿ‘Š Build MLOps system step by step ๐Ÿ‘Š

## ๋ฌธ์„œ

- [API DOCS](./docs/api-list.md)
63 changes: 39 additions & 24 deletions app/api/router/predict.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from typing import List


import numpy as np
from fastapi import APIRouter
from starlette.concurrency import run_in_threadpool
Expand All @@ -16,30 +17,26 @@


router = APIRouter(
prefix="/predict",
tags=["predict"],
responses={404: {"description": "Not Found"}}
prefix="/predict", tags=["predict"], responses={404: {"description": "Not Found"}}
)


@router.put("/insurance")
async def predict_insurance(info: ModelCorePrediction, model_name: str):
"""
Get information and predict insurance fee
param:
info:
# ์ž„์‹œ๋กœ intํ˜•ํƒœ๋ฅผ ๋ฐ›๋„๋ก ์ œ์ž‘
# preprocess ๋‹จ๊ณ„๋ฅผ ๊ฑฐ์น˜๋„๋ก ๋งŒ๋“ค ์˜ˆ์ •
age: int
sex: int
bmi: float
children: int
smoker: int
region: int
return:
insurance_fee: float
์ •๋ณด๋ฅผ ์ž…๋ ฅ๋ฐ›์•„ ๋ณดํ—˜๋ฃŒ๋ฅผ ์˜ˆ์ธกํ•˜์—ฌ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Args:
info(dict): ๋‹ค์Œ์˜ ๊ฐ’๋“ค์„ ์ž…๋ ฅ๋ฐ›์Šต๋‹ˆ๋‹ค. age(int), sex(int), bmi(float), children(int), smoker(int), region(int)
Returns:
insurance_fee(float): ๋ณดํ—˜๋ฃŒ ์˜ˆ์ธก๊ฐ’์ž…๋‹ˆ๋‹ค.
"""

def sync_call(info, model_name):
"""
none sync ํ•จ์ˆ˜๋ฅผ sync๋กœ ๋งŒ๋“ค์–ด ์ฃผ๊ธฐ ์œ„ํ•œ ํ•จ์ˆ˜์ด๋ฉฐ ์ž…์ถœ๋ ฅ์€ ๋ถ€๋ชจ ํ•จ์ˆ˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.
"""
model = ScikitLearnModel(model_name)
model.load_model()

Expand All @@ -48,30 +45,48 @@ def sync_call(info, model_name):

pred = model.predict_target(test_set)
return {"result": pred.tolist()[0]}

try:
result = await run_in_threadpool(sync_call, info, model_name)
L.info(
f"Predict Args info: {info}\n\tmodel_name: {model_name}\n\tPrediction Result: {result}")
f"Predict Args info: {info}\n\tmodel_name: {model_name}\n\tPrediction Result: {result}"
)
return result

except Exception as e:
L.error(e)
return {'error': str(e)}
return {"error": str(e)}


@router.put("/atmos")
async def predict_temperature(time_series: List[float]):
"""
์˜จ๋„ 1์‹œ๊ฐ„ ๊ฐ„๊ฒฉ ์‹œ๊ณ„์—ด์„ ์ž…๋ ฅ๋ฐ›์•„ ์ดํ›„ 24์‹œ๊ฐ„ ๋™์•ˆ์˜ ์˜จ๋„๋ฅผ 1์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์˜ ์‹œ๊ณ„์—ด๋กœ ์˜ˆ์ธกํ•ฉ๋‹ˆ๋‹ค.
Args:
time_series(List): 72์‹œ๊ฐ„ ๋™์•ˆ์˜ 1์‹œ๊ฐ„ ๊ฐ„๊ฒฉ ์˜จ๋„ ์‹œ๊ณ„์—ด ์ž…๋‹ˆ๋‹ค. 72๊ฐœ์˜ ์›์†Œ๋ฅผ ๊ฐ€์ ธ์•ผ ํ•ฉ๋‹ˆ๋‹ค.
Returns:
List[float]: ์ž…๋ ฅ๋ฐ›์€ ์‹œ๊ฐ„ ์ดํ›„ 24์‹œ๊ฐ„ ๋™์•ˆ์˜ 1์‹œ๊ฐ„ ๊ฐ„๊ฒฉ ์˜จ๋„ ์˜ˆ์ธก ์‹œ๊ณ„์—ด ์ž…๋‹ˆ๋‹ค.
"""
if len(time_series) != 72:
L.error(f'input time_series: {time_series} is not valid')
L.error(f"input time_series: {time_series} is not valid")
return "time series must have 72 values"

try:
tf_model = my_model.model
def sync_pred_ts(time_series):
"""
none sync ํ•จ์ˆ˜๋ฅผ sync๋กœ ๋งŒ๋“ค์–ด ์ฃผ๊ธฐ ์œ„ํ•œ ํ•จ์ˆ˜์ด๋ฉฐ ์ž…์ถœ๋ ฅ์€ ๋ถ€๋ชจ ํ•จ์ˆ˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.
"""
time_series = np.array(time_series).reshape(1, -1, 1)
result = tf_model.predict(time_series)

result = my_model.predict_target(time_series)
L.info(
f"Predict Args info: {time_series.flatten().tolist()}\n\tmodel_name: {tf_model}\n\tPrediction Result: {result.tolist()[0]}")
f"Predict Args info: {time_series.flatten().tolist()}\n\tmodel_name: {my_model.model_name}\n\tPrediction Result: {result.tolist()[0]}"
)

return result

try:
result = await run_in_threadpool(sync_pred_ts, time_series)
return result.tolist()

except Exception as e:
Expand Down
124 changes: 88 additions & 36 deletions app/api/router/train.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,111 @@
# -*- coding: utf-8 -*-
import multiprocessing
import os
import re
import subprocess


from fastapi import APIRouter

from app.utils import write_yml
from app.utils import NniWatcher, ExprimentOwl, base_dir, check_expr_over, get_free_port, write_yml
from logger import L


router = APIRouter(
prefix="/train",
tags=["train"],
responses={404: {"description": "Not Found"}}
prefix="/train", tags=["train"], responses={404: {"description": "Not Found"}}
)


@router.put("/")
@router.put("/insurance")
def train_insurance(
PORT: int = 8080,
experiment_sec: int = 20,
experiment_name: str = 'exp1',
experimenter: str = 'DongUk',
model_name: str = 'insurance_fee_model',
version: float = 0.1
experiment_name: str = "exp1",
experimenter: str = "DongUk",
model_name: str = "insurance_fee_model",
version: float = 0.1,
):
"""
insurance์™€ ๊ด€๋ จ๋œ ํ•™์Šต์„ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•œ API์ž…๋‹ˆ๋‹ค.
Args:
PORT (int): PORT to run NNi. Defaults to 8080
experiment_sec (int): Express the experiment time in seconds Defaults to 20
experiment_name (str): experiment name Defaults to exp1
experimeter (str): experimenter (author) Defaults to DongUk
model_name (str): model name Defaults to insurance_fee_model
version (float): version of experiment Defaults to 0.1
experiment_name (str): ์‹คํ—˜์ด๋ฆ„. ๊ธฐ๋ณธ ๊ฐ’: exp1
experimenter (str): ์‹คํ—˜์ž์˜ ์ด๋ฆ„. ๊ธฐ๋ณธ ๊ฐ’: DongUk
model_name (str): ๋ชจ๋ธ์˜ ์ด๋ฆ„. ๊ธฐ๋ณธ ๊ฐ’: insurance_fee_model
version (float): ์‹คํ—˜์˜ ๋ฒ„์ „. ๊ธฐ๋ณธ ๊ฐ’: 0.1
Returns:
msg: Regardless of success or not, return address values including PORT.
msg: ์‹คํ—˜ ์‹คํ–‰์˜ ์„ฑ๊ณต๊ณผ ์ƒ๊ด€์—†์ด ํฌํŠธ๋ฒˆํ˜ธ๋ฅผ ํฌํ•จํ•œ NNI Dashboard์˜ ์ฃผ์†Œ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Note:
์‹คํ—˜์˜ ์ตœ์ข… ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
"""
PORT = get_free_port()
L.info(
f"Train Args info\n\texperiment_sec: {experiment_sec}\n\texperiment_name: {experiment_name}\n\texperimenter: {experimenter}\n\tmodel_name: {model_name}\n\tversion: {version}")
path = 'experiments/insurance/'
f"Train Args info\n\texperiment_name: {experiment_name}\n\texperimenter: {experimenter}\n\tmodel_name: {model_name}\n\tversion: {version}"
)
path = "experiments/insurance/"
try:
L.info("Start NNi")
write_yml(
path,
experiment_name,
experimenter,
model_name,
version
)
subprocess.Popen(
"nnictl create --port {} --config {}/{}.yml && timeout {} && nnictl stop --port {}".format(
PORT, path, model_name, experiment_sec, PORT),
shell=True,
write_yml(path, experiment_name, experimenter, model_name, version)
nni_create_result = subprocess.getoutput(
"nnictl create --port {} --config {}/{}.yml".format(PORT, path, model_name)
)
sucs_msg = "Successfully started experiment!"

if sucs_msg in nni_create_result:
p = re.compile(r"The experiment id is ([a-zA-Z0-9]+)\n")
expr_id = p.findall(nni_create_result)[0]
nni_watcher = NniWatcher(expr_id, experiment_name, experimenter, version)
m_process = multiprocessing.Process(target=nni_watcher.excute)
m_process.start()

L.info(nni_create_result)
return nni_create_result

except Exception as e:
L.error(e)
return {'error': str(e)}
return {"error": str(e)}


@router.put("/atmos")
def train_atmos(expr_name: str):
"""
์˜จ๋„ ์‹œ๊ณ„์—ด๊ณผ ๊ด€๋ จ๋œ ํ•™์Šต์„ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•œ API์ž…๋‹ˆ๋‹ค.
Args:
expr_name(str): NNI๊ฐ€ ์‹คํ–‰ํ•  ์‹คํ—˜์˜ ์ด๋ฆ„ ์ž…๋‹ˆ๋‹ค. ์ด ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ project_dir/experiments/[expr_name] ๊ฒฝ๋กœ๋กœ ์ฐพ์•„๊ฐ€ config.yml์„ ์ด์šฉํ•˜์—ฌ NNI๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.
return {"msg": f'Check out http://127.0.0.1:{PORT}'}
Returns:
str: NNI์‹คํ—˜์ด ์‹คํ–‰๋œ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๊ฑฐ๋‚˜ ์‹คํ–‰๊ณผ์ •์—์„œ ๋ฐœ์ƒํ•œ ์—๋Ÿฌ ๋ฉ”์„ธ์ง€๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Note:
์‹คํ—˜์˜ ์ตœ์ข… ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
"""

nni_port = get_free_port()
expr_path = os.path.join(base_dir, "experiments", expr_name)

try:
nni_create_result = subprocess.getoutput(
"nnictl create --port {} --config {}/config.yml".format(nni_port, expr_path)
)
sucs_msg = "Successfully started experiment!"

if sucs_msg in nni_create_result:
p = re.compile(r"The experiment id is ([a-zA-Z0-9]+)\n")
expr_id = p.findall(nni_create_result)[0]
check_expr = ExprimentOwl(expr_id, expr_name, expr_path)
check_expr.add("update_tfmodeldb")
check_expr.add("modelfile_cleaner")

m_process = multiprocessing.Process(
target=check_expr.execute
)
m_process.start()

L.info(nni_create_result)
return nni_create_result

else:
L.error(nni_create_result)
return {"error": nni_create_result}

except Exception as e:
L.error(e)
return {"error": str(e)}
12 changes: 12 additions & 0 deletions app/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ class ModelCoreBase(BaseModel):


class ModelCorePrediction(BaseModel):
"""
predict_insurance API์˜ ์ž…๋ ฅ ๊ฐ’ ๊ฒ€์ฆ์„ ์œ„ํ•œ pydantic ํด๋ž˜์Šค์ž…๋‹ˆ๋‹ค.
Attributes:
age(int)
sex(int)
bmi(float)
children(int)
smoker(int)
region(int)
"""

age: int
sex: int
bmi: float
Expand Down
26 changes: 23 additions & 3 deletions app/database.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,43 @@
import os


from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

load_dotenv(verbose=True)


def connect(db):
"""
database์™€์˜ ์—ฐ๊ฒฐ์„ ์œ„ํ•œ ํ•จ์ˆ˜ ์ž…๋‹ˆ๋‹ค.
Args:
db(str): ์‚ฌ์šฉํ•  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์ด๋ฆ„์„ ์ „๋‹ฌ๋ฐ›์Šต๋‹ˆ๋‹ค.
Returns:
created database engine: ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์—ฐ๊ฒฐ๋œ ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Examples:
>>> engine = connect("my_db")
>>> query = "SHOW timezone;"
>>> engine.execute(query).fetchall()
[('Asia/Seoul',)]
>>> print(engine)
Engine(postgresql://postgres:***@127.0.0.1:5432/my_db)
"""
print(db)

POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
POSTGRES_SERVER = os.getenv("POSTGRES_SERVER")

SQLALCHEMY_DATABASE_URL = \
f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@' +\
f'{POSTGRES_SERVER}:{POSTGRES_PORT}/{db}'
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@"
+ f"{POSTGRES_SERVER}:{POSTGRES_PORT}/{db}"
)

connection = create_engine(SQLALCHEMY_DATABASE_URL)

Expand Down
Loading

0 comments on commit 02ba16d

Please sign in to comment.