1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 from copy import deepcopy
29 from math import fabs, pi
30
31 import rospy
32 import tf
33
34 from std_msgs.msg import (
35 Bool
36 )
37
38 import intera_dataflow
39
40 from intera_core_msgs.msg import (
41 HeadPanCommand,
42 HeadState,
43 )
44 from intera_interface import settings
45
46
48 """
49 Interface class for the head on an Intera Robot.
50
51 Used to control the head pan angle.
52 """
54 """
55 Constructor.
56 """
57 self._state = dict()
58
59 self._pub_pan = rospy.Publisher(
60 '/robot/head/command_head_pan',
61 HeadPanCommand,
62 queue_size=10)
63
64 state_topic = '/robot/head/head_state'
65 self._sub_state = rospy.Subscriber(
66 state_topic,
67 HeadState,
68 self._on_head_state)
69
70 self._tf_listener = tf.TransformListener()
71
72 intera_dataflow.wait_for(
73 lambda: len(self._state) != 0,
74 timeout=5.0,
75 timeout_msg=("Failed to get current head state from %s" %
76 (state_topic,)),
77 )
78
80 self._state['pan'] = msg.pan
81 self._state['panning'] = msg.isTurning
82 self._state['blocked'] = msg.isBlocked
83 self._state['pan_mode'] = msg.panMode
84
86 """
87 Check if the head is currently blocked from movement.
88 Get the current pan angle of the head. This can only
89 be true if 'pan_mode' is ACTIVE_CANCELLATION_MODE.
90
91 @rtype: bool
92 @return: True if the head is currently blocked, False otherwise.
93 """
94 return self._state['blocked']
95
97 """
98 Get the mode the head is currently acting in.
99
100 @rtype: string
101 @return: current mode -
102 'PASSIVE_MODE'(0) : Compliant to user-induced external movement
103 'ACTIVE_MODE' (1) : Actively responds to absolute commanded
104 position
105 Command limits are actual joint limits.
106 'ACTIVE_CANCELLATION_MODE' (2) : Actively responds to commanded
107 head position relative to the
108 current position of robot base frame
109 """
110 pan_mode_dict = {0:'PASSIVE_MODE', 1:'ACTIVE_MODE',
111 2:'ACTIVE_CANCELLATION_MODE'}
112 return pan_mode_dict[self._state['pan_mode']]
113
115 """
116 Get the current pan angle of the head.
117
118 @rtype: float
119 @return: current angle in radians
120 """
121 return self._state['pan']
122
124 """
125 Check if the head is currently panning.
126
127 @rtype: bool
128 @return: True if the head is currently panning, False otherwise.
129 """
130 return self._state['panning']
131
132 - def set_pan(self, angle, speed=1.0, timeout=10.0,
133 active_cancellation=False):
134 """
135 Pan at the given speed to the desired angle.
136
137 @type angle: float
138 @param angle: Desired pan angle in radians.
139 @type speed: int
140 @param speed: Desired speed to pan at, range is 0-1.0 [1.0]
141 @type timeout: float
142 @param timeout: Seconds to wait for the head to pan to the
143 specified angle. If 0, just command once and
144 return. [10]
145 @param active_cancellation: Specifies if the head should aim at
146 a location in the base frame. If this is set to True,
147 the "angle" param is measured with respect to
148 the "/base" frame, rather than the actual head joint
149 value. Valid range is [-pi, pi) radians.
150 @type active_cancellation: bool
151 """
152 if speed > HeadPanCommand.MAX_SPEED_RATIO:
153 rospy.logwarn(("Commanded Speed, ({0}), faster than Max speed of"
154 " {1}. Clamping to Max.]").format(speed,
155 HeadPanCommand.MAX_SPEED_RATIO))
156 speed = HeadPanCommand.MAX_SPEED_RATIO
157 elif speed < HeadPanCommand.MIN_SPEED_RATIO:
158 rospy.logwarn(("Commanded Speed, ({0}), slower than Min speed of"
159 " {1}. Clamping to Min.]").format(speed,
160 HeadPanCommand.MIN_SPEED_RATIO))
161 speed = HeadPanCommand.MIN_SPEED_RATIO
162 if active_cancellation:
163 def get_current_euler(axis, source_frame="base",
164 target_frame="head"):
165 rate = rospy.Rate(10)
166 counter = 1
167 quat = (0,0,0,1)
168 while not rospy.is_shutdown():
169 try:
170 pos, quat = self._tf_listener.lookupTransform(
171 source_frame, target_frame,
172 self._tf_listener.getLatestCommonTime(source_frame,
173 target_frame))
174 except tf.Exception:
175 if not counter % 10:
176 rospy.logwarn(("Active Cancellation: Trying again "
177 "to lookup transform from {0} to "
178 "{1}...").format(source_frame,
179 target_frame))
180 else:
181 break
182 counter += 1
183 rate.sleep()
184 euler = tf.transformations.euler_from_quaternion(quat)
185 return euler[axis]
186 tf_angle = -pi + (angle + pi) % (2*pi)
187 mode = HeadPanCommand.SET_ACTIVE_CANCELLATION_MODE
188 stop_condition = lambda: (abs(get_current_euler(2) - tf_angle) <=
189 settings.HEAD_PAN_ANGLE_TOLERANCE)
190 else:
191 mode = HeadPanCommand.SET_ACTIVE_MODE
192 stop_condition = lambda: (abs(self.pan() - angle) <=
193 settings.HEAD_PAN_ANGLE_TOLERANCE)
194 msg = HeadPanCommand(angle, speed, mode)
195 self._pub_pan.publish(msg)
196 if not timeout == 0:
197 intera_dataflow.wait_for(
198 stop_condition,
199 timeout=timeout,
200 rate=100,
201 timeout_msg=("Failed to move head to pan"
202 " command {0}").format(angle),
203 body=lambda: self._pub_pan.publish(msg)
204 )
205