1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 from copy import deepcopy
16 from math import fabs, pi
17
18 import rospy
19 import tf
20
21 from std_msgs.msg import (
22 Bool
23 )
24
25 import intera_dataflow
26
27 from intera_core_msgs.msg import (
28 HeadPanCommand,
29 HeadState,
30 )
31 from intera_interface import settings
32
33
35 """
36 Interface class for the head on an Intera Robot.
37
38 Used to control the head pan angle.
39 """
41 """
42 Constructor.
43 """
44 self._state = dict()
45
46 self._pub_pan = rospy.Publisher(
47 '/robot/head/command_head_pan',
48 HeadPanCommand,
49 queue_size=10)
50
51 state_topic = '/robot/head/head_state'
52 self._sub_state = rospy.Subscriber(
53 state_topic,
54 HeadState,
55 self._on_head_state)
56
57 self._tf_listener = tf.TransformListener()
58
59 intera_dataflow.wait_for(
60 lambda: len(self._state) != 0,
61 timeout=5.0,
62 timeout_msg=("Failed to get current head state from %s" %
63 (state_topic,)),
64 )
65
67 self._state['pan'] = msg.pan
68 self._state['panning'] = msg.isTurning
69 self._state['blocked'] = msg.isBlocked
70 self._state['pan_mode'] = msg.panMode
71
73 """
74 Check if the head is currently blocked from movement.
75 Get the current pan angle of the head. This can only
76 be true if 'pan_mode' is ACTIVE_CANCELLATION_MODE.
77
78 @rtype: bool
79 @return: True if the head is currently blocked, False otherwise.
80 """
81 return self._state['blocked']
82
84 """
85 Get the mode the head is currently acting in.
86
87 @rtype: string
88 @return: current mode -
89 'PASSIVE_MODE'(0) : Compliant to user-induced external movement
90 'ACTIVE_MODE' (1) : Actively responds to absolute commanded
91 position
92 Command limits are actual joint limits.
93 'ACTIVE_CANCELLATION_MODE' (2) : Actively responds to commanded
94 head position relative to the
95 current position of robot base frame
96 """
97 pan_mode_dict = {0:'PASSIVE_MODE', 1:'ACTIVE_MODE',
98 2:'ACTIVE_CANCELLATION_MODE'}
99 return pan_mode_dict[self._state['pan_mode']]
100
102 """
103 Get the current pan angle of the head.
104
105 @rtype: float
106 @return: current angle in radians
107 """
108 return self._state['pan']
109
111 """
112 Check if the head is currently panning.
113
114 @rtype: bool
115 @return: True if the head is currently panning, False otherwise.
116 """
117 return self._state['panning']
118
119 - def set_pan(self, angle, speed=1.0, timeout=10.0,
120 active_cancellation=False):
121 """
122 Pan at the given speed to the desired angle.
123
124 @type angle: float
125 @param angle: Desired pan angle in radians.
126 @type speed: int
127 @param speed: Desired speed to pan at, range is 0-1.0 [1.0]
128 @type timeout: float
129 @param timeout: Seconds to wait for the head to pan to the
130 specified angle. If 0, just command once and
131 return. [10]
132 @param active_cancellation: Specifies if the head should aim at
133 a location in the base frame. If this is set to True,
134 the "angle" param is measured with respect to
135 the "/base" frame, rather than the actual head joint
136 value. Valid range is [-pi, pi) radians.
137 @type active_cancellation: bool
138 """
139 if speed > HeadPanCommand.MAX_SPEED_RATIO:
140 rospy.logwarn(("Commanded Speed, ({0}), faster than Max speed of"
141 " {1}. Clamping to Max.]").format(speed,
142 HeadPanCommand.MAX_SPEED_RATIO))
143 speed = HeadPanCommand.MAX_SPEED_RATIO
144 elif speed < HeadPanCommand.MIN_SPEED_RATIO:
145 rospy.logwarn(("Commanded Speed, ({0}), slower than Min speed of"
146 " {1}. Clamping to Min.]").format(speed,
147 HeadPanCommand.MIN_SPEED_RATIO))
148 speed = HeadPanCommand.MIN_SPEED_RATIO
149 if active_cancellation:
150 def get_current_euler(axis, source_frame="base",
151 target_frame="head"):
152 rate = rospy.Rate(10)
153 counter = 1
154 quat = (0,0,0,1)
155 while not rospy.is_shutdown():
156 try:
157 pos, quat = self._tf_listener.lookupTransform(
158 source_frame, target_frame,
159 self._tf_listener.getLatestCommonTime(source_frame,
160 target_frame))
161 except tf.Exception:
162 if not counter % 10:
163 rospy.logwarn(("Active Cancellation: Trying again "
164 "to lookup transform from {0} to "
165 "{1}...").format(source_frame,
166 target_frame))
167 else:
168 break
169 counter += 1
170 rate.sleep()
171 euler = tf.transformations.euler_from_quaternion(quat)
172 return euler[axis]
173 tf_angle = -pi + (angle + pi) % (2*pi)
174 mode = HeadPanCommand.SET_ACTIVE_CANCELLATION_MODE
175 stop_condition = lambda: (abs(get_current_euler(2) - tf_angle) <=
176 settings.HEAD_PAN_ANGLE_TOLERANCE)
177 else:
178 mode = HeadPanCommand.SET_ACTIVE_MODE
179 stop_condition = lambda: (abs(self.pan() - angle) <=
180 settings.HEAD_PAN_ANGLE_TOLERANCE)
181 msg = HeadPanCommand(angle, speed, mode)
182 self._pub_pan.publish(msg)
183 if not timeout == 0:
184 intera_dataflow.wait_for(
185 stop_condition,
186 timeout=timeout,
187 rate=100,
188 timeout_msg=("Failed to move head to pan"
189 " command {0}").format(angle),
190 body=lambda: self._pub_pan.publish(msg)
191 )
192