1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import rospy
16 import sys
17 import json
18 import copy
19 import threading
20 import uuid
21 from threading import Lock
22 import intera_dataflow
23 from io_command import SetCommand
24
25 from intera_core_msgs.msg import IODeviceConfiguration, IODeviceStatus, \
26 IOComponentCommand
27
29 """
30 return true if the times are different
31 """
32 return (time1.secs != time2.secs) or (time1.nsecs != time2.nsecs)
33
35 """
36 Base class for IO interfaces.
37 """
38 - def __init__(self, path_root, config_msg_type, status_msg_type):
39 self._path = path_root
40 self.config_mutex = Lock()
41 self.state_mutex = Lock()
42 self.cmd_times = []
43 self.ports = dict()
44 self.signals = dict()
45 self.state = None
46 self.config = None
47 self.state_changed = intera_dataflow.Signal()
48 self.config_changed = intera_dataflow.Signal()
49
50 self._config_sub = rospy.Subscriber(self._path + "/config",
51 config_msg_type,
52 self.handle_config)
53 self._state_sub = rospy.Subscriber(self._path + "/state",
54 status_msg_type,
55 self.handle_state)
56 self._command_pub = rospy.Publisher(self._path + "/command",
57 IOComponentCommand, queue_size=10)
58
59
60 intera_dataflow.wait_for(
61 lambda: not self.state is None,
62 timeout=5.0,
63 timeout_msg=("Failed to get state.")
64 )
65
66 rospy.logdebug("Making new IOInterface on %s" % (self._path,))
67
69 """
70 mark the config topic data as invalid
71 """
72 with self.config_mutex:
73 self.config.time.secs = 0
74
76 """
77 mark the state topic data as invalid
78 """
79 with self.state_mutex:
80 self.state.time.secs = 0
81
83 """
84 return true if the config topic data is valid
85 """
86 return self.config.time.secs != 0
87
89 """
90 return true if the state topic data is valid
91 """
92 return self.state.time.secs != 0
93
99
100 - def revalidate(self, timeout, invalidate_state=True, invalidate_config=True):
101 """
102 invalidate the state and config topics, then wait up to timeout
103 seconds for them to become valid again.
104 return true if both the state and config topic data are valid
105 """
106 if invalidate_state:
107 self.invalidate_state()
108 if invalidate_config:
109 self.invalidate_config()
110 timeout_time = rospy.Time.now() + rospy.Duration(timeout)
111 while not self.is_state_valid() and not rospy.is_shutdown():
112 rospy.sleep(0.1)
113 if timeout_time < rospy.Time.now():
114 rospy.logwarn("Timed out waiting for node interface valid...")
115 return False
116 return True
117
119 """
120 config topic callback
121 """
122 if not self.config or _time_changed(self.config.time, msg.time):
123 with self.config_mutex:
124 self.config = msg
125 self.config_changed()
126
127 - def load_state(self, current_state, incoming_state):
128 for state in incoming_state:
129 if state.name not in current_state:
130 current_state[state.name] = dict()
131 formatting = json.loads(state.format)
132 current_state[state.name]["type"] = formatting["type"]
133 current_state[state.name]["role"] = formatting["role"]
134 data = json.loads(state.data)
135 current_state[state.name]["data"] = data[0] if len(data) > 0 else None
136
147
149 """
150 publish on the command topic
151 return true if the command is acknowleged within the timeout
152 """
153 cmd_time = rospy.Time.now()
154 self.cmd_times.append(cmd_time)
155 self.cmd_times = self.cmd_times[-100:]
156 cmd_msg = IOComponentCommand(
157 time=cmd_time,
158 op=op,
159 args=json.dumps(args))
160 rospy.logdebug("publish_command %s %s" % (cmd_msg.op, cmd_msg.args))
161 if timeout != None:
162 timeout_time = rospy.Time.now() + rospy.Duration(timeout)
163 while not rospy.is_shutdown():
164 self._command_pub.publish(cmd_msg)
165 if self.is_state_valid():
166 if cmd_time in self.state.commands:
167 rospy.logdebug("command %s acknowleged" % (cmd_msg.op,))
168 return True
169 rospy.sleep(0.1)
170 if timeout_time < rospy.Time.now():
171 rospy.logwarn("Timed out waiting for command acknowlegment...")
172 break
173 return False
174 return True
175
177 """
178 IO Device interface to config, status and command topics
179 """
180 - def __init__(self, node_name, dev_name):
181 super(IODeviceInterface, self).__init__(
182 'io/' + node_name + '/' + dev_name,
183 IODeviceConfiguration,
184 IODeviceStatus)
185 self.config = IODeviceConfiguration()
186 self.state = IODeviceStatus()
187 self.invalidate_config()
188 self.invalidate_state()
189 self._threads = dict()
190 self._callback_items = dict()
191 self._callback_functions = dict()
192
194 """
195 return a list of all signals
196 """
197 with self.state_mutex:
198 return copy.deepcopy(self.signals.keys())
199
201 """
202 return the status for the given signal, or none
203 """
204 with self.state_mutex:
205 if signal_name in self.signals.keys():
206 return copy.deepcopy(self.signals[signal_name]['type'])
207 return None
208
210 """
211 return the status for the given signal, or none
212 """
213 with self.state_mutex:
214 if signal_name in self.signals.keys():
215 return copy.deepcopy(self.signals[signal_name]['data'])
216 return None
217
218 - def set_signal_value(self, signal_name, signal_value, signal_type=None, timeout=5.0):
219 """
220 set the value for the given signal
221 return True if the signal value is set, False if the requested signal is invalid
222 """
223 if signal_name not in self.list_signal_names():
224 rospy.logerr("Cannot find signal '{0}' in this IO Device.".format(signal_name))
225 return
226 if signal_type == None:
227 s_type = self.get_signal_type(signal_name)
228 if s_type == None:
229 rospy.logerr("Failed to get 'type' for signal '{0}'.".format(signal_name))
230 return
231 else:
232 s_type = signal_type
233 set_command = SetCommand().set_signal(signal_name, s_type, signal_value)
234 self.publish_command(set_command.op, set_command.args, timeout=timeout)
235
236 self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
237
239 """
240 return a list of all ports
241 """
242 with self.state_mutex:
243 return copy.deepcopy(self.ports.keys())
244
246 """
247 return the status for the given port, or none
248 """
249 with self.state_mutex:
250 if port_name in self.ports.keys():
251 return copy.deepcopy(self.ports[port_name]['type'])
252 return None
253
255 """
256 return the status for the given port, or none
257 """
258 with self.state_mutex:
259 if port_name in self.ports.keys():
260 return copy.deepcopy(self.ports[port_name]['data'])
261 return None
262
263 - def set_port_value(self, port_name, port_value, port_type=None, timeout=5.0):
264 """
265 set the value for the given port
266 return True if the port value is set, False if the requested port is invalid
267 """
268 if port_name not in list_port_names():
269 rospy.logerr("Cannot find port '{0}' in this IO Device.".format(port_name))
270 return
271 if port_type == None:
272 p_type = self.get_port_type(port_name)
273 if p_type == None:
274 rospy.logerr("Failed to get 'type' for port '{0}'.".format(port_name))
275 return
276 else:
277 p_type = port_type
278 set_command = SetCommand().set_port(port_name, p_type, port_value)
279 self.publish_command(set_command.op, set_command.args, timeout=timeout)
280
281 self.revalidate(timeout, invalidate_state=False, invalidate_config=False)
282
284 """
285 Registers a supplied callback to a change in state of supplied
286 signal_name's value. Spawns a thread that will call the callback with
287 the updated value.
288 @type: function
289 @param: function handle for callback function
290 @type: str
291 @param: the signal name (button or wheel) to poll for value change
292 @type: int
293 @param: the rate at which to poll for a value change (in a separate
294 thread)
295 @rtype: str
296 @return: callback_id retuned if the callback was registered, and an
297 empty string if the requested signal_name does not exist in the
298 Navigator
299 """
300 if signal_name in self.list_signal_names():
301 callback_id = uuid.uuid4()
302 self._callback_items[callback_id] = intera_dataflow.Signal()
303 def signal_spinner():
304 old_state = self.get_signal_value(signal_name)
305 r = rospy.Rate(poll_rate)
306 while not rospy.is_shutdown():
307 new_state = self.get_signal_value(signal_name)
308 if new_state != old_state:
309 self._callback_items[callback_id](new_state)
310 old_state = new_state
311 r.sleep()
312 self._callback_items[callback_id].connect(callback_function)
313 t = threading.Thread(target=signal_spinner)
314 t.daemon = True
315 t.start()
316 self._threads[callback_id] = t
317 self._callback_functions[callback_id] = callback_function
318 return callback_id
319 else:
320 return str()
321
323 """
324 Deregisters a callback based on the supplied callback_id.
325 @type: str
326 @param: the callback_id string to deregister
327 @rtype: bool
328 @return: returns bool True if the callback was successfully
329 deregistered, and False otherwise.
330 """
331 if callback_id in self._threads.keys():
332 self._callback_items[callback_id].disconnect(
333 self._callback_functions[callback_id])
334 return True
335 else:
336 return False
337