Package intera_dataflow :: Module signals
[hide private]
[frames] | no frames]

Source Code for Module intera_dataflow.signals

 1  # Copyright (c) 2013-2017, Rethink Robotics Inc. 
 2  # 
 3  # Licensed under the Apache License, Version 2.0 (the "License"); 
 4  # you may not use this file except in compliance with the License. 
 5  # You may obtain a copy of the License at 
 6  # 
 7  #     http://www.apache.org/licenses/LICENSE-2.0 
 8  # 
 9  # Unless required by applicable law or agreed to in writing, software 
10  # distributed under the License is distributed on an "AS IS" BASIS, 
11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
12  # See the License for the specific language governing permissions and 
13  # limitations under the License. 
14   
15  import inspect 
16  from weakref import WeakKeyDictionary 
17   
18  try: 
19      from weakref import WeakSet 
20  except ImportError: 
21      from weakrefset import WeakSet 
22   
23   
24 -class Signal(object):
25 - def __init__(self):
26 self._functions = WeakSet() 27 self._methods = WeakKeyDictionary()
28
29 - def __call__(self, *args, **kargs):
30 for f in self._functions: 31 f(*args, **kargs) 32 33 for obj, functions in self._methods.items(): 34 for f in functions: 35 f(obj, *args, **kargs)
36
37 - def connect(self, slot):
38 if inspect.ismethod(slot): 39 if not slot.__self__ in self._methods: 40 self._methods[slot.__self__] = set() 41 self._methods[slot.__self__].add(slot.__func__) 42 else: 43 self._functions.add(slot)
44
45 - def disconnect(self, slot):
46 if inspect.ismethod(slot): 47 if slot.__self__ in self._methods: 48 self._methods[slot.__self__].remove(slot.__func__) 49 else: 50 if slot in self._functions: 51 self._functions.remove(slot)
52