Skip to content

Commit

Permalink
fix bayesian optimization suggestion (#251)
Browse files Browse the repository at this point in the history
* fix bayse optimization suggestion

Signed-off-by: YujiOshima <[email protected]>

* add bayseopt-example

Signed-off-by: YujiOshima <[email protected]>

* reset x_train in burn-in

Signed-off-by: YujiOshima <[email protected]>

* validate parameters

Signed-off-by: YujiOshima <[email protected]>
  • Loading branch information
YujiOshima authored and k8s-ci-robot committed Dec 4, 2018
1 parent 72a0fc0 commit 1104524
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 140 deletions.
2 changes: 2 additions & 0 deletions cmd/suggestion/bayesianoptimization/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
grpcio
duecredit
cloudpickle==0.5.6
numpy>=1.13.3
scikit-learn>=0.19.0
scipy>=0.19.1
Expand Down
67 changes: 67 additions & 0 deletions examples/bayseopt-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
apiVersion: "kubeflow.org/v1alpha1"
kind: StudyJob
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: bayseopt-example
spec:
studyName: random-example
owner: crd
optimizationtype: maximize
objectivevaluename: Validation-accuracy
optimizationgoal: 0.99
requestcount: 10
metricsnames:
- accuracy
parameterconfigs:
- name: --lr
parametertype: double
feasible:
min: "0.01"
max: "0.03"
- name: --num-layers
parametertype: int
feasible:
min: "1"
max: "4"
- name: --optimizer
parametertype: categorical
feasible:
list:
- sgd
- adam
- ftrl
workerSpec:
goTemplate:
rawTemplate: |-
apiVersion: batch/v1
kind: Job
metadata:
name: {{.WorkerID}}
namespace: kubeflow
spec:
template:
spec:
imagePullSecrets:
- name: gitlabregcred
containers:
- name: {{.WorkerID}}
image: katib/mxnet-mnist-example
command:
- "python"
- "/mxnet/example/image-classification/train_mnist.py"
- "--batch-size=32"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
restartPolicy: Never
suggestionSpec:
suggestionAlgorithm: "bayesianoptimization"
suggestionParameters:
-
name: "burn_in"
value: "5"
requestNumber: 3
273 changes: 159 additions & 114 deletions pkg/suggestion/bayesian_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from pkg.api.python import api_pb2_grpc
from pkg.suggestion.bayesianoptimization.src.bayesian_optimization_algorithm import BOAlgorithm
from pkg.suggestion.bayesianoptimization.src.algorithm_manager import AlgorithmManager
import logging
from logging import getLogger, StreamHandler, INFO, DEBUG


class BayesianService(api_pb2_grpc.SuggestionServicer):
def __init__(self):
def __init__(self, logger=None):
# {
# study_id:[
# {
Expand All @@ -21,134 +23,177 @@ def __init__(self):
# }
# ]
# }
self.trial_hist = {}
# {
# study_id:{
# N:
# }
# }
self.service_params = {}

def GenerateTrials(self, request, context):
if request.study_id not in self.trial_hist.keys():
self.trial_hist[request.study_id] = []
X_train = []
y_train = []
self.manager_addr = "vizier-core"
self.manager_port = 6789
if logger == None:
self.logger = getLogger(__name__)
FORMAT = '%(asctime)-15s StudyID %(studyid)s %(message)s'
logging.basicConfig(format=FORMAT)
handler = StreamHandler()
handler.setLevel(INFO)
self.logger.setLevel(INFO)
self.logger.addHandler(handler)
self.logger.propagate = False
else:
self.logger = logger

for x in request.completed_trials:
for trial in self.trial_hist[x.study_id]:
if trial["trial_id"] == x.trial_id:
trial["metric"] = x.objective_value

for x in self.trial_hist[request.study_id]:
if x["metric"] is not None:
X_train.append(x["parameters"])
y_train.append(x["metric"])
def GetSuggestions(self, request, context):
service_params = self.parseParameters(request.param_id)
study_conf = self.getStudyConfig(request.study_id)
X_train, y_train = self.getEvalHistory(request.study_id, study_conf.objective_value_name, service_params["burn_in"])

algo_manager = AlgorithmManager(
study_id=request.study_id,
study_config=request.configs,
X_train=X_train,
y_train=y_train,
study_id = request.study_id,
study_config = study_conf,
X_train = X_train,
y_train = y_train,
logger = self.logger,
)

lowerbound = np.array(algo_manager.lower_bound)
upperbound = np.array(algo_manager.upper_bound)
# print("lowerbound", lowerbound)
# print("upperbound", upperbound)
self.logger.debug("lowerbound: %r", lowerbound, extra={"StudyID": request.study_id})
self.logger.debug("upperbound: %r", upperbound, extra={"StudyID": request.study_id})
alg = BOAlgorithm(
dim=algo_manager.dim,
N=int(self.service_params[request.study_id]["N"]),
N=int(service_params["N"]),
lowerbound=lowerbound,
upperbound=upperbound,
X_train=algo_manager.X_train,
y_train=algo_manager.y_train,
mode=self.service_params[request.study_id]["mode"],
trade_off=self.service_params[request.study_id]["trade_off"],
mode=service_params["mode"],
trade_off=service_params["trade_off"],
# todo: support length_scale with array type
length_scale=self.service_params[request.study_id]["length_scale"],
noise=self.service_params[request.study_id]["noise"],
nu=self.service_params[request.study_id]["nu"],
kernel_type=self.service_params[request.study_id]["kernel_type"],
n_estimators=self.service_params[request.study_id]["n_estimators"],
max_features=self.service_params[request.study_id]["max_features"],
model_type=self.service_params[request.study_id]["model_type"],
length_scale=service_params["length_scale"],
noise=service_params["noise"],
nu=service_params["nu"],
kernel_type=service_params["kernel_type"],
n_estimators=service_params["n_estimators"],
max_features=service_params["max_features"],
model_type=service_params["model_type"],
logger=self.logger,
)
x_next = alg.get_suggestion().squeeze()

# todo: maybe there is a better way to generate a trial_id
trial_id = ''.join(random.sample(string.ascii_letters + string.digits, 12))
self.trial_hist[request.study_id].append(dict({
"trial_id": trial_id,
"parameters": x_next,
"metric": None,
}))
# print(x_next)

x_next = algo_manager.parse_x_next(x_next)
x_next = algo_manager.convert_to_dict(x_next)
trial = api_pb2.Trial(
trial_id=trial_id,
study_id=request.study_id,
parameter_set=[
api_pb2.Parameter(
name=x["name"],
value=str(x["value"]),
parameter_type=x["type"],
) for x in x_next
],
status=api_pb2.PENDING,
eval_logs=[],
trials = []
x_next_list = alg.get_suggestion(request.request_number)
for x_next in x_next_list:
x_next = x_next.squeeze()
self.logger.debug("xnext: %r ", x_next, extra={"StudyID": request.study_id})
x_next = algo_manager.parse_x_next(x_next)
x_next = algo_manager.convert_to_dict(x_next)
trials.append(api_pb2.Trial(
study_id=request.study_id,
parameter_set=[
api_pb2.Parameter(
name=x["name"],
value=str(x["value"]),
parameter_type=x["type"],
) for x in x_next
]
)
)
trials = self.registerTrials(trials)
return api_pb2.GetSuggestionsReply(
trials=trials
)
# print(self.trial_hist)
def getStudyConfig(self, studyID):
channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
gsrep = client.GetStudy(api_pb2.GetStudyRequest(study_id=studyID), 10)
return gsrep.study_config

return api_pb2.GenerateTrialsReply(
trials=[trial],
completed=False,
)
def getEvalHistory(self, studyID, obj_name, burn_in):
worker_hist = []
x_train = []
y_train = []
channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
gwfrep = client.GetWorkerFullInfo(api_pb2.GetWorkerFullInfoRequest(study_id=studyID, only_latest_log=True), 10)
worker_hist = gwfrep.worker_full_infos
#self.logger.debug("Eval Trials Log: %r", worker_hist, extra={"StudyID": studyID})
for w in worker_hist:
if w.Worker.status == api_pb2.COMPLETED:
for ml in w.metrics_logs:
if ml.name == obj_name:
y_train.append(float(ml.values[-1].value))
x_train.append(w.parameter_set)
break
self.logger.info("%d completed trials are found.", len(x_train), extra={"StudyID": studyID})
if len(x_train) <= burn_in:
x_train = []
y_train = []
self.logger.info("Trials will be sampled until %d trials for burn-in are completed.", burn_in, extra={"StudyID": studyID})
else:
self.logger.debug("Completed trials: %r", x_train, extra={"StudyID": studyID})

return x_train, y_train

def registerTrials(self, trials):
channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
for i, t in enumerate(trials):
ctrep = client.CreateTrial(api_pb2.CreateTrialRequest(trial=t), 10)
trials[i].trial_id = ctrep.trial_id
return trials

def parseParameters(self, paramID):
channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port)
params = []
with api_pb2.beta_create_Manager_stub(channel) as client:
gsprep = client.GetSuggestionParameters(api_pb2.GetSuggestionParametersRequest(param_id=paramID), 10)
params = gsprep.suggestion_parameters

parsed_service_params = {
"N": 100,
"model_type": "gp",
"max_features": "auto",
"length_scale": 0.5,
"noise": 0.0005,
"nu": 1.5,
"kernel_type": "matern",
"n_estimators": 50,
"mode": "pi",
"trade_off": 0.01,
"trial_hist": "",
"burn_in": 10,
}
modes = ["pi", "ei"]
model_types = ["gp", "rf"]
kernel_types = ["matern", "rbf"]

for param in params:
if param.name in parsed_service_params.keys():
if param.name == "length_scale" or param.name == "noise" or param.name == "nu" or param.name == "trade_off":
try:
float(param.value)
except ValueError:
self.logger.warning("Parameter must be float for %s: %s back to default value",param.name , param.value)
else:
parsed_service_params[param.name] = float(param.value)

elif param.name == "N" or param.name == "n_estimators" or param.name == "burn_in":
try:
int(param.value)
except ValueError:
self.logger.warning("Parameter must be int for %s: %s back to default value",param.name , param.value)
else:
parsed_service_params[param.name] = int(param.value)

elif param.name == "kernel_type":
if param.value != "rbf" and param.value != "matern":
parsed_service_params[param.name] = param.value
else:
self.logger.warning("Unknown Parameter for %s: %s back to default value",param.name , param.value)
elif param.name == "mode" and param.value in modes:
if param.value != "lcb" and param.value != "ei" and param.value != "pi":
parsed_service_params[param.name] = param.value
else:
self.logger.warning("Unknown Parameter for %s: %s back to default value",param.name , param.value)
elif param.name == "model_type" and param.value in model_types:
if param.value != "rf" and param.value != "gp":
parsed_service_params[param.name] = param.value
else:
self.logger.warning("Unknown Parameter for %s: %s back to default value",param.name , param.value)
else:
self.logger.warning("Unknown Parameter name: %s ", param.name)

def SetSuggestionParameters(self, request, context):
if request.study_id not in self.service_params.keys():
self.service_params[request.study_id] = {
"N": None,
"length_scale": None,
"noise": None,
"nu": None,
"kernel_type": None,
"mode": None,
"trade_off": None,
"n_estimators": None,
"max_features": None,
"model_type": None,
}
for param in request.suggestion_parameters:
if param.name not in self.service_params[request.study_id].keys():
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("unknown suggestion parameter: "+param.name)
return api_pb2.SetSuggestionParametersReply()
if param.name == "length_scale" or param.name == "noise" or param.name == "nu" or param.name == "trade_off":
self.service_params[request.study_id][param.name] = float(param.value)
elif param.name == "N" or param.name == "n_estimators":
self.service_params[request.study_id][param.name] = int(param.value)
elif param.name == "kernel_type":
if param.value != "rbf" and param.value != "matern":
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("unknown kernel type: " + param.value)
self.service_params[request.study_id][param.name] = param.value
elif param.name == "mode":
if param.value != "lcb" and param.value != "ei" and param.value != "pi":
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("unknown acquisition mode: " + param.name)
self.service_params[request.study_id][param.name] = param.value
elif param.name == "model_type":
if param.value != "rf" and param.value != "gp":
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details("unknown model_type: " + param.name)

return api_pb2.SetSuggestionParametersReply()

def StopSuggestion(self, request, context):
if request.study_id in self.service_params.keys():
del self.service_params[request.study_id]
del self.trial_hist[request.study_id]
return api_pb2.StopStudyReply()
return parsed_service_params
Loading

0 comments on commit 1104524

Please sign in to comment.