Package intera_io :: Module io_interface
[hide private]
[frames] | no frames]

Source Code for Module intera_io.io_interface

  1  # Copyright (c) 2013-2017, Rethink Robotics Inc. 
  2  # 
  3  # Licensed under the Apache License, Version 2.0 (the "License"); 
  4  # you may not use this file except in compliance with the License. 
  5  # You may obtain a copy of the License at 
  6  # 
  7  #     http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  # Unless required by applicable law or agreed to in writing, software 
 10  # distributed under the License is distributed on an "AS IS" BASIS, 
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  # See the License for the specific language governing permissions and 
 13  # limitations under the License. 
 14   
 15  import rospy 
 16  import sys 
 17  import json 
 18  import copy 
 19  import threading 
 20  import uuid 
 21  from threading import Lock 
 22  import intera_dataflow 
 23  from io_command import SetCommand 
 24   
 25  from intera_core_msgs.msg import IODeviceConfiguration, IODeviceStatus, \ 
 26                                   IOComponentCommand 
 27   
28 -def _time_changed(time1, time2):
29 """ 30 return true if the times are different 31 """ 32 return (time1.secs != time2.secs) or (time1.nsecs != time2.nsecs)
33
34 -class IOInterface(object):
35 """ 36 Base class for IO interfaces. 37 """
38 - def __init__(self, path_root, config_msg_type, status_msg_type):
39 self._path = path_root 40 self.config_mutex = Lock() 41 self.state_mutex = Lock() 42 self.cmd_times = [] 43 self.ports = dict() 44 self.signals = dict() 45 self.state = None 46 self.config = None 47 self.state_changed = intera_dataflow.Signal() 48 self.config_changed = intera_dataflow.Signal() 49 50 self._config_sub = rospy.Subscriber(self._path + "/config", 51 config_msg_type, 52 self.handle_config) 53 self._state_sub = rospy.Subscriber(self._path + "/state", 54 status_msg_type, 55 self.handle_state) 56 self._command_pub = rospy.Publisher(self._path + "/command", 57 IOComponentCommand, queue_size=10) 58 59 # Wait for the state to be populated 60 intera_dataflow.wait_for( 61 lambda: not self.state is None, 62 timeout=5.0, 63 timeout_msg=("Failed to get state.") 64 ) 65 66 rospy.logdebug("Making new IOInterface on %s" % (self._path,))
67
68 - def invalidate_config(self):
69 """ 70 mark the config topic data as invalid 71 """ 72 with self.config_mutex: 73 self.config.time.secs = 0
74
75 - def invalidate_state(self):
76 """ 77 mark the state topic data as invalid 78 """ 79 with self.state_mutex: 80 self.state.time.secs = 0
81
82 - def is_config_valid(self):
83 """ 84 return true if the config topic data is valid 85 """ 86 return self.config.time.secs != 0
87
88 - def is_state_valid(self):
89 """ 90 return true if the state topic data is valid 91 """ 92 return self.state.time.secs != 0
93
94 - def is_valid(self):
95 """ 96 return true if both the state and config topic data are valid 97 """ 98 return self.is_config_valid() and self.is_state_valid()
99
100 - def revalidate(self, timeout, invalidate_state=True, invalidate_config=True):
101 """ 102 invalidate the state and config topics, then wait up to timeout 103 seconds for them to become valid again. 104 return true if both the state and config topic data are valid 105 """ 106 if invalidate_state: 107 self.invalidate_state() 108 if invalidate_config: 109 self.invalidate_config() 110 timeout_time = rospy.Time.now() + rospy.Duration(timeout) 111 while not self.is_state_valid() and not rospy.is_shutdown(): 112 rospy.sleep(0.1) 113 if timeout_time < rospy.Time.now(): 114 rospy.logwarn("Timed out waiting for node interface valid...") 115 return False 116 return True
117
118 - def handle_config(self, msg):
119 """ 120 config topic callback 121 """ 122 if not self.config or _time_changed(self.config.time, msg.time): 123 with self.config_mutex: 124 self.config = msg 125 self.config_changed()
126
127 - def load_state(self, current_state, incoming_state):
128 for state in incoming_state: 129 if state.name not in current_state: 130 current_state[state.name] = dict() 131 formatting = json.loads(state.format) 132 current_state[state.name]["type"] = formatting["type"] 133 current_state[state.name]["role"] = formatting["role"] 134 data = json.loads(state.data) 135 current_state[state.name]["data"] = data[0] if len(data) > 0 else None
136
137 - def handle_state(self, msg):
138 """ 139 state topic callback 140 """ 141 if not self.state or _time_changed(self.state.time, msg.time): 142 with self.state_mutex: 143 self.state = msg 144 self.state_changed() 145 self.load_state(self.ports, self.state.ports) 146 self.load_state(self.signals, self.state.signals)
147
148 - def publish_command(self, op, args, timeout=2.0):
149 """ 150 publish on the command topic 151 return true if the command is acknowleged within the timeout 152 """ 153 cmd_time = rospy.Time.now() 154 self.cmd_times.append(cmd_time) 155 self.cmd_times = self.cmd_times[-100:] 156 cmd_msg = IOComponentCommand( 157 time=cmd_time, 158 op=op, 159 args=json.dumps(args)) 160 rospy.logdebug("publish_command %s %s" % (cmd_msg.op, cmd_msg.args)) 161 if timeout != None: 162 timeout_time = rospy.Time.now() + rospy.Duration(timeout) 163 while not rospy.is_shutdown(): 164 self._command_pub.publish(cmd_msg) 165 if self.is_state_valid(): 166 if cmd_time in self.state.commands: 167 rospy.logdebug("command %s acknowleged" % (cmd_msg.op,)) 168 return True 169 rospy.sleep(0.1) 170 if timeout_time < rospy.Time.now(): 171 rospy.logwarn("Timed out waiting for command acknowlegment...") 172 break 173 return False 174 return True
175
176 -class IODeviceInterface(IOInterface):
177 """ 178 IO Device interface to config, status and command topics 179 """
180 - def __init__(self, node_name, dev_name):
181 super(IODeviceInterface, self).__init__( 182 'io/' + node_name + '/' + dev_name, 183 IODeviceConfiguration, 184 IODeviceStatus) 185 self.config = IODeviceConfiguration() 186 self.state = IODeviceStatus() 187 self.invalidate_config() 188 self.invalidate_state() 189 self._threads = dict() 190 self._callback_items = dict() 191 self._callback_functions = dict()
192
193 - def list_signal_names(self):
194 """ 195 return a list of all signals 196 """ 197 with self.state_mutex: 198 return copy.deepcopy(self.signals.keys())
199
200 - def get_signal_type(self, signal_name):
201 """ 202 return the status for the given signal, or none 203 """ 204 with self.state_mutex: 205 if signal_name in self.signals.keys(): 206 return copy.deepcopy(self.signals[signal_name]['type']) 207 return None
208
209 - def get_signal_value(self, signal_name):
210 """ 211 return the status for the given signal, or none 212 """ 213 with self.state_mutex: 214 if signal_name in self.signals.keys(): 215 return copy.deepcopy(self.signals[signal_name]['data']) 216 return None
217
218 - def set_signal_value(self, signal_name, signal_value, signal_type=None, timeout=5.0):
219 """ 220 set the value for the given signal 221 return True if the signal value is set, False if the requested signal is invalid 222 """ 223 if signal_name not in self.list_signal_names(): 224 rospy.logerr("Cannot find signal '{0}' in this IO Device.".format(signal_name)) 225 return 226 if signal_type == None: 227 s_type = self.get_signal_type(signal_name) 228 if s_type == None: 229 rospy.logerr("Failed to get 'type' for signal '{0}'.".format(signal_name)) 230 return 231 else: 232 s_type = signal_type 233 set_command = SetCommand().set_signal(signal_name, s_type, signal_value) 234 self.publish_command(set_command.op, set_command.args, timeout=timeout) 235 # make sure both state and config are valid: 236 self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
237
238 - def list_port_names(self):
239 """ 240 return a list of all ports 241 """ 242 with self.state_mutex: 243 return copy.deepcopy(self.ports.keys())
244
245 - def get_port_type(self, port_name):
246 """ 247 return the status for the given port, or none 248 """ 249 with self.state_mutex: 250 if port_name in self.ports.keys(): 251 return copy.deepcopy(self.ports[port_name]['type']) 252 return None
253
254 - def get_port_value(self, port_name):
255 """ 256 return the status for the given port, or none 257 """ 258 with self.state_mutex: 259 if port_name in self.ports.keys(): 260 return copy.deepcopy(self.ports[port_name]['data']) 261 return None
262
263 - def set_port_value(self, port_name, port_value, port_type=None, timeout=5.0):
264 """ 265 set the value for the given port 266 return True if the port value is set, False if the requested port is invalid 267 """ 268 if port_name not in list_port_names(): 269 rospy.logerr("Cannot find port '{0}' in this IO Device.".format(port_name)) 270 return 271 if port_type == None: 272 p_type = self.get_port_type(port_name) 273 if p_type == None: 274 rospy.logerr("Failed to get 'type' for port '{0}'.".format(port_name)) 275 return 276 else: 277 p_type = port_type 278 set_command = SetCommand().set_port(port_name, p_type, port_value) 279 self.publish_command(set_command.op, set_command.args, timeout=timeout) 280 # make sure both state and config are valid: 281 self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
282
283 - def register_callback(self, callback_function, signal_name, poll_rate=10):
284 """ 285 Registers a supplied callback to a change in state of supplied 286 signal_name's value. Spawns a thread that will call the callback with 287 the updated value. 288 @type: function 289 @param: function handle for callback function 290 @type: str 291 @param: the signal name (button or wheel) to poll for value change 292 @type: int 293 @param: the rate at which to poll for a value change (in a separate 294 thread) 295 @rtype: str 296 @return: callback_id retuned if the callback was registered, and an 297 empty string if the requested signal_name does not exist in the 298 Navigator 299 """ 300 if signal_name in self.list_signal_names(): 301 callback_id = uuid.uuid4() 302 self._callback_items[callback_id] = intera_dataflow.Signal() 303 def signal_spinner(): 304 old_state = self.get_signal_value(signal_name) 305 r = rospy.Rate(poll_rate) 306 while not rospy.is_shutdown(): 307 new_state = self.get_signal_value(signal_name) 308 if new_state != old_state: 309 self._callback_items[callback_id](new_state) 310 old_state = new_state 311 r.sleep()
312 self._callback_items[callback_id].connect(callback_function) 313 t = threading.Thread(target=signal_spinner) 314 t.daemon = True 315 t.start() 316 self._threads[callback_id] = t 317 self._callback_functions[callback_id] = callback_function 318 return callback_id 319 else: 320 return str()
321
322 - def deregister_callback(self, callback_id):
323 """ 324 Deregisters a callback based on the supplied callback_id. 325 @type: str 326 @param: the callback_id string to deregister 327 @rtype: bool 328 @return: returns bool True if the callback was successfully 329 deregistered, and False otherwise. 330 """ 331 if callback_id in self._threads.keys(): 332 self._callback_items[callback_id].disconnect( 333 self._callback_functions[callback_id]) 334 return True 335 else: 336 return False
337