breaks subscription implementation out into persistor model

This commit is contained in:
Dave Della Costa 2014-06-10 18:40:21 +09:00
parent 99f2ba61a4
commit 6717ad46db
9 changed files with 141 additions and 111 deletions

View file

@ -1,85 +1,64 @@
(ns views.base-subscribed-views (ns views.base-subscribed-views
(:require (:require
[views.db.load :refer [initial-views]] [views.persistor :refer [subscribe-to-view]]
[views.subscribed-views :refer [SubscribedViews subscriber-key-fn namespace-fn]] [views.subscribed-views :refer [SubscribedViews]]
[views.subscriptions :as vs :refer [add-subscriptions! remove-subscription! subscriptions-for]] [views.subscriptions :as vs :refer [add-subscriptions! remove-subscription! subscriptions-for]]
[views.filters :refer [view-filter]]
[clojure.tools.logging :refer [debug info warn error]] [clojure.tools.logging :refer [debug info warn error]]
[clojure.core.async :refer [put! <! go thread]])) [clojure.core.async :refer [put! <! go thread]]))
(defn view-filter (defmacro config
"Takes a subscription request msg, a collection of view-sigs and [{:keys [db schema templates send-fn subscriber-fn namespace-fn unsafe?]}]
the config templates hash-map for an app. Checks if there is `(do (defschema schema db "public")
a global filter-fn in the hash-map metadata and checks against {:db db :schema schema :subscribed-views (BaseSubscribedViews. db templates send-fn subscriber-fn namespace-fn unsafe?)}))
that if it exists, as well as against any existing filter
functions for individual template config entries. Template
config hash-map entries can specify a filter-fn using the key
:filter-fn, and the global filter-fn is the same, only on
the config meta-data (i.e. (with-meta templates {:filter-fn ...}))
By default throws an exception if no filters are present. (defn send-fn*
By passing in {:unsafe true} in opts, this can be overridden." [send-fn address msg]
[msg view-sigs templates & opts] (if send-fn
(let [global-filter-fn (:filter-fn (meta templates))] (send-fn address msg)
(filterv (warn "IMPLEMENT ME. Got message " msg " sent to address " address)))
#(let [filter-fn (:filter-fn (get templates (first %)))]
(cond
(and filter-fn global-filter-fn)
(and (global-filter-fn msg %) (filter-fn msg %))
filter-fn (defn subscriber-key-fn*
(filter-fn msg %) [subscriber-key-fn msg]
(if subscriber-key-fn (subscriber-key-fn msg) (:subscriber-key msg)))
global-filter-fn (defn namespace-fn*
(global-filter-fn msg %) [namespace-fn msg]
(if namespace-fn (namespace-fn msg) vs/default-ns))
:else (deftype BaseSubscribedViews [opts]
(if (-> opts first :unsafe)
(do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %)
true)
(throw (Exception. (str "No filter set for view " %))))))
view-sigs)))
(defn send-message
[this address msg]
(warn "IMPLEMENT ME. Got message " msg " sent to address " address))
(deftype BaseSubscribedViews [db templates send-fn broadcast-fn subscribed-views-fn opts]
SubscribedViews SubscribedViews
(subscribe-views (subscribe-views
[this sub-req] [this {db :db :as msg}]
(let [subscriber-key (subscriber-key-fn this sub-req) (let [{:keys [persistor templates send-fn subscriber-key-fn namespace-fn unsafe?]} opts
view-sigs (view-filter sub-req (:views sub-req) templates opts)] ; this is where security comes in. db (if db db (:db opts))
(info "Subscribing views: " view-sigs " for subscriber " subscriber-key) subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (view-filter msg (:views msg) templates {:unsafe? unsafe?}) ; this is where security comes in. Move?
popts {:templates templates :subscriber-key subscriber-key :namespace namespace}]
(info "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace)
(when (seq view-sigs) (when (seq view-sigs)
(let [subbed-views (if-let [namespace (namespace-fn this sub-req)] ;; (thread
(add-subscriptions! subscriber-key view-sigs templates namespace) (doseq [vs view-sigs]
(add-subscriptions! subscriber-key view-sigs templates))] (send-fn* send-fn subscriber-key (subscribe-to-view persistor db vs popts))))))
(thread
(->> (initial-views db view-sigs templates subbed-views)
((if send-fn send-fn send-message) this subscriber-key)))))))
(unsubscribe-views (unsubscribe-views
[this unsub-req] [this msg]
(let [subscriber-key (subscriber-key-fn this unsub-req) (let [{:keys [subscriber-key-fn namespace-fn]} opts
view-sigs (:views unsub-req)] subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (:views msg)]
(info "Unsubscribing views: " view-sigs " for subscriber " subscriber-key) (info "Unsubscribing views: " view-sigs " for subscriber " subscriber-key)
(if-let [namespace (namespace-fn this unsub-req)] (doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace))))
(doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace))
(doseq [vs view-sigs] (remove-subscription! subscriber-key vs)))))
(disconnect [this disconnect-req] (disconnect [this msg]
(let [subscriber-key (:subscriber-key disconnect-req) (let [{:keys [subscriber-key-fn namespace-fn]} opts
namespace (namespace-fn this disconnect-req) subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (if namespace (subscriptions-for subscriber-key namespace) (subscriptions-for subscriber-key))] view-sigs (if namespace (subscriptions-for subscriber-key namespace) (subscriptions-for subscriber-key))]
(if namespace (doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace))))
(doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace))
(doseq [vs view-sigs] (remove-subscription! subscriber-key vs)))))
(subscriber-key-fn [this msg] (:subscriber-key msg))
(namespace-fn [this msg] nil)
;; DB interaction ;; DB interaction
(subscribed-views [this] @vs/compiled-views) (subscribed-views [this] ) ;; (vs/compiled-views))
(broadcast-deltas [this fdb views-with-deltas])) (broadcast-deltas [this fdb views-with-deltas]))

View file

@ -10,23 +10,18 @@
(j/query db (hsql/format query-map))) (j/query db (hsql/format query-map)))
(defn post-process-result-set (defn post-process-result-set
[nv templates result-set] [view-sig templates result-set]
(if-let [post-fn (get-in templates [(first nv) :post-fn])] (if-let [post-fn (get-in templates [(first view-sig) :post-fn])]
(mapv post-fn result-set) (mapv post-fn result-set)
result-set)) result-set))
(defn initial-views (defn initial-view
"Takes a db spec, the new views sigs (new-views) we want to produce result-sets for, "Takes a db spec, the new views sigs (new-views) we want to produce result-sets for,
the template config map, and the subscribed-views themselves (with compiled view-maps) the template config map, and the view-map itself.
and returns a result-set for the new-views with post-fn functions applied to the data." and returns a result-set for the new-views with post-fn functions applied to the data."
[db new-views templates subscribed-views] [db new-view templates view-map]
(reduce (->> view-map
(fn [results nv] (view-query db)
(->> (get subscribed-views nv) (into [])
:view (post-process-result-set new-view templates)
(view-query db) (hash-map new-view)))
(into [])
(post-process-result-set nv templates)
(assoc results nv)))
{}
new-views))

36
src/views/filters.clj Normal file
View file

@ -0,0 +1,36 @@
(ns views.filters
(:require
[clojure.tools.logging :refer [debug info warn error]]))
(defn view-filter
"Takes a subscription request msg, a collection of view-sigs and
the config templates hash-map for an app. Checks if there is
a global filter-fn in the hash-map metadata and checks against
that if it exists, as well as against any existing filter
functions for individual template config entries. Template
config hash-map entries can specify a filter-fn using the key
:filter-fn, and the global filter-fn is the same, only on
the config meta-data (i.e. (with-meta templates {:filter-fn ...}))
By default throws an exception if no filters are present.
By passing in {:unsafe true} in opts, this can be overridden."
[msg view-sigs templates & opts]
(let [global-filter-fn (:filter-fn (meta templates))]
(filterv
#(let [filter-fn (:filter-fn (get templates (first %)))]
(cond
(and filter-fn global-filter-fn)
(and (global-filter-fn msg %) (filter-fn msg %))
filter-fn
(filter-fn msg %)
global-filter-fn
(global-filter-fn msg %)
:else
(if (-> opts first :unsafe?)
(do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %)
true)
(throw (Exception. (str "No filter set for view " %))))))
view-sigs)))

16
src/views/persistor.clj Normal file
View file

@ -0,0 +1,16 @@
(ns views.persistor
(:require
[clojure.java.jdbc :as j]
[views.subscriptions :refer [add-subscription! compiled-view-for]]
[views.db.load :refer [initial-view]]))
(defprotocol IPersistor
(subscribe-to-view [this db view-sig opts]))
(deftype InMemoryPersistor []
IPersistor
(subscribe-to-view
[persistor db view-sig {:keys [templates subscriber-key namespace]}]
(j/with-db-transaction [t db :isolation :serializable]
(add-subscription! subscriber-key view-sig templates namespace)
(initial-view t view-sig templates (:view (compiled-view-for view-sig))))))

View file

@ -5,8 +5,6 @@
(subscribe-views [this sub-request]) (subscribe-views [this sub-request])
(unsubscribe-views [this unsub-request]) (unsubscribe-views [this unsub-request])
(disconnect [this disconnect-request]) (disconnect [this disconnect-request])
(subscriber-key-fn [this msg])
(namespace-fn [this msg])
;; DB interaction ;; DB interaction
(broadcast-deltas [this db views-with-deltas]) (broadcast-deltas [this db views-with-deltas])

View file

@ -3,15 +3,10 @@
[views.db.core :as vdb])) [views.db.core :as vdb]))
;; ;;
;; {[:view-sig 1 "arg2"] {:keys [1 2 3 4 ... ] :view-map {:view ...}}} ;; {namespace {[:view-sig 1 "arg2"] {:subscriptions [1 2 3 4 ... ] :view-data {:view ...}}}}
;;
;; or
;;
;; {namespace {[:view-sig 1 "arg2"] {:keys [1 2 3 4 ... ] :view-map {:view ...}}}}
;; ;;
(def subscribed-views (atom {})) (def subscribed-views (atom {}))
(def compiled-views (atom {}))
(def default-ns :default-ns) (def default-ns :default-ns)
@ -22,32 +17,38 @@
(conj view-subs subscriber-key) (conj view-subs subscriber-key)
#{subscriber-key}))) #{subscriber-key})))
(defn- add-compiled-view! (defn add-subscription*
[view-sig templates] [subscriber-key view-sig templates namespace]
(swap! compiled-views #(assoc % view-sig (vdb/view-map (get-in templates [(first view-sig) :fn]) view-sig)))) (fn [svs]
(-> svs
(update-in [namespace view-sig :subscriptions] (add-subscriber-key subscriber-key))
(assoc-in [namespace view-sig :view-data] (vdb/view-map (get-in templates [(first view-sig) :fn]) view-sig)))))
(defn add-subscription! (defn add-subscription!
([subscriber-key view-sig templates] ([subscriber-key view-sig templates]
(add-subscription! subscriber-key view-sig templates default-ns)) (add-subscription! subscriber-key view-sig templates default-ns))
([subscriber-key view-sig templates namespace] ([subscriber-key view-sig templates namespace]
(swap! subscribed-views #(update-in % [namespace view-sig] (add-subscriber-key subscriber-key))) (swap! subscribed-views (add-subscription* subscriber-key view-sig templates namespace))))
(add-compiled-view! view-sig templates)))
(defn add-subscriptions! (defn add-subscriptions!
([subscriber-key view-sigs templates] ([subscriber-key view-sigs templates]
(add-subscriptions! subscriber-key view-sigs templates default-ns)) (add-subscriptions! subscriber-key view-sigs templates default-ns))
([subscriber-key view-sigs templates namespace] ([subscriber-key view-sigs templates namespace]
(last (mapv #(add-subscription! subscriber-key % templates namespace) view-sigs)))) (mapv #(add-subscription! subscriber-key % templates namespace) view-sigs)))
(defn subscriptions-for (defn subscriptions-for
([subscriber-key] (subscriptions-for subscriber-key default-ns)) ([subscriber-key] (subscriptions-for subscriber-key default-ns))
([subscriber-key namespace] ([subscriber-key namespace]
(reduce #(if (contains? (second %2) subscriber-key) (conj %1 (first %2)) %1) [] (get @subscribed-views namespace)))) (reduce
#(if (contains? (:subscriptions (second %2)) subscriber-key)
(conj %1 (first %2))
%1)
[] (get @subscribed-views namespace))))
(defn subscribed-to (defn subscribed-to
([view-sig] (subscribed-to view-sig default-ns)) ([view-sig] (subscribed-to view-sig default-ns))
([view-sig namespace] ([view-sig namespace]
(get-in @subscribed-views [namespace view-sig]))) (get-in @subscribed-views [namespace view-sig :subscriptions])))
(defn subscribed-to? (defn subscribed-to?
([subscriber-key view-sig] ([subscriber-key view-sig]
@ -59,12 +60,11 @@
(defn- remove-key-or-view (defn- remove-key-or-view
[subscriber-key view-sig namespace] [subscriber-key view-sig namespace]
(fn [subbed-views] (fn [subbed-views]
(let [path [namespace view-sig] (let [path [namespace view-sig :subscriptions]
updated (update-in subbed-views path disj subscriber-key)] updated (update-in subbed-views path disj subscriber-key)]
(if (seq (get-in updated path)) (if (seq (get-in updated path))
updated updated
(do (swap! compiled-views dissoc view-sig) ; remove the compiled view as well (update-in updated [namespace] dissoc view-sig)))))
(update-in updated [namespace] dissoc view-sig))))))
(defn remove-subscription! (defn remove-subscription!
([subscriber-key view-sig] ([subscriber-key view-sig]
@ -74,5 +74,6 @@
(swap! subscribed-views (remove-key-or-view subscriber-key view-sig namespace))))) (swap! subscribed-views (remove-key-or-view subscriber-key view-sig namespace)))))
(defn compiled-view-for (defn compiled-view-for
[view-sig] ([view-sig] (compiled-view-for view-sig default-ns))
(get @compiled-views view-sig)) ([view-sig namespace]
(get-in @subscribed-views [namespace view-sig :view-data])))

View file

@ -1,44 +1,49 @@
(ns views.base-subscribed-views-test (ns views.base-subscribed-views-test
(:require (:require
[views.base-subscribed-views :as bsv] ; :refer [BaseSubscribedViews]] [views.base-subscribed-views :as bsv] ; :refer [BaseSubscribedViews]]
[views.subscribed-views :refer [SubscribedViews subscriber-key-fn namespace-fn subscribe-views unsubscribe-views disconnect]] [views.persistor];; :refer [InMemoryPersistor]]
[views.subscribed-views :refer [SubscribedViews subscribe-views unsubscribe-views disconnect]]
[views.subscriptions :as vs :refer [subscribed-to?]] [views.subscriptions :as vs :refer [subscribed-to?]]
[views.fixtures :as vf] [views.fixtures :as vf]
[clojure.test :refer [use-fixtures deftest is]] [clojure.test :refer [use-fixtures deftest is]]
[clojure.java.jdbc :as j] [clojure.java.jdbc :as j]
[clj-logging-config.log4j :refer [set-logger! set-loggers!]]) [clj-logging-config.log4j :refer [set-logger! set-loggers!]])
(:import (:import
[views.persistor InMemoryPersistor]
[views.base_subscribed_views BaseSubscribedViews])) [views.base_subscribed_views BaseSubscribedViews]))
(set-loggers! "views.base-subscribed-views" {:level :error}) (set-loggers!
"views.base-subscribed-views" {:level :error}
"views.filters" {:level :error})
(defn- subscription-fixtures! (defn- subscription-fixtures!
[f] [f]
(reset! vs/subscribed-views {}) (reset! vs/subscribed-views {})
(reset! vs/compiled-views {})
(f)) (f))
(use-fixtures :each vf/database-fixtures! subscription-fixtures!) (use-fixtures :each vf/database-fixtures! subscription-fixtures!)
(def persistor (InMemoryPersistor.))
(deftest subscribes-and-dispatches-initial-view-result-set (deftest subscribes-and-dispatches-initial-view-result-set
(let [send-fn #(is (and (= %2 1) (= %3 {[:users] []}))) (let [send-fn #(is (and (= %1 1) (= %2 {[:users] []})))
base-subbed-views (BaseSubscribedViews. vf/db vf/templates send-fn nil nil {:unsafe true})] base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :send-fn send-fn :unsafe? true})]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}))) (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})))
(deftest unsubscribes-view (deftest unsubscribes-view
(let [base-subbed-views (BaseSubscribedViews. vf/db vf/templates nil nil nil {:unsafe true})] (let [base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :unsafe? true})]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(unsubscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (unsubscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(is (not (subscribed-to? 1 [:users]))))) (is (not (subscribed-to? 1 [:users])))))
(deftest filters-subscription-requests (deftest filters-subscription-requests
(let [templates (assoc-in vf/templates [:users :filter-fn] (fn [msg _] (:authorized? msg))) (let [templates (assoc-in vf/templates [:users :filter-fn] (fn [msg _] (:authorized? msg)))
base-subbed-views (BaseSubscribedViews. vf/db templates nil nil nil nil)] base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates templates})]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(is (not (subscribed-to? 1 [:users]))))) (is (not (subscribed-to? 1 [:users])))))
(deftest removes-all-subscriptions-on-disconnect (deftest removes-all-subscriptions-on-disconnect
(let [base-subbed-views (BaseSubscribedViews. vf/db vf/templates nil nil nil {:unsafe true})] (let [base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :unsafe? true})]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users][:user-posts 1]]}) (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users][:user-posts 1]]})
(disconnect base-subbed-views {:subscriber-key 1}) (disconnect base-subbed-views {:subscriber-key 1})
(is (not (subscribed-to? 1 [:user-posts 1]))) (is (not (subscribed-to? 1 [:user-posts 1])))

View file

@ -14,12 +14,12 @@
(deftest initializes-views (deftest initializes-views
(let [users (gen-n-users! 2)] (let [users (gen-n-users! 2)]
(is (= (vload/initial-views vf/db [[:users]] templates (subscribed-views)) (is (= (vload/initial-view vf/db [:users] templates (get-in (subscribed-views) [[:users] :view]))
{[:users] users})))) {[:users] users}))))
(deftest post-processes-views (deftest post-processes-views
(let [users (gen-n-users! 1) (let [users (gen-n-users! 1)
with-postfn (assoc-in templates [:users :post-fn] #(update-in % [:name] upper-case)) with-postfn (assoc-in templates [:users :post-fn] #(update-in % [:name] upper-case))
views-rs (vload/initial-views vf/db [[:users]] with-postfn (subscribed-views))] views-rs (vload/initial-view vf/db [:users] with-postfn (get-in (subscribed-views) [[:users] :view]))]
(is (= (-> (get views-rs [:users]) first :name) (is (= (-> (get views-rs [:users]) first :name)
(-> users first :name upper-case))))) (-> users first :name upper-case)))))

View file

@ -45,7 +45,7 @@
(let [key 1, view-sig [:user-posts 1]] (let [key 1, view-sig [:user-posts 1]]
(vs/add-subscription! key view-sig templates) (vs/add-subscription! key view-sig templates)
(vs/remove-subscription! key view-sig) (vs/remove-subscription! key view-sig)
(is (= {:default-ns {}} @vs/subscribed-views)))) (is (= {vs/default-ns {}} @vs/subscribed-views))))
(deftest adds-multiple-views-at-a-time (deftest adds-multiple-views-at-a-time
(let [key 1, view-sigs [[:user-posts 1] [:user-posts 2]]] (let [key 1, view-sigs [[:user-posts 1] [:user-posts 2]]]