Package intera_interface :: Module limb
[hide private]
[frames] | no frames]

Source Code for Module intera_interface.limb

  1  # Copyright (c) 2013-2017, Rethink Robotics Inc. 
  2  # 
  3  # Licensed under the Apache License, Version 2.0 (the "License"); 
  4  # you may not use this file except in compliance with the License. 
  5  # You may obtain a copy of the License at 
  6  # 
  7  #     http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  # Unless required by applicable law or agreed to in writing, software 
 10  # distributed under the License is distributed on an "AS IS" BASIS, 
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  # See the License for the specific language governing permissions and 
 13  # limitations under the License. 
 14   
 15  import collections 
 16  import warnings 
 17   
 18  from copy import deepcopy 
 19   
 20  import rospy 
 21   
 22  from sensor_msgs.msg import ( 
 23      JointState 
 24  ) 
 25  from std_msgs.msg import ( 
 26      Float64, 
 27  ) 
 28   
 29  import intera_dataflow 
 30   
 31  from intera_core_msgs.msg import ( 
 32      JointCommand, 
 33      EndpointState, 
 34      CollisionDetectionState, 
 35  ) 
 36  import settings 
 37  from robot_params import RobotParams 
 38   
 39   
40 -class Limb(object):
41 """ 42 Interface class for a limb on Intera robots. 43 """ 44 45 # Containers 46 Point = collections.namedtuple('Point', ['x', 'y', 'z']) 47 Quaternion = collections.namedtuple('Quaternion', ['x', 'y', 'z', 'w']) 48
49 - def __init__(self, limb="right", synchronous_pub=False):
50 """ 51 Constructor. 52 53 @type limb: str 54 @param limb: limb to interface 55 56 @type synchronous_pub: bool 57 @param synchronous_pub: designates the JointCommand Publisher 58 as Synchronous if True and Asynchronous if False. 59 60 Synchronous Publishing means that all joint_commands publishing to 61 the robot's joints will block until the message has been serialized 62 into a buffer and that buffer has been written to the transport 63 of every current Subscriber. This yields predicable and consistent 64 timing of messages being delivered from this Publisher. However, 65 when using this mode, it is possible for a blocking Subscriber to 66 prevent the joint_command functions from exiting. Unless you need exact 67 JointCommand timing, default to Asynchronous Publishing (False). 68 69 For more information about Synchronous Publishing see: 70 http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers#queue_size:_publish.28.29_behavior_and_queuing 71 """ 72 params = RobotParams() 73 limb_names = params.get_limb_names() 74 if limb not in limb_names: 75 rospy.logerr("Cannot detect limb {0} on this robot." 76 " Valid limbs are {1}. Exiting Limb.init().".format( 77 limb, limb_names)) 78 return 79 joint_names = params.get_joint_names(limb) 80 if not joint_names: 81 rospy.logerr("Cannot detect joint names for limb {0} on this " 82 "robot. Exiting Limb.init().".format(limb)) 83 return 84 self.name = limb 85 self._joint_angle = dict() 86 self._joint_velocity = dict() 87 self._joint_effort = dict() 88 self._cartesian_pose = dict() 89 self._cartesian_velocity = dict() 90 self._cartesian_effort = dict() 91 self._joint_names = { limb: joint_names } 92 self._collision_state = False 93 94 ns = '/robot/limb/' + limb + '/' 95 96 self._command_msg = JointCommand() 97 98 self._pub_speed_ratio = rospy.Publisher( 99 ns + 'set_speed_ratio', 100 Float64, 101 latch=True, 102 queue_size=10) 103 104 queue_size = None if synchronous_pub else 1 105 with warnings.catch_warnings(): 106 warnings.simplefilter("ignore") 107 self._pub_joint_cmd = rospy.Publisher( 108 ns + 'joint_command', 109 JointCommand, 110 tcp_nodelay=True, 111 queue_size=queue_size) 112 113 self._pub_joint_cmd_timeout = rospy.Publisher( 114 ns + 'joint_command_timeout', 115 Float64, 116 latch=True, 117 queue_size=10) 118 119 _cartesian_state_sub = rospy.Subscriber( 120 ns + 'endpoint_state', 121 EndpointState, 122 self._on_endpoint_states, 123 queue_size=1, 124 tcp_nodelay=True) 125 126 _collision_state_sub = rospy.Subscriber( 127 ns + 'collision_detection_state', 128 CollisionDetectionState, 129 self._on_collision_state, 130 queue_size=1, 131 tcp_nodelay=True) 132 133 joint_state_topic = 'robot/joint_states' 134 _joint_state_sub = rospy.Subscriber( 135 joint_state_topic, 136 JointState, 137 self._on_joint_states, 138 queue_size=1, 139 tcp_nodelay=True) 140 141 err_msg = ("%s limb init failed to get current joint_states " 142 "from %s") % (self.name.capitalize(), joint_state_topic) 143 intera_dataflow.wait_for(lambda: len(self._joint_angle.keys()) > 0, 144 timeout_msg=err_msg, timeout=5.0) 145 err_msg = ("%s limb init failed to get current endpoint_state " 146 "from %s") % (self.name.capitalize(), ns + 'endpoint_state') 147 intera_dataflow.wait_for(lambda: len(self._cartesian_pose.keys()) > 0, 148 timeout_msg=err_msg)
149
150 - def _on_joint_states(self, msg):
151 for idx, name in enumerate(msg.name): 152 if name in self._joint_names[self.name]: 153 self._joint_angle[name] = msg.position[idx] 154 self._joint_velocity[name] = msg.velocity[idx] 155 self._joint_effort[name] = msg.effort[idx]
156
157 - def _on_endpoint_states(self, msg):
158 # Comments in this private method are for documentation purposes. 159 # _pose = {'position': (x, y, z), 'orientation': (x, y, z, w)} 160 self._cartesian_pose = { 161 'position': self.Point( 162 msg.pose.position.x, 163 msg.pose.position.y, 164 msg.pose.position.z, 165 ), 166 'orientation': self.Quaternion( 167 msg.pose.orientation.x, 168 msg.pose.orientation.y, 169 msg.pose.orientation.z, 170 msg.pose.orientation.w, 171 ), 172 } 173 # _twist = {'linear': (x, y, z), 'angular': (x, y, z)} 174 self._cartesian_velocity = { 175 'linear': self.Point( 176 msg.twist.linear.x, 177 msg.twist.linear.y, 178 msg.twist.linear.z, 179 ), 180 'angular': self.Point( 181 msg.twist.angular.x, 182 msg.twist.angular.y, 183 msg.twist.angular.z, 184 ), 185 } 186 # _wrench = {'force': (x, y, z), 'torque': (x, y, z)} 187 self._cartesian_effort = { 188 'force': self.Point( 189 msg.wrench.force.x, 190 msg.wrench.force.y, 191 msg.wrench.force.z, 192 ), 193 'torque': self.Point( 194 msg.wrench.torque.x, 195 msg.wrench.torque.y, 196 msg.wrench.torque.z, 197 ), 198 }
199
200 - def _on_collision_state(self, msg):
201 if self._collision_state != msg.collision_state: 202 self._collision_state = msg.collision_state
203
204 - def has_collided(self):
205 """ 206 Return True if the specified limb has experienced a collision. 207 208 @rtype: bool 209 @return: True if the arm is in collision, False otherwise. 210 """ 211 return self._collision_state
212
213 - def joint_names(self):
214 """ 215 Return the names of the joints for the specified limb. 216 217 @rtype: [str] 218 @return: ordered list of joint names from proximal to distal 219 (i.e. shoulder to wrist). 220 """ 221 return self._joint_names[self.name]
222
223 - def joint_angle(self, joint):
224 """ 225 Return the requested joint angle. 226 227 @type joint: str 228 @param joint: name of a joint 229 @rtype: float 230 @return: angle in radians of individual joint 231 """ 232 return self._joint_angle[joint]
233
234 - def joint_angles(self):
235 """ 236 Return all joint angles. 237 238 @rtype: dict({str:float}) 239 @return: unordered dict of joint name Keys to angle (rad) Values 240 """ 241 return deepcopy(self._joint_angle)
242
243 - def joint_velocity(self, joint):
244 """ 245 Return the requested joint velocity. 246 247 @type joint: str 248 @param joint: name of a joint 249 @rtype: float 250 @return: velocity in radians/s of individual joint 251 """ 252 return self._joint_velocity[joint]
253
254 - def joint_velocities(self):
255 """ 256 Return all joint velocities. 257 258 @rtype: dict({str:float}) 259 @return: unordered dict of joint name Keys to velocity (rad/s) Values 260 """ 261 return deepcopy(self._joint_velocity)
262
263 - def joint_effort(self, joint):
264 """ 265 Return the requested joint effort. 266 267 @type joint: str 268 @param joint: name of a joint 269 @rtype: float 270 @return: effort in Nm of individual joint 271 """ 272 return self._joint_effort[joint]
273
274 - def joint_efforts(self):
275 """ 276 Return all joint efforts. 277 278 @rtype: dict({str:float}) 279 @return: unordered dict of joint name Keys to effort (Nm) Values 280 """ 281 return deepcopy(self._joint_effort)
282
283 - def endpoint_pose(self):
284 """ 285 Return Cartesian endpoint pose {position, orientation}. 286 287 @rtype: dict({str:L{Limb.Point},str:L{Limb.Quaternion}}) 288 @return: position and orientation as named tuples in a dict 289 290 C{pose = {'position': (x, y, z), 'orientation': (x, y, z, w)}} 291 292 - 'position': Cartesian coordinates x,y,z in 293 namedtuple L{Limb.Point} 294 - 'orientation': quaternion x,y,z,w in named tuple 295 L{Limb.Quaternion} 296 """ 297 return deepcopy(self._cartesian_pose)
298
299 - def endpoint_velocity(self):
300 """ 301 Return Cartesian endpoint twist {linear, angular}. 302 303 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}}) 304 @return: linear and angular velocities as named tuples in a dict 305 306 C{twist = {'linear': (x, y, z), 'angular': (x, y, z)}} 307 308 - 'linear': Cartesian velocity in x,y,z directions in 309 namedtuple L{Limb.Point} 310 - 'angular': Angular velocity around x,y,z axes in named tuple 311 L{Limb.Point} 312 """ 313 return deepcopy(self._cartesian_velocity)
314
315 - def endpoint_effort(self):
316 """ 317 Return Cartesian endpoint wrench {force, torque}. 318 319 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}}) 320 @return: force and torque at endpoint as named tuples in a dict 321 322 C{wrench = {'force': (x, y, z), 'torque': (x, y, z)}} 323 324 - 'force': Cartesian force on x,y,z axes in 325 namedtuple L{Limb.Point} 326 - 'torque': Torque around x,y,z axes in named tuple 327 L{Limb.Point} 328 """ 329 return deepcopy(self._cartesian_effort)
330
331 - def set_command_timeout(self, timeout):
332 """ 333 Set the timeout in seconds for the joint controller 334 335 @type timeout: float 336 @param timeout: timeout in seconds 337 """ 338 self._pub_joint_cmd_timeout.publish(Float64(timeout))
339
340 - def exit_control_mode(self, timeout=0.2):
341 """ 342 Clean exit from advanced control modes (joint torque or velocity). 343 344 Resets control to joint position mode with current positions. 345 346 @type timeout: float 347 @param timeout: control timeout in seconds [0.2] 348 """ 349 self.set_command_timeout(timeout) 350 self.set_joint_positions(self.joint_angles())
351
352 - def set_joint_position_speed(self, speed=0.3):
353 """ 354 Set ratio of max joint speed to use during joint position moves. 355 356 Set the proportion of maximum controllable velocity to use 357 during joint position control execution. The default ratio 358 is `0.3`, and can be set anywhere from [0.0-1.0] (clipped). 359 Once set, a speed ratio will persist until a new execution 360 speed is set. 361 362 @type speed: float 363 @param speed: ratio of maximum joint speed for execution 364 default= 0.3; range= [0.0-1.0] 365 """ 366 self._pub_speed_ratio.publish(Float64(speed))
367
368 - def set_joint_trajectory(self, names, positions, velocities, accelerations):
369 """ 370 Commands the joints of this limb to the specified positions using 371 the commanded velocities and accelerations to extrapolate between 372 commanded positions (prior to the next position being received). 373 374 B{IMPORTANT:} Joint Trajectory control mode allows for commanding 375 joint positions, without modification, directly to the JCBs 376 (Joint Controller Boards). While this results in more unaffected 377 motions, Joint Trajectory control mode bypasses the safety system 378 modifications (e.g. collision avoidance). 379 Please use with caution. 380 381 @type names: list [str] 382 @param names: joint_names list of strings 383 @type positions: list [float] 384 @param positions: list of positions in radians 385 @type velocities: list [float] 386 @param velocities: list of velocities in radians/second 387 @type accelerations: list [float] 388 @param accelerations: list of accelerations in radians/seconds^2 389 """ 390 self._command_msg.names = names 391 self._command_msg.position = positions 392 self._command_msg.velocity = velocities 393 self._command_msg.acceleration = accelerations 394 self._command_msg.mode = JointCommand.TRAJECTORY_MODE 395 self._command_msg.header.stamp = rospy.Time.now() 396 self._pub_joint_cmd.publish(self._command_msg)
397
398 - def set_joint_positions(self, positions):
399 """ 400 Commands the joints of this limb to the specified positions. 401 402 B{IMPORTANT:} 'raw' joint position control mode allows for commanding 403 joint positions, without modification, directly to the JCBs 404 (Joint Controller Boards). While this results in more unaffected 405 motions, 'raw' joint position control mode bypasses the safety system 406 modifications (e.g. collision avoidance). 407 Please use with caution. 408 409 @type positions: dict({str:float}) 410 @param positions: joint_name:angle command 411 @type raw: bool 412 @param raw: advanced, direct position control mode 413 """ 414 self._command_msg.names = positions.keys() 415 self._command_msg.position = positions.values() 416 self._command_msg.mode = JointCommand.POSITION_MODE 417 self._command_msg.header.stamp = rospy.Time.now() 418 self._pub_joint_cmd.publish(self._command_msg)
419
420 - def set_joint_velocities(self, velocities):
421 """ 422 Commands the joints of this limb to the specified velocities. 423 424 B{IMPORTANT}: set_joint_velocities must be commanded at a rate great 425 than the timeout specified by set_command_timeout. If the timeout is 426 exceeded before a new set_joint_velocities command is received, the 427 controller will switch modes back to position mode for safety purposes. 428 429 @type velocities: dict({str:float}) 430 @param velocities: joint_name:velocity command 431 """ 432 self._command_msg.names = velocities.keys() 433 self._command_msg.velocity = velocities.values() 434 self._command_msg.mode = JointCommand.VELOCITY_MODE 435 self._command_msg.header.stamp = rospy.Time.now() 436 self._pub_joint_cmd.publish(self._command_msg)
437
438 - def set_joint_torques(self, torques):
439 """ 440 Commands the joints of this limb to the specified torques. 441 442 B{IMPORTANT}: set_joint_torques must be commanded at a rate great than 443 the timeout specified by set_command_timeout. If the timeout is 444 exceeded before a new set_joint_torques command is received, the 445 controller will switch modes back to position mode for safety purposes. 446 447 @type torques: dict({str:float}) 448 @param torques: joint_name:torque command 449 """ 450 self._command_msg.names = torques.keys() 451 self._command_msg.effort = torques.values() 452 self._command_msg.mode = JointCommand.TORQUE_MODE 453 self._command_msg.header.stamp = rospy.Time.now() 454 self._pub_joint_cmd.publish(self._command_msg)
455
456 - def move_to_neutral(self, timeout=15.0, speed=0.3):
457 """ 458 Command the Limb joints to a predefined set of "neutral" joint angles. 459 From rosparam named_poses/<limb>/poses/neutral. 460 461 @type timeout: float 462 @param timeout: seconds to wait for move to finish [15] 463 @type speed: float 464 @param speed: ratio of maximum joint speed for execution 465 default= 0.3; range= [0.0-1.0] 466 """ 467 try: 468 neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name)) 469 except KeyError: 470 rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name)) 471 return 472 angles = dict(zip(self.joint_names(), neutral_pose)) 473 self.set_joint_position_speed(speed) 474 return self.move_to_joint_positions(angles, timeout)
475
476 - def move_to_joint_positions(self, positions, timeout=15.0, 477 threshold=settings.JOINT_ANGLE_TOLERANCE, 478 test=None):
479 """ 480 (Blocking) Commands the limb to the provided positions. 481 482 Waits until the reported joint state matches that specified. 483 484 This function uses a low-pass filter to smooth the movement. 485 486 @type positions: dict({str:float}) 487 @param positions: joint_name:angle command 488 @type timeout: float 489 @param timeout: seconds to wait for move to finish [15] 490 @type threshold: float 491 @param threshold: position threshold in radians across each joint when 492 move is considered successful [0.008726646] 493 @param test: optional function returning True if motion must be aborted 494 """ 495 cmd = self.joint_angles() 496 497 def genf(joint, angle): 498 def joint_diff(): 499 return abs(angle - self._joint_angle[joint])
500 return joint_diff
501 502 diffs = [genf(j, a) for j, a in positions.items() if 503 j in self._joint_angle] 504 fail_msg = "{0} limb failed to reach commanded joint positions.".format( 505 self.name.capitalize()) 506 def test_collision(): 507 if self.has_collided(): 508 rospy.logerr(' '.join(["Collision detected.", fail_msg])) 509 return True 510 return False 511 self.set_joint_positions(positions) 512 intera_dataflow.wait_for( 513 test=lambda: test_collision() or \ 514 (callable(test) and test() == True) or \ 515 (all(diff() < threshold for diff in diffs)), 516 timeout=timeout, 517 timeout_msg=fail_msg, 518 rate=100, 519 raise_on_error=False, 520 body=lambda: self.set_joint_positions(positions) 521 ) 522