diff --git a/src/views/core.clj b/src/views/core.clj index 491e10a..8d4d114 100644 --- a/src/views/core.clj +++ b/src/views/core.clj @@ -136,37 +136,77 @@ [last-update min-refresh-interval] (Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update))))) -(defn worker-thread +(defn refresh-worker-thread "Handles refresh requests." [view-system] (fn [] - (when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)] - (when (collect-stats?) (swap! statistics update-in [:refreshes] inc)) - (try - (let [view (get-in @view-system [:views view-id]) - vdata (data view namespace parameters) - hdata (hash vdata)] - (when-not (= hdata (get-in @view-system [:hashes view-sig])) - (doseq [s (get-in @view-system [:subscribers view-sig])] - ((:send-fn @view-system) s [[view-id parameters] vdata])) - (swap! view-system assoc-in [:hashes view-sig] hdata))) - (catch Exception e - (error "error refreshing:" namespace view-id parameters - "e:" e "msg:" (.getMessage e))))) - (recur))) + (try + (when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)] + (when (collect-stats?) (swap! statistics update-in [:refreshes] inc)) + (try + (let [view (get-in @view-system [:views view-id]) + vdata (data view namespace parameters) + hdata (hash vdata)] + (when-not (= hdata (get-in @view-system [:hashes view-sig])) + (doseq [s (get-in @view-system [:subscribers view-sig])] + ((:send-fn @view-system) s [[view-id parameters] vdata])) + (swap! view-system assoc-in [:hashes view-sig] hdata))) + (catch Exception e + (error "error refreshing:" namespace view-id parameters + "e:" e "msg:" (.getMessage e))))) + (catch InterruptedException e)) + (if-not (:stop-workers? @view-system) + (recur) + (debug "exiting worker thread")))) -(defn update-watcher! - "A single threaded view update mechanism." +(defn refresh-watcher-thread + [view-system min-refresh-interval] + (fn [] + (let [last-update (:last-update @view-system)] + (try + (if (can-refresh? last-update min-refresh-interval) + (refresh-views! view-system) + (wait last-update min-refresh-interval)) + (catch InterruptedException e) + (catch Exception e + (error "exception in views e:" e "msg:" (.getMessage e)))) + (if-not (:stop-refresh-watcher? @view-system) + (recur) + (debug "exiting refresh watcher thread"))))) + +(defn start-update-watcher! + "Starts threads for the views refresh watcher and worker threads that handle + view refresh requests." [view-system min-refresh-interval threads] - (swap! view-system assoc :last-update 0) - (.start (Thread. (fn [] (let [last-update (:last-update @view-system)] - (try - (if (can-refresh? last-update min-refresh-interval) - (refresh-views! view-system) - (wait last-update min-refresh-interval)) - (catch Exception e (error "exception in views e:" e "msg:"(.getMessage e)))) - (recur))))) - (dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system))))) + (if (and (:refresh-watcher @view-system) + (:workers @view-system)) + (error "cannot start new watcher and worker threads until existing threads are stopped") + (let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread view-system min-refresh-interval)) + worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread view-system))) + (range threads))] + (swap! view-system assoc + :last-update 0 + :refresh-watcher refresh-watcher + :stop-refresh-watcher? false + :workers worker-threads + :stop-workers? false) + (.start refresh-watcher) + (doseq [^Thread t worker-threads] + (.start t))))) + +(defn stop-update-watcher! + "Stops threads for the views refresh watcher and worker threads." + [view-system] + (swap! view-system assoc + :stop-refresh-watcher? true + :stop-workers? true) + (if-let [^Thread refresh-watcher (:refresh-watcher @view-system)] + (.interrupt refresh-watcher)) + (doseq [^Thread worker-thread (:workers @view-system)] + (.interrupt worker-thread)) + (swap! view-system assoc + :refresh-watcher nil + :workers nil)) (defn log-statistics! "Run a thread that logs statistics every msecs."