1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import rospy
29 import sys
30 import json
31 import copy
32 import threading
33 import uuid
34 from threading import Lock
35 import intera_dataflow
36 from io_command import SetCommand
37
38 from intera_core_msgs.msg import IODeviceConfiguration, IODeviceStatus, \
39 IOComponentCommand
40
42 """
43 return true if the times are different
44 """
45 return (time1.secs != time2.secs) or (time1.nsecs != time2.nsecs)
46
48 """
49 Base class for IO interfaces.
50 """
51 - def __init__(self, path_root, config_msg_type, status_msg_type):
52 self._path = path_root
53 self.config_mutex = Lock()
54 self.state_mutex = Lock()
55 self.cmd_times = []
56 self.ports = dict()
57 self.signals = dict()
58 self.state = None
59 self.config = None
60 self.state_changed = intera_dataflow.Signal()
61 self.config_changed = intera_dataflow.Signal()
62
63 self._config_sub = rospy.Subscriber(self._path + "/config",
64 config_msg_type,
65 self.handle_config)
66 self._state_sub = rospy.Subscriber(self._path + "/state",
67 status_msg_type,
68 self.handle_state)
69 self._command_pub = rospy.Publisher(self._path + "/command",
70 IOComponentCommand, queue_size=10)
71
72
73 intera_dataflow.wait_for(
74 lambda: not self.state is None,
75 timeout=5.0,
76 timeout_msg=("Failed to get state.")
77 )
78
79 rospy.logdebug("Making new IOInterface on %s" % (self._path,))
80
82 """
83 mark the config topic data as invalid
84 """
85 with self.config_mutex:
86 self.config.time.secs = 0
87
89 """
90 mark the state topic data as invalid
91 """
92 with self.state_mutex:
93 self.state.time.secs = 0
94
96 """
97 return true if the config topic data is valid
98 """
99 return self.config.time.secs != 0
100
102 """
103 return true if the state topic data is valid
104 """
105 return self.state.time.secs != 0
106
112
113 - def revalidate(self, timeout, invalidate_state=True, invalidate_config=True):
114 """
115 invalidate the state and config topics, then wait up to timeout
116 seconds for them to become valid again.
117 return true if both the state and config topic data are valid
118 """
119 if invalidate_state:
120 self.invalidate_state()
121 if invalidate_config:
122 self.invalidate_config()
123 timeout_time = rospy.Time.now() + rospy.Duration(timeout)
124 while not self.is_state_valid() and not rospy.is_shutdown():
125 rospy.sleep(0.1)
126 if timeout_time < rospy.Time.now():
127 rospy.logwarn("Timed out waiting for node interface valid...")
128 return False
129 return True
130
132 """
133 config topic callback
134 """
135 if not self.config or _time_changed(self.config.time, msg.time):
136 with self.config_mutex:
137 self.config = msg
138 self.config_changed()
139
140 - def load_state(self, current_state, incoming_state):
141 for state in incoming_state:
142 if state.name not in current_state:
143 current_state[state.name] = dict()
144 formatting = json.loads(state.format)
145 current_state[state.name]["type"] = formatting["type"]
146 current_state[state.name]["role"] = formatting["role"]
147 data = json.loads(state.data)
148 current_state[state.name]["data"] = data[0] if len(data) > 0 else None
149
160
162 """
163 publish on the command topic
164 return true if the command is acknowleged within the timeout
165 """
166 cmd_time = rospy.Time.now()
167 self.cmd_times.append(cmd_time)
168 self.cmd_times = self.cmd_times[-100:]
169 cmd_msg = IOComponentCommand(
170 time=cmd_time,
171 op=op,
172 args=json.dumps(args))
173 rospy.logdebug("publish_command %s %s" % (cmd_msg.op, cmd_msg.args))
174 if timeout != None:
175 timeout_time = rospy.Time.now() + rospy.Duration(timeout)
176 while not rospy.is_shutdown():
177 self._command_pub.publish(cmd_msg)
178 if self.is_state_valid():
179 if cmd_time in self.state.commands:
180 rospy.logdebug("command %s acknowleged" % (cmd_msg.op,))
181 return True
182 rospy.sleep(0.1)
183 if timeout_time < rospy.Time.now():
184 rospy.logwarn("Timed out waiting for command acknowlegment...")
185 break
186 return False
187 return True
188
190 """
191 IO Device interface to config, status and command topics
192 """
193 - def __init__(self, node_name, dev_name):
194 super(IODeviceInterface, self).__init__(
195 'io/' + node_name + '/' + dev_name,
196 IODeviceConfiguration,
197 IODeviceStatus)
198 self.config = IODeviceConfiguration()
199 self.state = IODeviceStatus()
200 self.invalidate_config()
201 self.invalidate_state()
202 self._threads = dict()
203 self._callback_items = dict()
204 self._callback_functions = dict()
205
207 """
208 return a list of all signals
209 """
210 with self.state_mutex:
211 return copy.deepcopy(self.signals.keys())
212
214 """
215 return the status for the given signal, or none
216 """
217 with self.state_mutex:
218 if signal_name in self.signals.keys():
219 return copy.deepcopy(self.signals[signal_name]['type'])
220 return None
221
223 """
224 return the status for the given signal, or none
225 """
226 with self.state_mutex:
227 if signal_name in self.signals.keys():
228 return copy.deepcopy(self.signals[signal_name]['data'])
229 return None
230
231 - def set_signal_value(self, signal_name, signal_value, signal_type=None, timeout=5.0):
232 """
233 set the value for the given signal
234 return True if the signal value is set, False if the requested signal is invalid
235 """
236 if signal_name not in self.list_signal_names():
237 rospy.logerr("Cannot find signal '{0}' in this IO Device.".format(signal_name))
238 return
239 if signal_type == None:
240 s_type = self.get_signal_type(signal_name)
241 if s_type == None:
242 rospy.logerr("Failed to get 'type' for signal '{0}'.".format(signal_name))
243 return
244 else:
245 s_type = signal_type
246 set_command = SetCommand().set_signal(signal_name, s_type, signal_value)
247 self.publish_command(set_command.op, set_command.args, timeout=timeout)
248
249 self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
250
252 """
253 return a list of all ports
254 """
255 with self.state_mutex:
256 return copy.deepcopy(self.ports.keys())
257
259 """
260 return the status for the given port, or none
261 """
262 with self.state_mutex:
263 if port_name in self.ports.keys():
264 return copy.deepcopy(self.ports[port_name]['type'])
265 return None
266
268 """
269 return the status for the given port, or none
270 """
271 with self.state_mutex:
272 if port_name in self.ports.keys():
273 return copy.deepcopy(self.ports[port_name]['data'])
274 return None
275
276 - def set_port_value(self, port_name, port_value, port_type=None, timeout=5.0):
277 """
278 set the value for the given port
279 return True if the port value is set, False if the requested port is invalid
280 """
281 if port_name not in list_port_names():
282 rospy.logerr("Cannot find port '{0}' in this IO Device.".format(port_name))
283 return
284 if port_type == None:
285 p_type = self.get_port_type(port_name)
286 if p_type == None:
287 rospy.logerr("Failed to get 'type' for port '{0}'.".format(port_name))
288 return
289 else:
290 p_type = port_type
291 set_command = SetCommand().set_port(port_name, p_type, port_value)
292 self.publish_command(set_command.op, set_command.args, timeout=timeout)
293
294 self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
295
297 """
298 Registers a supplied callback to a change in state of supplied
299 signal_name's value. Spawns a thread that will call the callback with
300 the updated value.
301 @type: function
302 @param: function handle for callback function
303 @type: str
304 @param: the signal name (button or wheel) to poll for value change
305 @type: int
306 @param: the rate at which to poll for a value change (in a separate
307 thread)
308 @rtype: str
309 @return: callback_id retuned if the callback was registered, and an
310 empty string if the requested signal_name does not exist in the
311 Navigator
312 """
313 if signal_name in self.list_signal_names():
314 callback_id = uuid.uuid4()
315 self._callback_items[callback_id] = intera_dataflow.Signal()
316 def signal_spinner():
317 old_state = self.get_signal_value(signal_name)
318 r = rospy.Rate(poll_rate)
319 while not rospy.is_shutdown():
320 new_state = self.get_signal_value(signal_name)
321 if new_state != old_state:
322 self._callback_items[callback_id](new_state)
323 old_state = new_state
324 r.sleep()
325 self._callback_items[callback_id].connect(callback_function)
326 t = threading.Thread(target=signal_spinner)
327 t.daemon = True
328 t.start()
329 self._threads[callback_id] = t
330 self._callback_functions[callback_id] = callback_function
331 return callback_id
332 else:
333 return str()
334
336 """
337 Deregisters a callback based on the supplied callback_id.
338 @type: str
339 @param: the callback_id string to deregister
340 @rtype: bool
341 @return: returns bool True if the callback was successfully
342 deregistered, and False otherwise.
343 """
344 if callback_id in self._threads.keys():
345 self._callback_items[callback_id].disconnect(
346 self._callback_functions[callback_id])
347 return True
348 else:
349 return False
350