1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import errno
29
30 import rospy
31
32 import intera_dataflow
33
34 from intera_core_msgs.msg import (
35 DigitalIOState,
36 DigitalOutputCommand,
37 )
41 """
42 DEPRECATION WARNING: This interface will likely be removed in
43 the future. Transition to using the IO Framework and the wrapper
44 classes: gripper.py, cuff.py, camera.py
45
46 Interface class for a simple Digital Input and/or Output on the
47 Intera robots.
48
49 Input
50 - read input state
51 Output
52 - turn output On/Off
53 - read current output state
54 """
56 """
57 Constructor.
58
59 @param component_id: unique id of the digital component
60 """
61 self._id = component_id
62 self._component_type = 'digital_io'
63 self._is_output = False
64 self._state = None
65
66 self.state_changed = intera_dataflow.Signal()
67
68 type_ns = '/robot/' + self._component_type
69 topic_base = type_ns + '/' + self._id
70
71 self._sub_state = rospy.Subscriber(
72 topic_base + '/state',
73 DigitalIOState,
74 self._on_io_state)
75
76 intera_dataflow.wait_for(
77 lambda: self._state != None,
78 timeout=2.0,
79 timeout_msg="Failed to get current digital_io state from %s" \
80 % (topic_base,),
81 )
82
83
84 if self._is_output:
85 self._pub_output = rospy.Publisher(
86 type_ns + '/command',
87 DigitalOutputCommand,
88 queue_size=10)
89
91 """
92 Updates the internally stored state of the Digital Input/Output.
93 """
94 new_state = (msg.state == DigitalIOState.PRESSED)
95 if self._state is None:
96 self._is_output = not msg.isInputOnly
97 old_state = self._state
98 self._state = new_state
99
100
101 if old_state is not None and old_state != new_state:
102 self.state_changed(new_state)
103
104 @property
106 """
107 Accessor to check if IO is capable of output.
108 """
109 return self._is_output
110
111 @property
113 """
114 Current state of the Digital Input/Output.
115 """
116 return self._state
117
118 @state.setter
120 """
121 Control the state of the Digital Output. (is_output must be True)
122
123 @type value: bool
124 @param value: new state to output {True, False}
125 """
126 self.set_output(value)
127
129 """
130 Control the state of the Digital Output.
131
132 Use this function for finer control over the wait_for timeout.
133
134 @type value: bool
135 @param value: new state {True, False} of the Output.
136 @type timeout: float
137 @param timeout: Seconds to wait for the io to reflect command.
138 If 0, just command once and return. [0]
139 """
140 if not self._is_output:
141 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
142 (self._component_type, self._id))
143 cmd = DigitalOutputCommand()
144 cmd.name = self._id
145 cmd.value = value
146 self._pub_output.publish(cmd)
147
148 if not timeout == 0:
149 intera_dataflow.wait_for(
150 test=lambda: self.state == value,
151 timeout=timeout,
152 rate=100,
153 timeout_msg=("Failed to command digital io to: %r" % (value,)),
154 body=lambda: self._pub_output.publish(cmd)
155 )
156