1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import errno
16
17 import rospy
18
19 import intera_dataflow
20
21 from intera_core_msgs.msg import (
22 DigitalIOState,
23 DigitalOutputCommand,
24 )
28 """
29 DEPRECATION WARNING: This interface will likely be removed in
30 the future. Transition to using the IO Framework and the wrapper
31 classes: gripper.py, cuff.py, camera.py
32
33 Interface class for a simple Digital Input and/or Output on the
34 Intera robots.
35
36 Input
37 - read input state
38 Output
39 - turn output On/Off
40 - read current output state
41 """
43 """
44 Constructor.
45
46 @param component_id: unique id of the digital component
47 """
48 self._id = component_id
49 self._component_type = 'digital_io'
50 self._is_output = False
51 self._state = None
52
53 self.state_changed = intera_dataflow.Signal()
54
55 type_ns = '/robot/' + self._component_type
56 topic_base = type_ns + '/' + self._id
57
58 self._sub_state = rospy.Subscriber(
59 topic_base + '/state',
60 DigitalIOState,
61 self._on_io_state)
62
63 intera_dataflow.wait_for(
64 lambda: self._state != None,
65 timeout=2.0,
66 timeout_msg="Failed to get current digital_io state from %s" \
67 % (topic_base,),
68 )
69
70
71 if self._is_output:
72 self._pub_output = rospy.Publisher(
73 type_ns + '/command',
74 DigitalOutputCommand,
75 queue_size=10)
76
78 """
79 Updates the internally stored state of the Digital Input/Output.
80 """
81 new_state = (msg.state == DigitalIOState.PRESSED)
82 if self._state is None:
83 self._is_output = not msg.isInputOnly
84 old_state = self._state
85 self._state = new_state
86
87
88 if old_state is not None and old_state != new_state:
89 self.state_changed(new_state)
90
91 @property
93 """
94 Accessor to check if IO is capable of output.
95 """
96 return self._is_output
97
98 @property
100 """
101 Current state of the Digital Input/Output.
102 """
103 return self._state
104
105 @state.setter
107 """
108 Control the state of the Digital Output. (is_output must be True)
109
110 @type value: bool
111 @param value: new state to output {True, False}
112 """
113 self.set_output(value)
114
116 """
117 Control the state of the Digital Output.
118
119 Use this function for finer control over the wait_for timeout.
120
121 @type value: bool
122 @param value: new state {True, False} of the Output.
123 @type timeout: float
124 @param timeout: Seconds to wait for the io to reflect command.
125 If 0, just command once and return. [0]
126 """
127 if not self._is_output:
128 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
129 (self._component_type, self._id))
130 cmd = DigitalOutputCommand()
131 cmd.name = self._id
132 cmd.value = value
133 self._pub_output.publish(cmd)
134
135 if not timeout == 0:
136 intera_dataflow.wait_for(
137 test=lambda: self.state == value,
138 timeout=timeout,
139 rate=100,
140 timeout_msg=("Failed to command digital io to: %r" % (value,)),
141 body=lambda: self._pub_output.publish(cmd)
142 )
143