1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import collections
16 import warnings
17
18 from copy import deepcopy
19
20 import rospy
21
22 from sensor_msgs.msg import (
23 JointState
24 )
25 from std_msgs.msg import (
26 Float64,
27 )
28
29 import intera_dataflow
30
31 from intera_core_msgs.msg import (
32 JointCommand,
33 EndpointState,
34 CollisionDetectionState,
35 )
36 import settings
37 from robot_params import RobotParams
38
39
41 """
42 Interface class for a limb on Intera robots.
43 """
44
45
46 Point = collections.namedtuple('Point', ['x', 'y', 'z'])
47 Quaternion = collections.namedtuple('Quaternion', ['x', 'y', 'z', 'w'])
48
49 - def __init__(self, limb="right", synchronous_pub=False):
50 """
51 Constructor.
52
53 @type limb: str
54 @param limb: limb to interface
55
56 @type synchronous_pub: bool
57 @param synchronous_pub: designates the JointCommand Publisher
58 as Synchronous if True and Asynchronous if False.
59
60 Synchronous Publishing means that all joint_commands publishing to
61 the robot's joints will block until the message has been serialized
62 into a buffer and that buffer has been written to the transport
63 of every current Subscriber. This yields predicable and consistent
64 timing of messages being delivered from this Publisher. However,
65 when using this mode, it is possible for a blocking Subscriber to
66 prevent the joint_command functions from exiting. Unless you need exact
67 JointCommand timing, default to Asynchronous Publishing (False).
68
69 For more information about Synchronous Publishing see:
70 http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers#queue_size:_publish.28.29_behavior_and_queuing
71 """
72 params = RobotParams()
73 limb_names = params.get_limb_names()
74 if limb not in limb_names:
75 rospy.logerr("Cannot detect limb {0} on this robot."
76 " Valid limbs are {1}. Exiting Limb.init().".format(
77 limb, limb_names))
78 return
79 joint_names = params.get_joint_names(limb)
80 if not joint_names:
81 rospy.logerr("Cannot detect joint names for limb {0} on this "
82 "robot. Exiting Limb.init().".format(limb))
83 return
84 self.name = limb
85 self._joint_angle = dict()
86 self._joint_velocity = dict()
87 self._joint_effort = dict()
88 self._cartesian_pose = dict()
89 self._cartesian_velocity = dict()
90 self._cartesian_effort = dict()
91 self._joint_names = { limb: joint_names }
92 self._collision_state = False
93
94 ns = '/robot/limb/' + limb + '/'
95
96 self._command_msg = JointCommand()
97
98 self._pub_speed_ratio = rospy.Publisher(
99 ns + 'set_speed_ratio',
100 Float64,
101 latch=True,
102 queue_size=10)
103
104 queue_size = None if synchronous_pub else 1
105 with warnings.catch_warnings():
106 warnings.simplefilter("ignore")
107 self._pub_joint_cmd = rospy.Publisher(
108 ns + 'joint_command',
109 JointCommand,
110 tcp_nodelay=True,
111 queue_size=queue_size)
112
113 self._pub_joint_cmd_timeout = rospy.Publisher(
114 ns + 'joint_command_timeout',
115 Float64,
116 latch=True,
117 queue_size=10)
118
119 _cartesian_state_sub = rospy.Subscriber(
120 ns + 'endpoint_state',
121 EndpointState,
122 self._on_endpoint_states,
123 queue_size=1,
124 tcp_nodelay=True)
125
126 _collision_state_sub = rospy.Subscriber(
127 ns + 'collision_detection_state',
128 CollisionDetectionState,
129 self._on_collision_state,
130 queue_size=1,
131 tcp_nodelay=True)
132
133 joint_state_topic = 'robot/joint_states'
134 _joint_state_sub = rospy.Subscriber(
135 joint_state_topic,
136 JointState,
137 self._on_joint_states,
138 queue_size=1,
139 tcp_nodelay=True)
140
141 err_msg = ("%s limb init failed to get current joint_states "
142 "from %s") % (self.name.capitalize(), joint_state_topic)
143 intera_dataflow.wait_for(lambda: len(self._joint_angle.keys()) > 0,
144 timeout_msg=err_msg, timeout=5.0)
145 err_msg = ("%s limb init failed to get current endpoint_state "
146 "from %s") % (self.name.capitalize(), ns + 'endpoint_state')
147 intera_dataflow.wait_for(lambda: len(self._cartesian_pose.keys()) > 0,
148 timeout_msg=err_msg)
149
151 for idx, name in enumerate(msg.name):
152 if name in self._joint_names[self.name]:
153 self._joint_angle[name] = msg.position[idx]
154 self._joint_velocity[name] = msg.velocity[idx]
155 self._joint_effort[name] = msg.effort[idx]
156
158
159
160 self._cartesian_pose = {
161 'position': self.Point(
162 msg.pose.position.x,
163 msg.pose.position.y,
164 msg.pose.position.z,
165 ),
166 'orientation': self.Quaternion(
167 msg.pose.orientation.x,
168 msg.pose.orientation.y,
169 msg.pose.orientation.z,
170 msg.pose.orientation.w,
171 ),
172 }
173
174 self._cartesian_velocity = {
175 'linear': self.Point(
176 msg.twist.linear.x,
177 msg.twist.linear.y,
178 msg.twist.linear.z,
179 ),
180 'angular': self.Point(
181 msg.twist.angular.x,
182 msg.twist.angular.y,
183 msg.twist.angular.z,
184 ),
185 }
186
187 self._cartesian_effort = {
188 'force': self.Point(
189 msg.wrench.force.x,
190 msg.wrench.force.y,
191 msg.wrench.force.z,
192 ),
193 'torque': self.Point(
194 msg.wrench.torque.x,
195 msg.wrench.torque.y,
196 msg.wrench.torque.z,
197 ),
198 }
199
201 if self._collision_state != msg.collision_state:
202 self._collision_state = msg.collision_state
203
205 """
206 Return True if the specified limb has experienced a collision.
207
208 @rtype: bool
209 @return: True if the arm is in collision, False otherwise.
210 """
211 return self._collision_state
212
214 """
215 Return the names of the joints for the specified limb.
216
217 @rtype: [str]
218 @return: ordered list of joint names from proximal to distal
219 (i.e. shoulder to wrist).
220 """
221 return self._joint_names[self.name]
222
224 """
225 Return the requested joint angle.
226
227 @type joint: str
228 @param joint: name of a joint
229 @rtype: float
230 @return: angle in radians of individual joint
231 """
232 return self._joint_angle[joint]
233
235 """
236 Return all joint angles.
237
238 @rtype: dict({str:float})
239 @return: unordered dict of joint name Keys to angle (rad) Values
240 """
241 return deepcopy(self._joint_angle)
242
244 """
245 Return the requested joint velocity.
246
247 @type joint: str
248 @param joint: name of a joint
249 @rtype: float
250 @return: velocity in radians/s of individual joint
251 """
252 return self._joint_velocity[joint]
253
255 """
256 Return all joint velocities.
257
258 @rtype: dict({str:float})
259 @return: unordered dict of joint name Keys to velocity (rad/s) Values
260 """
261 return deepcopy(self._joint_velocity)
262
264 """
265 Return the requested joint effort.
266
267 @type joint: str
268 @param joint: name of a joint
269 @rtype: float
270 @return: effort in Nm of individual joint
271 """
272 return self._joint_effort[joint]
273
275 """
276 Return all joint efforts.
277
278 @rtype: dict({str:float})
279 @return: unordered dict of joint name Keys to effort (Nm) Values
280 """
281 return deepcopy(self._joint_effort)
282
284 """
285 Return Cartesian endpoint pose {position, orientation}.
286
287 @rtype: dict({str:L{Limb.Point},str:L{Limb.Quaternion}})
288 @return: position and orientation as named tuples in a dict
289
290 C{pose = {'position': (x, y, z), 'orientation': (x, y, z, w)}}
291
292 - 'position': Cartesian coordinates x,y,z in
293 namedtuple L{Limb.Point}
294 - 'orientation': quaternion x,y,z,w in named tuple
295 L{Limb.Quaternion}
296 """
297 return deepcopy(self._cartesian_pose)
298
300 """
301 Return Cartesian endpoint twist {linear, angular}.
302
303 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}})
304 @return: linear and angular velocities as named tuples in a dict
305
306 C{twist = {'linear': (x, y, z), 'angular': (x, y, z)}}
307
308 - 'linear': Cartesian velocity in x,y,z directions in
309 namedtuple L{Limb.Point}
310 - 'angular': Angular velocity around x,y,z axes in named tuple
311 L{Limb.Point}
312 """
313 return deepcopy(self._cartesian_velocity)
314
316 """
317 Return Cartesian endpoint wrench {force, torque}.
318
319 @rtype: dict({str:L{Limb.Point},str:L{Limb.Point}})
320 @return: force and torque at endpoint as named tuples in a dict
321
322 C{wrench = {'force': (x, y, z), 'torque': (x, y, z)}}
323
324 - 'force': Cartesian force on x,y,z axes in
325 namedtuple L{Limb.Point}
326 - 'torque': Torque around x,y,z axes in named tuple
327 L{Limb.Point}
328 """
329 return deepcopy(self._cartesian_effort)
330
332 """
333 Set the timeout in seconds for the joint controller
334
335 @type timeout: float
336 @param timeout: timeout in seconds
337 """
338 self._pub_joint_cmd_timeout.publish(Float64(timeout))
339
341 """
342 Clean exit from advanced control modes (joint torque or velocity).
343
344 Resets control to joint position mode with current positions.
345
346 @type timeout: float
347 @param timeout: control timeout in seconds [0.2]
348 """
349 self.set_command_timeout(timeout)
350 self.set_joint_positions(self.joint_angles())
351
353 """
354 Set ratio of max joint speed to use during joint position moves.
355
356 Set the proportion of maximum controllable velocity to use
357 during joint position control execution. The default ratio
358 is `0.3`, and can be set anywhere from [0.0-1.0] (clipped).
359 Once set, a speed ratio will persist until a new execution
360 speed is set.
361
362 @type speed: float
363 @param speed: ratio of maximum joint speed for execution
364 default= 0.3; range= [0.0-1.0]
365 """
366 self._pub_speed_ratio.publish(Float64(speed))
367
369 """
370 Commands the joints of this limb to the specified positions using
371 the commanded velocities and accelerations to extrapolate between
372 commanded positions (prior to the next position being received).
373
374 B{IMPORTANT:} Joint Trajectory control mode allows for commanding
375 joint positions, without modification, directly to the JCBs
376 (Joint Controller Boards). While this results in more unaffected
377 motions, Joint Trajectory control mode bypasses the safety system
378 modifications (e.g. collision avoidance).
379 Please use with caution.
380
381 @type names: list [str]
382 @param names: joint_names list of strings
383 @type positions: list [float]
384 @param positions: list of positions in radians
385 @type velocities: list [float]
386 @param velocities: list of velocities in radians/second
387 @type accelerations: list [float]
388 @param accelerations: list of accelerations in radians/seconds^2
389 """
390 self._command_msg.names = names
391 self._command_msg.position = positions
392 self._command_msg.velocity = velocities
393 self._command_msg.acceleration = accelerations
394 self._command_msg.mode = JointCommand.TRAJECTORY_MODE
395 self._command_msg.header.stamp = rospy.Time.now()
396 self._pub_joint_cmd.publish(self._command_msg)
397
399 """
400 Commands the joints of this limb to the specified positions.
401
402 B{IMPORTANT:} 'raw' joint position control mode allows for commanding
403 joint positions, without modification, directly to the JCBs
404 (Joint Controller Boards). While this results in more unaffected
405 motions, 'raw' joint position control mode bypasses the safety system
406 modifications (e.g. collision avoidance).
407 Please use with caution.
408
409 @type positions: dict({str:float})
410 @param positions: joint_name:angle command
411 @type raw: bool
412 @param raw: advanced, direct position control mode
413 """
414 self._command_msg.names = positions.keys()
415 self._command_msg.position = positions.values()
416 self._command_msg.mode = JointCommand.POSITION_MODE
417 self._command_msg.header.stamp = rospy.Time.now()
418 self._pub_joint_cmd.publish(self._command_msg)
419
421 """
422 Commands the joints of this limb to the specified velocities.
423
424 B{IMPORTANT}: set_joint_velocities must be commanded at a rate great
425 than the timeout specified by set_command_timeout. If the timeout is
426 exceeded before a new set_joint_velocities command is received, the
427 controller will switch modes back to position mode for safety purposes.
428
429 @type velocities: dict({str:float})
430 @param velocities: joint_name:velocity command
431 """
432 self._command_msg.names = velocities.keys()
433 self._command_msg.velocity = velocities.values()
434 self._command_msg.mode = JointCommand.VELOCITY_MODE
435 self._command_msg.header.stamp = rospy.Time.now()
436 self._pub_joint_cmd.publish(self._command_msg)
437
439 """
440 Commands the joints of this limb to the specified torques.
441
442 B{IMPORTANT}: set_joint_torques must be commanded at a rate great than
443 the timeout specified by set_command_timeout. If the timeout is
444 exceeded before a new set_joint_torques command is received, the
445 controller will switch modes back to position mode for safety purposes.
446
447 @type torques: dict({str:float})
448 @param torques: joint_name:torque command
449 """
450 self._command_msg.names = torques.keys()
451 self._command_msg.effort = torques.values()
452 self._command_msg.mode = JointCommand.TORQUE_MODE
453 self._command_msg.header.stamp = rospy.Time.now()
454 self._pub_joint_cmd.publish(self._command_msg)
455
457 """
458 Command the Limb joints to a predefined set of "neutral" joint angles.
459 From rosparam named_poses/<limb>/poses/neutral.
460
461 @type timeout: float
462 @param timeout: seconds to wait for move to finish [15]
463 @type speed: float
464 @param speed: ratio of maximum joint speed for execution
465 default= 0.3; range= [0.0-1.0]
466 """
467 try:
468 neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name))
469 except KeyError:
470 rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name))
471 return
472 angles = dict(zip(self.joint_names(), neutral_pose))
473 self.set_joint_position_speed(speed)
474 return self.move_to_joint_positions(angles, timeout)
475
479 """
480 (Blocking) Commands the limb to the provided positions.
481
482 Waits until the reported joint state matches that specified.
483
484 This function uses a low-pass filter to smooth the movement.
485
486 @type positions: dict({str:float})
487 @param positions: joint_name:angle command
488 @type timeout: float
489 @param timeout: seconds to wait for move to finish [15]
490 @type threshold: float
491 @param threshold: position threshold in radians across each joint when
492 move is considered successful [0.008726646]
493 @param test: optional function returning True if motion must be aborted
494 """
495 cmd = self.joint_angles()
496
497 def genf(joint, angle):
498 def joint_diff():
499 return abs(angle - self._joint_angle[joint])
500 return joint_diff
501
502 diffs = [genf(j, a) for j, a in positions.items() if
503 j in self._joint_angle]
504 fail_msg = "{0} limb failed to reach commanded joint positions.".format(
505 self.name.capitalize())
506 def test_collision():
507 if self.has_collided():
508 rospy.logerr(' '.join(["Collision detected.", fail_msg]))
509 return True
510 return False
511 self.set_joint_positions(positions)
512 intera_dataflow.wait_for(
513 test=lambda: test_collision() or \
514 (callable(test) and test() == True) or \
515 (all(diff() < threshold for diff in diffs)),
516 timeout=timeout,
517 timeout_msg=fail_msg,
518 rate=100,
519 raise_on_error=False,
520 body=lambda: self.set_joint_positions(positions)
521 )
522