Package intera_interface :: Module digital_io
[hide private]
[frames] | no frames]

Source Code for Module intera_interface.digital_io

  1  # Copyright (c) 2013-2016, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  import errno 
 29   
 30  import rospy 
 31   
 32  import intera_dataflow 
 33   
 34  from intera_core_msgs.msg import ( 
 35      DigitalIOState, 
 36      DigitalOutputCommand, 
 37  ) 
38 39 40 -class DigitalIO(object):
41 """ 42 DEPRECATION WARNING: This interface will likely be removed in 43 the future. Transition to using the IO Framework and the wrapper 44 classes: gripper.py, cuff.py, camera.py 45 46 Interface class for a simple Digital Input and/or Output on the 47 Intera robots. 48 49 Input 50 - read input state 51 Output 52 - turn output On/Off 53 - read current output state 54 """
55 - def __init__(self, component_id):
56 """ 57 Constructor. 58 59 @param component_id: unique id of the digital component 60 """ 61 self._id = component_id 62 self._component_type = 'digital_io' 63 self._is_output = False 64 self._state = None 65 66 self.state_changed = intera_dataflow.Signal() 67 68 type_ns = '/robot/' + self._component_type 69 topic_base = type_ns + '/' + self._id 70 71 self._sub_state = rospy.Subscriber( 72 topic_base + '/state', 73 DigitalIOState, 74 self._on_io_state) 75 76 intera_dataflow.wait_for( 77 lambda: self._state != None, 78 timeout=2.0, 79 timeout_msg="Failed to get current digital_io state from %s" \ 80 % (topic_base,), 81 ) 82 83 # check if output-capable before creating publisher 84 if self._is_output: 85 self._pub_output = rospy.Publisher( 86 type_ns + '/command', 87 DigitalOutputCommand, 88 queue_size=10)
89
90 - def _on_io_state(self, msg):
91 """ 92 Updates the internally stored state of the Digital Input/Output. 93 """ 94 new_state = (msg.state == DigitalIOState.PRESSED) 95 if self._state is None: 96 self._is_output = not msg.isInputOnly 97 old_state = self._state 98 self._state = new_state 99 100 # trigger signal if changed 101 if old_state is not None and old_state != new_state: 102 self.state_changed(new_state)
103 104 @property
105 - def is_output(self):
106 """ 107 Accessor to check if IO is capable of output. 108 """ 109 return self._is_output
110 111 @property
112 - def state(self):
113 """ 114 Current state of the Digital Input/Output. 115 """ 116 return self._state
117 118 @state.setter
119 - def state(self, value):
120 """ 121 Control the state of the Digital Output. (is_output must be True) 122 123 @type value: bool 124 @param value: new state to output {True, False} 125 """ 126 self.set_output(value)
127
128 - def set_output(self, value, timeout=2.0):
129 """ 130 Control the state of the Digital Output. 131 132 Use this function for finer control over the wait_for timeout. 133 134 @type value: bool 135 @param value: new state {True, False} of the Output. 136 @type timeout: float 137 @param timeout: Seconds to wait for the io to reflect command. 138 If 0, just command once and return. [0] 139 """ 140 if not self._is_output: 141 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" % 142 (self._component_type, self._id)) 143 cmd = DigitalOutputCommand() 144 cmd.name = self._id 145 cmd.value = value 146 self._pub_output.publish(cmd) 147 148 if not timeout == 0: 149 intera_dataflow.wait_for( 150 test=lambda: self.state == value, 151 timeout=timeout, 152 rate=100, 153 timeout_msg=("Failed to command digital io to: %r" % (value,)), 154 body=lambda: self._pub_output.publish(cmd) 155 )
156