(ns observer
(:require [clojure.spec.alpha :as s]))
(s/def ::topic keyword?)
(s/def ::message any?)
(s/def ::callback fn?)
(defprotocol Observable
"Protocol defining Observable behaviors"
(subscribe [this topic callback] "Subscribes a callback function to a specific topic")
(unsubscribe [this topic callback] "Removes a callback subscription from a topic")
(notify [this topic message] "Notifies all subscribers of a topic with a message"))
(defrecord EventBus [subscribers]
Observable
(subscribe [this topic callback]
{:pre [(s/valid? ::topic topic)
(s/valid? ::callback callback)]}
(update-in this [:subscribers topic] (fnil conj #{}) callback))
(unsubscribe [this topic callback]
{:pre [(s/valid? ::topic topic)
(s/valid? ::callback callback)]}
(update-in this [:subscribers topic] disj callback))
(notify [this topic message]
{:pre [(s/valid? ::topic topic)
(s/valid? ::message message)]}
(doseq [callback (get-in this [:subscribers topic])]
(callback message))
this))
(defn create-event-bus
"Creates a new event bus instance"
[]
(->EventBus {}))
(defn create-stateful-subscriber
"Creates a subscriber that maintains state between notifications"
[initial-state update-fn]
(let [state (atom initial-state)]
(fn [message]
(swap! state update-fn message))))
(defn create-logging-subscriber
"Creates a subscriber that logs messages with timestamps"
[topic-name]
(fn [message]
(println (format "[%s][%s] Received: %s"
(java.time.LocalDateTime/now)
topic-name
message))))
(comment
(def event-bus (create-event-bus))
(def order-logger (create-logging-subscriber "Orders"))
(def bus-with-logger
(subscribe event-bus :orders order-logger))
(def order-counter
(create-stateful-subscriber 0 (fn [state _] (inc state))))
(def bus-with-counter
(subscribe bus-with-logger :orders order-counter))
(notify bus-with-counter :orders {:id 1 :total 100.0})
(notify bus-with-counter :orders {:id 2 :total 200.0})
(def final-bus
(unsubscribe bus-with-counter :orders order-logger))
)
(comment
(require '[clojure.test :refer [deftest testing is]])
(deftest observer-pattern-test
(testing "Basic subscription and notification"
(let [received (atom nil)
callback #(reset! received %)
bus (-> (create-event-bus)
(subscribe :test callback))]
(notify bus :test "hello")
(is (= @received "hello"))))
(testing "Multiple subscribers"
(let [results (atom [])
callback-1 #(swap! results conj [:cb1 %])
callback-2 #(swap! results conj [:cb2 %])
bus (-> (create-event-bus)
(subscribe :test callback-1)
(subscribe :test callback-2))]
(notify bus :test "hello")
(is (= @results [[:cb1 "hello"] [:cb2 "hello"]]))))
(testing "Unsubscribe"
(let [received (atom nil)
callback #(reset! received %)
bus (-> (create-event-bus)
(subscribe :test callback)
(unsubscribe :test callback))]
(notify bus :test "hello")
(is (nil? @received))))
(testing "Stateful subscriber"
(let [counter (create-stateful-subscriber 0 (fn [state _] (inc state)))
bus (subscribe (create-event-bus) :test counter)]
(notify bus :test "event1")
(notify bus :test "event2")
(is (= @(#'observer/state counter) 2))))))
![Cover image for Clojure Is Awesome!!! [PART 6]](https://media2.dev.to/dynamic/image/width=1000,height=420,fit=cover,gravity=auto,format=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgj4x858iplwc8twxba3x.jpg)
How to Diagram Your Cloud Architecture
Cloud architecture diagrams provide critical visibility into the resources in your environment and how they’re connected. In our latest eBook, AWS Solution Architects Jason Mimick and James Wenzel walk through best practices on how to build effective and professional diagrams.
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (1)
That is indeed awesome!
In the last test, I am getting the error "Unable to resolve var: observer/state in this context".