Package intera_interface :: Module head
[hide private]
[frames] | no frames]

Source Code for Module intera_interface.head

  1  # Copyright (c) 2013-2017, Rethink Robotics Inc. 
  2  # 
  3  # Licensed under the Apache License, Version 2.0 (the "License"); 
  4  # you may not use this file except in compliance with the License. 
  5  # You may obtain a copy of the License at 
  6  # 
  7  #     http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  # Unless required by applicable law or agreed to in writing, software 
 10  # distributed under the License is distributed on an "AS IS" BASIS, 
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  # See the License for the specific language governing permissions and 
 13  # limitations under the License. 
 14   
 15  from copy import deepcopy 
 16  from math import fabs, pi 
 17   
 18  import rospy 
 19  import tf 
 20   
 21  from std_msgs.msg import ( 
 22      Bool 
 23  ) 
 24   
 25  import intera_dataflow 
 26   
 27  from intera_core_msgs.msg import ( 
 28     HeadPanCommand, 
 29     HeadState, 
 30  ) 
 31  from intera_interface import settings 
 32   
 33   
34 -class Head(object):
35 """ 36 Interface class for the head on an Intera Robot. 37 38 Used to control the head pan angle. 39 """
40 - def __init__(self):
41 """ 42 Constructor. 43 """ 44 self._state = dict() 45 46 self._pub_pan = rospy.Publisher( 47 '/robot/head/command_head_pan', 48 HeadPanCommand, 49 queue_size=10) 50 51 state_topic = '/robot/head/head_state' 52 self._sub_state = rospy.Subscriber( 53 state_topic, 54 HeadState, 55 self._on_head_state) 56 57 self._tf_listener = tf.TransformListener() 58 59 intera_dataflow.wait_for( 60 lambda: len(self._state) != 0, 61 timeout=5.0, 62 timeout_msg=("Failed to get current head state from %s" % 63 (state_topic,)), 64 )
65
66 - def _on_head_state(self, msg):
67 self._state['pan'] = msg.pan 68 self._state['panning'] = msg.isTurning 69 self._state['blocked'] = msg.isBlocked 70 self._state['pan_mode'] = msg.panMode
71
72 - def blocked(self):
73 """ 74 Check if the head is currently blocked from movement. 75 Get the current pan angle of the head. This can only 76 be true if 'pan_mode' is ACTIVE_CANCELLATION_MODE. 77 78 @rtype: bool 79 @return: True if the head is currently blocked, False otherwise. 80 """ 81 return self._state['blocked']
82
83 - def pan_mode(self):
84 """ 85 Get the mode the head is currently acting in. 86 87 @rtype: string 88 @return: current mode - 89 'PASSIVE_MODE'(0) : Compliant to user-induced external movement 90 'ACTIVE_MODE' (1) : Actively responds to absolute commanded 91 position 92 Command limits are actual joint limits. 93 'ACTIVE_CANCELLATION_MODE' (2) : Actively responds to commanded 94 head position relative to the 95 current position of robot base frame 96 """ 97 pan_mode_dict = {0:'PASSIVE_MODE', 1:'ACTIVE_MODE', 98 2:'ACTIVE_CANCELLATION_MODE'} 99 return pan_mode_dict[self._state['pan_mode']]
100
101 - def pan(self):
102 """ 103 Get the current pan angle of the head. 104 105 @rtype: float 106 @return: current angle in radians 107 """ 108 return self._state['pan']
109
110 - def panning(self):
111 """ 112 Check if the head is currently panning. 113 114 @rtype: bool 115 @return: True if the head is currently panning, False otherwise. 116 """ 117 return self._state['panning']
118
119 - def set_pan(self, angle, speed=1.0, timeout=10.0, 120 active_cancellation=False):
121 """ 122 Pan at the given speed to the desired angle. 123 124 @type angle: float 125 @param angle: Desired pan angle in radians. 126 @type speed: int 127 @param speed: Desired speed to pan at, range is 0-1.0 [1.0] 128 @type timeout: float 129 @param timeout: Seconds to wait for the head to pan to the 130 specified angle. If 0, just command once and 131 return. [10] 132 @param active_cancellation: Specifies if the head should aim at 133 a location in the base frame. If this is set to True, 134 the "angle" param is measured with respect to 135 the "/base" frame, rather than the actual head joint 136 value. Valid range is [-pi, pi) radians. 137 @type active_cancellation: bool 138 """ 139 if speed > HeadPanCommand.MAX_SPEED_RATIO: 140 rospy.logwarn(("Commanded Speed, ({0}), faster than Max speed of" 141 " {1}. Clamping to Max.]").format(speed, 142 HeadPanCommand.MAX_SPEED_RATIO)) 143 speed = HeadPanCommand.MAX_SPEED_RATIO 144 elif speed < HeadPanCommand.MIN_SPEED_RATIO: 145 rospy.logwarn(("Commanded Speed, ({0}), slower than Min speed of" 146 " {1}. Clamping to Min.]").format(speed, 147 HeadPanCommand.MIN_SPEED_RATIO)) 148 speed = HeadPanCommand.MIN_SPEED_RATIO 149 if active_cancellation: 150 def get_current_euler(axis, source_frame="base", 151 target_frame="head"): 152 rate = rospy.Rate(10) # Hz 153 counter = 1 154 quat = (0,0,0,1) 155 while not rospy.is_shutdown(): 156 try: 157 pos, quat = self._tf_listener.lookupTransform( 158 source_frame, target_frame, 159 self._tf_listener.getLatestCommonTime(source_frame, 160 target_frame)) 161 except tf.Exception: 162 if not counter % 10: # essentially throttle 1.0 sec 163 rospy.logwarn(("Active Cancellation: Trying again " 164 "to lookup transform from {0} to " 165 "{1}...").format(source_frame, 166 target_frame)) 167 else: 168 break 169 counter += 1 170 rate.sleep() 171 euler = tf.transformations.euler_from_quaternion(quat) 172 return euler[axis]
173 tf_angle = -pi + (angle + pi) % (2*pi) 174 mode = HeadPanCommand.SET_ACTIVE_CANCELLATION_MODE 175 stop_condition = lambda: (abs(get_current_euler(2) - tf_angle) <= 176 settings.HEAD_PAN_ANGLE_TOLERANCE) 177 else: 178 mode = HeadPanCommand.SET_ACTIVE_MODE 179 stop_condition = lambda: (abs(self.pan() - angle) <= 180 settings.HEAD_PAN_ANGLE_TOLERANCE) 181 msg = HeadPanCommand(angle, speed, mode) 182 self._pub_pan.publish(msg) 183 if not timeout == 0: 184 intera_dataflow.wait_for( 185 stop_condition, 186 timeout=timeout, 187 rate=100, 188 timeout_msg=("Failed to move head to pan" 189 " command {0}").format(angle), 190 body=lambda: self._pub_pan.publish(msg) 191 )
192