forked from amidos2006/gym-pcgrl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
105 lines (97 loc) · 3.69 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Helper functions for train, infer, and eval modules.
"""
import os
import re
import glob
import numpy as np
from gym_pcgrl import wrappers
from stable_baselines import PPO2
from stable_baselines.bench import Monitor
from stable_baselines.common.vec_env import SubprocVecEnv, DummyVecEnv
class RenderMonitor(Monitor):
"""
Wrapper for the environment to save data in .csv files.
"""
def __init__(self, env, rank, log_dir, **kwargs):
self.log_dir = log_dir
self.rank = rank
self.render_gui = kwargs.get('render', False)
self.render_rank = kwargs.get('render_rank', 0)
if log_dir is not None:
log_dir = os.path.join(log_dir, str(rank))
Monitor.__init__(self, env, log_dir)
def step(self, action):
if self.render_gui and self.rank == self.render_rank:
self.render()
return Monitor.step(self, action)
def get_action(obs, env, model, action_type=True):
action = None
if action_type == 0:
action, _ = model.predict(obs)
elif action_type == 1:
action_prob = model.action_probability(obs)[0]
action = np.random.choice(a=list(range(len(action_prob))), size=1, p=action_prob)
else:
action = np.array([env.action_space.sample()])
return action
def make_env(env_name, representation, rank=0, log_dir=None, **kwargs):
'''
Return a function that will initialize the environment when called.
'''
max_step = kwargs.get('max_step', None)
render = kwargs.get('render', False)
def _thunk():
if representation == 'wide':
env = wrappers.ActionMapImagePCGRLWrapper(env_name, **kwargs)
else:
crop_size = kwargs.get('cropped_size', 28)
env = wrappers.CroppedImagePCGRLWrapper(env_name, crop_size, **kwargs)
# RenderMonitor must come last
if render or log_dir is not None and len(log_dir) > 0:
env = RenderMonitor(env, rank, log_dir, **kwargs)
return env
return _thunk
def make_vec_envs(env_name, representation, log_dir, n_cpu, **kwargs):
'''
Prepare a vectorized environment using a list of 'make_env' functions.
'''
if n_cpu > 1:
env_lst = []
for i in range(n_cpu):
env_lst.append(make_env(env_name, representation, i, log_dir, **kwargs))
env = SubprocVecEnv(env_lst)
else:
env = DummyVecEnv([make_env(env_name, representation, 0, log_dir, **kwargs)])
return env
def get_exp_name(game, representation, experiment, **kwargs):
exp_name = '{}_{}'.format(game, representation)
if experiment is not None:
exp_name = '{}_{}'.format(exp_name, experiment)
return exp_name
def max_exp_idx(exp_name):
log_dir = os.path.join("./runs", exp_name)
log_files = glob.glob('{}*'.format(log_dir))
if len(log_files) == 0:
n = 0
else:
log_ns = [re.search('_(\d+)', f).group(1) for f in log_files]
n = max(log_ns)
return int(n)
def load_model(log_dir):
model_path = os.path.join(log_dir, 'latest_model.pkl')
if not os.path.exists(model_path):
model_path = os.path.join(log_dir, 'latest_model.zip')
if not os.path.exists(model_path):
model_path = os.path.join(log_dir, 'best_model.pkl')
if not os.path.exists(model_path):
model_path = os.path.join(log_dir, 'best_model.zip')
if not os.path.exists(model_path):
files = [f for f in os.listdir(log_dir) if '.pkl' in f or '.zip' in f]
if len(files) > 0:
# selects the last file listed by os.listdir
model_path = os.path.join(log_dir, np.random.choice(files))
else:
raise Exception('No models are saved')
model = PPO2.load(model_path)
return model