Refactored persistence layer to be more moduler. Fixed some problems with delta dispatch.

This commit is contained in:
Alexander K. Hudek 2014-08-27 00:38:22 -04:00
parent 1fab3ff587
commit e830580da0
16 changed files with 392 additions and 335 deletions

1
.gitignore vendored
View file

@ -9,3 +9,4 @@ pom.xml.asc
/.nrepl-port
*~
*.bk
.idea

View file

@ -1,4 +1,4 @@
(defproject views "0.2.0"
(defproject views "0.3.0-SNAPSHOT"
:description "You underestimate the power of the SQL side"
:url "https://github.com/diligenceengine/views"

View file

@ -1,15 +1,16 @@
(ns views.base-subscribed-views
(:require
[views.persistence :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views!
get-subscribed-views get-subscriptions]]
[views.persistence.core :as persist]
[views.subscribed-views :refer [ISubscribedViews]]
[views.subscriptions :refer [default-ns subscribed-to compiled-view-for]]
[views.filters :refer [view-filter]]
[views.db.load :refer [initial-view]]
[views.db.util :refer [with-retry]]
[clojure.tools.logging :refer [debug info warn error]]
[clojure.core.async :refer [put! <! go thread]]
[clojure.java.jdbc :as j]))
(def default-ns :default-ns)
(declare send-deltas)
(defn send-fn*
@ -30,6 +31,22 @@
[view-sig-fn msg]
(if view-sig-fn (view-sig-fn msg) (:body msg)))
(defn subscribe-and-compute
"Subscribe a view and return the initial values."
[db persistence templates vs namespace subscriber-key]
(with-retry
(j/with-db-transaction [t db :isolation :serializable]
(let [view-data (persist/subscribe! persistence t templates namespace vs subscriber-key)]
(initial-view t vs templates (:view view-data))))))
;; Deltas look like:
;; [{view-sig1 delta, view-sig2 delta, ...} {view-sig3 delta, ...}]
(defn delta-signatures
"Return all the signatures mentioned by a map of deltas."
[deltas]
(mapcat keys deltas))
(deftype BaseSubscribedViews [config]
ISubscribedViews
(subscribe-views
@ -38,17 +55,13 @@
db (if db-fn (db-fn msg) (:db config))
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (view-filter msg (view-sig-fn* view-sig-fn msg) templates {:unsafe? unsafe?}) ; this is where security comes in.
pconfig {:templates templates :subscriber-key subscriber-key :namespace namespace}]
view-sigs (view-filter msg (view-sig-fn* view-sig-fn msg) templates {:unsafe? unsafe?})] ; this is where security comes in.
(debug "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace)
(when (seq view-sigs)
(thread
(doseq [vs view-sigs]
(j/with-db-transaction [t db :isolation :serializable]
(subscribe-to-view! persistence db vs pconfig)
(let [view (:view (if namespace (compiled-view-for vs namespace) (compiled-view-for vs)))
iv (initial-view t vs templates view)]
(send-fn* send-fn subscriber-key :views.init iv))))))))
(let [iv (subscribe-and-compute db persistence templates vs namespace subscriber-key)]
(send-fn* send-fn subscriber-key :views.init iv)))))))
(unsubscribe-views
[this msg]
@ -57,50 +70,77 @@
namespace (namespace-fn* namespace-fn msg)
view-sigs (view-sig-fn* view-sig-fn msg)]
(debug "Unsubscribing views: " view-sigs " for subscriber " subscriber-key)
(doseq [vs view-sigs] (unsubscribe-from-view! persistence vs subscriber-key namespace))))
(doseq [vs view-sigs]
(persist/unsubscribe! persistence namespace vs subscriber-key))))
(disconnect [this msg]
(let [{:keys [subscriber-key-fn namespace-fn persistence]} config
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)]
(debug "Disconnecting subscriber " subscriber-key " in namespace " namespace)
(unsubscribe-from-all-views! persistence subscriber-key namespace)))
(persist/unsubscribe-all! persistence namespace subscriber-key)))
;;
;; The two below functions get called by vexec!/with-view-transaction
;;
(subscribed-views [this namespace]
(map :view-data (vals (get-subscribed-views (:persistence config) namespace))))
;; Table name optimization not yet worked through the library.
(persist/view-data (:persistence config) namespace "fix-me"))
(broadcast-deltas [this deltas namespace]
(let [{:keys [templates]} config
namespace (if namespace namespace default-ns)
subs (get-subscriptions (:persistence config) namespace)]
subs (persist/subscriptions (:persistence config) namespace (delta-signatures deltas))]
(send-deltas deltas subs namespace config))))
(defn post-process-deltas*
[templates delta-map]
(let [vs (:view-sig delta-map)
dm (dissoc delta-map :view-sig)]
(if-let [post-fn (get-in templates [(first vs) :post-fn])]
(reduce #(assoc %1 %2 (map post-fn (get dm %2))) {} (keys dm))
dm)))
(defn post-process-delta-map
[post-fn delta-map]
(if-let [rset (:refresh-set delta-map)]
{:refresh-set (mapv post-fn rset)}
(reduce #(assoc %1 %2 (map post-fn (get delta-map %2))) {} (keys delta-map))))
(defn post-process-deltas
[delta-set templates]
(reduce
#(assoc %1 (first %2) (mapv (fn [ds] (post-process-deltas* templates ds)) (second %2)))
{} delta-set))
"Run post-processing functions on each delta. NOTE: this puts things in maps
to maintain compatability with the frontend code."
[delta templates]
(let [vs (first delta)]
(if-let [post-fn (get-in templates [(first vs) :post-fn])]
{(first delta) (mapv #(post-process-delta-map post-fn %) (second delta))}
{(first delta) (second delta)})))
(defn subscriber-keys
[subs skeys delta-set]
(into skeys (reduce #(into %1 (get subs %2)) #{} (keys delta-set))))
;; We flatten the above into a sequence:
;; [[view-sig1 delta-data], [view-sig2 delta-data]....]
;; where the signatures from each pack are listed in order.
(defn flatten-deltas
"We flatten the above into a sequence:
[[view-sig1 delta-data], [view-sig2 delta-data]....]
where the signatures from each pack are listed in order."
[deltas]
(reduce #(into %1 (seq %2)) [] deltas))
(defn update-subscriber-pack
"Given a delta [view-sig delta-data] we find the subscribers that need it
and add to the subscriber pack vector {view-sig [delta...]}."
[subs spacks delta]
(let [subscribers (get subs (ffirst delta))]
(reduce #(update-in %1 [%2] (fnil conj []) delta) spacks subscribers)))
(defn subscriber-deltas
"Group deltas into subscriber packs."
[subs deltas]
(reduce #(update-subscriber-pack subs %1 %2) {} deltas))
;; Deltas looks like:
;; [delta-pack1 delta-pack2 ...]
;; where each delta pack is a map:
;; {view-sig1 delta-data, view-sig2 delta-data, ...}
(defn send-deltas
"Send deltas out to subscribers."
[deltas subs namespace {:keys [send-fn templates] :as config}]
(let [deltas (mapv #(post-process-deltas % templates) deltas)
sks (reduce #(subscriber-keys subs %1 %2) #{} deltas)]
(doseq [sk sks]
(debug "Sending deltas " deltas " to subscriber " sk)
(send-fn* send-fn sk :views.deltas deltas))))
(let [deltas (mapv #(post-process-deltas % templates) (flatten-deltas deltas))]
(doseq [[sk deltas*] (subscriber-deltas subs deltas)]
(debug "Sending deltas " deltas* " to subscriber " sk)
(send-fn* send-fn sk :views.deltas deltas*))))

View file

@ -1,14 +1,18 @@
(ns views.core
(:require
[views.base-subscribed-views :as bsv]
[views.persistence :as vp]
[edl.schema :refer [denormalized-schema get-schema]])
[views.core :as vp]
[edl.schema :refer [denormalized-schema get-schema]]
[views.persistence.memory :refer [new-memory-persistence]])
(:import
[views.persistence InMemoryPersistence]
[views.base_subscribed_views BaseSubscribedViews]))
(defn config
[{:keys [db templates persistence vexec-ns-fn] :as conf}]
(let [schema (denormalized-schema (get-schema db (get conf :schema-name "public")))
conf (if persistence conf (assoc conf :persistence (InMemoryPersistence.)))]
{:db db :schema schema :templates templates :vexec-ns-fn vexec-ns-fn :base-subscribed-views (BaseSubscribedViews. conf)}))
conf (if persistence conf (assoc conf :persistence (new-memory-persistence)))]
{:db db
:schema schema
:templates templates
:vexec-ns-fn vexec-ns-fn
:base-subscribed-views (BaseSubscribedViews. conf)}))

View file

@ -1,53 +1,27 @@
(ns views.db.core
(:import
[java.sql SQLException])
(:require
[clojure.java.jdbc :as j]
[clojure.tools.logging :refer [debug]]
[views.db.deltas :as vd]
[views.db.util :refer [with-retry retry-on-transaction-failure]]
[views.subscribed-views :refer [subscribed-views broadcast-deltas]]))
;;
;; Need to catch this and retry:
;; java.sql.SQLException: ERROR: could not serialize access due to concurrent update
;;
(defn get-nested-exceptions*
[exceptions e]
(if-let [next-e (.getNextException e)]
(recur (conj exceptions next-e) next-e)
exceptions))
(defn get-nested-exceptions
[e]
(get-nested-exceptions* [e] e))
(defn do-transaction-fn-with-retries
[transaction-fn]
(try
(transaction-fn)
(catch SQLException e
;; http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
(debug "Caught exception with error code: " (.getSQLState e))
(debug "Exception message: " (.getMessage e))
;; (debug "stack trace message: " (.printStackTrace e))
(if (some #(= (.getSQLState %) "40001") (get-nested-exceptions e))
(do-transaction-fn-with-retries transaction-fn) ;; try it again
(throw e))))) ;; otherwise rethrow
(defmacro with-view-transaction
"Like with-db-transaction, but operates with views. If you want to use a
standard jdbc function, the transcation database map is accessible with
(:db vt) where vt is the bound view transaction."
[binding & forms]
(let [tvar (first binding), vc (second binding)]
`(if (:deltas ~vc) ;; check if we are in a nested transaction
(let [~tvar ~vc] ~@forms)
(do-transaction-fn-with-retries
(fn []
(let [base-subscribed-views# (:base-subscribed-views ~vc)
deltas# (atom [])
result# (j/with-db-transaction [t# (:db ~vc) :isolation :serializable]
(let [~tvar (assoc ~vc :deltas deltas# :db t#)]
~@forms))]
(broadcast-deltas base-subscribed-views# @deltas# (:namespace ~vc))
result#))))))
(let [base-subscribed-views# (:base-subscribed-views ~vc)
deltas# (atom [])
result# (with-retry
(j/with-db-transaction [t# (:db ~vc) :isolation :serializable]
(let [~tvar (assoc ~vc :deltas deltas# :db t#)]
~@forms)))]
(broadcast-deltas base-subscribed-views# @deltas# (:namespace ~vc))
result#))))
(defn vexec!
"Used to perform arbitrary insert/update/delete actions on the database,
@ -77,6 +51,6 @@
(let [{:keys [new-deltas result-set]} (transaction-fn)]
(swap! deltas #(conj % new-deltas))
result-set)
(let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
(let [{:keys [new-deltas result-set]} (retry-on-transaction-failure transaction-fn)]
(broadcast-deltas base-subscribed-views [new-deltas] namespace)
result-set))))

View file

@ -220,11 +220,11 @@
refresh-only-views))
(defn format-deltas
"Removes extraneous data from view delta response collections."
"Removes extraneous data from view delta response collections.
TODO: Is there only one delta pack per view-sig here?"
[views-with-deltas]
(->> views-with-deltas
(map #(select-keys % [:view-sig :delete-deltas :insert-deltas :refresh-set]))
(group-by :view-sig)))
(reduce #(update-in %1 [(:view-sig %2)] (fnil conj []) (select-keys %2 [:delete-deltas :insert-deltas :refresh-set]))
{} views-with-deltas))
(defn do-view-transaction
"Takes the following arguments:

41
src/views/db/util.clj Normal file
View file

@ -0,0 +1,41 @@
(ns views.db.util
(:import
[java.sql SQLException])
(:require
[clojure.tools.logging :refer [debug]]))
;; Need to catch this and retry:
;; java.sql.SQLException: ERROR: could not serialize access due to concurrent update
;;
(defn get-nested-exceptions*
[exceptions e]
(if-let [next-e (.getNextException e)]
(recur (conj exceptions next-e) next-e)
exceptions))
(defn get-nested-exceptions
"Return the current exception and all nested exceptions as a vector."
[e]
(get-nested-exceptions* [e] e))
;; TODO: update to avoid stack overflow.
(defn retry-on-transaction-failure
"Retry a function whenever we receive a transaction failure."
[transaction-fn]
(try
(transaction-fn)
(catch SQLException e
;; http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
(debug "Caught exception with error code: " (.getSQLState e))
(debug "Exception message: " (.getMessage e))
;; (debug "stack trace message: " (.printStackTrace e))
(if (some #(= (.getSQLState %) "40001") (get-nested-exceptions e))
(retry-on-transaction-failure transaction-fn) ;; try it again
(throw e))))) ;; otherwise rethrow
(defmacro with-retry
"Retry a transaction forever."
[ & body]
`(let [tfn# (fn [] ~@body)]
(retry-on-transaction-failure tfn#)))

View file

@ -1,34 +0,0 @@
(ns views.persistence
(:require
[views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for
compiled-views-for subscriptions-for all-subscriptions
default-ns subscribed-views]]))
(defprotocol IPersistence
(subscribe-to-view! [this db view-sig opts])
(unsubscribe-from-view! [this view-sig subscriber-key namespace])
(unsubscribe-from-all-views! [this subscriber-key namespace])
(get-subscribed-views [this namespace])
(get-subscriptions [this namespace]))
(deftype InMemoryPersistence []
IPersistence
(subscribe-to-view!
[persistor db view-sig {:keys [templates subscriber-key namespace]}]
(add-subscription! view-sig templates subscriber-key namespace))
(unsubscribe-from-view!
[this view-sig subscriber-key namespace]
(remove-subscription! view-sig subscriber-key namespace))
(unsubscribe-from-all-views!
[this subscriber-key namespace]
(doseq [vs (subscriptions-for subscriber-key namespace)]
(remove-subscription! vs subscriber-key namespace)))
(get-subscribed-views [this namespace]
;; Don't like this
(if namespace (compiled-views-for namespace) (compiled-views-for)))
(get-subscriptions [this namespace]
(all-subscriptions namespace)))

View file

@ -0,0 +1,25 @@
(ns views.persistence.core)
(defprotocol IPersistence
(subscribe! [this db templates namespace view-sig subscriber-key]
"Subscribes a subscriber with subscriber-key to a view with signature
view-sig. Templates is a map of all defined view templates and db
is a jdbc transcation handle for the database from which initial
view data will be retrieved.
This function must return the view-data for the subscribed view.")
(unsubscribe! [this namespace view-sig subscriber-key]
"Unsubscribes a subscriber with key 'subscriber-key' from the view
with signature 'view-sig' in namespace 'namespace'.")
(unsubscribe-all! [this namespace subscriber-key]
"Unsubscribes the subscriber with key 'subscriber-key' from ALL views
in namespace 'namespace'.")
(view-data [this namespace table-name]
"Return all the view data that references a table name in a namespace.")
(subscriptions [this namespace signatures]
"Return all subscribers for all signatures in the list 'signatures' in
a namespace."))

View file

@ -0,0 +1,62 @@
(ns views.persistence.memory
(:require
[views.persistence.core :refer :all]
[views.db.deltas :as vd]))
(defn ns-subscribe!
"Subscribe to a view inside a namespace."
[namespace-views view-sig templates subscriber-key]
(-> namespace-views
(update-in [view-sig :subscriptions] (fnil conj #{}) subscriber-key)
(assoc-in [view-sig :view-data] (vd/view-map (get-in templates [(first view-sig) :fn]) view-sig))))
(defn ns-unsubscribe!
"Unsubscribe from a view inside a namespace. If there are no more subscribers,
we remove the view itself as well."
[namespace-views view-sig subscriber-key]
(let [path [view-sig :subscriptions]
updated (update-in namespace-views path disj subscriber-key)]
(if (seq (get-in updated path))
updated
(dissoc updated view-sig))))
(defn ns-unsubscribe-all!
"Unsubscribe a subscriber from all views in a namespace."
[namespace-views subscriber-key]
(reduce #(ns-unsubscribe! %1 %2 subscriber-key) namespace-views (keys namespace-views)))
(defn ns-subscriptions
"Find subscribers for a signature and add to a map."
[namespace-views result-map sig]
(if-let [subscribers (get-in namespace-views [sig :subscriptions])]
(assoc result-map sig subscribers)
result-map))
(deftype ViewsMemoryPersistence [subbed-views]
IPersistence
(subscribe!
[this db templates namespace view-sig subscriber-key]
(let [sv (swap! subbed-views (fn [sv] (update-in sv [namespace] ns-subscribe! view-sig templates subscriber-key)))]
(get-in sv [namespace view-sig :view-data])))
(unsubscribe!
[this namespace view-sig subscriber-key]
(swap! subbed-views
(fn [sv] (update-in sv [namespace] ns-unsubscribe! view-sig subscriber-key))))
(unsubscribe-all!
[this namespace subscriber-key ]
(swap! subbed-views
(fn [sv] (update-in sv [namespace] ns-unsubscribe-all! subscriber-key))))
(view-data [this namespace table]
;; We don't yet use table name as an optimization here.
(map :view-data (vals (get @subbed-views namespace))))
(subscriptions [this namespace signatures]
(let [namespace-views (get @subbed-views namespace)]
(reduce #(ns-subscriptions namespace-views %1 %2) {} signatures))))
(defn new-memory-persistence
[]
(->ViewsMemoryPersistence (atom {})))

View file

@ -1,93 +0,0 @@
(ns views.subscriptions
(:require
[views.db.deltas :as vd]))
;;
;; {namespace {[:view-sig 1 "arg2"] {:subscriptions [1 2 3 4 ... ] :view-data {:view ...}}}}
;;
(def subscribed-views (atom {}))
(def default-ns :default-ns)
(defn- add-subscriber-key
[subscriber-key]
(fn [view-subs]
(if (seq view-subs)
(conj view-subs subscriber-key)
#{subscriber-key})))
(defn add-subscription*
[view-sig templates subscriber-key namespace]
(fn [svs]
(-> svs
(update-in [namespace view-sig :subscriptions] (add-subscriber-key subscriber-key))
(assoc-in [namespace view-sig :view-data] (vd/view-map (get-in templates [(first view-sig) :fn]) view-sig)))))
(defn add-subscription!
([view-sig templates subscriber-key]
(add-subscription! view-sig templates subscriber-key default-ns))
([view-sig templates subscriber-key namespace]
(swap! subscribed-views (add-subscription* view-sig templates subscriber-key namespace))))
(defn add-subscriptions!
([view-sigs templates subscriber-key]
(add-subscriptions! view-sigs templates subscriber-key default-ns))
([view-sigs templates subscriber-key namespace]
(mapv #(add-subscription! % templates subscriber-key namespace) view-sigs)))
(defn subscriptions-for
([subscriber-key] (subscriptions-for subscriber-key default-ns))
([subscriber-key namespace]
(reduce
#(if (contains? (:subscriptions (second %2)) subscriber-key)
(conj %1 (first %2))
%1)
[] (get @subscribed-views namespace))))
(defn all-subscriptions
([] (all-subscriptions default-ns @subscribed-views))
([namespace] (all-subscriptions namespace @subscribed-views))
([namespace subscribed-views']
(->> (get subscribed-views' namespace)
(reduce #(assoc %1 (first %2) (:subscriptions (second %2))) {}))))
(defn subscribed-to
([view-sig]
(subscribed-to view-sig default-ns @subscribed-views))
([view-sig namespace]
(subscribed-to view-sig namespace @subscribed-views))
([view-sig namespace subscribed-views']
(get-in subscribed-views' [namespace view-sig :subscriptions])))
(defn subscribed-to?
([view-sig subscriber-key]
(subscribed-to? view-sig subscriber-key default-ns))
([view-sig subscriber-key namespace]
(if-let [view-subs (subscribed-to view-sig namespace)]
(view-subs subscriber-key))))
(defn- remove-key-or-view
[view-sig subscriber-key namespace]
(fn [subbed-views]
(let [path [namespace view-sig :subscriptions]
updated (update-in subbed-views path disj subscriber-key)]
(if (seq (get-in updated path))
updated
(update-in updated [namespace] dissoc view-sig)))))
(defn remove-subscription!
([view-sig subscriber-key]
(remove-subscription! view-sig subscriber-key default-ns))
([view-sig subscriber-key namespace]
(when (subscribed-to? view-sig subscriber-key namespace)
(swap! subscribed-views (remove-key-or-view view-sig subscriber-key namespace)))))
(defn compiled-view-for
([view-sig] (compiled-view-for view-sig default-ns))
([view-sig namespace]
(get-in @subscribed-views [namespace view-sig :view-data])))
(defn compiled-views-for
([] (compiled-views-for default-ns))
([namespace] (get @subscribed-views namespace)))

View file

@ -1,8 +1,8 @@
(ns views.all-tests
(:require
[clojure.test :refer [run-tests]]
[views.subscriptions-test]
[views.base-subscribed-views-test]
[views.persistence.memory-test]
[views.db.core-test]
[views.db.deltas-test]
[views.db.checks-test] ; STILL SPECULATIVE
@ -11,8 +11,8 @@
(defn run-all-tests
[]
(run-tests 'views.subscriptions-test
'views.base-subscribed-views-test
(run-tests 'views.base-subscribed-views-test
'views.persistence.memory-test
'views.db.core-test
'views.db.deltas-test
'views.db.checks-test

View file

@ -1,62 +1,73 @@
(ns views.base-subscribed-views-test
(:require
[views.base-subscribed-views :as bsv]
[views.persistence]
[views.persistence.core :refer :all]
[views.persistence.memory :refer [new-memory-persistence]]
[views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect broadcast-deltas]]
[views.subscriptions :as vs :refer [add-subscription! default-ns subscribed-to?]]
[views.fixtures :as vf]
[clojure.test :refer [use-fixtures deftest is]]
[clojure.java.jdbc :as j]
[clj-logging-config.log4j :refer [set-logger! set-loggers!]])
(:import
[views.persistence InMemoryPersistence]
[views.base_subscribed_views BaseSubscribedViews]))
(set-loggers!
"views.base-subscribed-views" {:level :error}
"views.filters" {:level :error})
(defn- subscription-fixtures!
[f]
(reset! vs/subscribed-views {})
(f))
(use-fixtures :each (vf/database-fixtures!) subscription-fixtures!)
(def persistence (InMemoryPersistence.))
(def view-config-fixture
{:persistence persistence
(defn view-config []
{:persistence (new-memory-persistence)
:db vf/db
:templates vf/templates
:view-sig-fn :views
:unsafe? true})
(deftest subscribes-and-dispatches-initial-view-result-set
(let [send-fn #(is (and (= %1 1) (= %2 :views.init) (= %3 {[:users] []})))
base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn))]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})))
(deftest unsubscribes-view
(let [base-subbed-views (BaseSubscribedViews. view-config-fixture)]
(let [config (view-config)
sent (atom #{})
send-fn #(do (is (and (= %1 1) (= %2 :views.init) (= %3 {[:users] []})))
(swap! sent conj [%1 %2 %3]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn))]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(Thread/sleep 10)
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users]])
{[:users] #{1}}))
;; Verify sends occured.
(is (= @sent #{[1 :views.init {[:users] []}]}))))
;; This test illustrates a slight timing issue. Because view subscriptions
;; use threads, an unsubscription that follows a subscription too closely
;; can fail.
(deftest unsubscribes-view
(let [config (view-config)
base-subbed-views (BaseSubscribedViews. config)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(Thread/sleep 10)
(unsubscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(is (not (subscribed-to? 1 [:users])))))
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users]])
{}))))
(deftest filters-subscription-requests
(let [templates (assoc-in vf/templates [:users :filter-fn]
(let [config (view-config)
templates (assoc-in vf/templates [:users :filter-fn]
(fn [msg _] (:authorized? msg)))
view-config (-> view-config-fixture (assoc :templates templates) (dissoc :unsafe?))
view-config (-> config (assoc :templates templates) (dissoc :unsafe?))
base-subbed-views (BaseSubscribedViews. view-config)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(is (not (subscribed-to? 1 [:users])))))
(Thread/sleep 10)
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users]])
{}))))
(deftest removes-all-subscriptions-on-disconnect
(let [base-subbed-views (BaseSubscribedViews. view-config-fixture)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users][:user-posts 1]]})
(let [config (view-config)
base-subbed-views (BaseSubscribedViews. config)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users] [:user-posts 1]]})
(Thread/sleep 10)
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users] [:user-posts 1]])
{[:users] #{1}}))
(disconnect base-subbed-views {:subscriber-key 1})
(is (not (subscribed-to? 1 [:user-posts 1])))
(is (not (subscribed-to? 1 [:users])))))
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users] [:user-posts 1]])
{}))))
;; (deftest sends-deltas
;; (let [deltas {[:users] [{:view-sig [:users] :insert-deltas [{:foo "bar"}]}]}
@ -70,25 +81,63 @@
;; (broadcast-deltas base-subbed-views deltas nil)))
(deftest sends-deltas-in-batch
(let [deltas [{[:users] [{:view-sig [:users] :insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
{[:users] [{:view-sig [:users] :insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
(let [config (view-config)
deltas [{[:users] [{:insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
{[:users] [{:insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
;; This is just more obvious than writing some convulated fn to dig out the view-sigs.
sent-deltas [{[:users] [{:insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
{[:users] [{:insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
sent (atom #{})
send-fn #(do (is (#{1 2} %1))
(is (= :views.deltas %2))
(is (= sent-deltas %3)))
base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn))]
(add-subscription! [:users] vf/templates 1 default-ns)
(broadcast-deltas base-subbed-views deltas nil)))
(is (= sent-deltas %3))
(swap! sent conj [%1 %2 %3]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn))]
(subscribe! (:persistence config) vf/db vf/templates bsv/default-ns [:users] 1)
(broadcast-deltas base-subbed-views deltas nil)
(is (= 1 (count @sent)))
(is (= 1 (ffirst @sent)))
(is (= :views.deltas (second (first @sent))))
(is (= sent-deltas (nth (first @sent) 2)))))
(deftest deltas-are-post-processed
(let [templates (assoc-in vf/templates [:users :post-fn] (fn [d] (update-in d [:id] #(Integer. %))))
deltas [{[:users] [{:view-sig [:users] :insert-deltas [{:id "1" :name "Bob"}]}]}]
(let [config (view-config)
templates (assoc-in vf/templates [:users :post-fn] (fn [d] (update-in d [:id] #(Integer. %))))
deltas [{[:users] [{:insert-deltas [{:id "1" :name "Bob"}]}]}]
sent-deltas [{[:users] [{:insert-deltas [{:id "1" :name "Bob"}]}]}]
send-fn (fn [_ _ deltas-out]
(is (= (:id (first (:insert-deltas (first (get (first deltas-out) [:users])))))
1)))
base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn :templates templates))]
(add-subscription! [:users] templates 1 default-ns)
(broadcast-deltas base-subbed-views deltas nil)))
sent (atom #{})
send-fn (fn [a b deltas-out]
(is (= (:id (first (:insert-deltas (first (get (first deltas-out) [:users])))))
1))
(swap! sent conj [a b deltas-out]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn :templates templates))]
(subscribe! (:persistence config) vf/db vf/templates bsv/default-ns [:users] 1)
(Thread/sleep 10)
(broadcast-deltas base-subbed-views deltas nil)
(is (= 1 (count @sent)))
(is (= 1 (ffirst @sent)))
(is (= :views.deltas (second (first @sent))))
(is (not= sent-deltas (nth (first @sent) 2)))
(is (= [{[:users] [{:insert-deltas [{:name "Bob", :id 1}]}]}] (nth (first @sent) 2)))))
(deftest full-refresh-deltas-are-post-processed
(let [config (view-config)
templates (assoc-in vf/templates [:users :post-fn] (fn [d] (update-in d [:id] #(Integer. %))))
deltas [{[:users] [{:refresh-set [{:id "1" :name "Bob"}]}]}]
sent-deltas [{[:users] [{:refresh-set [{:id "1" :name "Bob"}]}]}]
sent (atom #{})
send-fn (fn [a b deltas-out]
(is (= (:id (first (:refresh-set (first (get (first deltas-out) [:users])))))
1))
(swap! sent conj [a b deltas-out]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn :templates templates))]
(subscribe! (:persistence config) vf/db vf/templates bsv/default-ns [:users] 1)
(Thread/sleep 10)
(broadcast-deltas base-subbed-views deltas nil)
(is (= 1 (count @sent)))
(is (= 1 (ffirst @sent)))
(is (= :views.deltas (second (first @sent))))
(is (not= sent-deltas (nth (first @sent) 2)))
(is (= [{[:users] [{:refresh-set [{:name "Bob", :id 1}]}]}] (nth (first @sent) 2)))))

View file

@ -1,18 +1,21 @@
(ns views.db.core-test
(:require
[clojure.test :refer [use-fixtures deftest is]]
[views.subscriptions :as vs]
[views.persistence.core :as persist]
[views.persistence.memory :refer [new-memory-persistence]]
[views.base-subscribed-views :refer [default-ns]]
[views.subscribed-views :refer [ISubscribedViews]]
[views.fixtures :as vf :refer [vschema sql-ts]]
[views.db.core :as vdb]))
(def received-deltas (atom nil))
(def memory (atom (new-memory-persistence)))
;; Very barebones subscribed-views instance which merely satisfies what vexec! needs:
(deftype TestSubscribedViews []
ISubscribedViews
(subscribed-views [this namespace]
(map :view-data (vals (vs/compiled-views-for))))
(persist/view-data @memory default-ns nil))
(broadcast-deltas [this new-deltas namespace]
(reset! received-deltas new-deltas)))
@ -22,7 +25,7 @@
(defn reset-fixtures!
[f]
(reset! vs/subscribed-views {})
(reset! memory (new-memory-persistence))
(reset! received-deltas {})
(f))
@ -31,7 +34,7 @@
(deftest vexec-sends-deltas
(let [view-sig [:user-posts (:id @vf/user-fixture)]
sub-to-it (vs/add-subscription! view-sig vf/templates (:id @vf/user-fixture))
sub-to-it (persist/subscribe! @memory vf/db vf/templates default-ns view-sig (:id @vf/user-fixture))
posted (first (vdb/vexec! test-config (vf/insert-post-tmpl (:id @vf/user-fixture) "title" "body")))
delta-vs (ffirst (first @received-deltas))
insert-delta (-> @received-deltas ffirst second first :insert-deltas first)]
@ -43,7 +46,7 @@
(deftest with-view-transaction-sends-deltas
(let [view-sig [:user-posts (:id @vf/user-fixture)]
sub-to-it (vs/add-subscription! view-sig vf/templates (:id @vf/user-fixture))
sub-to-it (persist/subscribe! @memory vf/db vf/templates default-ns view-sig (:id @vf/user-fixture))
posted (first (vdb/with-view-transaction
[tc test-config]
(vdb/vexec! tc (vf/insert-post-tmpl (:id @vf/user-fixture) "title" "body"))))

View file

@ -0,0 +1,61 @@
(ns views.persistence.memory-test
(:require
[views.persistence.core :refer :all]
[views.persistence.memory :refer [new-memory-persistence]]
[views.fixtures :as vf]
[clojure.test :refer [use-fixtures deftest is run-all-tests]]))
(deftest memory-persistence
(let [p (new-memory-persistence)
vd (subscribe! p vf/db vf/templates :ns [:users] 1)]
;; This sort of test isn't great as it depends on the internal
;; structure unrlated to memory persistence.
(is (= vd
{:view-sig [:users], :view {:from [:users], :select [:id :name :created_on]}, :refresh-only? nil}))
;; Ensure that we are subscribed.
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1}}))
;; Subsequent calls return same vd.
(is (= (subscribe! p vf/db vf/templates :ns [:users] 3)
vd))
;; And subscription is correct.
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1 3}}))
;; Missing subscription returns nothing.
(is (= (subscriptions p :ns [[:missing]])
{}))
;; Duplicate subscription is ignored.
(subscribe! p vf/db vf/templates :ns [:users] 3)
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1 3}}))
;; We can subscribe to multiple views.
(subscribe! p vf/db vf/templates :ns [:user-posts 1] 5)
(is (= (subscriptions p :ns [[:users] [:user-posts 1]])
{[:users] #{1 3}
[:user-posts 1] #{5}}))
;; Can we unsubscribe a view.
(unsubscribe! p :ns [:users] 3)
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1}}))
;; Remove last item in a view makes it go away.
(unsubscribe! p :ns [:users] 1)
(is (= (subscriptions p :ns [[:users]])
{}))
(is (= (map :view-sig (view-data p :ns :users))
[[:user-posts 1]]))
;; Unsubscribe all works.
(subscribe! p vf/db vf/templates :ns [:users] 7)
(subscribe! p vf/db vf/templates :ns [:users] 5)
(unsubscribe-all! p :ns 5)
(is (= (subscriptions p :ns [[:users] [:user-posts 1]])
{[:users] #{7}}))))

View file

@ -1,76 +0,0 @@
(ns views.subscriptions-test
(:require
[clojure.test :refer [use-fixtures deftest is]]
[views.fixtures :refer [templates user-posts-tmpl]]
[views.subscriptions :as vs]))
(defn- reset-subscribed-views!
[f]
(reset! vs/subscribed-views {})
(f))
(use-fixtures :each reset-subscribed-views!)
(deftest adds-a-subscription
(let [key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key)
(is (vs/subscribed-to? view-sig key))))
(deftest can-use-namespace
(let [namespace1 1, namespace2 2, key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key namespace1)
(vs/add-subscription! view-sig templates key namespace2)
(is (vs/subscribed-to? view-sig key namespace1))
(is (vs/subscribed-to? view-sig key namespace2))))
(deftest removes-a-subscription
(let [key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key)
(vs/remove-subscription! view-sig key)
(is (not (vs/subscribed-to? view-sig key)))))
(deftest doesnt-fail-or-create-view-entry-when-empty
(vs/remove-subscription! 1 [:user-posts 1])
(is (= {} @vs/subscribed-views)))
(deftest removes-a-subscription-with-namespace
(let [namespace1 1, namespace2 2, key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key namespace1)
(vs/add-subscription! view-sig templates key namespace2)
(vs/remove-subscription! view-sig key namespace1)
(is (not (vs/subscribed-to? view-sig key namespace1)))
(is (vs/subscribed-to? view-sig key namespace2))))
(deftest removes-unsubscribed-to-view-from-subscribed-views
(let [key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key)
(vs/remove-subscription! view-sig key)
(is (= {vs/default-ns {}} @vs/subscribed-views))))
(deftest adds-multiple-views-at-a-time
(let [key 1, view-sigs [[:user-posts 1] [:user-posts 2]]]
(vs/add-subscriptions! view-sigs templates key)
(is (vs/subscribed-to? (first view-sigs) key))
(is (vs/subscribed-to? (last view-sigs) key))))
(deftest subscribing-compiles-and-stores-view-maps
(let [key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key)
(is (= (:view (vs/compiled-view-for [:user-posts 1]))
(user-posts-tmpl 1)))))
(deftest removing-last-view-sub-removes-compiled-view
(let [key 1, view-sig [:user-posts 1]]
(vs/add-subscription! view-sig templates key)
(vs/remove-subscription! view-sig key)
(is (nil? (vs/compiled-view-for [:user-posts 1])))))
(deftest retrieves-subscriptions-for-subscriber
(let [key 1, view-sigs [[:users][:user-posts 1]]]
(vs/add-subscriptions! view-sigs templates key)
(is (= (set (vs/subscriptions-for 1)) (set view-sigs)))))
(deftest retrieves-subscriptions-for-subscriber-with-namespace
(let [key 1, view-sigs [[:users][:user-posts 1]] namespace 1]
(vs/add-subscriptions! view-sigs templates key namespace)
(is (= (set (vs/subscriptions-for 1 namespace)) (set view-sigs)))))