A brief explanation
First, before start coding. You must read this, a good article that explains the difference between a publish subscribe and an observer. If you don't want read this article, I think that this graphic will be enough
As we can see in this graphic, an subscriber(callback function) will be attached to an event. Once this step completed, a publisher will add to the event channel a new event firing all the subscribers attached to that event.
Our class
class EventChannel(object):
def __init__(self):
self.subscribers = {}
def unsubscribe(self, event, callback):
if event is not None or event != ""\
and event in self.subscribers.keys():
self.subscribers[event] = list(
filter(
lambda x: x is not callback,
self.subscribers[event]
)
)
def subscribe(self, event, callback):
if not callable(callback):
raise ValueError("callback must be callable")
if event is None or event == "":
raise ValueError("Event cant be empty")
if event not in self.subscribers.keys():
self.subscribers[event] = [callback]
else:
self.subscribers[event].append(callback)
def publish(self, event, args):
if event in self.subscribers.keys():
for callback in self.subscribers[event]:
callback(args)
Testing
event_channel = EventChannel()
callback = lambda x: print(x)
event_channel.subscribe("myevent", callback)
event_channel.publish("myevent", "Hello, world!")
# out: "Hello, world!"
event_channel.unsubscribe("myevent", callback)
bus_instance.publish("myevent", "Hello, world!")
# No output
Links
Wich things will you improve?.Thank you for reading, and write any thought below :D
Top comments (0)