Continuing improving our simple pub sub. Now we will fire a thread on each subscriptor.
Implementation
class ThreadedEventChannel(EventChannel):
def __init__(self):
super(ThreadedEventChannel, self).__init__()
def publish(self, event, *args, **kwargs):
threads = []
if event in self.subscribers.keys():
for callback in self.subscribers[event]:
threads.append(threading.Thread(
target=callback,
args=args,
kwargs=kwargs
))
for th in threads:
th.start()
for th in threads:
th.join()
This class will fire all threads subscribed and wait they all to continue executing
Making publisher non-blocking
class ThreadedEventChannel(EventChannel):
def __init__(self, blocking=True):
self.blocking = blocking
super(ThreadedEventChannel, self).__init__()
def publish(self, event, *args, **kwargs):
threads = []
if event in self.subscribers.keys():
for callback in self.subscribers[event]:
threads.append(threading.Thread(
target=callback,
args=args,
kwargs=kwargs
))
for th in threads:
th.start()
if self.blocking:
for th in threads:
th.join()
This class will fire all threads subscribed and no wait they to continue executing
Advantages of this method
We making calls parallel will improve our preformance for example, if an event fires 2 long callbacks we only wait the longest time of our callbacks .
import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel
non_thread = EventChannel()
threaded = ThreadedEventChannel()
non_blocking_threaded = ThreadedEventChannel(blocking=False)
non_thread.subscribe("myevent", time.sleep)
non_thread.subscribe("myevent", time.sleep)
start = time.time()
non_thread.publish("myevent", 3)
end = time.time()
print("non threaded function elapsed time {0}".format(end - start))
#non threaded function elapsed time 6.0080871582
threaded.subscribe("myevent", time.sleep)
threaded.subscribe("myevent", time.sleep)
start = time.time()
threaded.publish("myevent", 3)
end = time.time()
print("threaded function elapsed time {0}".format(end - start))
# threaded function elapsed time 3.00581121445
non_blocking_threaded.subscribe("myevent", time.sleep)
non_blocking_threaded.subscribe("myevent", time.sleep)
start = time.time()
non_blocking_threaded.publish("myevent", 3)
end = time.time()
print("threaded function non blocking elapsed time {0}".format(end - start))
# threaded function non blocking elapsed time 0.00333380699158
Links
Wich things will you improve?.Thank you for reading, and write any thought below :D
Top comments (0)