Package intera_interface :: Module camera
[hide private]
[frames] | no frames]

Source Code for Module intera_interface.camera

  1  # Copyright (c) 2013-2017, Rethink Robotics Inc. 
  2  # 
  3  # Licensed under the Apache License, Version 2.0 (the "License"); 
  4  # you may not use this file except in compliance with the License. 
  5  # You may obtain a copy of the License at 
  6  # 
  7  #     http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  # Unless required by applicable law or agreed to in writing, software 
 10  # distributed under the License is distributed on an "AS IS" BASIS, 
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  # See the License for the specific language governing permissions and 
 13  # limitations under the License. 
 14   
 15  import rospy 
 16  from sensor_msgs.msg import Image 
 17  from intera_io import IODeviceInterface 
 18  from robot_params import RobotParams 
 19   
20 -class Cameras(object):
21 """ 22 Base class for the interface to the robot's Cameras. 23 """ 24
25 - def __init__(self):
26 """ 27 Constructor. 28 """ 29 camera_param_dict = RobotParams().get_camera_details() 30 camera_list = camera_param_dict.keys() 31 # check to make sure cameras is not an empty list 32 if not camera_list: 33 rospy.logerr(' '.join(["camera list is empty: ", ' , '.join(camera_list)])) 34 return 35 camera_color_dict = {"mono":['cognex'], "color":['ienso_ethernet']} 36 self.cameras_io = dict() 37 for camera in camera_list: 38 if camera_param_dict[camera]['cameraType'] in camera_color_dict['' 39 'color']: 40 is_color = True 41 else: 42 is_color = False 43 self.cameras_io[camera] = {'interface': IODeviceInterface("internal" 44 "_camera", camera), 'is_color': is_color}
45
46 - def _camera_streaming_status(self, camera_name):
47 """ 48 Private function to check if the camera is currently in streaming mode. 49 50 @type camera_name: str 51 @param camera_name: camera name 52 53 @rtype: bool 54 @return: True if the camera is streaming, False otherwise 55 """ 56 return self.cameras_io[camera_name]['interface'].get_signal_value( 57 "camera_streaming")
58
59 - def list_cameras(self):
60 """ 61 Return the list of all camera names on current robot. 62 63 @rtype: [str] 64 @return: ordered list of camera names 65 """ 66 return self.cameras_io.keys()
67
68 - def verify_camera_exists(self, camera_name):
69 """ 70 Verify if the given camera name is in the list of camera names or not. 71 72 @type camera_name: str 73 @param camera_name: camera name 74 75 @rtype: bool 76 @return: True if the name exists in camera name list, False otherwise. 77 """ 78 if camera_name not in self.list_cameras(): 79 rospy.logerr(' '.join([camera_name, "not in the list of cameras" 80 " detected on this robot:", ' , '.join(self.list_cameras())])) 81 return False 82 return True
83
84 - def is_camera_streaming(self, camera_name):
85 """ 86 Check the given camera name is streaming or not. 87 88 @type camera_name: str 89 @param camera_name: camera name 90 91 @rtype: bool 92 @return: True if the camera is streaming, False camera is not 93 streaming, False with log error means camera name not exists 94 in camera name list 95 """ 96 if self.verify_camera_exists(camera_name): 97 return self._camera_streaming_status(camera_name) 98 return False
99
100 - def set_callback(self, camera_name, callback, callback_args=None, 101 queue_size=10, rectify_image=True):
102 """ 103 Setup the callback function to show image. 104 105 @type camera_name: str 106 @param camera_name: camera name 107 @type callback: fn(msg, cb_args) 108 @param callback: function to call when data is received 109 @type callback_args: any 110 @param callback_args: additional arguments to pass to the callback 111 @type queue_size: int 112 @param queue_size: maximum number of messages to receive at a time 113 @type rectify_image: bool 114 @param rectify_image: specify whether subscribe to the rectified or 115 raw (unrectified) image topic 116 """ 117 if self.verify_camera_exists(camera_name): 118 if rectify_image == True: 119 if self.cameras_io[camera_name]['is_color']: 120 image_string = "image_rect_color" 121 else: 122 image_string = "image_rect" 123 else: 124 image_string = "image_raw" 125 rospy.Subscriber('/'.join(["/io/internal_camera", camera_name, 126 image_string]), Image, callback, callback_args=callback_args)
127
128 - def start_streaming(self, camera_name):
129 """ 130 Start camera streaming for the given camera name, This only allows 131 one camera open at one time and forces closed any other open cameras 132 before open the wanted one. 133 134 @type camera_name: str 135 @param camera_name: camera name 136 137 @rtype: bool 138 @return: False if camera not exists in camera_name_list or the 139 interface is not able to stop streaming other camera. 140 Additionally, returns False if the interface is not able 141 to start streaming the desired camera. Returns True if the 142 camera already streaming or the camera successfully start 143 streaming. 144 """ 145 if not self.verify_camera_exists(camera_name): 146 return False 147 elif not self._camera_streaming_status(camera_name): 148 other_cameras_list = list(set(self.list_cameras())-set([ 149 camera_name])) 150 for other_camera in other_cameras_list: 151 if self._camera_streaming_status(other_camera): 152 self.cameras_io[other_camera]['interface'].set_signal_value( 153 "camera_streaming", False) 154 if self._camera_streaming_status(other_camera): 155 rospy.logerr(' '.join(["Failed to stop", 156 other_camera, "from streaming on this robot. " 157 "Unable to start streaming from", camera_name])) 158 return False 159 self.cameras_io[camera_name]['interface'].set_signal_value( 160 "camera_streaming", True) 161 if not self._camera_streaming_status(camera_name): 162 rospy.logerr(' '.join(["Failed to start", camera_name, "Unable" 163 " to start streaming from", camera_name])) 164 return False 165 else: 166 return True 167 else: # Camera is already streaming 168 return True
169
170 - def stop_streaming(self, camera_name):
171 """ 172 Stop camera streaming by given the camera name. 173 174 @type camera_name: str 175 @param camera_name: camera name 176 177 @rtype: bool 178 @return: False if camera not exists in camera name list or not able 179 to stop streaming camera. True if the camera not is streaming 180 mode or the camera successfully stop streaming. 181 """ 182 if not self.verify_camera_exists(camera_name): 183 return False 184 elif self._camera_streaming_status(camera_name): 185 self.cameras_io[camera_name]['interface'].set_signal_value( 186 "camera_streaming", False) 187 if self._camera_streaming_status(camera_name): 188 rospy.logerr(' '.join(["Failed to stop", camera_name, 189 " from streaming on this robot."])) 190 return False 191 else: 192 return True 193 else: # Camera not in streaming mode 194 return True
195