DEV Community

Andrés Baamonde Lozano
Andrés Baamonde Lozano

Posted on

Fast Pub-Sub python implementation: threading (II)

Continuing improving our simple pub sub. Now we will fire a thread on each subscriptor.

Implementation

class ThreadedEventChannel(EventChannel):
    def __init__(self):
        super(ThreadedEventChannel, self).__init__()

    def publish(self, event, *args, **kwargs):
        threads = []
        if event in self.subscribers.keys():
            for callback in self.subscribers[event]:
                threads.append(threading.Thread(
                  target=callback,
                  args=args,
                  kwargs=kwargs
                ))

            for th in threads:
                th.start()

            for th in threads:
                th.join()
Enter fullscreen mode Exit fullscreen mode

This class will fire all threads subscribed and wait they all to continue executing

Making publisher non-blocking

class ThreadedEventChannel(EventChannel):
    def __init__(self, blocking=True):
        self.blocking = blocking
        super(ThreadedEventChannel, self).__init__()

    def publish(self, event, *args, **kwargs):
        threads = []
        if event in self.subscribers.keys():
            for callback in self.subscribers[event]:
                threads.append(threading.Thread(
                  target=callback,
                  args=args,
                  kwargs=kwargs
                ))
            for th in threads:
                th.start()

            if self.blocking:
                for th in threads:
                    th.join()
Enter fullscreen mode Exit fullscreen mode

This class will fire all threads subscribed and no wait they to continue executing

Advantages of this method

We making calls parallel will improve our preformance for example, if an event fires 2 long callbacks we only wait the longest time of our callbacks .

import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel

non_thread = EventChannel()
threaded = ThreadedEventChannel()
non_blocking_threaded = ThreadedEventChannel(blocking=False)

non_thread.subscribe("myevent", time.sleep)
non_thread.subscribe("myevent", time.sleep)
start = time.time()
non_thread.publish("myevent", 3)
end = time.time()
print("non threaded function elapsed time {0}".format(end - start))
#non threaded function elapsed time 6.0080871582
threaded.subscribe("myevent", time.sleep)
threaded.subscribe("myevent", time.sleep)
start = time.time()
threaded.publish("myevent", 3)
end = time.time()
print("threaded function elapsed time {0}".format(end - start))
# threaded function elapsed time 3.00581121445

non_blocking_threaded.subscribe("myevent", time.sleep)
non_blocking_threaded.subscribe("myevent", time.sleep)
start = time.time()
non_blocking_threaded.publish("myevent", 3)
end = time.time()
print("threaded function non blocking elapsed time {0}".format(end - start))
# threaded function non blocking elapsed time 0.00333380699158
Enter fullscreen mode Exit fullscreen mode

Links

Wich things will you improve?.Thank you for reading, and write any thought below :D

Top comments (0)