Extra error handling, hash fix, and new concurrency support.

This commit is contained in:
Alexander K. Hudek 2015-01-20 02:21:32 -05:00
parent afb4b91103
commit 10db6d78d9
2 changed files with 61 additions and 17 deletions

19
NOTES.md Normal file
View file

@ -0,0 +1,19 @@
# Notes
## Note 1
There is one hash stored per view signature. If a view is refreshed, a hash of
its data is checked against the previous data and if they match it is not sent
out. Otherwise it is sent out and the hash is replaced. On subscription, we
also store a hash of the view data, but only if it doesn't exist. This is important
because if we always stored it, the following situation would be a problem.
u1 - subscribes v1, hash is stored
p1 - updates data requiring v1 to be updated
u2 - subscribes v1, hash is stored
t1 - update thread runs and decides v1 needs to be updated because of the hint
supplied by p1, however, the hash is now the same because of u2 and no
refresh is sent out to u1.

View file

@ -1,8 +1,10 @@
(ns views.core (ns views.core
(:import
[java.util.concurrent ArrayBlockingQueue TimeUnit])
(:require (:require
[views.protocols :refer [IView id data relevant?]] [views.protocols :refer [IView id data relevant?]]
[plumbing.core :refer [swap-pair!]] [plumbing.core :refer [swap-pair!]]
[clojure.tools.logging :refer [debug]])) [clojure.tools.logging :refer [debug error]]))
;; The view-system data structure has this shape: ;; The view-system data structure has this shape:
;; ;;
@ -18,19 +20,22 @@
;; ;;
;; Each hint has the form {:namespace x :hint y} ;; Each hint has the form {:namespace x :hint y}
(def refresh-queue (ArrayBlockingQueue. 500))
(defn subscribe-view! (defn subscribe-view!
[view-system view-sig subscriber-key data-hash] [view-system view-sig subscriber-key data-hash]
(-> view-system (-> view-system
(update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig) (update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig)
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key) (update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)
(assoc-in [:hashes view-sig] data-hash))) (update-in [:hashes view-sig] #(or % data-hash)))) ;; see note #1
(defn subscribe! (defn subscribe!
[view-system namespace view-id parameters subscriber-key] [view-system namespace view-id parameters subscriber-key]
(if-let [view (get-in @view-system [:views view-id])] (if-let [view (get-in @view-system [:views view-id])]
(future
(let [vdata (data view namespace parameters)] (let [vdata (data view namespace parameters)]
(swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata)) (swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata))
((get @view-system :send-fn) subscriber-key [[view-id parameters] vdata])))) ((get @view-system :send-fn) subscriber-key [[view-id parameters] vdata])))))
(defn remove-from-subscribers (defn remove-from-subscribers
[view-system view-sig subscriber-key] [view-system view-sig subscriber-key]
@ -58,12 +63,10 @@
[view-system hints [namespace view-id parameters :as view-sig]] [view-system hints [namespace view-id parameters :as view-sig]]
(let [v (get-in @view-system [:views view-id])] (let [v (get-in @view-system [:views view-id])]
(if (relevant? v namespace parameters hints) (if (relevant? v namespace parameters hints)
(let [vdata (data v namespace parameters) (if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig)
hdata (hash vdata)] (when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig)
(when-not (= hdata (get-in @view-system [:hashes view-sig])) (error "refresh-queue full, dropping refresh request for" view-sig))
(doseq [s (get-in @view-system [:subscribers view-sig])] (debug "already queued for refresh" view-sig)))))
((:send-fn @view-system) s [[view-id parameters] vdata]))
(swap! view-system assoc-in [:hashes view-sig] hdata))))))
(defn subscribed-views (defn subscribed-views
[view-system] [view-system]
@ -91,15 +94,37 @@
[last-update min-refresh-interval] [last-update min-refresh-interval]
(Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update))))) (Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update)))))
(defn worker-thread
"Handles refresh requests."
[view-system]
(fn []
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
(try
(let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [s (get-in @view-system [:subscribers view-sig])]
((:send-fn @view-system) s [[view-id parameters] vdata]))
(swap! view-system assoc-in [:hashes view-sig] hdata)))
(catch Exception e
(error "error refreshing:" namespace view-id parameters
"e:" e "msg:" (.getMessage e)))))
(recur)))
(defn update-watcher! (defn update-watcher!
"A single threaded view update mechanism." "A single threaded view update mechanism."
[view-system min-refresh-interval] [view-system min-refresh-interval threads]
(swap! view-system assoc :last-update 0) (swap! view-system assoc :last-update 0)
(.start (Thread. (fn [] (let [last-update (:last-update @view-system)] (.start (Thread. (fn [] (let [last-update (:last-update @view-system)]
(try
(if (can-refresh? last-update min-refresh-interval) (if (can-refresh? last-update min-refresh-interval)
(refresh-views! view-system) (refresh-views! view-system)
(wait last-update min-refresh-interval)) (wait last-update min-refresh-interval))
(recur)))))) (catch Exception e (error "exception in views e:" e "msg:"(.getMessage e))))
(recur)))))
(dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system)))))
(defn hint (defn hint
"Create a hint." "Create a hint."