1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import collections
29 import warnings
30
31 from copy import deepcopy
32
33 import rospy
34
35 from sensor_msgs.msg import (
36 JointState
37 )
38 from std_msgs.msg import (
39 Float64,
40 )
41
42 import intera_dataflow
43
44 from intera_core_msgs.msg import (
45 JointCommand,
46 EndpointState,
47 CollisionDetectionState,
48 )
49 import settings
50 from robot_params import RobotParams
51
52
54 """
55 Interface class for a limb on Intera robots.
56 """
57
58
59 Point = collections.namedtuple('Point', ['x', 'y', 'z'])
60 Quaternion = collections.namedtuple('Quaternion', ['x', 'y', 'z', 'w'])
61
62 - def __init__(self, limb="right", synchronous_pub=False):
63 """
64 Constructor.
65
66 @type limb: str
67 @param limb: limb to interface
68
69 @type synchronous_pub: bool
70 @param synchronous_pub: designates the JointCommand Publisher
71 as Synchronous if True and Asynchronous if False.
72
73 Synchronous Publishing means that all joint_commands publishing to
74 the robot's joints will block until the message has been serialized
75 into a buffer and that buffer has been written to the transport
76 of every current Subscriber. This yields predicable and consistent
77 timing of messages being delivered from this Publisher. However,
78 when using this mode, it is possible for a blocking Subscriber to
79 prevent the joint_command functions from exiting. Unless you need exact
80 JointCommand timing, default to Asynchronous Publishing (False).
81
82 For more information about Synchronous Publishing see:
83 http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers#queue_size:_publish.28.29_behavior_and_queuing
84 """
85 params = RobotParams()
86 limb_names = params.get_limb_names()
87 if limb not in limb_names:
88 rospy.logerr("Cannot detect limb {0} on this robot."
89 " Valid limbs are {1}. Exiting Limb.init().".format(
90 limb, limb_names))
91 return
92 joint_names = params.get_joint_names(limb)
93 if not joint_names:
94 rospy.logerr("Cannot detect joint names for limb {0} on this "
95 "robot. Exiting Limb.init().".format(limb))
96 return
97 self.name = limb
98 self._joint_angle = dict()
99 self._joint_velocity = dict()
100 self._joint_effort = dict()
101 self._cartesian_pose = dict()
102 self._cartesian_velocity = dict()
103 self._cartesian_effort = dict()
104 self._joint_names = { limb: joint_names }
105 self._collision_state = False
106
107 ns = '/robot/limb/' + limb + '/'
108
109 self._command_msg = JointCommand()
110
111 self._pub_speed_ratio = rospy.Publisher(
112 ns + 'set_speed_ratio',
113 Float64,
114 latch=True,
115 queue_size=10)
116
117 queue_size = None if synchronous_pub else 1
118 with warnings.catch_warnings():
119 warnings.simplefilter("ignore")
120 self._pub_joint_cmd = rospy.Publisher(
121 ns + 'joint_command',
122 JointCommand,
123 tcp_nodelay=True,
124 queue_size=queue_size)
125
126 self._pub_joint_cmd_timeout = rospy.Publisher(
127 ns + 'joint_command_timeout',
128 Float64,
129 latch=True,
130 queue_size=10)
131
132 _cartesian_state_sub = rospy.Subscriber(
133 ns + 'endpoint_state',
134 EndpointState,
135 self._on_endpoint_states,
136 queue_size=1,
137 tcp_nodelay=True)
138
139 _collision_state_sub = rospy.Subscriber(
140 ns + 'collision_detection_state',
141 CollisionDetectionState,
142 self._on_collision_state,
143 queue_size=1,
144 tcp_nodelay=True)
145
146 joint_state_topic = 'robot/joint_states'
147 _joint_state_sub = rospy.Subscriber(
148 joint_state_topic,
149 JointState,
150 self._on_joint_states,
151 queue_size=1,
152 tcp_nodelay=True)
153
154 err_msg = ("%s limb init failed to get current joint_states "
155 "from %s") % (self.name.capitalize(), joint_state_topic)
156 intera_dataflow.wait_for(lambda: len(self._joint_angle.keys()) > 0,
157 timeout_msg=err_msg, timeout=5.0)
158 err_msg = ("%s limb init failed to get current endpoint_state "
159 "from %s") % (self.name.capitalize(), ns + 'endpoint_state')
160 intera_dataflow.wait_for(lambda: len(self._cartesian_pose.keys()) > 0,
161 timeout_msg=err_msg)
162
164 for idx, name in enumerate(msg.name):
165 if name in self._joint_names[self.name]:
166 self._joint_angle[name] = msg.position[idx]
167 self._joint_velocity[name] = msg.velocity[idx]
168 self._joint_effort[name] = msg.effort[idx]
169
171
172
173 self._cartesian_pose = {
174 'position': self.Point(
175 msg.pose.position.x,
176 msg.pose.position.y,
177 msg.pose.position.z,
178 ),
179 'orientation': self.Quaternion(
180 msg.pose.orientation.x,
181 msg.pose.orientation.y,
182 msg.pose.orientation.z,
183 msg.pose.orientation.w,
184 ),
185 }
186
187 self._cartesian_velocity = {
188 'linear': self.Point(
189 msg.twist.linear.x,
190 msg.twist.linear.y,
191 msg.twist.linear.z,
192 ),
193 'angular': self.Point(
194 msg.twist.angular.x,
195 msg.twist.angular.y,
196 msg.twist.angular.z,
197 ),
198 }
199
200 self._cartesian_effort = {
201 'force': self.Point(
202 msg.wrench.force.x,
203 msg.wrench.force.y,
204 msg.wrench.force.z,
205 ),
206 'torque': self.Point(
207 msg.wrench.torque.x,
208 msg.wrench.torque.y,
209 msg.wrench.torque.z,
210 ),
211 }
212
214 if self._collision_state != msg.collision_state:
215 self._collision_state = msg.collision_state
216
218 """
219 Return True if the specified limb has experienced a collision.
220
221 @rtype: bool
222 @return: True if the arm is in collision, False otherwise.
223 """
224 return self._collision_state
225
227 """
228 Return the names of the joints for the specified limb.
229
230 @rtype: [str]
231 @return: ordered list of joint names from proximal to distal
232 (i.e. shoulder to wrist).
233 """
234 return self._joint_names[self.name]
235
237 """
238 Return the requested joint angle.
239
240 @type joint: str
241 @param joint: name of a joint
242 @rtype: float
243 @return: angle in radians of individual joint
244 """
245 return self._joint_angle[joint]
246
248 """
249 Return all joint angles.
250
251 @rtype: dict({str:float})
252 @return: unordered dict of joint name Keys to angle (rad) Values
253 """
254 return deepcopy(self._joint_angle)
255
257 """
258 Return the requested joint velocity.
259
260 @type joint: str
261 @param joint: name of a joint
262 @rtype: float
263 @return: velocity in radians/s of individual joint
264 """
265 return self._joint_velocity[joint]
266
268 """
269 Return all joint velocities.
270
271 @rtype: dict({str:float})
272 @return: unordered dict of joint name Keys to velocity (rad/s) Values
273 """
274 return deepcopy(self._joint_velocity)
275
277 """
278 Return the requested joint effort.
279
280 @type joint: str
281 @param joint: name of a joint
282 @rtype: float
283 @return: effort in Nm of individual joint
284 """
285 return self._joint_effort[joint]
286
288 """
289 Return all joint efforts.
290
291 @rtype: dict({str:float})
292 @return: unordered dict of joint name Keys to effort (Nm) Values
293 """
294 return deepcopy(self._joint_effort)
295
297 """
298 Return Cartesian endpoint pose {position, orientation}.
299
300 @rtype: dict({str:L{Limb.Point},str:L{Limb.Quaternion}})
301 @return: position and orientation as named tuples in a dict
302
303 C{pose = {'position': (x, y, z), 'orientation': (x, y, z, w)}}
304
305 - 'position': Cartesian coordinates x,y,z in
306 namedtuple L{Limb.Point}
307 - 'orientation': quaternion x,y,z,w in named tuple
308 L{Limb.Quaternion}
309 """
310 return deepcopy(self._cartesian_pose)
311
313 """
314 Return Cartesian endpoint twist {linear, angular}.
315
316 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}})
317 @return: linear and angular velocities as named tuples in a dict
318
319 C{twist = {'linear': (x, y, z), 'angular': (x, y, z)}}
320
321 - 'linear': Cartesian velocity in x,y,z directions in
322 namedtuple L{Limb.Point}
323 - 'angular': Angular velocity around x,y,z axes in named tuple
324 L{Limb.Point}
325 """
326 return deepcopy(self._cartesian_velocity)
327
329 """
330 Return Cartesian endpoint wrench {force, torque}.
331
332 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}})
333 @return: force and torque at endpoint as named tuples in a dict
334
335 C{wrench = {'force': (x, y, z), 'torque': (x, y, z)}}
336
337 - 'force': Cartesian force on x,y,z axes in
338 namedtuple L{Limb.Point}
339 - 'torque': Torque around x,y,z axes in named tuple
340 L{Limb.Point}
341 """
342 return deepcopy(self._cartesian_effort)
343
345 """
346 Set the timeout in seconds for the joint controller
347
348 @type timeout: float
349 @param timeout: timeout in seconds
350 """
351 self._pub_joint_cmd_timeout.publish(Float64(timeout))
352
354 """
355 Clean exit from advanced control modes (joint torque or velocity).
356
357 Resets control to joint position mode with current positions.
358
359 @type timeout: float
360 @param timeout: control timeout in seconds [0.2]
361 """
362 self.set_command_timeout(timeout)
363 self.set_joint_positions(self.joint_angles())
364
366 """
367 Set ratio of max joint speed to use during joint position moves.
368
369 Set the proportion of maximum controllable velocity to use
370 during joint position control execution. The default ratio
371 is `0.3`, and can be set anywhere from [0.0-1.0] (clipped).
372 Once set, a speed ratio will persist until a new execution
373 speed is set.
374
375 @type speed: float
376 @param speed: ratio of maximum joint speed for execution
377 default= 0.3; range= [0.0-1.0]
378 """
379 self._pub_speed_ratio.publish(Float64(speed))
380
382 """
383 Commands the joints of this limb to the specified positions using
384 the commanded velocities and accelerations to extrapolate between
385 commanded positions (prior to the next position being received).
386
387 B{IMPORTANT:} Joint Trajectory control mode allows for commanding
388 joint positions, without modification, directly to the JCBs
389 (Joint Controller Boards). While this results in more unaffected
390 motions, Joint Trajectory control mode bypasses the safety system
391 modifications (e.g. collision avoidance).
392 Please use with caution.
393
394 @type names: list [str]
395 @param names: joint_names list of strings
396 @type positions: list [float]
397 @param positions: list of positions in radians
398 @type velocities: list [float]
399 @param velocities: list of velocities in radians/second
400 @type accelerations: list [float]
401 @param accelerations: list of accelerations in radians/seconds^2
402 """
403 self._command_msg.names = names
404 self._command_msg.position = positions
405 self._command_msg.velocity = velocities
406 self._command_msg.acceleration = accelerations
407 self._command_msg.mode = JointCommand.TRAJECTORY_MODE
408 self._command_msg.header.stamp = rospy.Time.now()
409 self._pub_joint_cmd.publish(self._command_msg)
410
412 """
413 Commands the joints of this limb to the specified positions.
414
415 B{IMPORTANT:} 'raw' joint position control mode allows for commanding
416 joint positions, without modification, directly to the JCBs
417 (Joint Controller Boards). While this results in more unaffected
418 motions, 'raw' joint position control mode bypasses the safety system
419 modifications (e.g. collision avoidance).
420 Please use with caution.
421
422 @type positions: dict({str:float})
423 @param positions: joint_name:angle command
424 @type raw: bool
425 @param raw: advanced, direct position control mode
426 """
427 self._command_msg.names = positions.keys()
428 self._command_msg.position = positions.values()
429 self._command_msg.mode = JointCommand.POSITION_MODE
430 self._command_msg.header.stamp = rospy.Time.now()
431 self._pub_joint_cmd.publish(self._command_msg)
432
434 """
435 Commands the joints of this limb to the specified velocities.
436
437 B{IMPORTANT}: set_joint_velocities must be commanded at a rate great
438 than the timeout specified by set_command_timeout. If the timeout is
439 exceeded before a new set_joint_velocities command is received, the
440 controller will switch modes back to position mode for safety purposes.
441
442 @type velocities: dict({str:float})
443 @param velocities: joint_name:velocity command
444 """
445 self._command_msg.names = velocities.keys()
446 self._command_msg.velocity = velocities.values()
447 self._command_msg.mode = JointCommand.VELOCITY_MODE
448 self._command_msg.header.stamp = rospy.Time.now()
449 self._pub_joint_cmd.publish(self._command_msg)
450
452 """
453 Commands the joints of this limb to the specified torques.
454
455 B{IMPORTANT}: set_joint_torques must be commanded at a rate great than
456 the timeout specified by set_command_timeout. If the timeout is
457 exceeded before a new set_joint_torques command is received, the
458 controller will switch modes back to position mode for safety purposes.
459
460 @type torques: dict({str:float})
461 @param torques: joint_name:torque command
462 """
463 self._command_msg.names = torques.keys()
464 self._command_msg.effort = torques.values()
465 self._command_msg.mode = JointCommand.TORQUE_MODE
466 self._command_msg.header.stamp = rospy.Time.now()
467 self._pub_joint_cmd.publish(self._command_msg)
468
470 """
471 Command the Limb joints to a predefined set of "neutral" joint angles.
472 From rosparam named_poses/<limb>/poses/neutral.
473
474 @type timeout: float
475 @param timeout: seconds to wait for move to finish [15]
476 @type speed: float
477 @param speed: ratio of maximum joint speed for execution
478 default= 0.3; range= [0.0-1.0]
479 """
480 try:
481 neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name))
482 except KeyError:
483 rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name))
484 return
485 angles = dict(zip(self.joint_names(), neutral_pose))
486 self.set_joint_position_speed(speed)
487 return self.move_to_joint_positions(angles, timeout)
488
492 """
493 (Blocking) Commands the limb to the provided positions.
494
495 Waits until the reported joint state matches that specified.
496
497 This function uses a low-pass filter to smooth the movement.
498
499 @type positions: dict({str:float})
500 @param positions: joint_name:angle command
501 @type timeout: float
502 @param timeout: seconds to wait for move to finish [15]
503 @type threshold: float
504 @param threshold: position threshold in radians across each joint when
505 move is considered successful [0.008726646]
506 @param test: optional function returning True if motion must be aborted
507 """
508 cmd = self.joint_angles()
509
510 def genf(joint, angle):
511 def joint_diff():
512 return abs(angle - self._joint_angle[joint])
513 return joint_diff
514
515 diffs = [genf(j, a) for j, a in positions.items() if
516 j in self._joint_angle]
517 fail_msg = "{0} limb failed to reach commanded joint positions.".format(
518 self.name.capitalize())
519 def test_collision():
520 if self.has_collided():
521 rospy.logerr(' '.join(["Collision detected.", fail_msg]))
522 return True
523 return False
524 self.set_joint_positions(positions)
525 intera_dataflow.wait_for(
526 test=lambda: test_collision() or \
527 (callable(test) and test() == True) or \
528 (all(diff() < threshold for diff in diffs)),
529 timeout=timeout,
530 timeout_msg=fail_msg,
531 rate=100,
532 raise_on_error=False,
533 body=lambda: self.set_joint_positions(positions)
534 )
535