diff --git a/src/views/core.clj b/src/views/core.clj index 3fcfc3f..07983f0 100644 --- a/src/views/core.clj +++ b/src/views/core.clj @@ -4,7 +4,7 @@ (:require [views.protocols :refer [IView id data relevant?]] [plumbing.core :refer [swap-pair!]] - [clojure.tools.logging :refer [info debug error]] + [clojure.tools.logging :refer [info debug error trace]] [environ.core :refer [env]])) ;; The view-system data structure has this shape: @@ -75,6 +75,7 @@ (defn- subscribe-view! [view-system view-sig subscriber-key] + (trace "subscribing to view" view-sig subscriber-key) (-> view-system (update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig) (update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key))) @@ -109,7 +110,7 @@ (catch Exception e (error "error subscribing:" namespace view-id parameters "e:" e "msg:" (.getMessage e)))))) - (debug "subscription not authorized" view-sig subscriber-key context))))) + (trace "subscription not authorized" view-sig subscriber-key context))))) (defn- remove-from-subscribers [view-system view-sig subscriber-key] @@ -151,6 +152,7 @@ identified by subscriber-key. Additional context info can be passed in, which will be passed to the view-system's namespace-fn (if provided)." [{:keys [namespace view-id parameters] :as view-sig} subscriber-key context] + (trace "unsubscribing from view" view-sig subscriber-key) (swap! view-system (fn [vs] (let [namespace (get-namespace view-sig subscriber-key context) @@ -164,6 +166,7 @@ "Removes all of a subscriber's (identified by subscriber-key) current view subscriptions." [subscriber-key] + (trace "unsubscribing from all views" subscriber-key) (swap! view-system (fn [vs] (let [view-sigs (get-in vs [:subscribed subscriber-key]) @@ -183,7 +186,7 @@ (error "refresh-queue full, dropping refresh request for" view-sig)) (do (when (collect-stats?) (swap! statistics update-in [:deduplicated] inc)) - (debug "already queued for refresh" view-sig)))) + (trace "already queued for refresh" view-sig)))) (catch Exception e (error "error determining if view is relevant, view-id:" view-id "e:" e))))) @@ -209,7 +212,7 @@ queued up in the view-system." ([hints] (when (seq hints) - (debug "refresh hints:" hints) + (trace "refresh hints:" hints) (mapv #(refresh-view! hints %) (subscribed-views))) (swap! view-system assoc :last-update (System/currentTimeMillis))) ([] @@ -231,7 +234,8 @@ (fn [] (try (when-let [{:keys [namespace view-id parameters] :as view-sig} (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)] - (when (collect-stats?) (swap! statistics update-in [:refreshes] inc)) + (trace "worker running refresh for" view-sig) + (if (collect-stats?) (swap! statistics update-in [:refreshes] inc)) (try (let [view (get-in @view-system [:views view-id]) vdata (data view namespace parameters) @@ -246,7 +250,7 @@ (catch InterruptedException e)) (if-not (:stop-workers? @view-system) (recur) - (debug "exiting worker thread")))) + (trace "exiting worker thread")))) (defn refresh-watcher-thread "Returns a refresh watcher thread function. A 'refresh watcher' continually attempts @@ -265,12 +269,13 @@ (error "exception in views e:" e "msg:" (.getMessage e)))) (if-not (:stop-refresh-watcher? @view-system) (recur) - (debug "exiting refresh watcher thread"))))) + (trace "exiting refresh watcher thread"))))) (defn start-update-watcher! "Starts threads for the views refresh watcher and worker threads that handle view refresh requests." [min-refresh-interval threads] + (trace "starting refresh watcher at" min-refresh-interval "ms interval and" threads "workers") (if (and (:refresh-watcher @view-system) (:workers @view-system)) (error "cannot start new watcher and worker threads until existing threads are stopped") @@ -290,6 +295,7 @@ (defn stop-update-watcher! "Stops threads for the views refresh watcher and worker threads." [] + (trace "stopping refresh watcher and workers") (swap! view-system assoc :stop-refresh-watcher? true :stop-workers? true) @@ -323,6 +329,7 @@ "Starts a logger thread that will enable collection of view statistics which the logger will periodically write out to the log." [log-interval] + (trace "starting logger. logging at" log-interval "secs intervals") (if (:logger @statistics) (error "cannot start new logger thread until existing thread is stopped") (let [logger (Thread. ^Runnable (logger-thread log-interval))] @@ -335,6 +342,7 @@ (defn stop-logger! "Stops the logger thread." [] + (trace "stopping logger") (swap! statistics assoc :stop? true) (if-let [^Thread logger (:logger @statistics)] (.interrupt logger)) @@ -350,6 +358,7 @@ watcher and dispatched to the workers resulting in view updates being sent out for the relevant views/subscribers." [hints] + (trace "queueing hints" hints) (swap! view-system update-in [:hints] (fn [existing-hints] (reduce conj (or existing-hints #{}) hints)))) @@ -415,6 +424,7 @@ the call to init!." [views send-fn & [options]] (let [options (merge default-options options)] + (trace "initializing views system using options:" options) (reset! view-system {:views (into {} (get-views-map views)) :send-fn send-fn @@ -431,6 +441,7 @@ "Shuts the view system down, terminating all worker threads and clearing all view subscriptions and data." [] + (trace "shutting down views sytem") (stop-update-watcher!) (if (:logging? @view-system) (stop-logger!))