1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import rospy
16 from sensor_msgs.msg import Image
17 from intera_io import IODeviceInterface
18 from robot_params import RobotParams
19
21 """
22 Base class for the interface to the robot's Cameras.
23 """
24
26 """
27 Constructor.
28 """
29 camera_param_dict = RobotParams().get_camera_details()
30 camera_list = camera_param_dict.keys()
31
32 if not camera_list:
33 rospy.logerr(' '.join(["camera list is empty: ", ' , '.join(camera_list)]))
34 return
35 camera_color_dict = {"mono":['cognex'], "color":['ienso_ethernet']}
36 self.cameras_io = dict()
37 for camera in camera_list:
38 if camera_param_dict[camera]['cameraType'] in camera_color_dict[''
39 'color']:
40 is_color = True
41 else:
42 is_color = False
43 self.cameras_io[camera] = {'interface': IODeviceInterface("internal"
44 "_camera", camera), 'is_color': is_color}
45
47 """
48 Private function to check if the camera is currently in streaming mode.
49
50 @type camera_name: str
51 @param camera_name: camera name
52
53 @rtype: bool
54 @return: True if the camera is streaming, False otherwise
55 """
56 return self.cameras_io[camera_name]['interface'].get_signal_value(
57 "camera_streaming")
58
60 """
61 Return the list of all camera names on current robot.
62
63 @rtype: [str]
64 @return: ordered list of camera names
65 """
66 return self.cameras_io.keys()
67
69 """
70 Verify if the given camera name is in the list of camera names or not.
71
72 @type camera_name: str
73 @param camera_name: camera name
74
75 @rtype: bool
76 @return: True if the name exists in camera name list, False otherwise.
77 """
78 if camera_name not in self.list_cameras():
79 rospy.logerr(' '.join([camera_name, "not in the list of cameras"
80 " detected on this robot:", ' , '.join(self.list_cameras())]))
81 return False
82 return True
83
85 """
86 Check the given camera name is streaming or not.
87
88 @type camera_name: str
89 @param camera_name: camera name
90
91 @rtype: bool
92 @return: True if the camera is streaming, False camera is not
93 streaming, False with log error means camera name not exists
94 in camera name list
95 """
96 if self.verify_camera_exists(camera_name):
97 return self._camera_streaming_status(camera_name)
98 return False
99
100 - def set_callback(self, camera_name, callback, callback_args=None,
101 queue_size=10, rectify_image=True):
102 """
103 Setup the callback function to show image.
104
105 @type camera_name: str
106 @param camera_name: camera name
107 @type callback: fn(msg, cb_args)
108 @param callback: function to call when data is received
109 @type callback_args: any
110 @param callback_args: additional arguments to pass to the callback
111 @type queue_size: int
112 @param queue_size: maximum number of messages to receive at a time
113 @type rectify_image: bool
114 @param rectify_image: specify whether subscribe to the rectified or
115 raw (unrectified) image topic
116 """
117 if self.verify_camera_exists(camera_name):
118 if rectify_image == True:
119 if self.cameras_io[camera_name]['is_color']:
120 image_string = "image_rect_color"
121 else:
122 image_string = "image_rect"
123 else:
124 image_string = "image_raw"
125 rospy.Subscriber('/'.join(["/io/internal_camera", camera_name,
126 image_string]), Image, callback, callback_args=callback_args)
127
129 """
130 Start camera streaming for the given camera name, This only allows
131 one camera open at one time and forces closed any other open cameras
132 before open the wanted one.
133
134 @type camera_name: str
135 @param camera_name: camera name
136
137 @rtype: bool
138 @return: False if camera not exists in camera_name_list or the
139 interface is not able to stop streaming other camera.
140 Additionally, returns False if the interface is not able
141 to start streaming the desired camera. Returns True if the
142 camera already streaming or the camera successfully start
143 streaming.
144 """
145 if not self.verify_camera_exists(camera_name):
146 return False
147 elif not self._camera_streaming_status(camera_name):
148 other_cameras_list = list(set(self.list_cameras())-set([
149 camera_name]))
150 for other_camera in other_cameras_list:
151 if self._camera_streaming_status(other_camera):
152 self.cameras_io[other_camera]['interface'].set_signal_value(
153 "camera_streaming", False)
154 if self._camera_streaming_status(other_camera):
155 rospy.logerr(' '.join(["Failed to stop",
156 other_camera, "from streaming on this robot. "
157 "Unable to start streaming from", camera_name]))
158 return False
159 self.cameras_io[camera_name]['interface'].set_signal_value(
160 "camera_streaming", True)
161 if not self._camera_streaming_status(camera_name):
162 rospy.logerr(' '.join(["Failed to start", camera_name, "Unable"
163 " to start streaming from", camera_name]))
164 return False
165 else:
166 return True
167 else:
168 return True
169
171 """
172 Stop camera streaming by given the camera name.
173
174 @type camera_name: str
175 @param camera_name: camera name
176
177 @rtype: bool
178 @return: False if camera not exists in camera name list or not able
179 to stop streaming camera. True if the camera not is streaming
180 mode or the camera successfully stop streaming.
181 """
182 if not self.verify_camera_exists(camera_name):
183 return False
184 elif self._camera_streaming_status(camera_name):
185 self.cameras_io[camera_name]['interface'].set_signal_value(
186 "camera_streaming", False)
187 if self._camera_streaming_status(camera_name):
188 rospy.logerr(' '.join(["Failed to stop", camera_name,
189 " from streaming on this robot."]))
190 return False
191 else:
192 return True
193 else:
194 return True
195