contentious change (probably): convert to single global view-system atom

why do this?

well, i think the biggest gain from the perspective of a developer
looking to use this library is that this simplifies the use of it.
it becomes easier to change things in this library (and also in others
that plug into it, such as views-honeysql and other libraries providing
alternate IView implementations) to have a much better out-of-the-box
working configuration for the common use-cases.

additionally, i could not think of a scenario where i would want to
have more then one view-system hanging around given that you can
plug in multiple different IView implementations into the same
view-system that potentially each work with a different backing
database.

the big complaint i could see against this change is that it goes
somewhat against a "functional approach" with the global state that is
used automatically by the functions in the library.

personally, i (right now) see it as a more _practical_ approach. and
for me that wins out in the end.

who knows, perhaps i will deeply regret this change down the road.
This commit is contained in:
Gered 2016-05-18 18:57:01 -04:00
parent f9c15d6cd6
commit 5ec96d6db2

View file

@ -21,12 +21,16 @@
;;
;; Each hint has the form {:namespace x :hint y}
(defonce view-system (atom {}))
(def refresh-queue-size
(if-let [n (:views-refresh-queue-size env)]
(Long/parseLong n)
1000))
(def statistics (atom {}))
(defonce statistics (atom {}))
(defn reset-stats!
[]
@ -39,18 +43,18 @@
(def refresh-queue (ArrayBlockingQueue. refresh-queue-size))
(defn subscribe-view!
(defn- subscribe-view!
[view-system view-sig subscriber-key]
(-> view-system
(update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig)
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)))
(defn update-hash!
(defn- update-hash!
[view-system view-sig data-hash]
(update-in view-system [:hashes view-sig] #(or % data-hash))) ;; see note #1 in NOTES.md
(defn subscribe!
[view-system namespace view-id parameters subscriber-key]
[namespace view-id parameters subscriber-key]
(when-let [view (get-in @view-system [:views view-id])]
(let [view-sig [namespace view-id parameters]]
(swap! view-system subscribe-view! view-sig subscriber-key)
@ -67,12 +71,12 @@
(error "error subscribing:" namespace view-id parameters
"e:" e "msg:" (.getMessage e))))))))
(defn remove-from-subscribers
(defn- remove-from-subscribers
[view-system view-sig subscriber-key]
(update-in view-system [:subscribers view-sig] disj subscriber-key))
(defn unsubscribe!
[view-system namespace view-id parameters subscriber-key]
[namespace view-id parameters subscriber-key]
(swap! view-system
(fn [vs]
(-> vs
@ -81,7 +85,7 @@
(defn unsubscribe-all!
"Remove all subscriptions by a given subscriber."
[view-system subscriber-key]
[subscriber-key]
(swap! view-system
(fn [vs]
(let [view-sigs (get-in vs [:subscribed subscriber-key])
@ -90,7 +94,7 @@
(defn refresh-view!
"We refresh a view if it is relevant and its data hash has changed."
[view-system hints [namespace view-id parameters :as view-sig]]
[hints [namespace view-id parameters :as view-sig]]
(let [v (get-in @view-system [:views view-id])]
(try
(if (relevant? v namespace parameters hints)
@ -105,28 +109,28 @@
view-id "e:" e)))))
(defn subscribed-views
[view-system]
(reduce into #{} (vals (:subscribed view-system))))
[]
(reduce into #{} (vals (:subscribed @view-system))))
(defn active-view-count
"Returns a count of views with at least one subscriber."
[view-system]
(count (remove #(empty? (val %)) (:subscribers view-system))))
[]
(count (remove #(empty? (val %)) (:subscribers @view-system))))
(defn pop-hints!
"Return hints and clear hint set atomicly."
[view-system]
[]
(let [p (swap-pair! view-system assoc :hints #{})]
(or (:hints (first p)) #{})))
(defn refresh-views!
"Given a collection of hints, or a single hint, find all dirty views and schedule them for a refresh."
([view-system hints]
([hints]
(debug "refresh hints:" hints)
(mapv #(refresh-view! view-system hints %) (subscribed-views @view-system))
(mapv #(refresh-view! hints %) (subscribed-views))
(swap! view-system assoc :last-update (System/currentTimeMillis)))
([view-system]
(refresh-views! view-system (pop-hints! view-system))))
([]
(refresh-views! (pop-hints!))))
(defn can-refresh?
[last-update min-refresh-interval]
@ -138,7 +142,7 @@
(defn refresh-worker-thread
"Handles refresh requests."
[view-system]
[]
(fn []
(try
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
@ -160,12 +164,12 @@
(debug "exiting worker thread"))))
(defn refresh-watcher-thread
[view-system min-refresh-interval]
[min-refresh-interval]
(fn []
(let [last-update (:last-update @view-system)]
(try
(if (can-refresh? last-update min-refresh-interval)
(refresh-views! view-system)
(refresh-views!)
(wait last-update min-refresh-interval))
(catch InterruptedException e)
(catch Exception e
@ -177,12 +181,12 @@
(defn start-update-watcher!
"Starts threads for the views refresh watcher and worker threads that handle
view refresh requests."
[view-system min-refresh-interval threads]
[min-refresh-interval threads]
(if (and (:refresh-watcher @view-system)
(:workers @view-system))
(error "cannot start new watcher and worker threads until existing threads are stopped")
(let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread view-system min-refresh-interval))
worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread view-system)))
(let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread min-refresh-interval))
worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread)))
(range threads))]
(swap! view-system assoc
:last-update 0
@ -196,7 +200,7 @@
(defn stop-update-watcher!
"Stops threads for the views refresh watcher and worker threads."
[view-system]
[]
(swap! view-system assoc
:stop-refresh-watcher? true
:stop-workers? true)
@ -210,14 +214,14 @@
(defn log-statistics!
"Run a thread that logs statistics every msecs."
[view-system msecs]
[msecs]
(swap! statistics assoc-in [:enabled] true)
(let [secs (/ msecs 1000)]
(.start (Thread. (fn []
(Thread/sleep msecs)
(let [stats @statistics]
(reset-stats!)
(info "subscribed views:" (active-view-count @view-system)
(info "subscribed views:" (active-view-count)
(format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs)))
(format "dropped/sec: %.1f" (double (/ (:dropped stats) secs)))
(format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs))))
@ -230,12 +234,12 @@
(defn add-hint!
"Add a hint to the system."
[view-system hint]
[hint]
(swap! view-system update-in [:hints] (fnil conj #{}) hint))
(defn add-views!
"Add a collection of views to the system."
[view-system views]
[views]
(swap! view-system update-in [:views] (fnil into {}) (map vector (map id views) views)))
(comment
@ -248,50 +252,45 @@
(let [tables (query-tables (apply query-fn parameters))]
(boolean (some #(not-empty (intersection % talbes)) hints)))))
(def memory-system (atom {}))
(reset! memory-system {:a {:foo 1 :bar 200 :baz [1 2 3]}
:b {:foo 2 :bar 300 :baz [2 3 4]}})
(reset! in-memory-data {:a {:foo 1 :bar 200 :baz [1 2 3]}
:b {:foo 2 :bar 300 :baz [2 3 4]}})
(defrecord MemoryView [id ks]
IView
(id [_] id)
(data [_ namespace parameters]
(get-in @memory-system (-> [namespace] (into ks) (into parameters))))
(get-in @in-memory-data (-> [namespace] (into ks) (into parameters))))
(relevant? [_ namespace parameters hints]
(some #(and (= namespace (:namespace %)) (= ks (:hint %))) hints)))
(def view-system
(atom
{:views {:foo (MemoryView. :foo [:foo])
:bar (MemoryView. :bar [:bar])
:baz (MemoryView. :baz [:baz])}
:send-fn (fn [subscriber-key data] (println "sending to:" subscriber-key "data:" data))}))
(reset! view-system
{:views {:foo (MemoryView. :foo [:foo])
:bar (MemoryView. :bar [:bar])
:baz (MemoryView. :baz [:baz])}
:send-fn (fn [subscriber-key data] (println "sending to:" subscriber-key "data:" data))})
(subscribe! view-system :a :foo [] 1)
(subscribe! view-system :b :foo [] 2)
(subscribe! view-system :b :baz [] 2)
(subscribe! :a :foo [] 1)
(subscribe! :b :foo [] 2)
(subscribe! :b :baz [] 2)
(subscribed-views @view-system)
(subscribed-views)
(doto view-system
(add-hint! [:foo])
(add-hint! [:baz]))
(add-hint! [:foo])
(add-hint! [:baz])
(refresh-views! view-system)
(refresh-views!)
;; Example of function that updates and hints the view system.
(defn massoc-in!
[memory-system namespace ks v]
(let [ms (swap! memory-system assoc-in (into [namespace] ks) v)]
(add-hint! view-system ks)
[memory-db namespace ks v]
(let [ms (swap! memory-db assoc-in (into [namespace] ks) v)]
(add-hint! ks)
ms))
(massoc-in! memory-system :a [:foo] 1)
(massoc-in! memory-system :b [:baz] [2 4 3])
(massoc-in! in-memory-data :a [:foo] 1)
(massoc-in! in-memory-data :b [:baz] [2 4 3])
(start-update-watcher! view-system 1000)
(start-update-watcher! 1000 1)
)