Package intera_interface :: Module limb
[hide private]
[frames] | no frames]

Source Code for Module intera_interface.limb

  1  # Copyright (c) 2013-2016, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  import collections 
 29  import warnings 
 30   
 31  from copy import deepcopy 
 32   
 33  import rospy 
 34   
 35  from sensor_msgs.msg import ( 
 36      JointState 
 37  ) 
 38  from std_msgs.msg import ( 
 39      Float64, 
 40  ) 
 41   
 42  import intera_dataflow 
 43   
 44  from intera_core_msgs.msg import ( 
 45      JointCommand, 
 46      EndpointState, 
 47      CollisionDetectionState, 
 48  ) 
 49  import settings 
 50  from robot_params import RobotParams 
 51   
 52   
53 -class Limb(object):
54 """ 55 Interface class for a limb on Intera robots. 56 """ 57 58 # Containers 59 Point = collections.namedtuple('Point', ['x', 'y', 'z']) 60 Quaternion = collections.namedtuple('Quaternion', ['x', 'y', 'z', 'w']) 61
62 - def __init__(self, limb="right", synchronous_pub=False):
63 """ 64 Constructor. 65 66 @type limb: str 67 @param limb: limb to interface 68 69 @type synchronous_pub: bool 70 @param synchronous_pub: designates the JointCommand Publisher 71 as Synchronous if True and Asynchronous if False. 72 73 Synchronous Publishing means that all joint_commands publishing to 74 the robot's joints will block until the message has been serialized 75 into a buffer and that buffer has been written to the transport 76 of every current Subscriber. This yields predicable and consistent 77 timing of messages being delivered from this Publisher. However, 78 when using this mode, it is possible for a blocking Subscriber to 79 prevent the joint_command functions from exiting. Unless you need exact 80 JointCommand timing, default to Asynchronous Publishing (False). 81 82 For more information about Synchronous Publishing see: 83 http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers#queue_size:_publish.28.29_behavior_and_queuing 84 """ 85 params = RobotParams() 86 limb_names = params.get_limb_names() 87 if limb not in limb_names: 88 rospy.logerr("Cannot detect limb {0} on this robot." 89 " Valid limbs are {1}. Exiting Limb.init().".format( 90 limb, limb_names)) 91 return 92 joint_names = params.get_joint_names(limb) 93 if not joint_names: 94 rospy.logerr("Cannot detect joint names for limb {0} on this " 95 "robot. Exiting Limb.init().".format(limb)) 96 return 97 self.name = limb 98 self._joint_angle = dict() 99 self._joint_velocity = dict() 100 self._joint_effort = dict() 101 self._cartesian_pose = dict() 102 self._cartesian_velocity = dict() 103 self._cartesian_effort = dict() 104 self._joint_names = { limb: joint_names } 105 self._collision_state = False 106 107 ns = '/robot/limb/' + limb + '/' 108 109 self._command_msg = JointCommand() 110 111 self._pub_speed_ratio = rospy.Publisher( 112 ns + 'set_speed_ratio', 113 Float64, 114 latch=True, 115 queue_size=10) 116 117 queue_size = None if synchronous_pub else 1 118 with warnings.catch_warnings(): 119 warnings.simplefilter("ignore") 120 self._pub_joint_cmd = rospy.Publisher( 121 ns + 'joint_command', 122 JointCommand, 123 tcp_nodelay=True, 124 queue_size=queue_size) 125 126 self._pub_joint_cmd_timeout = rospy.Publisher( 127 ns + 'joint_command_timeout', 128 Float64, 129 latch=True, 130 queue_size=10) 131 132 _cartesian_state_sub = rospy.Subscriber( 133 ns + 'endpoint_state', 134 EndpointState, 135 self._on_endpoint_states, 136 queue_size=1, 137 tcp_nodelay=True) 138 139 _collision_state_sub = rospy.Subscriber( 140 ns + 'collision_detection_state', 141 CollisionDetectionState, 142 self._on_collision_state, 143 queue_size=1, 144 tcp_nodelay=True) 145 146 joint_state_topic = 'robot/joint_states' 147 _joint_state_sub = rospy.Subscriber( 148 joint_state_topic, 149 JointState, 150 self._on_joint_states, 151 queue_size=1, 152 tcp_nodelay=True) 153 154 err_msg = ("%s limb init failed to get current joint_states " 155 "from %s") % (self.name.capitalize(), joint_state_topic) 156 intera_dataflow.wait_for(lambda: len(self._joint_angle.keys()) > 0, 157 timeout_msg=err_msg, timeout=5.0) 158 err_msg = ("%s limb init failed to get current endpoint_state " 159 "from %s") % (self.name.capitalize(), ns + 'endpoint_state') 160 intera_dataflow.wait_for(lambda: len(self._cartesian_pose.keys()) > 0, 161 timeout_msg=err_msg)
162
163 - def _on_joint_states(self, msg):
164 for idx, name in enumerate(msg.name): 165 if name in self._joint_names[self.name]: 166 self._joint_angle[name] = msg.position[idx] 167 self._joint_velocity[name] = msg.velocity[idx] 168 self._joint_effort[name] = msg.effort[idx]
169
170 - def _on_endpoint_states(self, msg):
171 # Comments in this private method are for documentation purposes. 172 # _pose = {'position': (x, y, z), 'orientation': (x, y, z, w)} 173 self._cartesian_pose = { 174 'position': self.Point( 175 msg.pose.position.x, 176 msg.pose.position.y, 177 msg.pose.position.z, 178 ), 179 'orientation': self.Quaternion( 180 msg.pose.orientation.x, 181 msg.pose.orientation.y, 182 msg.pose.orientation.z, 183 msg.pose.orientation.w, 184 ), 185 } 186 # _twist = {'linear': (x, y, z), 'angular': (x, y, z)} 187 self._cartesian_velocity = { 188 'linear': self.Point( 189 msg.twist.linear.x, 190 msg.twist.linear.y, 191 msg.twist.linear.z, 192 ), 193 'angular': self.Point( 194 msg.twist.angular.x, 195 msg.twist.angular.y, 196 msg.twist.angular.z, 197 ), 198 } 199 # _wrench = {'force': (x, y, z), 'torque': (x, y, z)} 200 self._cartesian_effort = { 201 'force': self.Point( 202 msg.wrench.force.x, 203 msg.wrench.force.y, 204 msg.wrench.force.z, 205 ), 206 'torque': self.Point( 207 msg.wrench.torque.x, 208 msg.wrench.torque.y, 209 msg.wrench.torque.z, 210 ), 211 }
212
213 - def _on_collision_state(self, msg):
214 if self._collision_state != msg.collision_state: 215 self._collision_state = msg.collision_state
216
217 - def has_collided(self):
218 """ 219 Return True if the specified limb has experienced a collision. 220 221 @rtype: bool 222 @return: True if the arm is in collision, False otherwise. 223 """ 224 return self._collision_state
225
226 - def joint_names(self):
227 """ 228 Return the names of the joints for the specified limb. 229 230 @rtype: [str] 231 @return: ordered list of joint names from proximal to distal 232 (i.e. shoulder to wrist). 233 """ 234 return self._joint_names[self.name]
235
236 - def joint_angle(self, joint):
237 """ 238 Return the requested joint angle. 239 240 @type joint: str 241 @param joint: name of a joint 242 @rtype: float 243 @return: angle in radians of individual joint 244 """ 245 return self._joint_angle[joint]
246
247 - def joint_angles(self):
248 """ 249 Return all joint angles. 250 251 @rtype: dict({str:float}) 252 @return: unordered dict of joint name Keys to angle (rad) Values 253 """ 254 return deepcopy(self._joint_angle)
255
256 - def joint_velocity(self, joint):
257 """ 258 Return the requested joint velocity. 259 260 @type joint: str 261 @param joint: name of a joint 262 @rtype: float 263 @return: velocity in radians/s of individual joint 264 """ 265 return self._joint_velocity[joint]
266
267 - def joint_velocities(self):
268 """ 269 Return all joint velocities. 270 271 @rtype: dict({str:float}) 272 @return: unordered dict of joint name Keys to velocity (rad/s) Values 273 """ 274 return deepcopy(self._joint_velocity)
275
276 - def joint_effort(self, joint):
277 """ 278 Return the requested joint effort. 279 280 @type joint: str 281 @param joint: name of a joint 282 @rtype: float 283 @return: effort in Nm of individual joint 284 """ 285 return self._joint_effort[joint]
286
287 - def joint_efforts(self):
288 """ 289 Return all joint efforts. 290 291 @rtype: dict({str:float}) 292 @return: unordered dict of joint name Keys to effort (Nm) Values 293 """ 294 return deepcopy(self._joint_effort)
295
296 - def endpoint_pose(self):
297 """ 298 Return Cartesian endpoint pose {position, orientation}. 299 300 @rtype: dict({str:L{Limb.Point},str:L{Limb.Quaternion}}) 301 @return: position and orientation as named tuples in a dict 302 303 C{pose = {'position': (x, y, z), 'orientation': (x, y, z, w)}} 304 305 - 'position': Cartesian coordinates x,y,z in 306 namedtuple L{Limb.Point} 307 - 'orientation': quaternion x,y,z,w in named tuple 308 L{Limb.Quaternion} 309 """ 310 return deepcopy(self._cartesian_pose)
311
312 - def endpoint_velocity(self):
313 """ 314 Return Cartesian endpoint twist {linear, angular}. 315 316 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}}) 317 @return: linear and angular velocities as named tuples in a dict 318 319 C{twist = {'linear': (x, y, z), 'angular': (x, y, z)}} 320 321 - 'linear': Cartesian velocity in x,y,z directions in 322 namedtuple L{Limb.Point} 323 - 'angular': Angular velocity around x,y,z axes in named tuple 324 L{Limb.Point} 325 """ 326 return deepcopy(self._cartesian_velocity)
327
328 - def endpoint_effort(self):
329 """ 330 Return Cartesian endpoint wrench {force, torque}. 331 332 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}}) 333 @return: force and torque at endpoint as named tuples in a dict 334 335 C{wrench = {'force': (x, y, z), 'torque': (x, y, z)}} 336 337 - 'force': Cartesian force on x,y,z axes in 338 namedtuple L{Limb.Point} 339 - 'torque': Torque around x,y,z axes in named tuple 340 L{Limb.Point} 341 """ 342 return deepcopy(self._cartesian_effort)
343
344 - def set_command_timeout(self, timeout):
345 """ 346 Set the timeout in seconds for the joint controller 347 348 @type timeout: float 349 @param timeout: timeout in seconds 350 """ 351 self._pub_joint_cmd_timeout.publish(Float64(timeout))
352
353 - def exit_control_mode(self, timeout=0.2):
354 """ 355 Clean exit from advanced control modes (joint torque or velocity). 356 357 Resets control to joint position mode with current positions. 358 359 @type timeout: float 360 @param timeout: control timeout in seconds [0.2] 361 """ 362 self.set_command_timeout(timeout) 363 self.set_joint_positions(self.joint_angles())
364
365 - def set_joint_position_speed(self, speed=0.3):
366 """ 367 Set ratio of max joint speed to use during joint position moves. 368 369 Set the proportion of maximum controllable velocity to use 370 during joint position control execution. The default ratio 371 is `0.3`, and can be set anywhere from [0.0-1.0] (clipped). 372 Once set, a speed ratio will persist until a new execution 373 speed is set. 374 375 @type speed: float 376 @param speed: ratio of maximum joint speed for execution 377 default= 0.3; range= [0.0-1.0] 378 """ 379 self._pub_speed_ratio.publish(Float64(speed))
380
381 - def set_joint_trajectory(self, names, positions, velocities, accelerations):
382 """ 383 Commands the joints of this limb to the specified positions using 384 the commanded velocities and accelerations to extrapolate between 385 commanded positions (prior to the next position being received). 386 387 B{IMPORTANT:} Joint Trajectory control mode allows for commanding 388 joint positions, without modification, directly to the JCBs 389 (Joint Controller Boards). While this results in more unaffected 390 motions, Joint Trajectory control mode bypasses the safety system 391 modifications (e.g. collision avoidance). 392 Please use with caution. 393 394 @type names: list [str] 395 @param names: joint_names list of strings 396 @type positions: list [float] 397 @param positions: list of positions in radians 398 @type velocities: list [float] 399 @param velocities: list of velocities in radians/second 400 @type accelerations: list [float] 401 @param accelerations: list of accelerations in radians/seconds^2 402 """ 403 self._command_msg.names = names 404 self._command_msg.position = positions 405 self._command_msg.velocity = velocities 406 self._command_msg.acceleration = accelerations 407 self._command_msg.mode = JointCommand.TRAJECTORY_MODE 408 self._command_msg.header.stamp = rospy.Time.now() 409 self._pub_joint_cmd.publish(self._command_msg)
410
411 - def set_joint_positions(self, positions):
412 """ 413 Commands the joints of this limb to the specified positions. 414 415 B{IMPORTANT:} 'raw' joint position control mode allows for commanding 416 joint positions, without modification, directly to the JCBs 417 (Joint Controller Boards). While this results in more unaffected 418 motions, 'raw' joint position control mode bypasses the safety system 419 modifications (e.g. collision avoidance). 420 Please use with caution. 421 422 @type positions: dict({str:float}) 423 @param positions: joint_name:angle command 424 @type raw: bool 425 @param raw: advanced, direct position control mode 426 """ 427 self._command_msg.names = positions.keys() 428 self._command_msg.position = positions.values() 429 self._command_msg.mode = JointCommand.POSITION_MODE 430 self._command_msg.header.stamp = rospy.Time.now() 431 self._pub_joint_cmd.publish(self._command_msg)
432
433 - def set_joint_velocities(self, velocities):
434 """ 435 Commands the joints of this limb to the specified velocities. 436 437 B{IMPORTANT}: set_joint_velocities must be commanded at a rate great 438 than the timeout specified by set_command_timeout. If the timeout is 439 exceeded before a new set_joint_velocities command is received, the 440 controller will switch modes back to position mode for safety purposes. 441 442 @type velocities: dict({str:float}) 443 @param velocities: joint_name:velocity command 444 """ 445 self._command_msg.names = velocities.keys() 446 self._command_msg.velocity = velocities.values() 447 self._command_msg.mode = JointCommand.VELOCITY_MODE 448 self._command_msg.header.stamp = rospy.Time.now() 449 self._pub_joint_cmd.publish(self._command_msg)
450
451 - def set_joint_torques(self, torques):
452 """ 453 Commands the joints of this limb to the specified torques. 454 455 B{IMPORTANT}: set_joint_torques must be commanded at a rate great than 456 the timeout specified by set_command_timeout. If the timeout is 457 exceeded before a new set_joint_torques command is received, the 458 controller will switch modes back to position mode for safety purposes. 459 460 @type torques: dict({str:float}) 461 @param torques: joint_name:torque command 462 """ 463 self._command_msg.names = torques.keys() 464 self._command_msg.effort = torques.values() 465 self._command_msg.mode = JointCommand.TORQUE_MODE 466 self._command_msg.header.stamp = rospy.Time.now() 467 self._pub_joint_cmd.publish(self._command_msg)
468
469 - def move_to_neutral(self, timeout=15.0, speed=0.3):
470 """ 471 Command the Limb joints to a predefined set of "neutral" joint angles. 472 From rosparam named_poses/<limb>/poses/neutral. 473 474 @type timeout: float 475 @param timeout: seconds to wait for move to finish [15] 476 @type speed: float 477 @param speed: ratio of maximum joint speed for execution 478 default= 0.3; range= [0.0-1.0] 479 """ 480 try: 481 neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name)) 482 except KeyError: 483 rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name)) 484 return 485 angles = dict(zip(self.joint_names(), neutral_pose)) 486 self.set_joint_position_speed(speed) 487 return self.move_to_joint_positions(angles, timeout)
488
489 - def move_to_joint_positions(self, positions, timeout=15.0, 490 threshold=settings.JOINT_ANGLE_TOLERANCE, 491 test=None):
492 """ 493 (Blocking) Commands the limb to the provided positions. 494 495 Waits until the reported joint state matches that specified. 496 497 This function uses a low-pass filter to smooth the movement. 498 499 @type positions: dict({str:float}) 500 @param positions: joint_name:angle command 501 @type timeout: float 502 @param timeout: seconds to wait for move to finish [15] 503 @type threshold: float 504 @param threshold: position threshold in radians across each joint when 505 move is considered successful [0.008726646] 506 @param test: optional function returning True if motion must be aborted 507 """ 508 cmd = self.joint_angles() 509 510 def genf(joint, angle): 511 def joint_diff(): 512 return abs(angle - self._joint_angle[joint])
513 return joint_diff
514 515 diffs = [genf(j, a) for j, a in positions.items() if 516 j in self._joint_angle] 517 fail_msg = "{0} limb failed to reach commanded joint positions.".format( 518 self.name.capitalize()) 519 def test_collision(): 520 if self.has_collided(): 521 rospy.logerr(' '.join(["Collision detected.", fail_msg])) 522 return True 523 return False 524 self.set_joint_positions(positions) 525 intera_dataflow.wait_for( 526 test=lambda: test_collision() or \ 527 (callable(test) and test() == True) or \ 528 (all(diff() < threshold for diff in diffs)), 529 timeout=timeout, 530 timeout_msg=fail_msg, 531 rate=100, 532 raise_on_error=False, 533 body=lambda: self.set_joint_positions(positions) 534 ) 535