From 43a0d68d42c09085945d52bd6dfd84a575fb7b64 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 11 Dec 2023 13:10:50 +1000 Subject: [PATCH] General Clean Up --- armer/armer.py | 2 +- armer/cython/collision_handler.html | 1 - armer/robots/ROSRobot.py | 142 ++++++++++++---------------- 3 files changed, 60 insertions(+), 85 deletions(-) diff --git a/armer/armer.py b/armer/armer.py index bd1b77a..f2ddc95 100644 --- a/armer/armer.py +++ b/armer/armer.py @@ -310,7 +310,7 @@ def global_collision_check(self, robot: ROSRobot): dim=4, ) end = timeit.default_timer() - print(f"[KD Setup] full collision check: {1/(end-start)} hz") + # print(f"[KD Setup] full collision check: {1/(end-start)} hz") # Output collision debugging information (RVIZ) if enabled # NOTE: disabled by default diff --git a/armer/cython/collision_handler.html b/armer/cython/collision_handler.html index d5beb2f..a6f0918 100644 --- a/armer/cython/collision_handler.html +++ b/armer/cython/collision_handler.html @@ -305,7 +305,6 @@ .cython .cs { color: #3D7B7B; font-style: italic } /* Comment.Special */ .cython .gd { color: #A00000 } /* Generic.Deleted */ .cython .ge { font-style: italic } /* Generic.Emph */ -.cython .ges { font-weight: bold; font-style: italic } /* Generic.EmphStrong */ .cython .gr { color: #E40000 } /* Generic.Error */ .cython .gh { color: #000080; font-weight: bold } /* Generic.Heading */ .cython .gi { color: #008400 } /* Generic.Inserted */ diff --git a/armer/robots/ROSRobot.py b/armer/robots/ROSRobot.py index 0a52dad..67a4964 100644 --- a/armer/robots/ROSRobot.py +++ b/armer/robots/ROSRobot.py @@ -307,20 +307,14 @@ def __init__(self, self.custom_configs: List[str] = [] self.__load_config() - # --- TEST GHOST PUBLISHER --- # + # --- Publishes trajectory to run as marker list --- # self.display_traj_publisher: rospy.Publisher = rospy.Publisher( '{}/trajectory_display'.format(self.name.lower()), Marker, queue_size=1 ) - # --- TEST RVIZ MARKER PUBLISHER --- # - # self.display_rviz_marker_publisher: rospy.Publisher = rospy.Publisher( - # '{}/dynamic_marker_display'.format(self.name.lower()), - # MarkerArray, - # queue_size=1 - # ) - + # --- Publishes marker shapes as current target for checking collisions --- # self.collision_debug_publisher: rospy.Publisher = rospy.Publisher( '{}/collision_debug_display'.format(self.name.lower()), MarkerArray, @@ -555,18 +549,25 @@ def __init__(self, self.get_named_pose_configs_cb ) + # ------ Collision Checking Services ----- # rospy.Service( '{}/add_collision_object'.format(self.name.lower()), AddCollisionObject, self.add_collision_obj_cb ) + rospy.Service( + '{}/remove_collision_object'.format(self.name.lower()), + AddCollisionObject, + self.remove_collision_obj_cb + ) + rospy.Service( '{}/get_collision_objects'.format(self.name.lower()), GetCollisionObjects, self.get_collision_obj_cb ) - + rospy.Service( '{}/enable_collision_debug'.format(self.name.lower()), SetBool, @@ -594,7 +595,7 @@ def velocity_cb(self, msg: TwistStamped) -> None: :type msg: TwistStamped """ if self._controller_mode == ControlMode.ERROR: - rospy.logerr(f"[CART VELOCITY CB] -> [{self.name}] in Error Control Mode...") + rospy.logerr_throttle(msg=f"[CART VELOCITY CB] -> [{self.name}] in Error Control Mode...", period=1) return None if self.moving: @@ -613,12 +614,12 @@ def joint_velocity_cb(self, msg: JointVelocity) -> None: :type msg: JointVelocity """ if self._controller_mode == ControlMode.ERROR: - rospy.logerr(f"[JOINT VELOCITY CB] -> [{self.name}] in Error Control Mode...") + rospy.logerr_throttle(msg=f"[JOINT VELOCITY CB] -> [{self.name}] in Error Control Mode...", period=1) return None # Check for vector length and terminate if invalid if len(msg.joints) != len(self.j_v): - rospy.logerr(f"[JOINT VELOCITY CB] -> [{self.name}] provided input vector length invalid [{len(msg.joints)}]. Expecting len [{len(self.j_v)}]") + rospy.logerr_throttle(msg=f"[JOINT VELOCITY CB] -> [{self.name}] provided input vector length invalid [{len(msg.joints)}]. Expecting len [{len(self.j_v)}]", period=1) return None if self.moving: @@ -674,7 +675,7 @@ def servo_cb(self, msg) -> None: to generate velocities at each timestep. """ if self._controller_mode == ControlMode.ERROR: - rospy.logerr(f"SERVO CB: [{self.name}] in Error Control Mode...") + rospy.logerr_throttle(msg=f"SERVO CB: [{self.name}] in Error Control Mode...", period=1) return None # Safely stop any current motion of the arm @@ -1967,23 +1968,41 @@ def add_collision_obj_cb(self, req: AddCollisionObjectRequest) -> AddCollisionOb # NOTE: handle to not add if overwrite requested # NOTE: wait for shape to be added to backend if interactive_marker_handle: - rospy.sleep(rospy.Duration(secs=1)) + rospy.sleep(rospy.Duration(secs=0.5)) self.interactive_marker_creation() return AddCollisionObjectResponse(success=True) - def update_collision_obj_cb(self, req: AddCollisionObjectRequest) -> AddCollisionObjectResponse: + def remove_collision_obj_cb(self, req: AddCollisionObjectRequest) -> AddCollisionObjectResponse: """ - This will take a given key and (if it exists) updates the object's pose with given pose value + This will take a given key and (if it exists) and removes said key object as a collision shape NOTE: currently expects the following - -> TODO: name (string) to access object in question - -> TODO: pose (Pose) an updated pose to provide to the object - - TODO: currently this method is a service (for testing purposes) but could be a topic callback for dynamic updating - TODO: for completeness, should have its own service request and response + -> name (string) to access object in question """ + # Handle early termination on input error for name and type + if req.name == None or req.name == '': + rospy.logerr(f"[REMOVE COLLISION OBJ CB] -> Remove collision object service input error: name [{req.name}] | type [{req.type}]") + return AddCollisionObjectResponse(success=False) - return AddCollisionObjectResponse(success=True) + if req.name in self.collision_dict.keys(): + # Stage Backend to Remove Object + # NOTE: swift backend issue will not visually remove (but does correctly remove) + removed_obj = self.dynamic_collision_dict.pop(req.name) + self.dynamic_collision_removal_dict[req.name] = removed_obj + + # Remove from Collision Dict + self.collision_dict.pop(req.name) + + # Re-characterise overlaps after removal + self.characterise_collision_overlaps() + + # Remove interactive marker + self.interactive_marker_server.erase(name=req.name) + self.interactive_marker_server.applyChanges() + + return AddCollisionObjectResponse(success=True) + else: + rospy.logerr(f"[REMOVE COLLISION OBJ CB] -> Unknown name [{req.name}] requested; not in collision dictionary") def get_collision_obj_cb(self, req: GetCollisionObjectsRequest) -> GetCollisionObjectsResponse: """ @@ -2266,7 +2285,7 @@ def trajectory_collision_checker(self, traj) -> bool: marker_traj.id = 0 marker_traj.type = Marker.LINE_STRIP marker_traj.scale.x = 0.01 - # Default to Green Markers + # Default to Green Markers (Updated if in Collision) marker_traj.color.g = 1.0 marker_traj.color.a = 1.0 @@ -2276,6 +2295,16 @@ def trajectory_collision_checker(self, traj) -> bool: rospy.loginfo(f"[TRAJECTORY COLLISION CHECK] -> Traj len: {len(traj.s)} | with step value: {step_value}") go = True + + # Quick end state check prior to full trajectory check (in steps) + # NOTE: the rest of the trajectory is still parsed to display in RVIZ for debugging + if self.check_collision_per_state(q=traj.s[-1]) == False: + go=False + # Update colour to RED for error + marker_traj.color.g = 0.0 + marker_traj.color.r = 1.0 + + # Goal state is clear, check rest of trajectory in steps with Timer("Full Trajectory Collision Check", enabled=True): for idx in range(0,len(traj.s),step_value): @@ -2292,13 +2321,15 @@ def trajectory_collision_checker(self, traj) -> bool: if self.check_collision_per_state(q=traj.s[idx]) == False: # Terminate on collision check failure - # TODO: add red component to trail to signify collision # NOTE: currently terminates on collision for speed and efficiency # could continue (regardless) to show 'full' trajectory with collision # component highlighted? Note, that speed is a function of # step_thresh (currently yields approx. 30Hz on calculation # for a 500 sample size traj at a step_tresh of 10) go=False + # Update colour to RED for error + marker_traj.color.g = 0.0 + marker_traj.color.r = 1.0 break # Publish marker (regardless of failure case for visual identification) @@ -2342,7 +2373,7 @@ def check_collision_per_state(self, q: list() = []) -> bool(): # print(f"Checking [{link}] -> links in collision: {links_in_collision}") if len(links_in_collision) > 0: - rospy.logerr(f"[COLLISION PER STATE CHECK] -> Collision at state {self.robot_ghost.q} in trajectory between -> [{link}] and {[link_n for link_n in links_in_collision]}") + rospy.logerr(f"[COLLISION PER STATE CHECK] -> Collision in trajectory between -> [{link}] and {[link_n for link_n in links_in_collision]}") go_signal = False break @@ -3282,62 +3313,6 @@ def interactive_marker_creation(self): self.interactive_marker_server.insert(marker=int_marker, feedback_cb=self.int_marker_cb) self.interactive_marker_server.applyChanges() - - def marker_publisher(self): - """ - DEPRECATED -> see interactive_marker_creation method - Gets any existing dynamic objects (if added) and publishes them for debugging/display in RVIZ - Currently handles only: - - Sphere - - Cylinder - - Cuboid - """ - # Iterate through and add to marker publish if ready - marker_array = [] - for obj in list(self.dynamic_collision_dict.values()): - # Ensure the object has been successfully added to the backend first - if obj.is_added: - # Default setup of marker header - marker = Marker() - marker.header.frame_id = self.base_link.name - marker.header.stamp = rospy.Time.now() - - if obj.shape.stype == 'sphere': - marker.type = Marker.SPHERE - - # Expects diameter (m) - marker.scale.x = obj.shape.radius * 2 - marker.scale.y = obj.shape.radius * 2 - marker.scale.z = obj.shape.radius * 2 - elif obj.shape.stype == 'cylinder': - marker.type = Marker.CYLINDER - - # Expects diameter (m) - marker.scale.x = obj.shape.radius * 2 - marker.scale.y = obj.shape.radius * 2 - marker.scale.z = obj.shape.length - elif obj.shape.stype == 'cuboid': - marker.type = Marker.CUBE - - # Expects diameter (m) - marker.scale.x = obj.shape.scale[0] - marker.scale.y = obj.shape.scale[1] - marker.scale.z = obj.shape.scale[2] - else: - break - - marker.id = obj.id - marker.pose = obj.pose - marker.color.r = obj.shape.color[0] - marker.color.g = obj.shape.color[1] - marker.color.b = obj.shape.color[2] - marker.color.a = obj.shape.color[3] - - marker_array.append(marker) - - # Publish array of markers - self.display_rviz_marker_publisher.publish(marker_array) - def step(self, dt: float = 0.01) -> None: # pylint: disable=unused-argument """ Updates the robot joints (robot.q) used in computing kinematics @@ -3403,8 +3378,9 @@ def step(self, dt: float = 0.01) -> None: # pylint: disable=unused-argument rospy.logwarn('No valid transform found between %s and %s', self.base_link.name, self.e_v_frame) self.preempt() - # apply desired joint velocity to robot - # NOTE: this should be allowed to run even in ControlMode.ERROR to recover (i.e., Home action) + # Conduct a Trajectory Motion + # NOTE: the executor step is run internally for armer (invariant of ros_control mechanism) + # NOTE: if actually on a real robot, the hw_controlled flag (in the ROS backend) handles the joint trajectory controller execution if self.executor and not self.hw_controlled: self.j_v = self.executor.step(dt)