Added statistics collection.
This commit is contained in:
parent
87b3837346
commit
d1b3ce776b
|
@ -1,4 +1,4 @@
|
||||||
(defproject views "1.2.1"
|
(defproject views "1.3.0-SNAPSHOT"
|
||||||
:description "A view to the past helps navigate the future."
|
:description "A view to the past helps navigate the future."
|
||||||
|
|
||||||
:url "https://github.com/diligenceengine/views"
|
:url "https://github.com/diligenceengine/views"
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
(:require
|
(:require
|
||||||
[views.protocols :refer [IView id data relevant?]]
|
[views.protocols :refer [IView id data relevant?]]
|
||||||
[plumbing.core :refer [swap-pair!]]
|
[plumbing.core :refer [swap-pair!]]
|
||||||
[clojure.tools.logging :refer [debug error]]
|
[clojure.tools.logging :refer [info debug error]]
|
||||||
[environ.core :refer [env]]))
|
[environ.core :refer [env]]))
|
||||||
|
|
||||||
;; The view-system data structure has this shape:
|
;; The view-system data structure has this shape:
|
||||||
|
@ -26,6 +26,17 @@
|
||||||
(Long/parseLong n)
|
(Long/parseLong n)
|
||||||
1000))
|
1000))
|
||||||
|
|
||||||
|
(def statistics (atom {}))
|
||||||
|
|
||||||
|
(defn reset-stats!
|
||||||
|
[]
|
||||||
|
(swap! statistics (fn [s] {:enabled (boolean (:enabled s)), :refreshes 0, :dropped 0, :deduplicated 0})))
|
||||||
|
|
||||||
|
(defn collect-stats? [] (:enabled @statistics))
|
||||||
|
|
||||||
|
(reset-stats!)
|
||||||
|
|
||||||
|
|
||||||
(def refresh-queue (ArrayBlockingQueue. refresh-queue-size))
|
(def refresh-queue (ArrayBlockingQueue. refresh-queue-size))
|
||||||
|
|
||||||
(defn subscribe-view!
|
(defn subscribe-view!
|
||||||
|
@ -71,13 +82,21 @@
|
||||||
(if (relevant? v namespace parameters hints)
|
(if (relevant? v namespace parameters hints)
|
||||||
(if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig)
|
(if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig)
|
||||||
(when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig)
|
(when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig)
|
||||||
|
(when (collect-stats?) (swap! statistics update-in [:dropped] inc))
|
||||||
(error "refresh-queue full, dropping refresh request for" view-sig))
|
(error "refresh-queue full, dropping refresh request for" view-sig))
|
||||||
(debug "already queued for refresh" view-sig)))))
|
(do
|
||||||
|
(when (collect-stats?) (swap! statistics update-in [:deduplicated] inc))
|
||||||
|
(debug "already queued for refresh" view-sig))))))
|
||||||
|
|
||||||
(defn subscribed-views
|
(defn subscribed-views
|
||||||
[view-system]
|
[view-system]
|
||||||
(reduce into #{} (vals (:subscribed view-system))))
|
(reduce into #{} (vals (:subscribed view-system))))
|
||||||
|
|
||||||
|
(defn active-view-count
|
||||||
|
"Returns a count of views with at least one subscriber."
|
||||||
|
[view-system]
|
||||||
|
(count (remove #(empty? (val %)) (:subscribers view-system))))
|
||||||
|
|
||||||
(defn pop-hints!
|
(defn pop-hints!
|
||||||
"Return hints and clear hint set atomicly."
|
"Return hints and clear hint set atomicly."
|
||||||
[view-system]
|
[view-system]
|
||||||
|
@ -106,6 +125,7 @@
|
||||||
[view-system]
|
[view-system]
|
||||||
(fn []
|
(fn []
|
||||||
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
|
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
|
||||||
|
(when (collect-stats?) (swap! statistics update-in [:refreshes] inc))
|
||||||
(try
|
(try
|
||||||
(let [view (get-in @view-system [:views view-id])
|
(let [view (get-in @view-system [:views view-id])
|
||||||
vdata (data view namespace parameters)
|
vdata (data view namespace parameters)
|
||||||
|
@ -132,6 +152,20 @@
|
||||||
(recur)))))
|
(recur)))))
|
||||||
(dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system)))))
|
(dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system)))))
|
||||||
|
|
||||||
|
(defn log-statistics!
|
||||||
|
"Run a thread that logs statistics every msecs."
|
||||||
|
[view-system msecs]
|
||||||
|
(swap! statistics assoc-in [:enabled] true)
|
||||||
|
(let [secs (/ msecs 1000)]
|
||||||
|
(.start (Thread. (fn []
|
||||||
|
(Thread/sleep msecs)
|
||||||
|
(let [stats @statistics]
|
||||||
|
(reset-stats!)
|
||||||
|
(info "subscribed views:" (active-view-count @view-system)
|
||||||
|
(format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs)))
|
||||||
|
(format "dropped/sec: %.1f" (double (/ (:dropped stats) secs)))
|
||||||
|
(format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs))))
|
||||||
|
(recur)))))))
|
||||||
|
|
||||||
(defn hint
|
(defn hint
|
||||||
"Create a hint."
|
"Create a hint."
|
||||||
|
|
Loading…
Reference in a new issue