forked from robotology-legacy/gym-ignition
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathabc.py
133 lines (97 loc) · 3.69 KB
/
abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (C) 2020 Istituto Italiano di Tecnologia (IIT). All rights reserved.
# This software may be modified and distributed under the terms of the
# GNU Lesser General Public License v2.1 or any later version.
import abc
import gym_ignition.base.task
from scenario import core as scenario_core
class TaskRandomizer(abc.ABC):
@abc.abstractmethod
def randomize_task(self, task: gym_ignition.base.task.Task, **kwargs) -> None:
"""
Randomize a :py:class:`~gym_ignition.base.task.Task` instance.
Args:
task: the task to randomize.
Note:
Note that each task has a :py:attr:`~gym_ignition.base.task.Task.world`
property that provides access to the simulated
:py:class:`scenario.bindings.core.World`.
"""
pass
class PhysicsRandomizer(abc.ABC):
"""
Abstract class that provides the machinery for randomizing the physics of a Task.
Args:
randomize_after_rollouts_num: defines after many rollouts physics should be
randomized (i.e. the amount of times :py:meth:`gym.Env.reset` is called).
"""
def __init__(self, randomize_after_rollouts_num: int = 0):
self._rollout_counter = randomize_after_rollouts_num
self.randomize_after_rollouts_num = randomize_after_rollouts_num
@abc.abstractmethod
def randomize_physics(self, task: gym_ignition.base.task.Task, **kwargs) -> None:
"""
Method that insert and configures the physics of a Task's world.
By default this method loads a plugin that uses DART with no randomizations.
Randomizing physics engine parameters or changing physics engine backend could be
done by redefining this method and passing it to
:py:class:`~gym_ignition.runtimes.gazebo_runtime.GazeboRuntime`.
Args:
task: A task containing a world object without physics.
"""
pass
@abc.abstractmethod
def get_engine(self):
"""
Return the physics engine to use for the rollout.
Note:
Supported physics engines:
- :py:const:`scenario.bindings.gazebo.PhysicsEngine_dart`
Return:
The desired physics engine to set in the world.
"""
pass
def increase_rollout_counter(self) -> None:
"""
Increase the rollouts counter.
"""
if self.randomize_after_rollouts_num != 0:
assert self._rollout_counter != 0
self._rollout_counter -= 1
def physics_expired(self) -> bool:
"""
Checks if the physics needs to be randomized.
Return:
True if the physics has expired, false otherwise.
"""
if self.randomize_after_rollouts_num == 0:
return False
if self._rollout_counter == 0:
self._rollout_counter = self.randomize_after_rollouts_num
return True
return False
class ModelRandomizer(abc.ABC):
@abc.abstractmethod
def randomize_model(
self, task: gym_ignition.base.task.Task, **kwargs
) -> scenario_core.Model:
"""
Randomize the model.
Args:
task: The task that operates on the model to randomize.
Return:
The randomized model.
"""
pass
class ModelDescriptionRandomizer(abc.ABC):
@abc.abstractmethod
def randomize_model_description(
self, task: gym_ignition.base.task.Task, **kwargs
) -> str:
"""
Randomize the model description.
Args:
task: The task that operates on the model description to randomize.
Return:
A string with the randomized model description.
"""
pass