move refresh queue array object into view-system atom

also means the size is now configured via init! and not an environment
variable
This commit is contained in:
Gered 2016-05-21 19:12:34 -04:00
parent 36f3bdfc64
commit 943a99717f

View file

@ -4,21 +4,23 @@
(:require
[views.protocols :refer [IView id data relevant?]]
[plumbing.core :refer [swap-pair!]]
[clojure.tools.logging :refer [info debug error trace]]
[environ.core :refer [env]]))
[clojure.tools.logging :refer [info debug error trace]]))
;; The view-system data structure has this shape:
;;
;; {:views {:id1 view1, id2 view2, ...}
;; :send-fn (fn [subscriber-key data] ...)
;; :put-hints-fn (fn [hints] ... )
;; :auth-fn (fn [view-sig subscriber-key context] ...)
;; :namespace-fn (fn [view-sig subscriber-key context] ...)
;; {
;;
;; :hashes {view-sig hash, ...}
;; :subscribed {subscriber-key #{view-sig, ...}}
;; :subscribers {view-sig #{subscriber-key, ...}}
;; :hints #{hint1 hint2 ...}
;; :refresh-queue (ArrayBlockingQueue.)
;; :views {:id1 view1, id2 view2, ...}
;; :send-fn (fn [subscriber-key data] ...)
;; :put-hints-fn (fn [hints] ... )
;; :auth-fn (fn [view-sig subscriber-key context] ...)
;; :namespace-fn (fn [view-sig subscriber-key context] ...)
;;
;; :hashes {view-sig hash, ...}
;; :subscribed {subscriber-key #{view-sig, ...}}
;; :subscribers {view-sig #{subscriber-key, ...}}
;; :hints #{hint1 hint2 ...}
;;
;; }
;;
@ -28,10 +30,6 @@
(defonce statistics (atom {}))
(def refresh-queue-size
(if-let [n (:views-refresh-queue-size env)]
(Long/parseLong n)
1000))
(defn reset-stats!
@ -45,8 +43,6 @@
[]
(boolean (:logger @statistics)))
(def refresh-queue (ArrayBlockingQueue. refresh-queue-size))
(defn ->view-sig
[namespace view-id parameters]
{:namespace namespace
@ -178,17 +174,18 @@
only if the provided collection of hints is relevant to that view."
[hints {:keys [namespace view-id parameters] :as view-sig}]
(let [v (get-in @view-system [:views view-id])]
(try
(if (relevant? v namespace parameters hints)
(if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig)
(when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig)
(when (collect-stats?) (swap! statistics update-in [:dropped] inc))
(error "refresh-queue full, dropping refresh request for" view-sig))
(do
(when (collect-stats?) (swap! statistics update-in [:deduplicated] inc))
(trace "already queued for refresh" view-sig))))
(catch Exception e (error "error determining if view is relevant, view-id:"
view-id "e:" e)))))
(if-let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
(try
(if (relevant? v namespace parameters hints)
(if-not (.contains refresh-queue view-sig)
(when-not (.offer refresh-queue view-sig)
(when (collect-stats?) (swap! statistics update-in [:dropped] inc))
(error "refresh-queue full, dropping refresh request for" view-sig))
(do
(when (collect-stats?) (swap! statistics update-in [:deduplicated] inc))
(trace "already queued for refresh" view-sig))))
(catch Exception e (error "error determining if view is relevant, view-id:"
view-id "e:" e))))))
(defn subscribed-views
"Returns a list of all views in the system that have subscribers."
@ -231,26 +228,27 @@
refresh requests and when there is one, handles it by running the view, getting the view
data and then sending it out to all the view's subscribers. "
[]
(fn []
(try
(when-let [{:keys [namespace view-id parameters] :as view-sig} (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
(trace "worker running refresh for" view-sig)
(if (collect-stats?) (swap! statistics update-in [:refreshes] inc))
(try
(let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [subscriber-key (get-in @view-system [:subscribers view-sig])]
(send-view-data! subscriber-key view-sig vdata))
(swap! view-system assoc-in [:hashes view-sig] hdata)))
(catch Exception e
(error "error refreshing:" namespace view-id parameters
"e:" e "msg:" (.getMessage e)))))
(catch InterruptedException e))
(if-not (:stop-workers? @view-system)
(recur)
(trace "exiting worker thread"))))
(let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
(fn []
(try
(when-let [{:keys [namespace view-id parameters] :as view-sig} (.poll refresh-queue 60 TimeUnit/SECONDS)]
(trace "worker running refresh for" view-sig)
(if (collect-stats?) (swap! statistics update-in [:refreshes] inc))
(try
(let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [subscriber-key (get-in @view-system [:subscribers view-sig])]
(send-view-data! subscriber-key view-sig vdata))
(swap! view-system assoc-in [:hashes view-sig] hdata)))
(catch Exception e
(error "error refreshing:" namespace view-id parameters
"e:" e "msg:" (.getMessage e)))))
(catch InterruptedException e))
(if-not (:stop-workers? @view-system)
(recur)
(trace "exiting worker thread")))))
(defn refresh-watcher-thread
"Returns a refresh watcher thread function. A 'refresh watcher' continually attempts
@ -381,6 +379,11 @@
(def default-options
"Default options used to initialize the views system via init!"
{
; the size of the queue used to hold view refresh requests for
; the worker threads. for very heavy systems, this can be set
; higher if you start to get warnings about dropped refresh requests
:refresh-queue-size 1000
; interval in milliseconds at which the refresh watcher thread will
; check for any queued up hints and dispatch relevant view refresh
; updates to the worker threads.
@ -426,11 +429,12 @@
(let [options (merge default-options options)]
(trace "initializing views system using options:" options)
(reset! view-system
{:views (into {} (get-views-map views))
:send-fn send-fn
:put-hints-fn (:put-hints-fn options)
:auth-fn (:auth-fn options)
:namespace-fn (:namespace-fn options)})
{:refresh-queue (ArrayBlockingQueue. (:refresh-queue-size options))
:views (into {} (get-views-map views))
:send-fn send-fn
:put-hints-fn (:put-hints-fn options)
:auth-fn (:auth-fn options)
:namespace-fn (:namespace-fn options)})
(start-update-watcher! (:refresh-interval options)
(:worker-threads options))
(when-let [stats-log-interval (:stats-log-interval options)]