move statistics out of it's own atom into view-system

This commit is contained in:
Gered 2016-05-27 23:17:57 -04:00
parent 24646b1077
commit f857edd0d6
4 changed files with 19 additions and 24 deletions

View file

@ -28,20 +28,19 @@
(defonce view-system (atom {})) (defonce view-system (atom {}))
(defonce statistics (atom {}))
(defn reset-stats! (defn reset-stats!
[] []
(swap! statistics assoc (swap! view-system update-in [:statistics] assoc
:refreshes 0 :refreshes 0
:dropped 0 :dropped 0
:deduplicated 0)) :deduplicated 0))
(defn collect-stats? (defn collect-stats?
[] []
(boolean (:logger @statistics))) (boolean (get-in @view-system [:statistics :logger])))
(defn ->view-sig (defn ->view-sig
([namespace view-id parameters] ([namespace view-id parameters]
@ -199,10 +198,10 @@
(if (relevant? v namespace parameters hints) (if (relevant? v namespace parameters hints)
(if-not (.contains refresh-queue view-sig) (if-not (.contains refresh-queue view-sig)
(when-not (.offer refresh-queue view-sig) (when-not (.offer refresh-queue view-sig)
(when (collect-stats?) (swap! statistics update-in [:dropped] inc)) (when (collect-stats?) (swap! view-system update-in [:statistics :dropped] inc))
(error "refresh-queue full, dropping refresh request for" view-sig)) (error "refresh-queue full, dropping refresh request for" view-sig))
(do (do
(when (collect-stats?) (swap! statistics update-in [:deduplicated] inc)) (when (collect-stats?) (swap! view-system update-in [:statistics :deduplicated] inc))
(trace "already queued for refresh" view-sig)))) (trace "already queued for refresh" view-sig))))
(catch Exception e (catch Exception e
(error e "error determining if view is relevant" view-sig)))))) (error e "error determining if view is relevant" view-sig))))))
@ -245,7 +244,7 @@
(defn do-view-refresh! (defn do-view-refresh!
[{:keys [namespace view-id parameters] :as view-sig}] [{:keys [namespace view-id parameters] :as view-sig}]
(if (collect-stats?) (swap! statistics update-in [:refreshes] inc)) (if (collect-stats?) (swap! view-system update-in [:statistics :refreshes] inc))
(try (try
(let [view (get-in @view-system [:views view-id]) (let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters) vdata (data view namespace parameters)
@ -342,14 +341,14 @@
(fn [] (fn []
(try (try
(Thread/sleep msecs) (Thread/sleep msecs)
(let [stats @statistics] (let [stats (:statistics @view-system)]
(reset-stats!) (reset-stats!)
(info "subscribed views:" (active-view-count) (info "subscribed views:" (active-view-count)
(format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs))) (format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs)))
(format "dropped/sec: %.1f" (double (/ (:dropped stats) secs))) (format "dropped/sec: %.1f" (double (/ (:dropped stats) secs)))
(format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs))))) (format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs)))))
(catch InterruptedException e)) (catch InterruptedException e))
(if-not (:stop? @statistics) (if-not (get-in @view-system [:statistics :stop?])
(recur))))) (recur)))))
(defn start-logger! (defn start-logger!
@ -357,10 +356,10 @@
which the logger will periodically write out to the log." which the logger will periodically write out to the log."
[log-interval] [log-interval]
(trace "starting logger. logging at" log-interval "secs intervals") (trace "starting logger. logging at" log-interval "secs intervals")
(if (:logger @statistics) (if (get-in @view-system [:statistics :logger])
(error "cannot start new logger thread until existing thread is stopped") (error "cannot start new logger thread until existing thread is stopped")
(let [logger (Thread. ^Runnable (logger-thread log-interval))] (let [logger (Thread. ^Runnable (logger-thread log-interval))]
(swap! statistics assoc (swap! view-system update-in [:statistics] assoc
:logger logger :logger logger
:stop? false) :stop? false)
(reset-stats!) (reset-stats!)
@ -370,11 +369,11 @@
"Stops the logger thread." "Stops the logger thread."
[& [wait-for-thread?]] [& [wait-for-thread?]]
(trace "stopping logger") (trace "stopping logger")
(let [^Thread logger-thread (:logger @statistics)] (let [^Thread logger-thread (get-in @view-system [:statistics :logger])]
(swap! statistics assoc :stop? true) (swap! view-system assoc-in [:statistics :stop?] true)
(if logger-thread (.interrupt logger-thread)) (if logger-thread (.interrupt logger-thread))
(if wait-for-thread? (.join logger-thread)) (if wait-for-thread? (.join logger-thread))
(swap! statistics assoc :logger nil))) (swap! view-system assoc-in [:statistics :logger] nil)))
(defn hint (defn hint
"Create a hint." "Create a hint."

View file

@ -9,7 +9,6 @@
(defn reset-state-fixture! [f] (defn reset-state-fixture! [f]
(reset! view-system {}) (reset! view-system {})
(reset! statistics {})
(f)) (f))
(use-fixtures :each reset-state-fixture!) (use-fixtures :each reset-state-fixture!)
@ -43,7 +42,6 @@
; 2. shutdown views (and wait for all threads to also finish) ; 2. shutdown views (and wait for all threads to also finish)
(shutdown! true) (shutdown! true)
(is (empty? @view-system)) (is (empty? @view-system))
(is (empty? @statistics))
(is (not (.isAlive ^Thread refresh-watcher))) (is (not (.isAlive ^Thread refresh-watcher)))
(doseq [^Thread t workers] (doseq [^Thread t workers]
(is (not (.isAlive t))))))) (is (not (.isAlive t)))))))
@ -53,14 +51,14 @@
(assoc :stats-log-interval 10000))] (assoc :stats-log-interval 10000))]
; 1. init views ; 1. init views
(init! views dummy-send-fn options) (init! views dummy-send-fn options)
(is (seq @statistics)) (is (seq (:statistics @view-system)))
(is (:logging? @view-system)) (is (:logging? @view-system))
(is (collect-stats?)) (is (collect-stats?))
(let [logger-thread (:logger @statistics)] (let [logger-thread (get-in @view-system [:statistics :logger])]
(is (.isAlive ^Thread logger-thread)) (is (.isAlive ^Thread logger-thread))
; 2. shutdown views ; 2. shutdown views
(shutdown! true) (shutdown! true)
(is (nil? (:logger @statistics))) (is (nil? (get-in @view-system [:statistics :logger])))
(is (not (.isAlive ^Thread logger-thread)))))) (is (not (.isAlive ^Thread logger-thread))))))
(deftest can-add-new-views-after-init (deftest can-add-new-views-after-init

View file

@ -20,7 +20,6 @@
(defn reset-system-fixture [f] (defn reset-system-fixture [f]
(reset! view-system {}) (reset! view-system {})
(reset! statistics {})
(f) (f)
(shutdown! true)) (shutdown! true))
@ -232,13 +231,13 @@
(let [subscribe-result (subscribe! view-sig subscriber-key nil)] (let [subscribe-result (subscribe! view-sig subscriber-key nil)]
; 4. block until subscription finishes ; 4. block until subscription finishes
(while (not (realized? subscribe-result))) (while (not (realized? subscribe-result)))
(is (= 0 (:deduplicated @statistics))) (is (= 0 (get-in @view-system [:statistics :deduplicated])))
; 5. add duplicate hints by changing the same set of data twice ; 5. add duplicate hints by changing the same set of data twice
; (hints will stay in the queue forever because we stopped the worker threads) ; (hints will stay in the queue forever because we stopped the worker threads)
(memory-db-assoc-in! :a [:foo] 6) (memory-db-assoc-in! :a [:foo] 6)
(memory-db-assoc-in! :a [:foo] 7) (memory-db-assoc-in! :a [:foo] 7)
(wait-for-refresh-views) (wait-for-refresh-views)
(is (= 1 (:deduplicated @statistics))) (is (= 1 (get-in @view-system [:statistics :deduplicated])))
(is (= [view-sig] (is (= [view-sig]
(vec (:refresh-queue @view-system))))))) (vec (:refresh-queue @view-system)))))))
@ -265,12 +264,12 @@
; 4. block until subscription finishes ; 4. block until subscription finishes
(while (or (not (realized? subscribe-a)) (while (or (not (realized? subscribe-a))
(not (realized? subscribe-b)))) (not (realized? subscribe-b))))
(is (= 0 (:dropped @statistics))) (is (= 0 (get-in @view-system [:statistics :dropped])))
; 5. change some data affecting the subscribed view, resulting in more then 1 hint ; 5. change some data affecting the subscribed view, resulting in more then 1 hint
; being added to the refresh queue ; being added to the refresh queue
(memory-db-assoc-in! :a [:foo] 101010) (memory-db-assoc-in! :a [:foo] 101010)
(memory-db-assoc-in! :b [:foo] 010101) (memory-db-assoc-in! :b [:foo] 010101)
(wait-for-refresh-views) (wait-for-refresh-views)
(is (= 1 (:dropped @statistics))) (is (= 1 (get-in @view-system [:statistics :dropped])))
(is (= [view-sig-a] (is (= [view-sig-a]
(vec (:refresh-queue @view-system)))))))) (vec (:refresh-queue @view-system))))))))

View file

@ -20,7 +20,6 @@
(defn reset-system-fixture [f] (defn reset-system-fixture [f]
(reset! view-system {}) (reset! view-system {})
(reset! statistics {})
(f) (f)
(shutdown! true)) (shutdown! true))