Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
mengyuest committed Dec 22, 2023
0 parents commit 5a26a1e
Show file tree
Hide file tree
Showing 225 changed files with 30,949 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.vscode/*
.idea/*
.DS_Store
__pycache__/*
exps_stl
scripts
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Signal Temporal Logic Neural Predictive Control

[![Journal](https://img.shields.io/badge/RA--L2023-Accepted-success)](https://ieeexplore.ieee.org/iel7/7083369/7339444/10251585.pdf)
[![Conference](https://img.shields.io/badge/ICRA2024-Present-success)](https://2024.ieee-icra.org/index.html)

<!-- [![Arxiv](http://img.shields.io/badge/arxiv-cs:2309.05131-B31B1B.svg)](https://arxiv.org/abs/2309.05131.pdf) -->

[<ins>Reliable Autonomous Systems Lab @ MIT (REALM)</ins>](https://aeroastro.mit.edu/realm/)

[<ins>Yue Meng</ins>](https://mengyuest.github.io/), [<ins>Chuchu Fan</ins>](https://chuchu.mit.edu/)

![Alt Text](ral2023_teaser_v1.png)

> A differentiable learning framework to define task requirements and to learn control policies for robots.

This repository contains the original code and tutorial for our ICRA2024 paper, "Signal Temporal Logic Neural Predictive Control." [[link]](https://arxiv.org/abs/2309.05131.pdf)


```
@article{meng2023signal,
title={Signal Temporal Logic Neural Predictive Control},
author={Meng, Yue and Fan, Chuchu},
journal={IEEE Robotics and Automation Letters},
year={2023},
publisher={IEEE}
}
```

![Alt Text](ral2023.gif)

## Prerequisite
Ubuntu 20.04 (better to have a GPU like NVidia RTX 2080Ti)

Packages (steps 1 and 2 suffice for just using our STL Library (see [tutorial](tutorial.ipynb))):
1. Numpy and Matplotlib: `conda install numpy matplotlib`
2. PyTorch v1.13.1 [[link]](https://pytorch.org/get-started/previous-versions/): `conda install pytorch==1.13.1 torchvision==0.14.1 torchaudio==0.13.1 pytorch-cuda=11.7 -c pytorch -c nvidia` (other version might also work )
3. Casadi, Gurobi and RL libraries: `pip install casadi gurobipy stable-baselines3 && cd mbrl_bsl && pip install -e . && cd -`
4. (Just for the manipulation task) `pip install pytorch_kinematics mujoco forwardkinematics pybullet && sudo apt-get install libffi7`

## Tutorial
You can find basic usage in our tutorial jupyter notebook [here](tutorial.ipynb).

## Experimental results
Please look at [`exp_scripts`](exp_scripts.sh) to reproduce the full experiments.
179 changes: 179 additions & 0 deletions envs/base_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from abc import (
ABC,
abstractmethod,
abstractproperty,
)
from gym import Env
import numpy as np
import torch
from utils import to_np, to_torch
import csv

class BaseEnv(Env):
def __init__(self, args):
super(BaseEnv, self).__init__()
self.args = args
self.pid = None
self.sample_idx = 0
# TODO obs space and action space
self.reward_list = []
self.stl_reward_list = []
self.acc_reward_list = []
self.history = []
if hasattr(args, "write_csv") and args.write_csv:
self.epi = 0
self.csvfile = open('%s/monitor_full.csv'%(args.exp_dir_full), 'w', newline='')
self.csvwriter = csv.writer(self.csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
self.reward_fn = self.generate_reward_batch_fn()
self.reward_fn_torch = self.wrap_reward_fn_torch(self.reward_fn)

@abstractmethod
def next_state(self, x, u):
pass

# @abstractmethod
def dynamics(self, x0, u, include_first=False):
args = self.args
t = u.shape[1]
x = x0.clone()
segs = []
if include_first:
segs.append(x)
for ti in range(t):
new_x = self.next_state(x, u[:, ti])
segs.append(new_x)
x = new_x
return torch.stack(segs, dim=1)

@abstractmethod
def init_x_cycle(self):
pass

@abstractmethod
def init_x(self):
pass

@abstractmethod
def generate_stl(self):
pass

@abstractmethod
def generate_heur_loss(self):
pass

@abstractmethod
def visualize(self):
pass

def transform(self, seg):
# this is used for some case when there is a need to first augment the state trajectory
# for example, for the panda env environment
return seg

#@abstractmethod
def step(self):
pass

def write_to_csv(self, env_steps):
r_rs = self.get_rewards()
r_rs = np.array(r_rs, dtype=np.float32)
r_avg = np.mean(r_rs[0])
rs_avg = np.mean(r_rs[1])
racc_avg = np.mean(r_rs[2])
self.csvwriter.writerow([self.epi, env_steps, r_avg, rs_avg, racc_avg])
self.csvfile.flush()
print("epi:%06d step:%06d r:%.3f %.3f %.3f"%(self.epi, env_steps, r_avg, rs_avg, racc_avg))
self.epi += 1

#@abstractmethod
# def reset(self):
# pass
def reset(self):
N = self.args.num_samples
if self.sample_idx % N == 0:
self.x0 = self.init_x(N)
self.indices = torch.randperm(N)
self.state = to_np(self.x0[self.indices[self.sample_idx % N]])
self.sample_idx += 1
self.t = 0
if len(self.history)>self.args.nt:
segs_np = np.stack(self.history, axis=0)
segs = to_torch(segs_np[None, :])
seg_aug = self.transform(segs)
seg_aug_np = to_np(seg_aug)
# print(seg_aug_np.shape)
# exit()
self.reward_list.append(np.sum(self.generate_reward_batch(seg_aug_np.squeeze())))
self.stl_reward_list.append(self.stl_reward(seg_aug)[0, 0])
self.acc_reward_list.append(self.acc_reward(seg_aug)[0, 0])
self.history = [np.array(self.state)]
return self.state

def get_rewards(self):
if len(self.reward_list)==0:
return 0, 0, 0
else:
return self.reward_list[-1], self.stl_reward_list[-1], self.acc_reward_list[-1]

def generate_reward_batch(self, state): # (n, 7)
return self.reward_fn(None, state)

def wrap_reward_fn_torch(self, reward_fn):
def reward_fn_torch(act, state):
act_np = act.detach().cpu().numpy()
state_np = state.detach().cpu().numpy()
reward_np = reward_fn(act_np, state_np)
return torch.from_numpy(reward_np).float()[:, None].to(state.device)
return reward_fn_torch

@abstractmethod
def generate_reward_batch_fn(self):
pass

#@abstractmethod
def generate_reward(self, state):
if self.args.stl_reward or self.args.acc_reward:
last_one = (self.t+1) >= self.args.nt
if last_one:
segs = to_torch(np.stack(self.history, axis=0)[None, :])
segs_aug = self.transform(segs)
if self.args.stl_reward:
return self.stl_reward(segs_aug)[0, 0]
elif self.args.acc_reward:
return self.acc_reward(segs_aug)[0, 0]
else:
raise NotImplementError
else:
return np.zeros_like(0)
else:
return self.generate_reward_batch(state[None, :])[0]

def stl_reward(self, segs):
score = self.stl(segs, self.args.smoothing_factor)[:, :1]
reward = to_np(score)
return reward

def acc_reward(self, segs):
score = (self.stl(segs, self.args.smoothing_factor, d={"hard":True})[:, :1]>=0).float()
reward = 100 * to_np(score)
return reward

def print_stl(self):
print(self.stl)
self.stl.update_format("word")
print(self.stl)

def my_render(self):
if self.pid==0:
self.render(None)

def test(self):
for trial_i in range(self.num_trials):
obs = self.test_reset()
trajs = [self.test_state()]
for ti in range(self.nt):
u = solve(obs)
obs, reward, done, di = self.test_step(u)
trajs.append(self.test_state())

# save metrics result
Loading

0 comments on commit 5a26a1e

Please sign in to comment.