make refresh watcher/worker threads stoppable (and restartable)

This commit is contained in:
Gered 2016-05-18 17:40:07 -04:00
parent 5fdc6334bb
commit f9c15d6cd6

View file

@ -136,37 +136,77 @@
[last-update min-refresh-interval] [last-update min-refresh-interval]
(Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update))))) (Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update)))))
(defn worker-thread (defn refresh-worker-thread
"Handles refresh requests." "Handles refresh requests."
[view-system] [view-system]
(fn [] (fn []
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)] (try
(when (collect-stats?) (swap! statistics update-in [:refreshes] inc)) (when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
(try (when (collect-stats?) (swap! statistics update-in [:refreshes] inc))
(let [view (get-in @view-system [:views view-id]) (try
vdata (data view namespace parameters) (let [view (get-in @view-system [:views view-id])
hdata (hash vdata)] vdata (data view namespace parameters)
(when-not (= hdata (get-in @view-system [:hashes view-sig])) hdata (hash vdata)]
(doseq [s (get-in @view-system [:subscribers view-sig])] (when-not (= hdata (get-in @view-system [:hashes view-sig]))
((:send-fn @view-system) s [[view-id parameters] vdata])) (doseq [s (get-in @view-system [:subscribers view-sig])]
(swap! view-system assoc-in [:hashes view-sig] hdata))) ((:send-fn @view-system) s [[view-id parameters] vdata]))
(catch Exception e (swap! view-system assoc-in [:hashes view-sig] hdata)))
(error "error refreshing:" namespace view-id parameters (catch Exception e
"e:" e "msg:" (.getMessage e))))) (error "error refreshing:" namespace view-id parameters
(recur))) "e:" e "msg:" (.getMessage e)))))
(catch InterruptedException e))
(if-not (:stop-workers? @view-system)
(recur)
(debug "exiting worker thread"))))
(defn update-watcher! (defn refresh-watcher-thread
"A single threaded view update mechanism." [view-system min-refresh-interval]
(fn []
(let [last-update (:last-update @view-system)]
(try
(if (can-refresh? last-update min-refresh-interval)
(refresh-views! view-system)
(wait last-update min-refresh-interval))
(catch InterruptedException e)
(catch Exception e
(error "exception in views e:" e "msg:" (.getMessage e))))
(if-not (:stop-refresh-watcher? @view-system)
(recur)
(debug "exiting refresh watcher thread")))))
(defn start-update-watcher!
"Starts threads for the views refresh watcher and worker threads that handle
view refresh requests."
[view-system min-refresh-interval threads] [view-system min-refresh-interval threads]
(swap! view-system assoc :last-update 0) (if (and (:refresh-watcher @view-system)
(.start (Thread. (fn [] (let [last-update (:last-update @view-system)] (:workers @view-system))
(try (error "cannot start new watcher and worker threads until existing threads are stopped")
(if (can-refresh? last-update min-refresh-interval) (let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread view-system min-refresh-interval))
(refresh-views! view-system) worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread view-system)))
(wait last-update min-refresh-interval)) (range threads))]
(catch Exception e (error "exception in views e:" e "msg:"(.getMessage e)))) (swap! view-system assoc
(recur))))) :last-update 0
(dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system))))) :refresh-watcher refresh-watcher
:stop-refresh-watcher? false
:workers worker-threads
:stop-workers? false)
(.start refresh-watcher)
(doseq [^Thread t worker-threads]
(.start t)))))
(defn stop-update-watcher!
"Stops threads for the views refresh watcher and worker threads."
[view-system]
(swap! view-system assoc
:stop-refresh-watcher? true
:stop-workers? true)
(if-let [^Thread refresh-watcher (:refresh-watcher @view-system)]
(.interrupt refresh-watcher))
(doseq [^Thread worker-thread (:workers @view-system)]
(.interrupt worker-thread))
(swap! view-system assoc
:refresh-watcher nil
:workers nil))
(defn log-statistics! (defn log-statistics!
"Run a thread that logs statistics every msecs." "Run a thread that logs statistics every msecs."