Package intera_interface :: Module head
[hide private]
[frames] | no frames]

Source Code for Module intera_interface.head

  1  # Copyright (c) 2013-2016, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  from copy import deepcopy 
 29  from math import fabs, pi 
 30   
 31  import rospy 
 32  import tf 
 33   
 34  from std_msgs.msg import ( 
 35      Bool 
 36  ) 
 37   
 38  import intera_dataflow 
 39   
 40  from intera_core_msgs.msg import ( 
 41     HeadPanCommand, 
 42     HeadState, 
 43  ) 
 44  from intera_interface import settings 
 45   
 46   
47 -class Head(object):
48 """ 49 Interface class for the head on an Intera Robot. 50 51 Used to control the head pan angle. 52 """
53 - def __init__(self):
54 """ 55 Constructor. 56 """ 57 self._state = dict() 58 59 self._pub_pan = rospy.Publisher( 60 '/robot/head/command_head_pan', 61 HeadPanCommand, 62 queue_size=10) 63 64 state_topic = '/robot/head/head_state' 65 self._sub_state = rospy.Subscriber( 66 state_topic, 67 HeadState, 68 self._on_head_state) 69 70 self._tf_listener = tf.TransformListener() 71 72 intera_dataflow.wait_for( 73 lambda: len(self._state) != 0, 74 timeout=5.0, 75 timeout_msg=("Failed to get current head state from %s" % 76 (state_topic,)), 77 )
78
79 - def _on_head_state(self, msg):
80 self._state['pan'] = msg.pan 81 self._state['panning'] = msg.isTurning 82 self._state['blocked'] = msg.isBlocked 83 self._state['pan_mode'] = msg.panMode
84
85 - def blocked(self):
86 """ 87 Check if the head is currently blocked from movement. 88 Get the current pan angle of the head. This can only 89 be true if 'pan_mode' is ACTIVE_CANCELLATION_MODE. 90 91 @rtype: bool 92 @return: True if the head is currently blocked, False otherwise. 93 """ 94 return self._state['blocked']
95
96 - def pan_mode(self):
97 """ 98 Get the mode the head is currently acting in. 99 100 @rtype: string 101 @return: current mode - 102 'PASSIVE_MODE'(0) : Compliant to user-induced external movement 103 'ACTIVE_MODE' (1) : Actively responds to absolute commanded 104 position 105 Command limits are actual joint limits. 106 'ACTIVE_CANCELLATION_MODE' (2) : Actively responds to commanded 107 head position relative to the 108 current position of robot base frame 109 """ 110 pan_mode_dict = {0:'PASSIVE_MODE', 1:'ACTIVE_MODE', 111 2:'ACTIVE_CANCELLATION_MODE'} 112 return pan_mode_dict[self._state['pan_mode']]
113
114 - def pan(self):
115 """ 116 Get the current pan angle of the head. 117 118 @rtype: float 119 @return: current angle in radians 120 """ 121 return self._state['pan']
122
123 - def panning(self):
124 """ 125 Check if the head is currently panning. 126 127 @rtype: bool 128 @return: True if the head is currently panning, False otherwise. 129 """ 130 return self._state['panning']
131
132 - def set_pan(self, angle, speed=1.0, timeout=10.0, 133 active_cancellation=False):
134 """ 135 Pan at the given speed to the desired angle. 136 137 @type angle: float 138 @param angle: Desired pan angle in radians. 139 @type speed: int 140 @param speed: Desired speed to pan at, range is 0-1.0 [1.0] 141 @type timeout: float 142 @param timeout: Seconds to wait for the head to pan to the 143 specified angle. If 0, just command once and 144 return. [10] 145 @param active_cancellation: Specifies if the head should aim at 146 a location in the base frame. If this is set to True, 147 the "angle" param is measured with respect to 148 the "/base" frame, rather than the actual head joint 149 value. Valid range is [-pi, pi) radians. 150 @type active_cancellation: bool 151 """ 152 if speed > HeadPanCommand.MAX_SPEED_RATIO: 153 rospy.logwarn(("Commanded Speed, ({0}), faster than Max speed of" 154 " {1}. Clamping to Max.]").format(speed, 155 HeadPanCommand.MAX_SPEED_RATIO)) 156 speed = HeadPanCommand.MAX_SPEED_RATIO 157 elif speed < HeadPanCommand.MIN_SPEED_RATIO: 158 rospy.logwarn(("Commanded Speed, ({0}), slower than Min speed of" 159 " {1}. Clamping to Min.]").format(speed, 160 HeadPanCommand.MIN_SPEED_RATIO)) 161 speed = HeadPanCommand.MIN_SPEED_RATIO 162 if active_cancellation: 163 def get_current_euler(axis, source_frame="base", 164 target_frame="head"): 165 rate = rospy.Rate(10) # Hz 166 counter = 1 167 quat = (0,0,0,1) 168 while not rospy.is_shutdown(): 169 try: 170 pos, quat = self._tf_listener.lookupTransform( 171 source_frame, target_frame, 172 self._tf_listener.getLatestCommonTime(source_frame, 173 target_frame)) 174 except tf.Exception: 175 if not counter % 10: # essentially throttle 1.0 sec 176 rospy.logwarn(("Active Cancellation: Trying again " 177 "to lookup transform from {0} to " 178 "{1}...").format(source_frame, 179 target_frame)) 180 else: 181 break 182 counter += 1 183 rate.sleep() 184 euler = tf.transformations.euler_from_quaternion(quat) 185 return euler[axis]
186 tf_angle = -pi + (angle + pi) % (2*pi) 187 mode = HeadPanCommand.SET_ACTIVE_CANCELLATION_MODE 188 stop_condition = lambda: (abs(get_current_euler(2) - tf_angle) <= 189 settings.HEAD_PAN_ANGLE_TOLERANCE) 190 else: 191 mode = HeadPanCommand.SET_ACTIVE_MODE 192 stop_condition = lambda: (abs(self.pan() - angle) <= 193 settings.HEAD_PAN_ANGLE_TOLERANCE) 194 msg = HeadPanCommand(angle, speed, mode) 195 self._pub_pan.publish(msg) 196 if not timeout == 0: 197 intera_dataflow.wait_for( 198 stop_condition, 199 timeout=timeout, 200 rate=100, 201 timeout_msg=("Failed to move head to pan" 202 " command {0}").format(angle), 203 body=lambda: self._pub_pan.publish(msg) 204 )
205