From 10db6d78d9626a19d4b58f4e190048652b542caa Mon Sep 17 00:00:00 2001 From: "Alexander K. Hudek" Date: Tue, 20 Jan 2015 02:21:32 -0500 Subject: [PATCH] Extra error handling, hash fix, and new concurrency support. --- NOTES.md | 19 +++++++++++++++ src/views/core.clj | 59 +++++++++++++++++++++++++++++++++------------- 2 files changed, 61 insertions(+), 17 deletions(-) create mode 100644 NOTES.md diff --git a/NOTES.md b/NOTES.md new file mode 100644 index 0000000..23c4078 --- /dev/null +++ b/NOTES.md @@ -0,0 +1,19 @@ +# Notes + +## Note 1 + +There is one hash stored per view signature. If a view is refreshed, a hash of +its data is checked against the previous data and if they match it is not sent +out. Otherwise it is sent out and the hash is replaced. On subscription, we +also store a hash of the view data, but only if it doesn't exist. This is important +because if we always stored it, the following situation would be a problem. + + + u1 - subscribes v1, hash is stored + p1 - updates data requiring v1 to be updated + u2 - subscribes v1, hash is stored + t1 - update thread runs and decides v1 needs to be updated because of the hint + supplied by p1, however, the hash is now the same because of u2 and no + refresh is sent out to u1. + + \ No newline at end of file diff --git a/src/views/core.clj b/src/views/core.clj index 0b3578a..e012b05 100644 --- a/src/views/core.clj +++ b/src/views/core.clj @@ -1,8 +1,10 @@ (ns views.core + (:import + [java.util.concurrent ArrayBlockingQueue TimeUnit]) (:require [views.protocols :refer [IView id data relevant?]] [plumbing.core :refer [swap-pair!]] - [clojure.tools.logging :refer [debug]])) + [clojure.tools.logging :refer [debug error]])) ;; The view-system data structure has this shape: ;; @@ -18,19 +20,22 @@ ;; ;; Each hint has the form {:namespace x :hint y} +(def refresh-queue (ArrayBlockingQueue. 500)) + (defn subscribe-view! [view-system view-sig subscriber-key data-hash] (-> view-system (update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig) (update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key) - (assoc-in [:hashes view-sig] data-hash))) + (update-in [:hashes view-sig] #(or % data-hash)))) ;; see note #1 (defn subscribe! [view-system namespace view-id parameters subscriber-key] - (if-let [view (get-in @view-system [:views view-id])] - (let [vdata (data view namespace parameters)] - (swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata)) - ((get @view-system :send-fn) subscriber-key [[view-id parameters] vdata])))) + (if-let [view (get-in @view-system [:views view-id])] + (future + (let [vdata (data view namespace parameters)] + (swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata)) + ((get @view-system :send-fn) subscriber-key [[view-id parameters] vdata]))))) (defn remove-from-subscribers [view-system view-sig subscriber-key] @@ -58,12 +63,10 @@ [view-system hints [namespace view-id parameters :as view-sig]] (let [v (get-in @view-system [:views view-id])] (if (relevant? v namespace parameters hints) - (let [vdata (data v namespace parameters) - hdata (hash vdata)] - (when-not (= hdata (get-in @view-system [:hashes view-sig])) - (doseq [s (get-in @view-system [:subscribers view-sig])] - ((:send-fn @view-system) s [[view-id parameters] vdata])) - (swap! view-system assoc-in [:hashes view-sig] hdata)))))) + (if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig) + (when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig) + (error "refresh-queue full, dropping refresh request for" view-sig)) + (debug "already queued for refresh" view-sig))))) (defn subscribed-views [view-system] @@ -91,15 +94,37 @@ [last-update min-refresh-interval] (Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update))))) +(defn worker-thread + "Handles refresh requests." + [view-system] + (fn [] + (when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)] + (try + (let [view (get-in @view-system [:views view-id]) + vdata (data view namespace parameters) + hdata (hash vdata)] + (when-not (= hdata (get-in @view-system [:hashes view-sig])) + (doseq [s (get-in @view-system [:subscribers view-sig])] + ((:send-fn @view-system) s [[view-id parameters] vdata])) + (swap! view-system assoc-in [:hashes view-sig] hdata))) + (catch Exception e + (error "error refreshing:" namespace view-id parameters + "e:" e "msg:" (.getMessage e))))) + (recur))) + (defn update-watcher! "A single threaded view update mechanism." - [view-system min-refresh-interval] + [view-system min-refresh-interval threads] (swap! view-system assoc :last-update 0) (.start (Thread. (fn [] (let [last-update (:last-update @view-system)] - (if (can-refresh? last-update min-refresh-interval) - (refresh-views! view-system) - (wait last-update min-refresh-interval)) - (recur)))))) + (try + (if (can-refresh? last-update min-refresh-interval) + (refresh-views! view-system) + (wait last-update min-refresh-interval)) + (catch Exception e (error "exception in views e:" e "msg:"(.getMessage e)))) + (recur))))) + (dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system))))) + (defn hint "Create a hint."