From 943a99717fcc5aa0c983b379c332c84da2341ec2 Mon Sep 17 00:00:00 2001 From: gered Date: Sat, 21 May 2016 19:12:34 -0400 Subject: [PATCH] move refresh queue array object into view-system atom also means the size is now configured via init! and not an environment variable --- src/views/core.clj | 110 +++++++++++++++++++++++---------------------- 1 file changed, 57 insertions(+), 53 deletions(-) diff --git a/src/views/core.clj b/src/views/core.clj index 07983f0..dc8947b 100644 --- a/src/views/core.clj +++ b/src/views/core.clj @@ -4,21 +4,23 @@ (:require [views.protocols :refer [IView id data relevant?]] [plumbing.core :refer [swap-pair!]] - [clojure.tools.logging :refer [info debug error trace]] - [environ.core :refer [env]])) + [clojure.tools.logging :refer [info debug error trace]])) ;; The view-system data structure has this shape: ;; -;; {:views {:id1 view1, id2 view2, ...} -;; :send-fn (fn [subscriber-key data] ...) -;; :put-hints-fn (fn [hints] ... ) -;; :auth-fn (fn [view-sig subscriber-key context] ...) -;; :namespace-fn (fn [view-sig subscriber-key context] ...) +;; { ;; -;; :hashes {view-sig hash, ...} -;; :subscribed {subscriber-key #{view-sig, ...}} -;; :subscribers {view-sig #{subscriber-key, ...}} -;; :hints #{hint1 hint2 ...} +;; :refresh-queue (ArrayBlockingQueue.) +;; :views {:id1 view1, id2 view2, ...} +;; :send-fn (fn [subscriber-key data] ...) +;; :put-hints-fn (fn [hints] ... ) +;; :auth-fn (fn [view-sig subscriber-key context] ...) +;; :namespace-fn (fn [view-sig subscriber-key context] ...) +;; +;; :hashes {view-sig hash, ...} +;; :subscribed {subscriber-key #{view-sig, ...}} +;; :subscribers {view-sig #{subscriber-key, ...}} +;; :hints #{hint1 hint2 ...} ;; ;; } ;; @@ -28,10 +30,6 @@ (defonce statistics (atom {})) -(def refresh-queue-size - (if-let [n (:views-refresh-queue-size env)] - (Long/parseLong n) - 1000)) (defn reset-stats! @@ -45,8 +43,6 @@ [] (boolean (:logger @statistics))) -(def refresh-queue (ArrayBlockingQueue. refresh-queue-size)) - (defn ->view-sig [namespace view-id parameters] {:namespace namespace @@ -178,17 +174,18 @@ only if the provided collection of hints is relevant to that view." [hints {:keys [namespace view-id parameters] :as view-sig}] (let [v (get-in @view-system [:views view-id])] - (try - (if (relevant? v namespace parameters hints) - (if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig) - (when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig) - (when (collect-stats?) (swap! statistics update-in [:dropped] inc)) - (error "refresh-queue full, dropping refresh request for" view-sig)) - (do - (when (collect-stats?) (swap! statistics update-in [:deduplicated] inc)) - (trace "already queued for refresh" view-sig)))) - (catch Exception e (error "error determining if view is relevant, view-id:" - view-id "e:" e))))) + (if-let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)] + (try + (if (relevant? v namespace parameters hints) + (if-not (.contains refresh-queue view-sig) + (when-not (.offer refresh-queue view-sig) + (when (collect-stats?) (swap! statistics update-in [:dropped] inc)) + (error "refresh-queue full, dropping refresh request for" view-sig)) + (do + (when (collect-stats?) (swap! statistics update-in [:deduplicated] inc)) + (trace "already queued for refresh" view-sig)))) + (catch Exception e (error "error determining if view is relevant, view-id:" + view-id "e:" e)))))) (defn subscribed-views "Returns a list of all views in the system that have subscribers." @@ -231,26 +228,27 @@ refresh requests and when there is one, handles it by running the view, getting the view data and then sending it out to all the view's subscribers. " [] - (fn [] - (try - (when-let [{:keys [namespace view-id parameters] :as view-sig} (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)] - (trace "worker running refresh for" view-sig) - (if (collect-stats?) (swap! statistics update-in [:refreshes] inc)) - (try - (let [view (get-in @view-system [:views view-id]) - vdata (data view namespace parameters) - hdata (hash vdata)] - (when-not (= hdata (get-in @view-system [:hashes view-sig])) - (doseq [subscriber-key (get-in @view-system [:subscribers view-sig])] - (send-view-data! subscriber-key view-sig vdata)) - (swap! view-system assoc-in [:hashes view-sig] hdata))) - (catch Exception e - (error "error refreshing:" namespace view-id parameters - "e:" e "msg:" (.getMessage e))))) - (catch InterruptedException e)) - (if-not (:stop-workers? @view-system) - (recur) - (trace "exiting worker thread")))) + (let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)] + (fn [] + (try + (when-let [{:keys [namespace view-id parameters] :as view-sig} (.poll refresh-queue 60 TimeUnit/SECONDS)] + (trace "worker running refresh for" view-sig) + (if (collect-stats?) (swap! statistics update-in [:refreshes] inc)) + (try + (let [view (get-in @view-system [:views view-id]) + vdata (data view namespace parameters) + hdata (hash vdata)] + (when-not (= hdata (get-in @view-system [:hashes view-sig])) + (doseq [subscriber-key (get-in @view-system [:subscribers view-sig])] + (send-view-data! subscriber-key view-sig vdata)) + (swap! view-system assoc-in [:hashes view-sig] hdata))) + (catch Exception e + (error "error refreshing:" namespace view-id parameters + "e:" e "msg:" (.getMessage e))))) + (catch InterruptedException e)) + (if-not (:stop-workers? @view-system) + (recur) + (trace "exiting worker thread"))))) (defn refresh-watcher-thread "Returns a refresh watcher thread function. A 'refresh watcher' continually attempts @@ -381,6 +379,11 @@ (def default-options "Default options used to initialize the views system via init!" { + ; the size of the queue used to hold view refresh requests for + ; the worker threads. for very heavy systems, this can be set + ; higher if you start to get warnings about dropped refresh requests + :refresh-queue-size 1000 + ; interval in milliseconds at which the refresh watcher thread will ; check for any queued up hints and dispatch relevant view refresh ; updates to the worker threads. @@ -426,11 +429,12 @@ (let [options (merge default-options options)] (trace "initializing views system using options:" options) (reset! view-system - {:views (into {} (get-views-map views)) - :send-fn send-fn - :put-hints-fn (:put-hints-fn options) - :auth-fn (:auth-fn options) - :namespace-fn (:namespace-fn options)}) + {:refresh-queue (ArrayBlockingQueue. (:refresh-queue-size options)) + :views (into {} (get-views-map views)) + :send-fn send-fn + :put-hints-fn (:put-hints-fn options) + :auth-fn (:auth-fn options) + :namespace-fn (:namespace-fn options)}) (start-update-watcher! (:refresh-interval options) (:worker-threads options)) (when-let [stats-log-interval (:stats-log-interval options)]