-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain.py
321 lines (272 loc) · 12.2 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTICE: This file has been modified from the original version.
# Modifications copyright 2024 Kyoung Whan Choe
# Original file: https://github.com/huggingface/lerobot/blob/main/lerobot/scripts/train.py
import logging
import hydra
import time
from pprint import pformat
from omegaconf import DictConfig, OmegaConf
from contextlib import nullcontext
from pathlib import Path
import torch
from torch import nn
from torch.cuda.amp import GradScaler
from lerobot.common.datasets.factory import make_dataset
from lerobot.common.datasets.lerobot_dataset import MultiLeRobotDataset
from lerobot.common.datasets.sampler import EpisodeAwareSampler
from lerobot.common.datasets.utils import cycle
from lerobot.common.logger import Logger, log_output_dir
from lerobot.common.utils.utils import (
format_big_number,
get_safe_torch_device,
init_logging,
set_global_seed,
)
from lerobot.scripts.eval import eval_policy
from lerobot.scripts.train import (
update_policy,
log_train_info,
log_eval_info,
)
from mg2hfbot import IMAGE_OBS_SIZE
from mg2hfbot.env import make_mimicgen_env
from policies.factory import make_policy, make_optimizer_and_scheduler
from mg2hfbot.utils import make_dataset_from_local, load_states_from_hdf5
def validate_config(cfg: DictConfig):
if cfg.training.online_steps > 0: # and isinstance(cfg.dataset_repo_id, ListConfig):
raise NotImplementedError("Online training not implemented.")
if cfg.eval.batch_size > cfg.eval.n_episodes:
raise ValueError(
"The eval batch size is greater than the number of eval episodes "
f"({cfg.eval.batch_size} > {cfg.eval.n_episodes}). As a result, {cfg.eval.batch_size} "
f"eval environments will be instantiated, but only {cfg.eval.n_episodes} will be used. "
"This might significantly slow down evaluation. To fix this, you should update your command "
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={cfg.eval.batch_size}`), "
f"or lower the batch size (e.g. `eval.batch_size={cfg.eval.n_episodes}`)."
)
# Check if the env config matches the policy config
image_obs_shape = cfg.policy.input_shapes["observation.images.agentview"]
if cfg.policy.name == "act":
if cfg.env.use_delta_action:
logging.warning(
"ACT should NOT use delta actions. Setting cfg.env.use_delta_action to False."
)
cfg.env.use_delta_action = False
assert (
image_obs_shape[1] == IMAGE_OBS_SIZE[0] and image_obs_shape[2] == IMAGE_OBS_SIZE[1]
), "Image obs shape does not match image obs size"
elif cfg.policy.name == "diffusion":
if cfg.env.use_delta_action:
logging.warning(
"Diffusion should NOT use delta actions. Setting cfg.env.use_delta_action to False."
)
cfg.env.use_delta_action = False
assert (
image_obs_shape[1] == IMAGE_OBS_SIZE[0] and image_obs_shape[2] == IMAGE_OBS_SIZE[1]
), "Image obs shape does not match image obs size"
elif cfg.policy.name.startswith("bc"):
if not cfg.env.use_delta_action:
logging.warning(
"BC-RNN should use delta actions. Setting cfg.env.use_delta_action to True."
)
cfg.env.use_delta_action = True
assert (
image_obs_shape[1] == IMAGE_OBS_SIZE[0] and image_obs_shape[2] == IMAGE_OBS_SIZE[1]
), "Image obs shape does not match image obs size"
else:
raise NotImplementedError(f"Unsupported policy: {cfg.policy.name}")
def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = None):
if out_dir is None:
raise NotImplementedError()
if job_name is None:
raise NotImplementedError()
validate_config(cfg)
init_logging()
logging.info(pformat(OmegaConf.to_container(cfg)))
if cfg.resume:
raise NotImplementedError("Resuming training not implemented.")
elif Logger.get_last_checkpoint_dir(out_dir).exists():
raise RuntimeError(
f"The configured output directory {Logger.get_last_checkpoint_dir(out_dir)} already exists. If "
"you meant to resume training, please use `resume=true` in your command or yaml configuration."
)
# log metrics to terminal and wandb
logger = Logger(cfg, out_dir, wandb_job_name=job_name)
set_global_seed(cfg.seed)
# Check device is available
device = get_safe_torch_device(cfg.device, log=True)
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
logging.info("make_dataset")
try:
offline_dataset = make_dataset(cfg)
except: # noqa
offline_dataset = make_dataset_from_local(cfg)
if isinstance(offline_dataset, MultiLeRobotDataset):
logging.info(
"Multiple datasets were provided. Applied the following index mapping to the provided datasets: "
f"{pformat(offline_dataset.repo_id_to_index , indent=2)}"
)
# Create environment used for evaluating checkpoints during training on simulation data.
# On real-world data, no need to create an environment as evaluations are done outside train.py,
# using the eval.py instead, with gym_dora environment and dora-rs.
eval_env = None
if cfg.training.eval_freq > 0:
use_training_episodes = cfg.eval.get("use_training_episodes", False)
eval_init_states = None
if use_training_episodes:
# NOTE: load the init states from the meta data dir
# once we feed in the initial states, the same states will be used throughout the eval
eval_init_states = load_states_from_hdf5(
Path(cfg.dataset_repo_id) / "meta_data" / "init_states.hdf5"
)
eval_init_states = eval_init_states[: cfg.eval.n_episodes]
logging.info("make_env")
eval_env = make_mimicgen_env(cfg, eval_init_states=eval_init_states)
logging.info("make_policy")
policy = make_policy(
hydra_cfg=cfg,
dataset_stats=offline_dataset.stats if not cfg.resume else None,
pretrained_policy_name_or_path=str(logger.last_pretrained_model_dir)
if cfg.resume
else None,
)
assert isinstance(policy, nn.Module)
# Create optimizer and scheduler
# Temporary hack to move optimizer out of policy
optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy)
grad_scaler = GradScaler(enabled=cfg.use_amp)
step = 0 # number of policy updates (forward + backward + optim)
# if cfg.resume:
# step = logger.load_last_training_state(optimizer, lr_scheduler)
num_learnable_params = sum(p.numel() for p in policy.parameters() if p.requires_grad)
num_total_params = sum(p.numel() for p in policy.parameters())
log_output_dir(out_dir)
logging.info(f"{cfg.env.task=}")
logging.info(f"{cfg.training.offline_steps=} ({format_big_number(cfg.training.offline_steps)})")
logging.info(f"{cfg.training.online_steps=}")
logging.info(
f"{offline_dataset.num_samples=} ({format_big_number(offline_dataset.num_samples)})"
)
logging.info(f"{offline_dataset.num_episodes=}")
logging.info(f"{num_learnable_params=} ({format_big_number(num_learnable_params)})")
logging.info(f"{num_total_params=} ({format_big_number(num_total_params)})")
# Note: this helper will be used in offline and online training loops.
def evaluate_and_checkpoint_if_needed(step, is_online):
_num_digits = max(6, len(str(cfg.training.offline_steps + cfg.training.online_steps)))
step_identifier = f"{step:0{_num_digits}d}"
# Checkpointing first
if cfg.training.save_checkpoint and (
step % cfg.training.save_freq == 0
or step == cfg.training.offline_steps + cfg.training.online_steps
):
logging.info(f"Checkpoint policy after step {step}")
# Note: Save with step as the identifier, and format it to have at least 6 digits but more if
# needed (choose 6 as a minimum for consistency without being overkill).
logger.save_checkpoint(
step,
policy,
optimizer,
lr_scheduler,
identifier=step_identifier,
)
logging.info("Resume training")
if cfg.training.eval_freq > 0 and step % cfg.training.eval_freq == 0:
logging.info(f"Eval policy at step {step}")
with (
torch.no_grad(),
torch.autocast(device_type=device.type) if cfg.use_amp else nullcontext(),
):
assert eval_env is not None
eval_info = eval_policy(
eval_env,
policy,
cfg.eval.n_episodes,
videos_dir=Path(out_dir) / "eval" / f"videos_step_{step_identifier}",
max_episodes_rendered=4,
start_seed=cfg.seed,
)
log_eval_info(
logger, eval_info["aggregated"], step, cfg, offline_dataset, is_online=is_online
)
if cfg.wandb.enable:
logger.log_video(eval_info["video_paths"][0], step, mode="eval")
logging.info("Resume training")
# create dataloader for offline training
if cfg.training.get("drop_n_last_frames"):
shuffle = False
sampler = EpisodeAwareSampler(
offline_dataset.episode_data_index,
drop_n_last_frames=cfg.training.drop_n_last_frames,
shuffle=True,
)
else:
shuffle = True
sampler = None
dataloader = torch.utils.data.DataLoader(
offline_dataset,
num_workers=cfg.training.num_workers,
batch_size=cfg.training.batch_size,
shuffle=shuffle,
sampler=sampler,
pin_memory=device.type != "cpu",
drop_last=False,
)
dl_iter = cycle(dataloader)
policy.train()
offline_step = 0
for _ in range(step, cfg.training.offline_steps):
if offline_step == 0:
logging.info("Start offline training on a fixed dataset")
start_time = time.perf_counter()
batch = next(dl_iter)
dataloading_s = time.perf_counter() - start_time
# Modify the action and image obs to match the config
if cfg.env.use_delta_action:
batch["action"] = batch["action_delta"]
for key in batch:
batch[key] = batch[key].to(device, non_blocking=True)
train_info = update_policy(
policy,
batch,
optimizer,
cfg.training.grad_clip_norm,
grad_scaler=grad_scaler,
lr_scheduler=lr_scheduler,
use_amp=cfg.use_amp,
)
train_info["dataloading_s"] = dataloading_s
if step % cfg.training.log_freq == 0:
log_train_info(logger, train_info, step, cfg, offline_dataset, is_online=False)
# Note: evaluate_and_checkpoint_if_needed happens **after** the `step`th training update has completed,
# so we pass in step + 1.
evaluate_and_checkpoint_if_needed(step + 1, is_online=False)
step += 1
offline_step += 1 # noqa: SIM113
if cfg.training.online_steps == 0:
if eval_env:
eval_env.close()
logging.info("End of training")
return
@hydra.main(version_base="1.2", config_name="default", config_path="./configs")
def train_cli(cfg: dict):
train(
cfg,
out_dir=hydra.core.hydra_config.HydraConfig.get().run.dir,
job_name=hydra.core.hydra_config.HydraConfig.get().job.name,
)
if __name__ == "__main__":
train_cli()