1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 import rospy
30 from sensor_msgs.msg import Image
31 from intera_io import IODeviceInterface
32 from robot_params import RobotParams
33
35 """
36 Base class for the interface to the robot's Cameras.
37 """
38
40 """
41 Constructor.
42 """
43 camera_param_dict = RobotParams().get_camera_details()
44 camera_list = camera_param_dict.keys()
45
46 if not camera_list:
47 rospy.logerr(' '.join(["camera list is empty: ", ' , '.join(camera_list)]))
48 return
49 camera_color_dict = {"mono":['cognex'], "color":['ienso_ethernet']}
50 self.cameras_io = dict()
51 for camera in camera_list:
52 if camera_param_dict[camera]['cameraType'] in camera_color_dict[''
53 'color']:
54 is_color = True
55 else:
56 is_color = False
57 self.cameras_io[camera] = {'interface': IODeviceInterface("internal"
58 "_camera", camera), 'is_color': is_color}
59
61 """
62 Private function to check if the camera is currently in streaming mode.
63
64 @type camera_name: str
65 @param camera_name: camera name
66
67 @rtype: bool
68 @return: True if the camera is streaming, False otherwise
69 """
70 return self.cameras_io[camera_name]['interface'].get_signal_value(
71 "camera_streaming")
72
74 """
75 Return the list of all camera names on current robot.
76
77 @rtype: [str]
78 @return: ordered list of camera names
79 """
80 return self.cameras_io.keys()
81
83 """
84 Verify if the given camera name is in the list of camera names or not.
85
86 @type camera_name: str
87 @param camera_name: camera name
88
89 @rtype: bool
90 @return: True if the name exists in camera name list, False otherwise.
91 """
92 if camera_name not in self.list_cameras():
93 rospy.logerr(' '.join([camera_name, "not in the list of cameras"
94 " detected on this robot:", ' , '.join(self.list_cameras())]))
95 return False
96 return True
97
99 """
100 Check the given camera name is streaming or not.
101
102 @type camera_name: str
103 @param camera_name: camera name
104
105 @rtype: bool
106 @return: True if the camera is streaming, False camera is not
107 streaming, False with log error means camera name not exists
108 in camera name list
109 """
110 if self.verify_camera_exists(camera_name):
111 return self._camera_streaming_status(camera_name)
112 return False
113
114 - def set_callback(self, camera_name, callback, callback_args=None,
115 queue_size=10, rectify_image=True):
116 """
117 Setup the callback function to show image.
118
119 @type camera_name: str
120 @param camera_name: camera name
121 @type callback: fn(msg, cb_args)
122 @param callback: function to call when data is received
123 @type callback_args: any
124 @param callback_args: additional arguments to pass to the callback
125 @type queue_size: int
126 @param queue_size: maximum number of messages to receive at a time
127 @type rectify_image: bool
128 @param rectify_image: specify whether subscribe to the rectified or
129 raw (unrectified) image topic
130 """
131 if self.verify_camera_exists(camera_name):
132 if rectify_image == True:
133 if self.cameras_io[camera_name]['is_color']:
134 image_string = "image_rect_color"
135 else:
136 image_string = "image_rect"
137 else:
138 image_string = "image_raw"
139 rospy.Subscriber('/'.join(["/io/internal_camera", camera_name,
140 image_string]), Image, callback, callback_args=callback_args)
141
143 """
144 Start camera streaming for the given camera name, This only allows
145 one camera open at one time and forces closed any other open cameras
146 before open the wanted one.
147
148 @type camera_name: str
149 @param camera_name: camera name
150
151 @rtype: bool
152 @return: False if camera not exists in camera_name_list or the
153 interface is not able to stop streaming other camera.
154 Additionally, returns False if the interface is not able
155 to start streaming the desired camera. Returns True if the
156 camera already streaming or the camera successfully start
157 streaming.
158 """
159 if not self.verify_camera_exists(camera_name):
160 return False
161 elif not self._camera_streaming_status(camera_name):
162 other_cameras_list = list(set(self.list_cameras())-set([
163 camera_name]))
164 for other_camera in other_cameras_list:
165 if self._camera_streaming_status(other_camera):
166 self.cameras_io[other_camera]['interface'].set_signal_value(
167 "camera_streaming", False)
168 if self._camera_streaming_status(other_camera):
169 rospy.logerr(' '.join(["Failed to stop",
170 other_camera, "from streaming on this robot. "
171 "Unable to start streaming from", camera_name]))
172 return False
173 self.cameras_io[camera_name]['interface'].set_signal_value(
174 "camera_streaming", True)
175 if not self._camera_streaming_status(camera_name):
176 rospy.logerr(' '.join(["Failed to start", camera_name, "Unable"
177 " to start streaming from", camera_name]))
178 return False
179 else:
180 return True
181 else:
182 return True
183
185 """
186 Stop camera streaming by given the camera name.
187
188 @type camera_name: str
189 @param camera_name: camera name
190
191 @rtype: bool
192 @return: False if camera not exists in camera name list or not able
193 to stop streaming camera. True if the camera not is streaming
194 mode or the camera successfully stop streaming.
195 """
196 if not self.verify_camera_exists(camera_name):
197 return False
198 elif self._camera_streaming_status(camera_name):
199 self.cameras_io[camera_name]['interface'].set_signal_value(
200 "camera_streaming", False)
201 if self._camera_streaming_status(camera_name):
202 rospy.logerr(' '.join(["Failed to stop", camera_name,
203 " from streaming on this robot."]))
204 return False
205 else:
206 return True
207 else:
208 return True
209