(ns observer
  (:require [clojure.spec.alpha :as s]))
(s/def ::topic keyword?)
(s/def ::message any?)
(s/def ::callback fn?)
(defprotocol Observable
  "Protocol defining Observable behaviors"
  (subscribe [this topic callback] "Subscribes a callback function to a specific topic")
  (unsubscribe [this topic callback] "Removes a callback subscription from a topic")
  (notify [this topic message] "Notifies all subscribers of a topic with a message"))
(defrecord EventBus [subscribers]
  Observable
  (subscribe [this topic callback]
    {:pre [(s/valid? ::topic topic)
           (s/valid? ::callback callback)]}
    (update-in this [:subscribers topic] (fnil conj #{}) callback))
  (unsubscribe [this topic callback]
    {:pre [(s/valid? ::topic topic)
           (s/valid? ::callback callback)]}
    (update-in this [:subscribers topic] disj callback))
  (notify [this topic message]
    {:pre [(s/valid? ::topic topic)
           (s/valid? ::message message)]}
    (doseq [callback (get-in this [:subscribers topic])]
      (callback message))
    this))
(defn create-event-bus
  "Creates a new event bus instance"
  []
  (->EventBus {}))
(defn create-stateful-subscriber
  "Creates a subscriber that maintains state between notifications"
  [initial-state update-fn]
  (let [state (atom initial-state)]
    (fn [message]
      (swap! state update-fn message))))
(defn create-logging-subscriber
  "Creates a subscriber that logs messages with timestamps"
  [topic-name]
  (fn [message]
    (println (format "[%s][%s] Received: %s"
                     (java.time.LocalDateTime/now)
                     topic-name
                     message))))
(comment
  (def event-bus (create-event-bus))
  (def order-logger (create-logging-subscriber "Orders"))
  (def bus-with-logger 
    (subscribe event-bus :orders order-logger))
  (def order-counter
    (create-stateful-subscriber 0 (fn [state _] (inc state))))
  (def bus-with-counter
    (subscribe bus-with-logger :orders order-counter))
  (notify bus-with-counter :orders {:id 1 :total 100.0})
  (notify bus-with-counter :orders {:id 2 :total 200.0})
  (def final-bus
    (unsubscribe bus-with-counter :orders order-logger))
)
(comment
  (require '[clojure.test :refer [deftest testing is]])
  (deftest observer-pattern-test
    (testing "Basic subscription and notification"
      (let [received (atom nil)
            callback #(reset! received %)
            bus (-> (create-event-bus)
                   (subscribe :test callback))]
        (notify bus :test "hello")
        (is (= @received "hello"))))
    (testing "Multiple subscribers"
      (let [results (atom [])
            callback-1 #(swap! results conj [:cb1 %])
            callback-2 #(swap! results conj [:cb2 %])
            bus (-> (create-event-bus)
                   (subscribe :test callback-1)
                   (subscribe :test callback-2))]
        (notify bus :test "hello")
        (is (= @results [[:cb1 "hello"] [:cb2 "hello"]]))))
    (testing "Unsubscribe"
      (let [received (atom nil)
            callback #(reset! received %)
            bus (-> (create-event-bus)
                   (subscribe :test callback)
                   (unsubscribe :test callback))]
        (notify bus :test "hello")
        (is (nil? @received))))
    (testing "Stateful subscriber"
      (let [counter (create-stateful-subscriber 0 (fn [state _] (inc state)))
            bus (subscribe (create-event-bus) :test counter)]
        (notify bus :test "event1")
        (notify bus :test "event2")
        (is (= @(#'observer/state counter) 2))))))
![Cover image for Clojure Is Awesome!!! [PART 6]](https://media2.dev.to/dynamic/image/width=1000,height=420,fit=cover,gravity=auto,format=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgj4x858iplwc8twxba3x.jpg) 
              
            
          For further actions, you may consider blocking this person and/or reporting abuse
 

 
    
Top comments (1)
That is indeed awesome!
In the last test, I am getting the error "Unable to resolve var: observer/state in this context".