go back to non single/global view-system state atom

after thinking about it some more, this way definitely does make
more logical sense to me when it comes to integrating it into a
"Reloaded" type of setup via mount/component. even though i will
almost certainly never use more then 1 simultaneous view-system in any
given project.
This commit is contained in:
Gered 2016-05-29 17:33:47 -04:00
parent f857edd0d6
commit 280d91b12b

View file

@ -1,6 +1,7 @@
(ns views.core
(:import
[java.util.concurrent ArrayBlockingQueue TimeUnit])
[java.util.concurrent ArrayBlockingQueue TimeUnit]
(clojure.lang Atom))
(:require
[views.protocols :refer [IView id data relevant?]]
[plumbing.core :refer [swap-pair!]]
@ -26,20 +27,17 @@
;;
;; Each hint has the form {:namespace x :hint y}
(defonce view-system (atom {}))
(defn reset-stats!
[]
[^Atom view-system]
(swap! view-system update-in [:statistics] assoc
:refreshes 0
:dropped 0
:deduplicated 0))
(defn collect-stats?
[]
[^Atom view-system]
(boolean (get-in @view-system [:statistics :logger])))
(defn ->view-sig
@ -52,27 +50,27 @@
:parameters parameters}))
(defn- send-view-data!
[subscriber-key {:keys [namespace view-id parameters] :as view-sig} data]
(if-let [send-fn (:send-fn @view-system)]
[view-system subscriber-key {:keys [namespace view-id parameters] :as view-sig} data]
(if-let [send-fn (:send-fn view-system)]
(send-fn subscriber-key [(dissoc view-sig :namespace) data])
(throw (new Exception "no send-fn function set in view-system"))))
(defn- authorized-subscription?
[view-sig subscriber-key context]
(if-let [auth-fn (:auth-fn @view-system)]
[view-system view-sig subscriber-key context]
(if-let [auth-fn (:auth-fn view-system)]
(auth-fn view-sig subscriber-key context)
; assume that if no auth-fn is specified, that we are not doing auth checks at all
; so do not disallow access to any subscription
true))
(defn- on-unauthorized-subscription
[view-sig subscriber-key context]
(if-let [on-unauth-fn (:on-unauth-fn @view-system)]
[view-system view-sig subscriber-key context]
(if-let [on-unauth-fn (:on-unauth-fn view-system)]
(on-unauth-fn view-sig subscriber-key context)))
(defn- get-namespace
[view-sig subscriber-key context]
(if-let [namespace-fn (:namespace-fn @view-system)]
[view-system view-sig subscriber-key context]
(if-let [namespace-fn (:namespace-fn view-system)]
(namespace-fn view-sig subscriber-key context)
(:namespace view-sig)))
@ -94,13 +92,13 @@
passed to the view-system's namespace-fn and auth-fn (if provided). If
the subscription is successful, the subscriber will be sent the initial
data for the view."
[{:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
(if-let [view (get-in @view-system [:views view-id])]
(let [namespace (if (contains? view-sig :namespace)
namespace
(get-namespace view-sig subscriber-key context))
(get-namespace @view-system view-sig subscriber-key context))
view-sig (->view-sig namespace view-id parameters)]
(if (authorized-subscription? view-sig subscriber-key context)
(if (authorized-subscription? @view-system view-sig subscriber-key context)
(do
(swap! view-system subscribe-view! view-sig subscriber-key)
(future
@ -111,12 +109,12 @@
;; an unsubscription event came in while computing the view.
(when (contains? (get-in @view-system [:subscribed subscriber-key]) view-sig)
(swap! view-system update-hash! view-sig data-hash)
(send-view-data! subscriber-key view-sig vdata)))
(send-view-data! @view-system subscriber-key view-sig vdata)))
(catch Exception e
(error e "error subscribing to view" view-sig)))))
(do
(trace "subscription not authorized" view-sig subscriber-key context)
(on-unauthorized-subscription view-sig subscriber-key context)
(on-unauthorized-subscription @view-system view-sig subscriber-key context)
nil)))
(throw (new Exception (str "Subscription for non-existant view: " view-id)))))
@ -159,15 +157,15 @@
"Removes a subscription to a view identified by view-sig for a subscriber
identified by subscriber-key. Additional context info can be passed in,
which will be passed to the view-system's namespace-fn (if provided)."
[{:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
(trace "unsubscribing from view" view-sig subscriber-key)
(swap! view-system
(fn [vs]
(fn [view-system]
(let [namespace (if (contains? view-sig :namespace)
namespace
(get-namespace view-sig subscriber-key context))
(get-namespace view-system view-sig subscriber-key context))
view-sig (->view-sig namespace view-id parameters)]
(-> vs
(-> view-system
(remove-from-subscribed view-sig subscriber-key)
(remove-from-subscribers view-sig subscriber-key)
(clean-up-unneeded-hashes view-sig))))))
@ -175,50 +173,49 @@
(defn unsubscribe-all!
"Removes all of a subscriber's (identified by subscriber-key) current
view subscriptions."
[subscriber-key]
[^Atom view-system subscriber-key]
(trace "unsubscribing from all views" subscriber-key)
(swap! view-system
(fn [vs]
(let [view-sigs (get-in vs [:subscribed subscriber-key])
vs* (update-in vs [:subscribed] dissoc subscriber-key)]
(fn [view-system]
(let [view-sigs (get-in view-system [:subscribed subscriber-key])
view-system* (update-in view-system [:subscribed] dissoc subscriber-key)]
(reduce
#(-> %1
(remove-from-subscribers %2 subscriber-key)
(clean-up-unneeded-hashes %2))
vs*
view-system*
view-sigs)))))
(defn refresh-view!
"Schedules a view (identified by view-sig) to be refreshed by one of the worker threads
only if the provided collection of hints is relevant to that view."
[hints {:keys [namespace view-id parameters] :as view-sig}]
[^Atom view-system hints {:keys [namespace view-id parameters] :as view-sig}]
(let [v (get-in @view-system [:views view-id])]
(if-let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
(try
(if (relevant? v namespace parameters hints)
(if-not (.contains refresh-queue view-sig)
(when-not (.offer refresh-queue view-sig)
(when (collect-stats?) (swap! view-system update-in [:statistics :dropped] inc))
(if (collect-stats? view-system) (swap! view-system update-in [:statistics :dropped] inc))
(error "refresh-queue full, dropping refresh request for" view-sig))
(do
(when (collect-stats?) (swap! view-system update-in [:statistics :deduplicated] inc))
(if (collect-stats? view-system) (swap! view-system update-in [:statistics :deduplicated] inc))
(trace "already queued for refresh" view-sig))))
(catch Exception e
(error e "error determining if view is relevant" view-sig))))))
(defn subscribed-views
"Returns a list of all views in the system that have subscribers."
[]
[^Atom view-system]
(reduce into #{} (vals (:subscribed @view-system))))
(defn active-view-count
"Returns a count of views with at least one subscriber."
[]
[^Atom view-system]
(count (remove #(empty? (val %)) (:subscribers @view-system))))
(defn pop-hints!
"Return hints and clear hint set atomicly."
[]
(defn- pop-hints!
[^Atom view-system]
(let [p (swap-pair! view-system assoc :hints #{})]
(or (:hints (first p)) #{})))
@ -226,32 +223,32 @@
"Given a collection of hints, check all views in the system to find any that need refreshing
and schedule refreshes for them. If no hints are provided, will use any that have been
queued up in the view-system."
([hints]
([^Atom view-system hints]
(when (seq hints)
(trace "refresh hints:" hints)
(mapv #(refresh-view! hints %) (subscribed-views)))
(mapv #(refresh-view! view-system hints %) (subscribed-views view-system)))
(swap! view-system assoc :last-update (System/currentTimeMillis)))
([]
(refresh-views! (pop-hints!))))
([^Atom view-system]
(refresh-views! view-system (pop-hints! view-system))))
(defn can-refresh?
(defn- can-refresh?
[last-update min-refresh-interval]
(> (- (System/currentTimeMillis) last-update) min-refresh-interval))
(defn wait
(defn- wait
[last-update min-refresh-interval]
(Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update)))))
(defn do-view-refresh!
[{:keys [namespace view-id parameters] :as view-sig}]
(if (collect-stats?) (swap! view-system update-in [:statistics :refreshes] inc))
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig}]
(if (collect-stats? view-system) (swap! view-system update-in [:statistics :refreshes] inc))
(try
(let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [subscriber-key (get-in @view-system [:subscribers view-sig])]
(send-view-data! subscriber-key view-sig vdata))
(send-view-data! @view-system subscriber-key view-sig vdata))
(swap! view-system assoc-in [:hashes view-sig] hdata)))
(catch Exception e
(error e "error refreshing:" namespace view-id parameters))))
@ -260,13 +257,13 @@
"Returns a refresh worker thread function. A 'refresh worker' continually waits for
refresh requests and when there is one, handles it by running the view, getting the view
data and then sending it out to all the view's subscribers. "
[]
[^Atom view-system]
(let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
(fn []
(try
(when-let [view-sig (.poll refresh-queue 60 TimeUnit/SECONDS)]
(trace "worker running refresh for" view-sig)
(do-view-refresh! view-sig))
(do-view-refresh! view-system view-sig))
(catch InterruptedException e))
(if-not (:stop-workers? @view-system)
(recur)
@ -277,12 +274,12 @@
to schedule refreshes for any views in the system which are 'dirty' (a dirty view in
this case is one when there is a hint waiting in the view-system that is relevant
to the view)."
[min-refresh-interval]
[^Atom view-system min-refresh-interval]
(fn []
(let [last-update (:last-update @view-system)]
(try
(if (can-refresh? last-update min-refresh-interval)
(refresh-views!)
(refresh-views! view-system)
(wait last-update min-refresh-interval))
(catch InterruptedException e)
(catch Exception e
@ -294,13 +291,13 @@
(defn start-update-watcher!
"Starts threads for the views refresh watcher and worker threads that handle
view refresh requests."
[min-refresh-interval threads]
[^Atom view-system min-refresh-interval threads]
(trace "starting refresh watcher at" min-refresh-interval "ms interval and" threads "workers")
(if (and (:refresh-watcher @view-system)
(:workers @view-system))
(error "cannot start new watcher and worker threads until existing threads are stopped")
(let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread min-refresh-interval))
worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread)))
(let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread view-system min-refresh-interval))
worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread view-system)))
(range threads))]
(swap! view-system assoc
:last-update 0
@ -314,7 +311,7 @@
(defn stop-update-watcher!
"Stops threads for the views refresh watcher and worker threads."
[& [wait-for-threads?]]
[^Atom view-system & [wait-for-threads?]]
(trace "stopping refresh watcher and workers")
(let [worker-threads (:workers @view-system)
watcher-thread (:refresh-watcher @view-system)
@ -336,14 +333,14 @@
(defn logger-thread
"Returns a logger thread function. A logger periodically writes view system
statistics to the log that are collected only when logging is enabled."
[msecs]
[^Atom view-system msecs]
(let [secs (/ msecs 1000)]
(fn []
(try
(Thread/sleep msecs)
(let [stats (:statistics @view-system)]
(reset-stats!)
(info "subscribed views:" (active-view-count)
(reset-stats! view-system)
(info "subscribed views:" (active-view-count view-system)
(format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs)))
(format "dropped/sec: %.1f" (double (/ (:dropped stats) secs)))
(format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs)))))
@ -354,20 +351,20 @@
(defn start-logger!
"Starts a logger thread that will enable collection of view statistics
which the logger will periodically write out to the log."
[log-interval]
[^Atom view-system log-interval]
(trace "starting logger. logging at" log-interval "secs intervals")
(if (get-in @view-system [:statistics :logger])
(error "cannot start new logger thread until existing thread is stopped")
(let [logger (Thread. ^Runnable (logger-thread log-interval))]
(let [logger (Thread. ^Runnable (logger-thread view-system log-interval))]
(swap! view-system update-in [:statistics] assoc
:logger logger
:stop? false)
(reset-stats!)
(reset-stats! view-system)
(.start logger))))
(defn stop-logger!
"Stops the logger thread."
[& [wait-for-thread?]]
[^Atom view-system & [wait-for-thread?]]
(trace "stopping logger")
(let [^Thread logger-thread (get-in @view-system [:statistics :logger])]
(swap! view-system assoc-in [:statistics :stop?] true)
@ -384,15 +381,15 @@
"Queues up hints in the view system so that they will be picked up by the refresh
watcher and dispatched to the workers resulting in view updates being sent out
for the relevant views/subscribers."
[hints]
[^Atom view-system hints]
(trace "queueing hints" hints)
(swap! view-system update-in [:hints] (fnil into #{}) hints))
(defn put-hints!
"Adds a collection of hints to the view system by using the view system
configuration's :put-hints-fn."
[hints]
((:put-hints-fn @view-system) hints))
[^Atom view-system hints]
((:put-hints-fn @view-system) view-system hints))
(defn- get-views-map
[views]
@ -400,32 +397,47 @@
(defn add-views!
"Add a collection of views to the system."
[views]
[^Atom view-system views]
(swap! view-system update-in [:views] (fnil into {}) (get-views-map views)))
(def default-options
"Default options used to initialize the views system via init!"
{
; *REQUIRED*
; a function that is used to send view refresh data to subscribers.
; this function must be set for normal operation of the views system.
; (fn [subscriber-key [view-sig view-data]] ...)
:send-fn nil
; *REQUIRED*
; a function that adds hints to the view system. this function will be used
; by other libraries that implement IView. this function must be set for
; normal operation of the views system. the default function provided
; will trigger relevant view refreshes immediately.
; (fn [^Atom view-system hints] ... )
:put-hints-fn (fn [^Atom view-system hints] (refresh-views! view-system hints))
; *REQUIRED*
; the size of the queue used to hold view refresh requests for
; the worker threads. for very heavy systems, this can be set
; higher if you start to get warnings about dropped refresh requests
:refresh-queue-size 1000
; *REQUIRED*
; interval in milliseconds at which the refresh watcher thread will
; check for any queued up hints and dispatch relevant view refresh
; updates to the worker threads.
:refresh-interval 1000
; *REQUIRED*
; the number of refresh worker threads that poll for view refresh
; requests and dispatch updated view data to subscribers.
:worker-threads 8
; a function that adds hints to the view system. this function will be used
; by other libraries that implement IView. this function must be set for
; normal operation of the views system. the default function provided
; will trigger relevant view refreshes immediately.
; (fn [hints] ... )
:put-hints-fn (fn [hints] (refresh-views! hints))
; a list of IView instances. these are the views that can be subscribed
; to. views can also be added/replaced after system initialization through
; the use of add-views!
:views nil
; a function that authorizes view subscriptions. should return true if the
; subscription is authorized. if not set, no view subscriptions will require
@ -449,40 +461,44 @@
(defn init!
"Initializes the view system for use with the list of views provided.
send-fn is a function that sends view refresh data to subscribers. it is
of the form: (fn [subscriber-key [view-sig view-data]] ... )
An existing atom that will be used to store the state of the views
system can be provided, otherwise one will be created. Either way,
the atom with the initialized view system is returned.
options is a map of options to configure the view system with. See
views.core/default-options for a description of the available options
and the defaults that will be used for any options not provided in
the call to init!."
[views send-fn & [options]]
(let [options (merge default-options options)]
(trace "initializing views system using options:" options)
(reset! view-system
{:refresh-queue (ArrayBlockingQueue. (:refresh-queue-size options))
:views (into {} (get-views-map views))
:send-fn send-fn
:put-hints-fn (:put-hints-fn options)
:auth-fn (:auth-fn options)
:on-unauth-fn (:on-unauth-fn options)
:namespace-fn (:namespace-fn options)
; keeping a copy of the options used during init allows other libraries
; that plugin/extend views functionality (e.g. IView implementations)
; to make use of any options themselves
:options options})
(start-update-watcher! (:refresh-interval options)
(:worker-threads options))
(when-let [stats-log-interval (:stats-log-interval options)]
(swap! view-system assoc :logging? true)
(start-logger! stats-log-interval))))
([^Atom view-system options]
(let [options (merge default-options options)]
(trace "initializing views system using options:" options)
(reset! view-system
{:refresh-queue (ArrayBlockingQueue. (:refresh-queue-size options))
:views (into {} (get-views-map (:views options)))
:send-fn (:send-fn options)
:put-hints-fn (:put-hints-fn options)
:auth-fn (:auth-fn options)
:on-unauth-fn (:on-unauth-fn options)
:namespace-fn (:namespace-fn options)
; keeping a copy of the options used during init allows other libraries
; that plugin/extend views functionality (e.g. IView implementations)
; to make use of any options themselves
:options options})
(start-update-watcher! view-system (:refresh-interval options) (:worker-threads options))
(when-let [stats-log-interval (:stats-log-interval options)]
(swap! view-system assoc :logging? true)
(start-logger! view-system stats-log-interval))
view-system))
([options]
(init! (atom {}) options)))
(defn shutdown!
"Shuts the view system down, terminating all worker threads and clearing
all view subscriptions and data."
[& [wait-for-threads?]]
[^Atom view-system & [wait-for-threads?]]
(trace "shutting down views sytem")
(stop-update-watcher! wait-for-threads?)
(stop-update-watcher! view-system wait-for-threads?)
(if (:logging? @view-system)
(stop-logger! wait-for-threads?))
(reset! view-system {}))
(stop-logger! view-system wait-for-threads?))
(reset! view-system {})
view-system)