From 6717ad46db19c46be682366b60d1b93b6114feb1 Mon Sep 17 00:00:00 2001 From: Dave Della Costa Date: Tue, 10 Jun 2014 18:40:21 +0900 Subject: [PATCH] breaks subscription implementation out into persistor model --- src/views/base_subscribed_views.clj | 105 +++++++++------------- src/views/db/load.clj | 25 +++--- src/views/filters.clj | 36 ++++++++ src/views/persistor.clj | 16 ++++ src/views/subscribed_views.clj | 2 - src/views/subscriptions.clj | 39 ++++---- test/views/base_subscribed_views_test.clj | 23 +++-- test/views/db/load_test.clj | 4 +- test/views/subscriptions_test.clj | 2 +- 9 files changed, 141 insertions(+), 111 deletions(-) create mode 100644 src/views/filters.clj create mode 100644 src/views/persistor.clj diff --git a/src/views/base_subscribed_views.clj b/src/views/base_subscribed_views.clj index 074f2f0..b16cf63 100644 --- a/src/views/base_subscribed_views.clj +++ b/src/views/base_subscribed_views.clj @@ -1,85 +1,64 @@ (ns views.base-subscribed-views (:require - [views.db.load :refer [initial-views]] - [views.subscribed-views :refer [SubscribedViews subscriber-key-fn namespace-fn]] + [views.persistor :refer [subscribe-to-view]] + [views.subscribed-views :refer [SubscribedViews]] [views.subscriptions :as vs :refer [add-subscriptions! remove-subscription! subscriptions-for]] + [views.filters :refer [view-filter]] [clojure.tools.logging :refer [debug info warn error]] [clojure.core.async :refer [put! opts first :unsafe) - (do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %) - true) - (throw (Exception. (str "No filter set for view " %)))))) - view-sigs))) - -(defn send-message - [this address msg] - (warn "IMPLEMENT ME. Got message " msg " sent to address " address)) - -(deftype BaseSubscribedViews [db templates send-fn broadcast-fn subscribed-views-fn opts] +(deftype BaseSubscribedViews [opts] SubscribedViews (subscribe-views - [this sub-req] - (let [subscriber-key (subscriber-key-fn this sub-req) - view-sigs (view-filter sub-req (:views sub-req) templates opts)] ; this is where security comes in. - (info "Subscribing views: " view-sigs " for subscriber " subscriber-key) + [this {db :db :as msg}] + (let [{:keys [persistor templates send-fn subscriber-key-fn namespace-fn unsafe?]} opts + db (if db db (:db opts)) + subscriber-key (subscriber-key-fn* subscriber-key-fn msg) + namespace (namespace-fn* namespace-fn msg) + view-sigs (view-filter msg (:views msg) templates {:unsafe? unsafe?}) ; this is where security comes in. Move? + popts {:templates templates :subscriber-key subscriber-key :namespace namespace}] + (info "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace) (when (seq view-sigs) - (let [subbed-views (if-let [namespace (namespace-fn this sub-req)] - (add-subscriptions! subscriber-key view-sigs templates namespace) - (add-subscriptions! subscriber-key view-sigs templates))] - (thread - (->> (initial-views db view-sigs templates subbed-views) - ((if send-fn send-fn send-message) this subscriber-key))))))) +;; (thread + (doseq [vs view-sigs] + (send-fn* send-fn subscriber-key (subscribe-to-view persistor db vs popts)))))) (unsubscribe-views - [this unsub-req] - (let [subscriber-key (subscriber-key-fn this unsub-req) - view-sigs (:views unsub-req)] + [this msg] + (let [{:keys [subscriber-key-fn namespace-fn]} opts + subscriber-key (subscriber-key-fn* subscriber-key-fn msg) + namespace (namespace-fn* namespace-fn msg) + view-sigs (:views msg)] (info "Unsubscribing views: " view-sigs " for subscriber " subscriber-key) - (if-let [namespace (namespace-fn this unsub-req)] - (doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace)) - (doseq [vs view-sigs] (remove-subscription! subscriber-key vs))))) + (doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace)))) - (disconnect [this disconnect-req] - (let [subscriber-key (:subscriber-key disconnect-req) - namespace (namespace-fn this disconnect-req) + (disconnect [this msg] + (let [{:keys [subscriber-key-fn namespace-fn]} opts + subscriber-key (subscriber-key-fn* subscriber-key-fn msg) + namespace (namespace-fn* namespace-fn msg) view-sigs (if namespace (subscriptions-for subscriber-key namespace) (subscriptions-for subscriber-key))] - (if namespace - (doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace)) - (doseq [vs view-sigs] (remove-subscription! subscriber-key vs))))) - - (subscriber-key-fn [this msg] (:subscriber-key msg)) - - (namespace-fn [this msg] nil) + (doseq [vs view-sigs] (remove-subscription! subscriber-key vs namespace)))) ;; DB interaction - (subscribed-views [this] @vs/compiled-views) + (subscribed-views [this] ) ;; (vs/compiled-views)) (broadcast-deltas [this fdb views-with-deltas])) diff --git a/src/views/db/load.clj b/src/views/db/load.clj index eb4100a..30e5003 100644 --- a/src/views/db/load.clj +++ b/src/views/db/load.clj @@ -10,23 +10,18 @@ (j/query db (hsql/format query-map))) (defn post-process-result-set - [nv templates result-set] - (if-let [post-fn (get-in templates [(first nv) :post-fn])] + [view-sig templates result-set] + (if-let [post-fn (get-in templates [(first view-sig) :post-fn])] (mapv post-fn result-set) result-set)) -(defn initial-views +(defn initial-view "Takes a db spec, the new views sigs (new-views) we want to produce result-sets for, - the template config map, and the subscribed-views themselves (with compiled view-maps) + the template config map, and the view-map itself. and returns a result-set for the new-views with post-fn functions applied to the data." - [db new-views templates subscribed-views] - (reduce - (fn [results nv] - (->> (get subscribed-views nv) - :view - (view-query db) - (into []) - (post-process-result-set nv templates) - (assoc results nv))) - {} - new-views)) + [db new-view templates view-map] + (->> view-map + (view-query db) + (into []) + (post-process-result-set new-view templates) + (hash-map new-view))) diff --git a/src/views/filters.clj b/src/views/filters.clj new file mode 100644 index 0000000..46ea8ce --- /dev/null +++ b/src/views/filters.clj @@ -0,0 +1,36 @@ +(ns views.filters + (:require + [clojure.tools.logging :refer [debug info warn error]])) + +(defn view-filter + "Takes a subscription request msg, a collection of view-sigs and + the config templates hash-map for an app. Checks if there is + a global filter-fn in the hash-map metadata and checks against + that if it exists, as well as against any existing filter + functions for individual template config entries. Template + config hash-map entries can specify a filter-fn using the key + :filter-fn, and the global filter-fn is the same, only on + the config meta-data (i.e. (with-meta templates {:filter-fn ...})) + + By default throws an exception if no filters are present. + By passing in {:unsafe true} in opts, this can be overridden." + [msg view-sigs templates & opts] + (let [global-filter-fn (:filter-fn (meta templates))] + (filterv + #(let [filter-fn (:filter-fn (get templates (first %)))] + (cond + (and filter-fn global-filter-fn) + (and (global-filter-fn msg %) (filter-fn msg %)) + + filter-fn + (filter-fn msg %) + + global-filter-fn + (global-filter-fn msg %) + + :else + (if (-> opts first :unsafe?) + (do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %) + true) + (throw (Exception. (str "No filter set for view " %)))))) + view-sigs))) diff --git a/src/views/persistor.clj b/src/views/persistor.clj new file mode 100644 index 0000000..2b7581f --- /dev/null +++ b/src/views/persistor.clj @@ -0,0 +1,16 @@ +(ns views.persistor + (:require + [clojure.java.jdbc :as j] + [views.subscriptions :refer [add-subscription! compiled-view-for]] + [views.db.load :refer [initial-view]])) + +(defprotocol IPersistor + (subscribe-to-view [this db view-sig opts])) + +(deftype InMemoryPersistor [] + IPersistor + (subscribe-to-view + [persistor db view-sig {:keys [templates subscriber-key namespace]}] + (j/with-db-transaction [t db :isolation :serializable] + (add-subscription! subscriber-key view-sig templates namespace) + (initial-view t view-sig templates (:view (compiled-view-for view-sig)))))) diff --git a/src/views/subscribed_views.clj b/src/views/subscribed_views.clj index 8fdf294..2b88b65 100644 --- a/src/views/subscribed_views.clj +++ b/src/views/subscribed_views.clj @@ -5,8 +5,6 @@ (subscribe-views [this sub-request]) (unsubscribe-views [this unsub-request]) (disconnect [this disconnect-request]) - (subscriber-key-fn [this msg]) - (namespace-fn [this msg]) ;; DB interaction (broadcast-deltas [this db views-with-deltas]) diff --git a/src/views/subscriptions.clj b/src/views/subscriptions.clj index bf7043f..8d536e7 100644 --- a/src/views/subscriptions.clj +++ b/src/views/subscriptions.clj @@ -3,15 +3,10 @@ [views.db.core :as vdb])) ;; -;; {[:view-sig 1 "arg2"] {:keys [1 2 3 4 ... ] :view-map {:view ...}}} -;; -;; or -;; -;; {namespace {[:view-sig 1 "arg2"] {:keys [1 2 3 4 ... ] :view-map {:view ...}}}} +;; {namespace {[:view-sig 1 "arg2"] {:subscriptions [1 2 3 4 ... ] :view-data {:view ...}}}} ;; (def subscribed-views (atom {})) -(def compiled-views (atom {})) (def default-ns :default-ns) @@ -22,32 +17,38 @@ (conj view-subs subscriber-key) #{subscriber-key}))) -(defn- add-compiled-view! - [view-sig templates] - (swap! compiled-views #(assoc % view-sig (vdb/view-map (get-in templates [(first view-sig) :fn]) view-sig)))) +(defn add-subscription* + [subscriber-key view-sig templates namespace] + (fn [svs] + (-> svs + (update-in [namespace view-sig :subscriptions] (add-subscriber-key subscriber-key)) + (assoc-in [namespace view-sig :view-data] (vdb/view-map (get-in templates [(first view-sig) :fn]) view-sig))))) (defn add-subscription! ([subscriber-key view-sig templates] (add-subscription! subscriber-key view-sig templates default-ns)) ([subscriber-key view-sig templates namespace] - (swap! subscribed-views #(update-in % [namespace view-sig] (add-subscriber-key subscriber-key))) - (add-compiled-view! view-sig templates))) + (swap! subscribed-views (add-subscription* subscriber-key view-sig templates namespace)))) (defn add-subscriptions! ([subscriber-key view-sigs templates] (add-subscriptions! subscriber-key view-sigs templates default-ns)) ([subscriber-key view-sigs templates namespace] - (last (mapv #(add-subscription! subscriber-key % templates namespace) view-sigs)))) + (mapv #(add-subscription! subscriber-key % templates namespace) view-sigs))) (defn subscriptions-for ([subscriber-key] (subscriptions-for subscriber-key default-ns)) ([subscriber-key namespace] - (reduce #(if (contains? (second %2) subscriber-key) (conj %1 (first %2)) %1) [] (get @subscribed-views namespace)))) + (reduce + #(if (contains? (:subscriptions (second %2)) subscriber-key) + (conj %1 (first %2)) + %1) + [] (get @subscribed-views namespace)))) (defn subscribed-to ([view-sig] (subscribed-to view-sig default-ns)) ([view-sig namespace] - (get-in @subscribed-views [namespace view-sig]))) + (get-in @subscribed-views [namespace view-sig :subscriptions]))) (defn subscribed-to? ([subscriber-key view-sig] @@ -59,12 +60,11 @@ (defn- remove-key-or-view [subscriber-key view-sig namespace] (fn [subbed-views] - (let [path [namespace view-sig] + (let [path [namespace view-sig :subscriptions] updated (update-in subbed-views path disj subscriber-key)] (if (seq (get-in updated path)) updated - (do (swap! compiled-views dissoc view-sig) ; remove the compiled view as well - (update-in updated [namespace] dissoc view-sig)))))) + (update-in updated [namespace] dissoc view-sig))))) (defn remove-subscription! ([subscriber-key view-sig] @@ -74,5 +74,6 @@ (swap! subscribed-views (remove-key-or-view subscriber-key view-sig namespace))))) (defn compiled-view-for - [view-sig] - (get @compiled-views view-sig)) + ([view-sig] (compiled-view-for view-sig default-ns)) + ([view-sig namespace] + (get-in @subscribed-views [namespace view-sig :view-data]))) diff --git a/test/views/base_subscribed_views_test.clj b/test/views/base_subscribed_views_test.clj index 0f3c979..7b26b4e 100644 --- a/test/views/base_subscribed_views_test.clj +++ b/test/views/base_subscribed_views_test.clj @@ -1,44 +1,49 @@ (ns views.base-subscribed-views-test (:require [views.base-subscribed-views :as bsv] ; :refer [BaseSubscribedViews]] - [views.subscribed-views :refer [SubscribedViews subscriber-key-fn namespace-fn subscribe-views unsubscribe-views disconnect]] + [views.persistor];; :refer [InMemoryPersistor]] + [views.subscribed-views :refer [SubscribedViews subscribe-views unsubscribe-views disconnect]] [views.subscriptions :as vs :refer [subscribed-to?]] [views.fixtures :as vf] [clojure.test :refer [use-fixtures deftest is]] [clojure.java.jdbc :as j] [clj-logging-config.log4j :refer [set-logger! set-loggers!]]) (:import + [views.persistor InMemoryPersistor] [views.base_subscribed_views BaseSubscribedViews])) -(set-loggers! "views.base-subscribed-views" {:level :error}) +(set-loggers! + "views.base-subscribed-views" {:level :error} + "views.filters" {:level :error}) (defn- subscription-fixtures! [f] (reset! vs/subscribed-views {}) - (reset! vs/compiled-views {}) (f)) (use-fixtures :each vf/database-fixtures! subscription-fixtures!) +(def persistor (InMemoryPersistor.)) + (deftest subscribes-and-dispatches-initial-view-result-set - (let [send-fn #(is (and (= %2 1) (= %3 {[:users] []}))) - base-subbed-views (BaseSubscribedViews. vf/db vf/templates send-fn nil nil {:unsafe true})] + (let [send-fn #(is (and (= %1 1) (= %2 {[:users] []}))) + base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :send-fn send-fn :unsafe? true})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}))) (deftest unsubscribes-view - (let [base-subbed-views (BaseSubscribedViews. vf/db vf/templates nil nil nil {:unsafe true})] + (let [base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :unsafe? true})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (unsubscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (is (not (subscribed-to? 1 [:users]))))) (deftest filters-subscription-requests - (let [templates (assoc-in vf/templates [:users :filter-fn] (fn [msg _] (:authorized? msg))) - base-subbed-views (BaseSubscribedViews. vf/db templates nil nil nil nil)] + (let [templates (assoc-in vf/templates [:users :filter-fn] (fn [msg _] (:authorized? msg))) + base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates templates})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (is (not (subscribed-to? 1 [:users]))))) (deftest removes-all-subscriptions-on-disconnect - (let [base-subbed-views (BaseSubscribedViews. vf/db vf/templates nil nil nil {:unsafe true})] + (let [base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :unsafe? true})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users][:user-posts 1]]}) (disconnect base-subbed-views {:subscriber-key 1}) (is (not (subscribed-to? 1 [:user-posts 1]))) diff --git a/test/views/db/load_test.clj b/test/views/db/load_test.clj index b7ede2b..ba19887 100644 --- a/test/views/db/load_test.clj +++ b/test/views/db/load_test.clj @@ -14,12 +14,12 @@ (deftest initializes-views (let [users (gen-n-users! 2)] - (is (= (vload/initial-views vf/db [[:users]] templates (subscribed-views)) + (is (= (vload/initial-view vf/db [:users] templates (get-in (subscribed-views) [[:users] :view])) {[:users] users})))) (deftest post-processes-views (let [users (gen-n-users! 1) with-postfn (assoc-in templates [:users :post-fn] #(update-in % [:name] upper-case)) - views-rs (vload/initial-views vf/db [[:users]] with-postfn (subscribed-views))] + views-rs (vload/initial-view vf/db [:users] with-postfn (get-in (subscribed-views) [[:users] :view]))] (is (= (-> (get views-rs [:users]) first :name) (-> users first :name upper-case))))) diff --git a/test/views/subscriptions_test.clj b/test/views/subscriptions_test.clj index 505e92c..ba58cc5 100644 --- a/test/views/subscriptions_test.clj +++ b/test/views/subscriptions_test.clj @@ -45,7 +45,7 @@ (let [key 1, view-sig [:user-posts 1]] (vs/add-subscription! key view-sig templates) (vs/remove-subscription! key view-sig) - (is (= {:default-ns {}} @vs/subscribed-views)))) + (is (= {vs/default-ns {}} @vs/subscribed-views)))) (deftest adds-multiple-views-at-a-time (let [key 1, view-sigs [[:user-posts 1] [:user-posts 2]]]