New architecture under way.

This commit is contained in:
Alexander K. Hudek 2015-01-16 23:47:08 -05:00
parent ebaec499fd
commit 6d7ae24a3e
27 changed files with 245 additions and 1765 deletions

View file

@ -1,4 +1,4 @@
(defproject views "0.5.2"
(defproject views "1.0.0"
:description "You underestimate the power of the SQL side"
:url "https://github.com/diligenceengine/views"
@ -15,6 +15,7 @@
[org.postgresql/postgresql "9.2-1003-jdbc4"]
[clj-logging-config "1.9.10"]
[zip-visit "1.0.2"]
[prismatic/plumbing "0.3.5"]
[pjstadig/humane-test-output "0.6.0"]]
:profiles {:test {:dependencies [[org.clojure/tools.nrepl "0.2.3"]

View file

@ -1,152 +0,0 @@
(ns views.base-subscribed-views
(:require
[views.persistence.core :as persist]
[views.subscribed-views :refer [ISubscribedViews]]
[views.filters :refer [view-filter]]
[views.db.load :refer [initial-view]]
[views.db.util :refer [with-retry log-exception]]
[clojure.tools.logging :refer [debug info warn error]]
[clojure.core.async :refer [thread]]
[clojure.java.jdbc :as j]))
(def default-ns :default-ns)
(declare send-deltas)
(defn send-fn*
[send-fn address subject msg]
(if send-fn
(send-fn address subject msg)
(warn "IMPLEMENT ME. Got message " msg " with subject " subject " sent to address " address)))
(defn subscriber-key-fn*
[subscriber-key-fn msg]
(if subscriber-key-fn (subscriber-key-fn msg) (:subscriber-key msg)))
(defn namespace-fn*
[namespace-fn msg]
(if namespace-fn (namespace-fn msg) default-ns))
(defn view-sig-fn*
[view-sig-fn msg]
(if view-sig-fn (view-sig-fn msg) (:body msg)))
(defn subscribe-and-compute
"Subscribe a view and return the initial values."
[db persistence templates vs namespace subscriber-key]
(let [view-data (persist/subscribe! persistence templates namespace vs subscriber-key)]
(with-retry
(j/with-db-transaction [t db :isolation :serializable]
(initial-view t vs templates (:view view-data))))))
;; Deltas look like:
;; [{view-sig1 delta, view-sig2 delta, ...} {view-sig3 delta, ...}]
(defn delta-signatures
"Return all the signatures mentioned by a map of deltas."
[deltas]
(mapcat keys deltas))
(deftype BaseSubscribedViews [config]
ISubscribedViews
(subscribe-views
[this msg]
(let [{:keys [persistence templates db-fn send-fn view-sig-fn subscriber-key-fn namespace-fn unsafe?]} config
db (if db-fn (db-fn msg) (:db config))
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (view-filter msg (view-sig-fn* view-sig-fn msg) templates {:unsafe? unsafe?})] ; this is where security comes in.
(debug "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace)
(when (seq view-sigs)
(doseq [vs view-sigs]
(thread
(try
(let [iv (subscribe-and-compute db persistence templates vs namespace subscriber-key)]
(send-fn* send-fn subscriber-key :views.init iv))
(catch Exception e
(error "when subscribing to" vs)
(log-exception e))))))))
(unsubscribe-views
[this msg]
(let [{:keys [subscriber-key-fn namespace-fn persistence view-sig-fn]} config
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (view-sig-fn* view-sig-fn msg)]
(debug "Unsubscribing views: " view-sigs " for subscriber " subscriber-key)
(doseq [vs view-sigs]
(persist/unsubscribe! persistence namespace vs subscriber-key))))
(disconnect [this msg]
(let [{:keys [subscriber-key-fn namespace-fn persistence]} config
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)]
(debug "Disconnecting subscriber " subscriber-key " in namespace " namespace)
(persist/unsubscribe-all! persistence namespace subscriber-key)))
;;
;; The below functions get called by vexec!/with-view-transaction
;;
(persistence [this] (:persistence config))
;; Deprecate this? It's just a call to persistence.
(subscribed-views [this namespace]
;; Table name optimization not yet worked through the library.
(persist/view-data (:persistence config) namespace "fix-me"))
(broadcast-deltas [this deltas namespace]
(let [{:keys [templates]} config
namespace (if namespace namespace default-ns)
subs (persist/subscriptions (:persistence config) namespace (delta-signatures deltas))]
(send-deltas deltas subs namespace config))))
(defn post-process-delta-map
[post-fn delta-map]
(if-let [rset (:refresh-set delta-map)]
delta-map
(reduce #(assoc %1 %2 (map post-fn (get delta-map %2))) {} (keys delta-map))))
(defn post-process-deltas
"Run post-processing functions on each delta. NOTE: this puts things in maps
to maintain compatability with the frontend code."
[delta templates]
(let [vs (first delta)]
(if-let [post-fn (get-in templates [(first vs) :post-fn])]
{(first delta) (mapv #(post-process-delta-map post-fn %) (second delta))}
{(first delta) (second delta)})))
;; We flatten the above into a sequence:
;; [[view-sig1 delta-data], [view-sig2 delta-data]....]
;; where the signatures from each pack are listed in order.
(defn flatten-deltas
"We flatten the above into a sequence:
[[view-sig1 delta-data], [view-sig2 delta-data]....]
where the signatures from each pack are listed in order."
[deltas]
(reduce #(into %1 (seq %2)) [] deltas))
(defn update-subscriber-pack
"Given a delta [view-sig delta-data] we find the subscribers that need it
and add to the subscriber pack vector {view-sig [delta...]}."
[subs spacks delta]
(let [subscribers (get subs (ffirst delta))]
(reduce #(update-in %1 [%2] (fnil conj []) delta) spacks subscribers)))
(defn subscriber-deltas
"Group deltas into subscriber packs."
[subs deltas]
(reduce #(update-subscriber-pack subs %1 %2) {} deltas))
;; Deltas looks like:
;; [delta-pack1 delta-pack2 ...]
;; where each delta pack is a map:
;; {view-sig1 delta-data, view-sig2 delta-data, ...}
(defn send-deltas
"Send deltas out to subscribers."
[deltas subs namespace {:keys [send-fn templates] :as config}]
(let [deltas (mapv #(post-process-deltas % templates) (flatten-deltas deltas))]
(doseq [[sk deltas*] (subscriber-deltas subs deltas)]
(debug "Sending deltas " deltas* " to subscriber " sk)
(send-fn* send-fn sk :views.deltas deltas*))))

View file

@ -1,18 +1,162 @@
(ns views.core
(:require
[views.base-subscribed-views :as bsv]
[views.core :as vp]
[edl.schema :refer [denormalized-schema get-schema]]
[views.persistence.memory :refer [new-memory-persistence]])
(:import
[views.base_subscribed_views BaseSubscribedViews]))
[views.protocols :refer [IView id data relevant?]]
[plumbing.core :refer [swap-pair!]]))
(defn config
[{:keys [db templates persistence vexec-ns-fn] :as conf}]
(let [schema (denormalized-schema (get-schema db (get conf :schema-name "public")))
conf (if persistence conf (assoc conf :persistence (new-memory-persistence)))]
{:db db
:schema schema
:templates templates
:vexec-ns-fn vexec-ns-fn
:base-subscribed-views (BaseSubscribedViews. conf)}))
;; The view-system data structure has this shape:
;;
;; {:views {:id1 view1, id2 view2, ...}
;; :send-fn (fn [subscriber-key data] ...)
;;
;; :hashes {view-sig hash, ...}
;; :subscribed {subscriber-key #{view-sig, ...}}
;; :subscribers {view-sig #{subscriber-key, ...}}
;; :hints #{hint1 hint2 ...}
;;
;; }
;;
;; Each hint has the form {:namespace x :hint y}
(defn subscribe-view!
[view-system view-sig subscriber-key data-hash]
(-> view-system
(update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig)
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)
(assoc-in [:hashes view-sig] data-hash)))
(defn subscribe!
[view-system namespace view-id parameters subscriber-key]
(if-let [view (get-in @view-system [:views view-id])]
(let [vdata (data view namespace parameters)]
(swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata))
((get @view-system :send-fn) subscriber-key vdata))))
(defn remove-from-subscribers
[view-system view-sig subscriber-key]
(update-in view-system [:subscribers view-sig] disj subscriber-key))
(defn unsubscribe!
[view-system namespace view-id parameters subscriber-key]
(swap! view-system
(fn [vs]
(-> vs
(update-in [:subscribed subscriber-key] disj [namespace view-id parameters])
(remove-from-subscribers [namespace view-id parameters] subscriber-key)))))
(defn unsubscribe-all!
"Remove all subscriptions by a given subscriber."
[view-system subscriber-key]
(swap! view-system
(fn [vs]
(let [view-sigs (get-in vs [:subscribed subscriber-key])
vs* (update-in vs [:subscribed] dissoc subscriber-key)]
(reduce #(remove-from-subscribers %1 %2 subscriber-key) vs* view-sigs)))))
(defn refresh-view!
"We refresh a view if it is relevant and its data hash has changed."
[view-system hints [namespace view-id parameters :as view-sig]]
(let [v (get-in @view-system [:views view-id])]
(if (relevant? v namespace parameters hints)
(let [vdata (data v namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [s (get-in @view-system [:subscribers view-sig])]
((:send-fn @view-system) s vdata))
(swap! view-system assoc-in [:hashes view-sig] hdata))))))
(defn subscribed-views
[view-system]
(reduce into #{} (vals (:subscribed view-system))))
(defn pop-hints!
"Return hints and clear hint set atomicly."
[view-system]
(let [p (swap-pair! view-system assoc :hints #{})]
(or (:hints (first p)) #{})))
(defn refresh-views!
"Given a collection of hints, find all dirty views."
[view-system]
(let [hints (pop-hints! view-system)]
(mapv #(refresh-view! view-system hints %) (subscribed-views @view-system))
(swap! view-system assoc :last-update (System/currentTimeMillis))))
(defn can-refresh?
[last-update min-refresh-interval]
(> (- (System/currentTimeMillis) last-update) min-refresh-interval))
(defn wait
[last-update min-refresh-interval]
(Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update)))))
(defn start-update-watcher!
"A single threaded view update mechanism."
[view-system min-refresh-interval]
(swap! view-system assoc :last-update 0)
(.start (Thread. (fn [] (let [last-update (:last-update @view-system)]
(if (can-refresh? last-update min-refresh-interval)
(do (refresh-views! view-system) (recur))
(do (wait last-update min-refresh-interval) (recur))))))))
(defn add-hint!
"Add a hint to the system."
[view-system namespace hint]
(swap! view-system update-in [:hints] (fnil conj #{}) {:namespace namespace :hint hint}))
(comment
(defrecord SQLView [id query-fn]
IView
(id [_] id)
(data [_ namespace parameters]
(j/query (db/firm-connection namespace) (hsql/format (apply query-fn parameters))))
(relevant? [_ namespace parameters hints]
(let [tables (query-tables (apply query-fn parameters))]
(boolean (some #(not-empty (intersection % talbes)) hints)))))
(def memory-system (atom {}))
(reset! memory-system {:a {:foo 1 :bar 200 :baz [1 2 3]}
:b {:foo 2 :bar 300 :baz [2 3 4]}})
(defrecord MemoryView [id ks]
IView
(id [_] id)
(data [_ namespace parameters]
(get-in @memory-system (-> [namespace] (into ks) (into parameters))))
(relevant? [_ namespace parameters hints]
(some #(and (= namespace (:namespace %)) (= ks (:hint %))) hints)))
(def view-system
(atom
{:views {:foo (MemoryView. :foo [:foo])
:bar (MemoryView. :bar [:bar])
:baz (MemoryView. :baz [:baz])}
:send-fn (fn [subscriber-key data] (println "sending to:" subscriber-key "data:" data))}))
(subscribe! view-system :a :foo [] 1)
(subscribe! view-system :b :foo [] 2)
(subscribe! view-system :b :baz [] 2)
(subscribed-views @view-system)
(doto view-system
(add-hint! [:foo])
(add-hint! [:baz]))
(refresh-views! view-system)
;; Example of function that updates and hints the view system.
(defn massoc-in!
[memory-system namespace ks v]
(let [ms (swap! memory-system assoc-in (into [namespace] ks) v)]
(add-hint! view-system ks)
ms))
(massoc-in! memory-system :a [:foo] 1)
(massoc-in! memory-system :b [:baz] [2 4 3])
(start-update-watcher! view-system 1000)
)

View file

@ -1,45 +0,0 @@
(ns views.db.checks
(:require
[views.db.honeysql :as vh]
[clojure.set :refer [intersection]]
[clojure.zip :as z]
[zip.visit :as zv]
[honeysql.core :as hsql]))
(defn replace-param-pred
[]
(zv/visitor
:pre [n s]
(if (and (coll? n) (string? (last n)) (= (subs (last n) 0 1) "?"))
{:node true
:state (conj s n)})))
(defn swap-wc-preds
[wc]
(let [root (z/vector-zip wc)]
(zv/visit root nil [(replace-param-pred)])))
(defn swap-preds
[vm]
(let [{:keys [node state]} (swap-wc-preds (:where vm))]
{:q (assoc vm :where node) :p state}))
(defn view-sig->dummy-args
[view-sig]
(map #(str "?" %) (range 0 (count (rest view-sig)))))
(defn view-check
[action dummy-vm]
(let [{:keys [p q]} (swap-preds dummy-vm)]
(-> q
(update-in [:where] #(merge % (:where action)))
(assoc :select (mapv second p)))))
(defn have-overlapping-tables?
"Takes two Honeysql hash-maps, one for action, one for view, and returns
boolean value representing whether or not their set of tables intersect."
[action view refresh?]
(let [a (set (map first (vh/extract-tables action)))]
(if refresh?
(boolean (seq (intersection a (vh/query-tables view))))
(boolean (seq (intersection a (set (map first (vh/extract-tables view)))))))))

View file

@ -1,56 +0,0 @@
(ns views.db.core
(:require
[clojure.java.jdbc :as j]
[clojure.tools.logging :refer [debug]]
[views.db.deltas :as vd]
[views.db.util :refer [with-retry retry-on-transaction-failure]]
[views.subscribed-views :refer [subscribed-views broadcast-deltas persistence]]))
(defmacro with-view-transaction
"Like with-db-transaction, but operates with views. If you want to use a
standard jdbc function, the transcation database map is accessible with
(:db vt) where vt is the bound view transaction."
[binding & forms]
(let [tvar (first binding), vc (second binding)]
`(if (:deltas ~vc) ;; check if we are in a nested transaction
(let [~tvar ~vc] ~@forms)
(let [base-subscribed-views# (:base-subscribed-views ~vc)
deltas# (atom [])
result# (with-retry
(j/with-db-transaction [t# (:db ~vc) :isolation :serializable]
(let [~tvar (assoc ~vc :deltas deltas# :db t#)]
~@forms)))]
(broadcast-deltas base-subscribed-views# @deltas# (:namespace ~vc))
result#))))
(defn vexec!
"Used to perform arbitrary insert/update/delete actions on the database,
while ensuring that view deltas are appropriately checked and calculated
for the currently registered views as reported by a type implementing
the ISubscribedViews protocol.
Arguments are:
- schema: an edl schema (\"(defschema my-schema ...)\")
- db: a clojure.java.jdbc database
- action-map: the HoneySQL map for the insert/update/delete action
- subscribed-views: an implementation of ISubscribedViews implementing
the follow functions:
- subscribed-views takes a ... . It should return
a collection of view-maps.
- broadcast-deltas takes ... ."
[{:keys [db schema base-subscribed-views templates namespace deltas] :as conf} action-map]
(let [subbed-views (subscribed-views base-subscribed-views namespace)
transaction-fn #(vd/do-view-transaction (persistence base-subscribed-views) namespace schema db subbed-views action-map templates)]
(if deltas ;; inside a transaction we just collect deltas and do not retry
(let [{:keys [new-deltas result-set]} (transaction-fn)]
(swap! deltas #(conj % new-deltas))
result-set)
(let [{:keys [new-deltas result-set]} (retry-on-transaction-failure transaction-fn)]
(broadcast-deltas base-subscribed-views [new-deltas] namespace)
result-set))))

View file

@ -1,320 +0,0 @@
(ns views.db.deltas
(:import (java.sql SQLException))
(:require
[clojure.string :refer [split]]
[clojure.java.jdbc :as j]
[clojure.tools.logging :refer [debug error]]
[clojure.core.reducers :as r]
[honeysql.core :as hsql]
[honeysql.helpers :as hh]
[views.db.load :as vdbl]
[views.db.checks :as vc]
[views.db.honeysql :as vh]
[views.db.util :refer [log-exception serialization-error?]]
[views.persistence.core :refer [view-hash update-hash!]]))
;;
;; Terminology and data structures used throughout this code
;;
;; <name>-template - refers to a function which receives parameters
;; and returns a HoneySQL hash-map with params interpolated.
;;
;; action - describes the HoneySQL hash-map for the action to be performed
;; --the template function has already been called and returned this
;; with the appropriate parameter arguments.
;;
;; view-map - contains a set of computed information for each view itself.
;; Refer to the view-map doc-string for more information.
;;
(defn view-map
"Constructs a view map from a HoneySQL view function and its arguments.
Contains four fields:
:view - the hash-map with interpolated parameters
:view-sig - the \"signature\" for the view, i.e. [:matter 1]
:tables - the tables present in all :from, :insert-into,
:update, :delete-from, :join, :left-join :right-join clauses
Input is a view template function and a view signature. The template
function must take the same number of paramters as the signature and
return a honeysql data structure "
[view-template view-sig]
(let [compiled-view (if (> (count view-sig) 1)
(apply view-template (rest view-sig))
(view-template))]
{:view-sig view-sig
:view compiled-view
:refresh-only? (:refresh-only (meta view-template))}))
(defn view-sig->view-map
"Takes a map of sig keys to view template function vars (templates)
and a view signature (view-sig the key for the template map and its args)
and returns a view-map for that view-sig."
[templates view-sig]
(let [lookup (first view-sig)]
(view-map (get-in templates [lookup :fn]) view-sig)))
;; Helpers
(defn get-primary-key
"Get a primary key for a table."
[schema table]
(or
(keyword (get-in schema [(name table) :primary-key :column_name]))
(throw (Exception. (str "Cannot find primary key for table: " table)))))
(defn- create-view-delta-where-clauses
[view-map action]
(let [action-table (first (vh/extract-tables action))
view-tables (vh/extract-tables (:view view-map))]
(for [view-table (vh/find-table-aliases action-table view-tables)]
(-> (:where action)
(vh/prefix-columns (vh/table-alias view-table))
(vh/replace-table (vh/table-alias action-table) (vh/table-alias view-table))))))
(defn format-action-wc-for-view
"Takes view-map and action (HoneySQL hash-map for insert/update/delete),
extracts where clause from action, and formats it with the proper
alias (or no alias) so that it will work when applied to the view SQL."
[view-map action]
(if (:where action)
(let [preds (create-view-delta-where-clauses view-map action)]
(if (> (count preds) 1)
(into [:or] preds)
(first preds)))))
;; DELTA CALCULATIONS
(defn- calculate-delete-deltas
[db view-map]
(try
(->> (:delete-deltas-map view-map)
hsql/format
(j/query db)
(assoc view-map :delete-deltas))
(catch Exception e
(error "computing delete deltas for" view-map)
(log-exception e)
(throw e))))
(defn compute-delete-deltas-for-insert
"Computes and returns a sequence of delete deltas for a single view and insert."
[schema db view-map table record]
(if (vh/outer-join-table? (:view view-map) table)
(let [delta-q (vh/outer-join-delta-query schema (:view view-map) table record)]
(j/query db (hsql/format delta-q)))
[]))
(defn primary-key-predicate
"Return a predicate for a where clause that constrains to the primary key of
the record."
[schema table record]
(let [pkey (get-primary-key schema table)]
[:= pkey (pkey record)]))
(defn compute-insert-deltas-for-insert
[schema db view-map table record]
(let [pkey-pred (primary-key-predicate schema table record)
action (hsql/build :insert-into table :values [record] :where pkey-pred)
insert-delta-wc (format-action-wc-for-view view-map action)
view (:view view-map)
insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))]
(j/query db (hsql/format insert-delta-map))))
(defn compute-insert-delete-deltas-for-views
[schema db views table record]
(mapv #(compute-delete-deltas-for-insert schema db % table record) views))
(defn compute-insert-insert-deltas-for-views
[schema db views table record]
(mapv #(compute-insert-deltas-for-insert schema db % table record) views))
(defn compute-deltas-for-insert
"This takes a *single* insert and a view, applies the insert and computes
the view deltas."
[schema db views table record]
(let [deletes (compute-insert-delete-deltas-for-views schema db views table record)
record* (first (j/insert! db table record))
inserts (compute-insert-insert-deltas-for-views schema db views table record*)]
{:views-with-deltas (doall (map #(assoc %1 :delete-deltas %2 :insert-deltas %3) views deletes inserts))
:result record*}))
(defn- insert-and-append-deltas!
"Handles insert and calculation of insert (after insert) delta."
[schema db views action table pkey]
(let [table (:insert-into action)]
(reduce
#(-> %1
(update-in [:views-with-deltas] into (:views-with-deltas %2))
(update-in [:result-set] conj (:result %2)))
{:views-with-deltas [] :result-set []}
(map #(compute-deltas-for-insert schema db views table %) (:values action)))))
(defn- calculate-insert-deltas
"This is for insert deltas for non-insert updates.
Takes the HoneySQL map (at key :view) from the view-map and appends
the appropriately-table-namespaced where clause which limits the
view query to the previously inserted or updated records."
[db action pkey-wc view-map]
(let [action (assoc action :where pkey-wc)
insert-delta-wc (format-action-wc-for-view view-map action)
view (:view view-map)
insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))
deltas (j/query db (hsql/format insert-delta-map))]
(if (seq deltas)
(update-in view-map [:insert-deltas] #(apply conj % deltas))
view-map)))
(defn- get-action-row-key
"Helper to query the action's table for primary key and pull it out."
[db pkey table action]
(->> (:where action)
(hsql/build :select pkey :from table :where)
hsql/format
(j/query db)
first pkey))
(defn- update-and-append-deltas!
"Handles update and calculation of delete (before update) and insert (after update) deltas."
[db views action table pkey]
(let [views-pre (mapv #(calculate-delete-deltas db %) views)
pkey-val (get-action-row-key db pkey table action)
update (j/execute! db (hsql/format action))]
{:views-with-deltas (mapv #(calculate-insert-deltas db action [:= pkey pkey-val] %) views-pre)
:result-set update}))
(defn- delete-and-append-deltas!
"Handles deletion and calculation of delete (before update) delta."
[db views action table pkey]
(let [views-pre (mapv #(calculate-delete-deltas db %) views)]
{:views-with-deltas views-pre
:result-set (j/execute! db (hsql/format action))}))
(defn perform-action-and-return-deltas
"Identifies which action--insert, update or delete--we are performing and dispatches appropriately.
Returns view-map with appropriate deltas appended."
[schema db views action table pkey]
(cond
(:insert-into action) (insert-and-append-deltas! schema db views action table pkey)
(:update action) (update-and-append-deltas! db views action table pkey)
(:delete-from action) (delete-and-append-deltas! db views action table pkey)
:else (throw (Exception. "Received malformed action: " action))))
(defn generate-view-delta-map
"Adds a HoneySQL hash-map for the delta-calculation specific to the view + action.
Takes a view-map and the action HoneySQL hash-map, and appends the action's
where clause to the view's where clause, and adds in new field :insert-deltas-map."
[view-map action]
(let [action-wc (format-action-wc-for-view view-map action)
view (:view view-map)]
(->> (update-in view [:where] #(:where (vh/merge-where-clauses action-wc %)))
(assoc view-map :delete-deltas-map))))
(defn update-deltas-with-refresh-set
[refresh-set]
(fn [view-deltas]
(if (coll? view-deltas)
(mapv #(merge % refresh-set) view-deltas)
[refresh-set])))
(defn calculate-refresh-sets*
"Compute all the refresh views in parallel."
[db templates refresh-only-views]
(let [reducefn (fn ([] [])
([a b]
(try
(conj a {:view-sig (:view-sig b)
:refresh-set (get (vdbl/initial-view db (:view-sig b) templates (:view b)) (:view-sig b))})
(catch SQLException e
(conj a {:view-sig (:view-sig b) :sql-exception e}))
(catch Exception e
(conj a {:view-sig (:view-sig b) :exception e})))))]
(r/fold 1 concat reducefn refresh-only-views)))
(defn filter-by-hash
"Remove any views with hashes that haven't changed, else update the hash and keep the view."
[persistence namespace refresh-sets]
(filterv
(fn [rs]
(let [hash-value (hash (:refresh-set rs))]
(if (not= (view-hash persistence namespace (:view-sig rs)) hash-value)
(do (update-hash! persistence namespace (:view-sig rs) hash-value) true))))
refresh-sets))
(defn first-serialization-error
[refresh-sets]
(some #(if-let [e (:sql-exception %)] (and (serialization-error? e) e)) refresh-sets))
(defn some-exception
[rs]
(or (:sql-exception rs) (:exception rs)))
;; Exceedingly ugly code.
(defn look-for-exceptions
"Because exceptions in threads get buried, we have to look through the results to check for
serialization errors or other exceptions. We must bubble any serialization exception up."
[refresh-sets]
(if (some #(some-exception %) refresh-sets)
(if-let [e (first-serialization-error refresh-sets)]
(throw e)
(do
(doseq [rs refresh-sets :when (some-exception rs)]
(error "error computing refresh-set for" (:view-sig rs))
(when-let [e (some-exception rs)] (log-exception e)))
(throw (some some-exception refresh-sets))))
refresh-sets))
(defn calculate-refresh-sets
"For refresh-only views, calculates the refresh-set and adds it to the view's delta update collection."
[persistence namespace deltas db templates refresh-only-views]
(let [refresh-sets (->> (calculate-refresh-sets* db templates refresh-only-views)
look-for-exceptions
(filter-by-hash persistence namespace))]
(reduce
(fn [d rs] (update-in d [(:view-sig rs)] (update-deltas-with-refresh-set rs)))
deltas
refresh-sets)))
(defn format-deltas
"Removes extraneous data from view delta response collections.
TODO: Is there only one delta pack per view-sig here?"
[views-with-deltas]
(reduce #(update-in %1 [(:view-sig %2)] (fnil conj []) (select-keys %2 [:delete-deltas :insert-deltas :refresh-set]))
{} views-with-deltas))
(defn do-view-transaction
"Takes the following arguments:
schema - from edl.core/defschema
db - clojure.java.jdbc database connection
all-views - the current set of views (view-maps--see view-map fn docstring for
description) in memory for the database
action - the HoneySQL pre-SQL hash-map with parameters already interpolated.
templates - the mapping of view names (keywords) to SQL templates
(a.k.a. HoneySQL hash-map producing functions)
The function will then perform the following sequence of actions, all run
within a transaction (with isolation serializable):
1) Create pre-check SQL for each view in the list.
2) Run the pre-check SQL (or fail out based on some simple heuristics) to
identify if we want to send delta messages to the view's subscribers
(Note: this happens after the database action for *inserts only*).
3) Run the database action (insert/action/delete).
4) Calculate deltas based on the method described in section 5.4, \"Rule Generation\"
of the paper \"Deriving Production Rules for Incremental Rule Maintenance\"
by Stefano Ceri and Jennifer Widom (http://ilpubs.stanford.edu:8090/8/1/1991-4.pdf)
The function returns a hash-map with :result-set and :new-deltas collection values.
:new-deltas contains :insert-deltas, :delete-deltas, and :refresh-set values, as well
as the original :view-sig the deltas apply to."
[persistence namespace schema db all-views action templates]
(j/with-db-transaction [t db :isolation :serializable]
(let [filtered-views (filterv #(vc/have-overlapping-tables? action (:view %) (:refresh-only? %)) all-views)
{full-refresh-views true normal-views nil} (group-by :refresh-only? filtered-views)
need-deltas (map #(generate-view-delta-map % action) normal-views)
table (-> action vh/extract-tables ffirst)
pkey (get-primary-key schema table)
{:keys [views-with-deltas result-set]} (perform-action-and-return-deltas schema t need-deltas action table pkey)
deltas (calculate-refresh-sets persistence namespace (format-deltas views-with-deltas) t templates full-refresh-views)]
{:new-deltas deltas :result-set result-set})))

View file

@ -1,215 +0,0 @@
(ns views.db.honeysql
(:require
[honeysql.core :as hsql]
[honeysql.helpers :as hh]
[clojure.string :refer [split]]))
(def table-clauses
[:from :insert-into :update :delete-from :join :left-join :right-join])
;; This list doesn't support custom operators. Is there something else we can do?
(def pred-ops
#{:= :< :> :<> :>= :<= :in :between :match :ltree-match :and :or :not= :like :xor :regexp :not-in :not-like
:!= :is :is-not})
(defn process-complex-clause
[tables clause]
(reduce
#(if (coll? %2)
(if (some pred-ops [(first %2)])
%1
(conj %1 %2))
(conj %1 [%2]))
tables
clause))
(defn extract-tables*
[tables clause]
(if clause
(if (coll? clause)
(process-complex-clause tables clause)
(conj tables [clause]))
tables))
(defn extract-tables
"Extracts a set of table vector from a HoneySQL spec hash-map.
Each vector either contains a single table keyword, or the
table keyword and an alias keyword."
([hh-spec] (extract-tables hh-spec table-clauses))
([hh-spec clauses] (reduce #(extract-tables* %1 (%2 hh-spec)) #{} clauses)))
;; The following is used for full refresh views where we can have CTEs and
;; subselects in play.
(declare query-tables)
(defn cte-tables
[query]
(mapcat #(query-tables (second %)) (:with query)))
(defn isolate-tables
"Isolates tables from table definitions in from and join clauses."
[c]
(if (keyword? c) [c] (let [v (first c)] (if (map? v) (query-tables v) [v]))))
(defn from-tables
[query]
(mapcat isolate-tables (:from query)))
(defn every-second
[coll]
(map first (partition 2 coll)))
(defn join-tables
[query k]
(mapcat isolate-tables (every-second (k query))))
(defn collect-maps
[wc]
(cond
(coll? wc) (let [maps (filterv map? wc)
colls (filter #(and (coll? %) (not (map? %))) wc)]
(into maps (mapcat collect-maps colls)))
(map? wc) [wc]
:else []))
(defn where-tables
"This search for subqueries in the where clause."
[query]
(mapcat query-tables (collect-maps (:where query))))
(defn insert-tables
[query]
(if-let [v (:insert-into query)] [v] []))
(defn update-tables
[query]
(if-let [v (:update query)] [v] []))
(defn delete-tables
[query]
(if-let [v (:delete-from query)] [v] []))
(defn query-tables
[query]
(set (concat
(cte-tables query)
(from-tables query)
(join-tables query :join)
(join-tables query :left-join)
(join-tables query :right-join)
(where-tables query)
(insert-tables query)
(update-tables query)
(delete-tables query))))
(defn with-op
"Takes a collection of things and returns either an nary op of them, or
the item in the collection if there is only one."
[op coll]
(if (> (count coll) 1) (into [op] coll) (first coll)))
(defn find-table-aliases
"Returns the table alias for the supplied table."
[action-table tables]
(filter #(= (first action-table) (first %)) tables))
(defn outer-join-table?
"Return true if table is used in an outer join in the honeysql expression."
[hh-spec table]
(let [tables (map first (extract-tables hh-spec [:left-join :right-join]))]
(boolean (some #(= table %) tables))))
(defn prefix-column
"Prefixes a column with an alias."
[column alias]
(keyword (str (name alias) \. (name column))))
(defn create-null-constraints
"Create 'is null' constraints for all the columns of a table."
[schema table table-alias]
(let [columns (map #(prefix-column % table-alias) (keys (:columns (get schema (name table)))))]
(into [:and] (for [c columns] [:= c nil]))))
(defn table-alias
"Returns the name of a table or its alias. E.g. for [:table :t] returns :t."
[table]
(if (keyword? table) table (last table)))
(defn table-name
"Returns the name of a table . E.g. for [:table :t] returns :table."
[table]
(if (keyword? table) table (first table)))
(defn table-column
"Assumes that table columns are keywords of the form :table.column. Returns
the column as a keyword or nil if the supplied keyword doesn't match the pattern."
[table item]
(let [s (name item), t (str (name table) \.)]
(if (.startsWith s t) (keyword (subs s (count t))))))
(defn modified-outer-join-predicate
"Returns an outer join predicate with the join tables columns subistituted
with values from a record."
[table predicate record]
(if-let [column (and (keyword? predicate) (table-column table predicate))]
(or (get record column)
(throw (Exception. (str "No value for column " column " in " record))))
(if (vector? predicate)
(apply vector (map #(modified-outer-join-predicate table % record) predicate))
predicate)))
(defn find-outer-joins
"Find and return all the outer joins on a given table."
[hh-spec table]
(->> (concat (:left-join hh-spec) (:right-join hh-spec))
(partition 2 2)
(filter #(= table (table-name (first %))))))
(defn- create-outer-join-predicates
"Create outer join predicate from a record and joins."
[schema table record joins]
(->> joins
(map (fn [[table-spec join-pred]]
[:and
(modified-outer-join-predicate (table-alias table-spec) join-pred record)
(create-null-constraints schema table (table-alias table-spec))]))
(with-op :or)))
(defn outer-join-delta-query
"Create an outer join delta query given a honeysql template and record"
[schema hh-spec table record]
(let [join-tables (find-outer-joins hh-spec table)
join-pred (create-outer-join-predicates schema table record join-tables)]
(assert (not (nil? join-pred)))
(update-in hh-spec [:where] #(vector :and % join-pred))))
(defn merge-where-clauses
"Takes two where clauses from two different HoneySQL maps and joins then with an and.
If one is nil, returns only the non-nil where clause."
[wc1 wc2]
(if (and wc1 wc2)
(hh/where wc1 wc2)
(hh/where (or wc1 wc2))))
(defn replace-table
"Replace all instances of table name t1 pred with t2."
[pred t1 t2]
(if-let [column (and (keyword? pred) (table-column t1 pred))]
(keyword (str (name t2) \. (name column)))
(if (coll? pred)
(map #(replace-table % t1 t2) pred)
pred)))
(defn unprefixed-column?
[c]
(and (keyword? c) (not (pred-ops c)) (neg? (.indexOf (name c) (int \.)))))
(defn prefix-columns
"Prefix all unprefixed columns with table."
[pred table]
(if (unprefixed-column? pred)
(keyword (str (name table) \. (name pred)))
(if (coll? pred)
(map #(prefix-columns % table) pred)
pred)))

View file

@ -1,31 +0,0 @@
(ns views.db.load
(:import
[java.sql SQLException])
(:require
[clojure.tools.logging :refer [debug info warn error]]
[clojure.java.jdbc :as j]
[honeysql.core :as hsql]
[views.db.util :refer [log-exception serialization-error?]]))
(defn view-query
"Takes db and query-fn (compiled HoneySQL hash-map)
and runs the query, returning results."
[db query-map]
(j/query db (hsql/format query-map)))
(defn post-process-result-set
[view-sig templates result-set]
(if-let [post-fn (get-in templates [(first view-sig) :post-fn])]
(mapv post-fn result-set)
result-set))
(defn initial-view
"Takes a db spec, the new views sigs (new-views) we want to produce result-sets for,
the template config map, and the view-map itself.
and returns a result-set for the new-views with post-fn functions applied to the data."
[db new-view templates view-map]
(->> view-map
(view-query db)
(into [])
(post-process-result-set new-view templates)
(hash-map new-view)))

View file

@ -1,55 +0,0 @@
(ns views.db.util
(:import
[java.sql SQLException])
(:require
[clojure.stacktrace :refer [print-stack-trace]]
[clojure.tools.logging :refer [debug error]]))
;; Need to catch this and retry:
;; java.sql.SQLException: ERROR: could not serialize access due to concurrent update
;;
(defn get-nested-exceptions*
[exceptions ^SQLException e]
(if-let [next-e (.getNextException e)]
(recur (conj exceptions next-e) next-e)
exceptions))
(defn get-nested-exceptions
"Return the current exception and all nested exceptions as a vector."
[e]
(get-nested-exceptions* [e] e))
(defn serialization-error?
"True if e is a serialization error."
[^SQLException e]
(boolean (some #(= (.getSQLState ^SQLException %) "40001") (get-nested-exceptions e))))
;; TODO: update to avoid stack overflow.
(defn retry-on-transaction-failure
"Retry a function whenever we receive a transaction failure."
[transaction-fn]
(try
(transaction-fn)
(catch SQLException e
;; http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
(debug "Caught exception with error code: " (.getSQLState e))
(debug "Exception message: " (.getMessage e))
;; (debug "stack trace message: " (.printStackTrace e))
(if (serialization-error? e)
(retry-on-transaction-failure transaction-fn) ;; try it again
(throw e))))) ;; otherwise rethrow
(defmacro with-retry
"Retry a transaction forever."
[ & body]
`(let [tfn# (fn [] ~@body)]
(retry-on-transaction-failure tfn#)))
(defn log-exception
[^Exception e]
(error "views internal"
(str
"e: " e
" msg: " (.getMessage e)
" trace: " (with-out-str (print-stack-trace e)))))

View file

@ -1,36 +0,0 @@
(ns views.filters
(:require
[clojure.tools.logging :refer [debug info warn error]]))
(defn view-filter
"Takes a subscription request msg, a collection of view-sigs and
the config templates hash-map for an app. Checks if there is
a global filter-fn in the hash-map metadata and checks against
that if it exists, as well as against any existing filter
functions for individual template config entries. Template
config hash-map entries can specify a filter-fn using the key
:filter-fn, and the global filter-fn is the same, only on
the config meta-data (i.e. (with-meta templates {:filter-fn ...}))
By default throws an exception if no filters are present.
By passing in {:unsafe true} in opts, this can be overridden."
[msg view-sigs templates & opts]
(let [global-filter-fn (:filter-fn (meta templates))]
(filterv
#(let [filter-fn (:filter-fn (get templates (first %)))]
(cond
(and filter-fn global-filter-fn)
(and (global-filter-fn msg %) (filter-fn msg %))
filter-fn
(filter-fn msg %)
global-filter-fn
(global-filter-fn msg %)
:else
(if (-> opts first :unsafe?)
(do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %)
true)
(throw (Exception. (str "No filter set for view " %))))))
view-sigs)))

View file

@ -0,0 +1,70 @@
(ns views.honey-sql.util
(:require
[honeysql.core :as hsql]
[honeysql.helpers :as hh]
[clojure.string :refer [split]]))
;; The following is used for full refresh views where we can have CTEs and
;; subselects in play.
(declare query-tables)
(defn cte-tables
[query]
(mapcat #(query-tables (second %)) (:with query)))
(defn isolate-tables
"Isolates tables from table definitions in from and join clauses."
[c]
(if (keyword? c) [c] (let [v (first c)] (if (map? v) (query-tables v) [v]))))
(defn from-tables
[query]
(mapcat isolate-tables (:from query)))
(defn every-second
[coll]
(map first (partition 2 coll)))
(defn join-tables
[query k]
(mapcat isolate-tables (every-second (k query))))
(defn collect-maps
[wc]
(cond
(coll? wc) (let [maps (filterv map? wc)
colls (filter #(and (coll? %) (not (map? %))) wc)]
(into maps (mapcat collect-maps colls)))
(map? wc) [wc]
:else []))
(defn where-tables
"This search for subqueries in the where clause."
[query]
(mapcat query-tables (collect-maps (:where query))))
(defn insert-tables
[query]
(if-let [v (:insert-into query)] [v] []))
(defn update-tables
[query]
(if-let [v (:update query)] [v] []))
(defn delete-tables
[query]
(if-let [v (:delete-from query)] [v] []))
(defn query-tables
"Return all the tables in an sql statement."
[query]
(set (concat
(cte-tables query)
(from-tables query)
(join-tables query :join)
(join-tables query :left-join)
(join-tables query :right-join)
(where-tables query)
(insert-tables query)
(update-tables query)
(delete-tables query))))

View file

@ -1,31 +0,0 @@
(ns views.persistence.core)
(defprotocol IPersistence
(subscribe! [this templates namespace view-sig subscriber-key]
"Subscribes a subscriber with subscriber-key to a view with signature
view-sig. Templates is a map of all defined view templates and db
is a jdbc transcation handle for the database from which initial
view data will be retrieved.
This function must return the view-data for the subscribed view.")
(unsubscribe! [this namespace view-sig subscriber-key]
"Unsubscribes a subscriber with key 'subscriber-key' from the view
with signature 'view-sig' in namespace 'namespace'.")
(unsubscribe-all! [this namespace subscriber-key]
"Unsubscribes the subscriber with key 'subscriber-key' from ALL views
in namespace 'namespace'.")
(update-hash! [this namespace view-sig hash-value]
"Updates the data has for the view.")
(view-hash [this namespace view-sig]
"Returns the latest data has for the view.")
(view-data [this namespace table-name]
"Return all the view data that references a table name in a namespace.")
(subscriptions [this namespace signatures]
"Return all subscribers for all signatures in the list 'signatures' in
a namespace."))

View file

@ -1,70 +0,0 @@
(ns views.persistence.memory
(:require
[views.persistence.core :refer :all]
[views.db.deltas :as vd]))
(defn ns-subscribe!
"Subscribe to a view inside a namespace."
[namespace-views view-sig templates subscriber-key]
(-> namespace-views
(update-in [view-sig :subscriptions] (fnil conj #{}) subscriber-key)
(assoc-in [view-sig :view-data] (vd/view-map (get-in templates [(first view-sig) :fn]) view-sig))))
(defn ns-unsubscribe!
"Unsubscribe from a view inside a namespace. If there are no more subscribers,
we remove the view itself as well."
[namespace-views view-sig subscriber-key]
(let [path [view-sig :subscriptions]
updated (update-in namespace-views path disj subscriber-key)]
(if (seq (get-in updated path))
updated
(dissoc updated view-sig))))
(defn ns-unsubscribe-all!
"Unsubscribe a subscriber from all views in a namespace."
[namespace-views subscriber-key]
(reduce #(ns-unsubscribe! %1 %2 subscriber-key) namespace-views (keys namespace-views)))
(defn ns-subscriptions
"Find subscribers for a signature and add to a map."
[namespace-views result-map sig]
(if-let [subscribers (get-in namespace-views [sig :subscriptions])]
(assoc result-map sig subscribers)
result-map))
(deftype ViewsMemoryPersistence [subbed-views]
IPersistence
(subscribe!
[this templates namespace view-sig subscriber-key]
(let [sv (swap! subbed-views (fn [sv] (update-in sv [namespace] ns-subscribe! view-sig templates subscriber-key)))]
(get-in sv [namespace view-sig :view-data])))
(unsubscribe!
[this namespace view-sig subscriber-key]
(swap! subbed-views
(fn [sv] (update-in sv [namespace] ns-unsubscribe! view-sig subscriber-key))))
(unsubscribe-all!
[this namespace subscriber-key ]
(swap! subbed-views
(fn [sv] (update-in sv [namespace] ns-unsubscribe-all! subscriber-key))))
(update-hash!
[this namespace view-sig hash-value]
(swap! subbed-views assoc-in [namespace view-sig :hash] hash-value))
(view-hash
[this namespace view-sig]
(get-in @subbed-views [namespace view-sig :hash]))
(view-data [this namespace table]
;; We don't yet use table name as an optimization here.
(map :view-data (vals (get @subbed-views namespace))))
(subscriptions [this namespace signatures]
(let [namespace-views (get @subbed-views namespace)]
(reduce #(ns-subscriptions namespace-views %1 %2) {} signatures))))
(defn new-memory-persistence
[]
(->ViewsMemoryPersistence (atom {})))

14
src/views/protocols.clj Normal file
View file

@ -0,0 +1,14 @@
(ns views.protocols)
(defprotocol IView
(data [this namespace parameters]
"Returns view data.")
(relevant? [this namespace parameters hints]
"Given hints of the form {:namespace x :hint y}, the view must
return true if the hint indicates that an instance of this view
with supplied namespace and parameters might require updating.
It is always safe to return true, but false sure be returned only
if you are sure this view does not need updating.")
(id [this]
"A unique identifer for a view."))

View file

@ -1,51 +0,0 @@
(ns views.router
(:require
[views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect]]
[clojure.core.async :refer [go go-loop chan pub sub unsub close! >! >!! <! <!! filter<]]
[clojure.stacktrace :refer [print-stack-trace]]
[clojure.tools.logging :refer [error debug]]))
(defn log-exception
"Takes a string and exception and logs it to error."
[s e]
(error s e (.getMessage e) (print-stack-trace e)))
(defn handle-subscriptions!
[subscribed-views subscriptions]
(go (while true
(try
(let [sub (<! subscriptions)]
(debug "Subscribing (in router): " sub)
(subscribe-views subscribed-views sub))
(catch Exception e (log-exception "when subscribing" e))))))
(defn handle-unsubscriptions!
[subscribed-views unsubscriptions]
(go (while true
(try
(let [unsub (<! unsubscriptions)]
(debug "Unsubscribing (in router): " unsub)
(unsubscribe-views subscribed-views unsub))
(catch Exception e (log-exception "when unsubscribing" e))))))
(defn handle-disconnects!
[subscribed-views disconnects]
(go (while true
(try
(let [disc (<! disconnects)]
(debug "Disconnecting (in router): " disc)
(disconnect subscribed-views disc))
(catch Exception e (log-exception "disconnect" e))))))
(defn init!
[{:keys [base-subscribed-views] :as conf} client-chan]
(let [subs (chan 200)
unsubs (chan 200)
control (chan 200)
disconnects (filter< #(= :disconnect (:body %)) control)]
(sub client-chan :views.subscribe subs)
(sub client-chan :views.unsubscribe unsubs)
(sub client-chan :client-channel disconnects)
(handle-subscriptions! base-subscribed-views subs)
(handle-unsubscriptions! base-subscribed-views unsubs)
(handle-disconnects! base-subscribed-views disconnects)))

View file

@ -1,12 +0,0 @@
(ns views.subscribed-views)
(defprotocol ISubscribedViews
;; Subscription and Delta routing
(subscribe-views [this sub-request])
(unsubscribe-views [this unsub-request])
(disconnect [this disconnect-request])
;; DB interaction
(persistence [this])
(subscribed-views [this namespace])
(broadcast-deltas [this deltas namespace]))

View file

@ -1,146 +0,0 @@
(ns views.base-subscribed-views-test
(:require
[views.base-subscribed-views :as bsv]
[views.persistence.core :refer :all]
[views.persistence.memory :refer [new-memory-persistence]]
[views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect broadcast-deltas]]
[views.fixtures :as vf]
[clojure.test :refer [use-fixtures deftest is]]
[clojure.java.jdbc :as j]
[clj-logging-config.log4j :refer [set-logger! set-loggers!]])
(:import
[views.base_subscribed_views BaseSubscribedViews]))
(set-loggers!
"views.base-subscribed-views" {:level :error}
"views.filters" {:level :error})
(defn view-config []
{:persistence (new-memory-persistence)
:db vf/db
:templates vf/templates
:view-sig-fn :views
:unsafe? true})
(deftest subscribes-and-dispatches-initial-view-result-set
(let [config (view-config)
sent (atom #{})
send-fn #(do (is (and (= %1 1) (= %2 :views.init) (= %3 {[:users] []})))
(swap! sent conj [%1 %2 %3]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn))]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(Thread/sleep 10)
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users]])
{[:users] #{1}}))
;; Verify sends occured.
(is (= #{[1 :views.init {[:users] []}]} @sent))))
;; This test illustrates a slight timing issue. Because view subscriptions
;; use threads, an unsubscription that follows a subscription too closely
;; can fail.
(deftest unsubscribes-view
(let [config (view-config)
base-subbed-views (BaseSubscribedViews. config)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(Thread/sleep 10)
(unsubscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users]])
{}))))
(deftest filters-subscription-requests
(let [config (view-config)
templates (assoc-in vf/templates [:users :filter-fn]
(fn [msg _] (:authorized? msg)))
view-config (-> config (assoc :templates templates) (dissoc :unsafe?))
base-subbed-views (BaseSubscribedViews. view-config)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]})
(Thread/sleep 10)
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users]])
{}))))
(deftest removes-all-subscriptions-on-disconnect
(let [config (view-config)
base-subbed-views (BaseSubscribedViews. config)]
(subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users] [:user-posts 1]]})
(Thread/sleep 10)
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users] [:user-posts 1]])
{[:users] #{1}, [:user-posts 1] #{1}}))
(disconnect base-subbed-views {:subscriber-key 1})
(is (= (subscriptions (:persistence config) bsv/default-ns [[:users] [:user-posts 1]])
{}))))
;; (deftest sends-deltas
;; (let [deltas {[:users] [{:view-sig [:users] :insert-deltas [{:foo "bar"}]}]}
;; sent-delta {[:users] {:insert-deltas [{:foo "bar"}]}}
;; send-fn #(do (is (#{1 2} %1))
;; (is (= %2 :views.deltas))
;; (is (= %3 sent-delta)))
;; base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn))]
;; (add-subscription! [:users] vf/templates 1 default-ns)
;; (add-subscription! [:users] vf/templates 2 default-ns)
;; (broadcast-deltas base-subbed-views deltas nil)))
(deftest sends-deltas-in-batch
(let [config (view-config)
deltas [{[:users] [{:insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
{[:users] [{:insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
;; This is just more obvious than writing some convulated fn to dig out the view-sigs.
sent-deltas [{[:users] [{:insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
{[:users] [{:insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
sent (atom #{})
send-fn #(do (is (#{1 2} %1))
(is (= :views.deltas %2))
(is (= sent-deltas %3))
(swap! sent conj [%1 %2 %3]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn))]
(subscribe! (:persistence config) vf/templates bsv/default-ns [:users] 1)
(broadcast-deltas base-subbed-views deltas nil)
(is (= 1 (count @sent)))
(is (= 1 (ffirst @sent)))
(is (= :views.deltas (second (first @sent))))
(is (= sent-deltas (nth (first @sent) 2)))))
(deftest deltas-are-post-processed
(let [config (view-config)
templates (assoc-in vf/templates [:users :post-fn] (fn [d] (update-in d [:id] #(Integer. %))))
deltas [{[:users] [{:insert-deltas [{:id "1" :name "Bob"}]}]}]
sent-deltas [{[:users] [{:insert-deltas [{:id "1" :name "Bob"}]}]}]
sent (atom #{})
send-fn (fn [a b deltas-out]
(is (= (:id (first (:insert-deltas (first (get (first deltas-out) [:users])))))
1))
(swap! sent conj [a b deltas-out]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn :templates templates))]
(subscribe! (:persistence config) vf/templates bsv/default-ns [:users] 1)
(Thread/sleep 10)
(broadcast-deltas base-subbed-views deltas nil)
(is (= 1 (count @sent)))
(is (= 1 (ffirst @sent)))
(is (= :views.deltas (second (first @sent))))
(is (not= sent-deltas (nth (first @sent) 2)))
(is (= [{[:users] [{:insert-deltas [{:name "Bob", :id 1}]}]}] (nth (first @sent) 2)))))
;; These tests are now broken because we post-process full refresh queries right
;; when they come out of the database. Need an actual database to test this now.
;; TODO: fix test.
(deftest full-refresh-deltas-are-post-processed
(let [config (view-config)
templates (assoc-in vf/templates [:users :post-fn] (fn [d] (update-in d [:id] #(Integer. %))))
deltas [{[:users] [{:refresh-set [{:id "1" :name "Bob"}]}]}]
sent-deltas [{[:users] [{:refresh-set [{:id "1" :name "Bob"}]}]}]
sent (atom #{})
send-fn (fn [a b deltas-out]
#_(is (= (:id (first (:refresh-set (first (get (first deltas-out) [:users])))))
1))
(swap! sent conj [a b deltas-out]))
base-subbed-views (BaseSubscribedViews. (assoc config :send-fn send-fn :templates templates))]
(subscribe! (:persistence config) vf/templates bsv/default-ns [:users] 1)
(Thread/sleep 10)
(broadcast-deltas base-subbed-views deltas nil)
(is (= 1 (count @sent)))
(is (= 1 (ffirst @sent)))
(is (= :views.deltas (second (first @sent))))
#_(is (not= sent-deltas (nth (first @sent) 2)))
#_(is (= [{[:users] [{:refresh-set [{:name "Bob", :id 1}]}]}] (nth (first @sent) 2)))))

View file

@ -1,15 +0,0 @@
(ns views.core-test
(:require
[clojure.test :refer [use-fixtures deftest is]]
[edl.core :refer [defschema]]
[views.fixtures :as vf]
[views.subscribed-views :as vs]
[views.core :refer [config]]))
(defschema schema vf/db "public")
#_(deftest configures-views
(let [conf (config {:db vf/db :schema schema :templates vf/templates :unsafe? true})]
;; wtf is this false?! AKH: there is some sort of recursive referencing going on
;; in the thing being compared to.
(is (satisfies? views.subscribed-views/ISubscribedViews (:subscribed-views conf)))))

View file

@ -1,23 +0,0 @@
(ns views.db.checks-test
(:require
[clojure.test :refer [deftest is run-tests]]
[honeysql.core :as hsql]
[honeysql.helpers :as hh]
[views.fixtures :as vf]
[views.db.checks :as vc]))
(defn view [a b] (hsql/build :select [:c :d :f] :from {:foo :f} :where [:and [:and [:= :a a] [:= :b b]]]))
(deftest swaps-predicates-and-extracts-clauses
(let [{:keys [p q]} (vc/swap-preds (view "?1" "?2"))
swapped {:where [:and [:and true true]], :from {:foo :f}, :select [:c :d :f]}]
(is (= (set p) #{[:= :a "?1"] [:= :b "?2"]}))
(is (= (:where q) (:where swapped)))))
(deftest constructs-view-check
(let [dummy-vm (apply view (vc/view-sig->dummy-args [:view 1 2]))
update (hsql/build :update :foo :set {:d "d"} :where [:= :c "c"])
check (hsql/build :select [:a :b] :from :foo :where [:and [:and true true] [:= :c "c"]])
calcc (vc/view-check update dummy-vm)] ;;view )]
(is (= (into #{} (:select check)) (into #{} (:select calcc))))
(is (= (:where check) (:where calcc)))))

View file

@ -1,68 +0,0 @@
(ns views.db.core-test
(:require
[clojure.test :refer [use-fixtures deftest is]]
[views.persistence.core :as persist]
[views.persistence.memory :refer [new-memory-persistence]]
[views.base-subscribed-views :refer [default-ns]]
[views.subscribed-views :refer [ISubscribedViews]]
[views.fixtures :as vf :refer [vschema sql-ts]]
[views.db.core :as vdb]))
(def received-deltas (atom nil))
(def memory (atom (new-memory-persistence)))
;; Very barebones subscribed-views instance which merely satisfies what vexec! needs:
(deftype TestSubscribedViews []
ISubscribedViews
(subscribed-views [this namespace]
(persist/view-data @memory default-ns nil))
(persistence [this] @memory)
(broadcast-deltas [this new-deltas namespace]
(reset! received-deltas new-deltas)))
(def test-subscribed-views (TestSubscribedViews.))
(def test-config {:db vf/db :schema vschema :templates vf/templates :base-subscribed-views test-subscribed-views})
(defn reset-fixtures!
[f]
(reset! memory (new-memory-persistence))
(reset! received-deltas {})
(f))
(use-fixtures :each vf/with-user-fixture! (vf/database-fixtures! [:posts :comments]) reset-fixtures!)
(use-fixtures :once (vf/database-fixtures! [:users]))
(deftest vexec-sends-deltas
(let [view-sig [:user-posts (:id @vf/user-fixture)]
sub-to-it (persist/subscribe! @memory vf/templates default-ns view-sig (:id @vf/user-fixture))
posted (first (vdb/vexec! test-config (vf/insert-post-tmpl (:id @vf/user-fixture) "title" "body")))
delta-vs (ffirst (first @received-deltas))
insert-delta (-> @received-deltas ffirst second first :insert-deltas first)]
(is (= (ffirst (first @received-deltas)) view-sig))
(is (= (:name insert-delta) (:name @vf/user-fixture)))
(is (= (:body insert-delta) (:body posted)))
(is (= (:title insert-delta) (:title posted)))))
(deftest with-view-transaction-sends-deltas
(let [view-sig [:user-posts (:id @vf/user-fixture)]
sub-to-it (persist/subscribe! @memory vf/templates default-ns view-sig (:id @vf/user-fixture))
posted (first (vdb/with-view-transaction
[tc test-config]
(vdb/vexec! tc (vf/insert-post-tmpl (:id @vf/user-fixture) "title" "body"))))
delta-vs (ffirst (first @received-deltas))
insert-delta (-> @received-deltas ffirst second first :insert-deltas first)]
(is (= (ffirst (first @received-deltas)) view-sig))
(is (= (:name insert-delta) (:name @vf/user-fixture)))
(is (= (:body insert-delta) (:body posted)))
(is (= (:title insert-delta) (:title posted)))))
;; (deftest removes-nil-deltas
;; (let [deltas {[:foo 1] {:insert-deltas '() :delete-deltas []}
;; [:bar 2] {:insert-deltas '() :delete-deltas [] :refresh-set []}
;; [:baz 2] {:insert-deltas '() :delete-deltas [{:baz 1}]}}]
;; (is (= #{[:bar 2] [:baz 2]} (into #{} (keys (vdb/remove-nil-deltas deltas)))))
;; ))

View file

@ -1,77 +0,0 @@
(ns views.db.deltas-test
(:require
[clojure.test :refer [use-fixtures deftest is]]
[honeysql.core :as hsql]
[honeysql.helpers :as hh]
[views.fixtures :as vf :refer [vschema sql-ts]]
[views.db.core :as vdb]
[views.db.deltas :as vd]))
(defn dvt-helper
([all-views action] (dvt-helper all-views action vf/templates))
([all-views action templates]
(vd/do-view-transaction vf/persistence :default-ns vf/vschema vf/db all-views action templates)))
(use-fixtures :each (vf/database-fixtures!))
(deftest builds-view-map
(let [{:keys [view-sig view refresh-only?]} (vd/view-map vf/users-tmpl [:users])]
(is (= view-sig [:users]))
(is (= view {:from [:users], :select [:id :name :created_on]}))
(is (nil? refresh-only?))))
(defn non-nil-values-for-keys?
[hm keys]
(every? #(% hm) keys))
(deftest calculates-insert-deltas
(let [views [(vd/view-map vf/users-tmpl [:users])]
user-args {:name "Test user" :created_on (sql-ts)}
insert (hsql/build :insert-into :users :values [user-args])
{:keys [new-deltas result-set]} (dvt-helper views insert)
insert-delta (first (:insert-deltas (first (get new-deltas [:users]))))]
;; Result set
(is (not (nil? (:id (first result-set)))))
(is (= user-args (dissoc (first result-set) :id)))
;; Deltas
(is (= (:name user-args) (:name insert-delta)))
(is (= (:created_on user-args) (:created_on insert-delta)))
(is (non-nil-values-for-keys? insert-delta (-> views first :view :select)))))
(deftest calculates-delete-deltas
(let [views [(vd/view-map vf/users-tmpl [:users])]
user-args {:name "Test user" :created_on (sql-ts)}
user (vf/view-action! (hsql/build :insert-into :users :values [user-args]))
delete (hsql/build :delete-from :users :where [:= :name (:name user-args)])
{:keys [new-deltas result-set]} (dvt-helper views delete)
delete-delta (first (:delete-deltas (first (get new-deltas [:users]))))]
;; Deltas
(is (= (:name user-args) (:name delete-delta)))
(is (= (:created_on user-args) (:created_on delete-delta)))
(is (non-nil-values-for-keys? delete-delta (-> views first :view :select)))))
(deftest calculates-update-deltas
(let [views [(vd/view-map vf/users-tmpl [:users])]
user-args {:name "Test user" :created_on (sql-ts)}
user (vf/view-action! (hsql/build :insert-into :users :values [user-args]))
new-name "new name!"
update (hsql/build :update :users :set {:name new-name} :where [:= :name (:name user-args)])
{:keys [new-deltas result-set]} (dvt-helper views update)
{:keys [insert-deltas delete-deltas]} (first (get new-deltas [:users]))]
;; Deltas
(is (= (:name user-args) (:name (first delete-deltas))))
(is (= new-name (:name (first insert-deltas))))))
(deftest does-not-calculate-deltas-for-unrelated-views
(let [views [(vd/view-map vf/users-tmpl [:users])
(vd/view-map vf/all-comments-tmpl [:all-comments])]
user-args {:name "Test user" :created_on (sql-ts)}
insert (hsql/build :insert-into :users :values [user-args])
{:keys [new-deltas result-set]} (dvt-helper views insert)]
;; (is (= (count (insert-deltas new-deltas) 1))
(is (nil? (get new-deltas [:all-comments])))))

View file

@ -1,100 +0,0 @@
(ns views.db.honeysql-test
(:require
[clojure.test :refer [deftest is run-tests]]
[views.db.honeysql :as vh]
[honeysql.core :as hsql]
[honeysql.helpers :as hh]))
(def simple-test
(-> (hh/select :a)
(hh/from :foo)))
(def insert-test
(-> (hh/insert-into :foo)
(hh/values [{:foo "foo"}])))
(def join-test
(-> (hh/select :a)
(hh/from :foo)
(hh/join :bar [:= :bar.id :foo.bar_id])))
(def join-with-alias-test
(-> (hh/select :a)
(hh/from :foo)
(hh/join [:bar :b] [:= :b.id :foo.bar_id])))
(def join-and-from-with-alias-test
(-> (hh/select :a)
(hh/from [:foo :f])
(hh/join [:bar :b] [:= :b.id :foo.bar_id])))
(deftest extracts-tables-from-specs
(is (= (vh/extract-tables simple-test) #{[:foo]}))
(is (= (vh/extract-tables insert-test) #{[:foo]}))
(is (= (vh/extract-tables join-test) #{[:foo] [:bar]}))
(is (= (vh/extract-tables join-with-alias-test) #{[:foo] [:bar :b]}))
(is (= (vh/extract-tables join-and-from-with-alias-test) #{[:foo :f] [:bar :b]})))
(def cte-test
{:with [[:a {:select [:*] :from [:bar]}]]
:select [:*] :from [:foo]})
(def from-subselect-test
{:select [:*] :from [[{:select [:*] :from [:foo]} :a]]})
(def where-subselect-test
{:select [:*] :from [:foo] :where [:in :a {:select [:*] :from [:bar]}]})
(def nested-where-subselect-test
{:select [:*] :from [:foo] :where [:and [:in :a {:select [:*] :from [:bar]}] [:in :a {:select [:*] :from [:baz]}]]})
(def sql-raw-test
{:select [:*] :from [:foo] :where (hsql/raw "bar=1")})
(deftest extracts-tables-from-full-refresh-specs
(is (= (vh/query-tables simple-test) #{:foo}))
(is (= (vh/query-tables insert-test) #{:foo}))
(is (= (vh/query-tables join-test) #{:foo :bar}))
(is (= (vh/query-tables join-with-alias-test) #{:foo :bar}))
(is (= (vh/query-tables join-and-from-with-alias-test) #{:foo :bar}))
(is (= (vh/query-tables cte-test) #{:foo :bar}))
(is (= (vh/query-tables from-subselect-test) #{:foo}))
(is (= (vh/query-tables where-subselect-test) #{:foo :bar}))
(is (= (vh/query-tables nested-where-subselect-test) #{:foo :bar :baz}))
(is (= (vh/query-tables sql-raw-test) #{:foo})))
;; Do we really need to test the new version?
(deftest merges-where-clauses
(is (= (vh/merge-where-clauses [:= :foo 1] [:= :bar 2])
{:where [:and [:= :foo 1] [:= :bar 2]]}))
#_(is (= (vh/merge-where-clauses [[:= :foo 1]] [:= :bar 2])
{:where [:and [:= :foo 1] [:= :bar 2]]}))
#_(is (= (vh/merge-where-clauses [[:= :foo 1]] [:and [:= :bar 2] [:not= :baz 3]])
{:where [:and [:= :foo 1] [:= :bar 2] [:not= :baz 3]]}))
#_(is (= (vh/merge-where-clauses [[:= :foo 1]] [nil])
{:where [:= :foo 1]}))
#_(is (= (vh/merge-where-clauses [nil] [:= :bar 2])
{:where [:= :bar 2]})))
(deftest table-alias-tests
(is (= (vh/table-alias [:bar]) :bar))
(is (= (vh/table-alias [:bar :a]) :a))
(is (= (vh/table-alias :bar) :bar)))
(deftest table-name-tests
(is (= (vh/table-name [:bar]) :bar))
(is (= (vh/table-name [:bar :a]) :bar))
(is (= (vh/table-name :bar) :bar)))
(deftest prefix-columns-tests
(is (= (vh/prefix-columns [:= :id 1] :bar) [:= :bar.id 1]))
(is (= (vh/prefix-columns [:and [:= :id 1] [:= :val "foo"]] :b)
[:and [:= :b.id 1] [:= :b.val "foo"]]))
(is (= (vh/prefix-columns [:and [:= :id 1] [:or [:> :x 3] [:= :val "foo"]]] :b)
[:and [:= :b.id 1] [:or [:> :b.x 3] [:= :b.val "foo"]]])))
(deftest replace-table-tests
(is (= (vh/replace-table [:= :bar.id 1] :bar :b) [:= :b.id 1]))
(is (= (vh/replace-table [:= :bar.id 1] :baz :b) [:= :bar.id 1]))
(is (= (vh/replace-table [:and [:= :bar.id 1] [:= :bar.val "foo"]] :bar :b)
[:and [:= :b.id 1] [:= :b.val "foo"]])))

View file

@ -1,25 +0,0 @@
(ns views.db.load-test
(:require
[clojure.test :refer [use-fixtures deftest is]]
[honeysql.core :as hsql]
[views.fixtures :as vf :refer [gen-n-users! database-fixtures! templates]]
[views.db.load :as vload]
[clojure.string :refer [upper-case]]))
(use-fixtures :each (database-fixtures!))
(defn subscribed-views
[]
{[:users] {:view ((get-in templates [:users :fn]))}})
(deftest initializes-views
(let [users (gen-n-users! 2)]
(is (= (vload/initial-view vf/db [:users] templates (get-in (subscribed-views) [[:users] :view]))
{[:users] users}))))
(deftest post-processes-views
(let [users (gen-n-users! 1)
with-postfn (assoc-in templates [:users :post-fn] #(update-in % [:name] upper-case))
views-rs (vload/initial-view vf/db [:users] with-postfn (get-in (subscribed-views) [[:users] :view]))]
(is (= (-> (get views-rs [:users]) first :name)
(-> users first :name upper-case)))))

View file

@ -1,105 +0,0 @@
(ns views.fixtures
(:require
[environ.core :as e]
[clojure.java.jdbc :as j]
[honeysql.core :as hsql]
[edl.core :refer [defschema]]
[clojure.data.generators :as dg]
[views.persistence.memory :refer [new-memory-persistence]]))
(defn sql-ts
([ts] (java.sql.Timestamp. ts))
([] (java.sql.Timestamp. (.getTime (java.util.Date.)))))
(def db {:classname "org.postgresql.Driver"
:subprotocol "postgresql"
:subname (get :views-test-db e/env "//localhost/views_test")
:user (get :views-test-user e/env "views_user")
:password (get :views-test-ppassword e/env "password")})
(defschema vschema db "public")
(def persistence (new-memory-persistence))
(defn clean-tables!
[tables]
(doseq [t (map name tables)]
(j/execute! db [(str "DELETE FROM " t)])))
(defn database-fixtures!
([] (database-fixtures! [:posts :users :comments]))
([tables]
(fn [f]
(clean-tables! tables)
(f)
(clean-tables! tables)))) ; do it after as well in case a test breaks
(defn rand-str
[l]
(dg/string #(rand-nth (seq "abcdefghijklmnopqrstuwvxyz ")) l))
(defn view-query
[view]
(j/query db (hsql/format view)))
(defn view-action!
[action]
(j/execute! db (hsql/format action)))
(defn user-fixture!
[name]
(view-action! (hsql/build :insert-into :users :values [{:name name :created_on (sql-ts)}])))
(def user-fixture (atom nil))
(defn with-user-fixture!
([f] (with-user-fixture! "test user" f))
([name f]
(user-fixture! name)
(let [user (first (j/query db ["SELECT * FROM users WHERE name = ?" name]))]
(reset! user-fixture user)
(f)
(reset! user-fixture nil))))
(defn gen-n-users!
[n]
(dotimes [n n] (user-fixture! (rand-str 10)))
(j/query db ["SELECT * FROM users"]))
(defn insert-post-tmpl
[uid title body]
(hsql/build :insert-into :posts :values [{:user_id uid :title title :body body :created_on (sql-ts)}]))
(defn post-fixture!
[uid title body]
(view-action! (insert-post-tmpl uid title body)))
(defn gen-n-posts-for-user!
[n uid]
(dotimes [n n] (post-fixture! uid (rand-str 20) (rand-str 100))))
(defn users-tmpl
[]
(hsql/build :select [:id :name :created_on] :from :users))
(defn user-posts-tmpl
[user_id]
(hsql/build :select [:u.id :u.name :p.title :p.body :p.created_on]
:from {:posts :p}
:join [[:users :u][:= :u.id :p.user_id]]
:where [:= :p.user_id user_id]))
(defn users-posts-tmpl
[]
(hsql/build :select [[:u.id :user_id] :u.name :p.id :p.title :p.body :p.created_on]
:from {:users :u}
:left-join [[:posts :p][:= :u.id :p.user_id]]))
(defn all-comments-tmpl
[]
(hsql/build :select [:id :body :created_on] :from {:comments :c}))
(def templates
{:users {:fn #'users-tmpl}
:user-posts {:fn #'user-posts-tmpl}
:all-comments {:fn #'all-comments-tmpl}})

View file

@ -1,61 +0,0 @@
(ns views.persistence.memory-test
(:require
[views.persistence.core :refer :all]
[views.persistence.memory :refer [new-memory-persistence]]
[views.fixtures :as vf]
[clojure.test :refer [use-fixtures deftest is run-all-tests]]))
(deftest memory-persistence
(let [p (new-memory-persistence)
vd (subscribe! p vf/templates :ns [:users] 1)]
;; This sort of test isn't great as it depends on the internal
;; structure unrlated to memory persistence.
(is (= vd
{:view-sig [:users], :view {:from [:users], :select [:id :name :created_on]}, :refresh-only? nil}))
;; Ensure that we are subscribed.
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1}}))
;; Subsequent calls return same vd.
(is (= (subscribe! p vf/templates :ns [:users] 3)
vd))
;; And subscription is correct.
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1 3}}))
;; Missing subscription returns nothing.
(is (= (subscriptions p :ns [[:missing]])
{}))
;; Duplicate subscription is ignored.
(subscribe! p vf/templates :ns [:users] 3)
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1 3}}))
;; We can subscribe to multiple views.
(subscribe! p vf/templates :ns [:user-posts 1] 5)
(is (= (subscriptions p :ns [[:users] [:user-posts 1]])
{[:users] #{1 3}
[:user-posts 1] #{5}}))
;; Can we unsubscribe a view.
(unsubscribe! p :ns [:users] 3)
(is (= (subscriptions p :ns [[:users]])
{[:users] #{1}}))
;; Remove last item in a view makes it go away.
(unsubscribe! p :ns [:users] 1)
(is (= (subscriptions p :ns [[:users]])
{}))
(is (= (map :view-sig (view-data p :ns :users))
[[:user-posts 1]]))
;; Unsubscribe all works.
(subscribe! p vf/templates :ns [:users] 7)
(subscribe! p vf/templates :ns [:users] 5)
(unsubscribe-all! p :ns 5)
(is (= (subscriptions p :ns [[:users] [:user-posts 1]])
{[:users] #{7}}))))

View file

@ -1,37 +0,0 @@
(ns views.repl
(:require
[honeysql.core :as hsql]
[edl.core :refer [defschema]]
[views.core :as vc]
[views.subscribed-views :as sv]
[views.fixtures :as vf]
[clojure.data.generators :as dg]
[views.db.core :as vdb]
[clj-logging-config.log4j :refer [set-logger! set-loggers!]]))
(defn rand-str
([] (rand-str 10))
([n] (dg/string #(rand-nth (clojure.string/split "abcdefghijklmnopqrstuvwxyz" #"\B")) n)))
(defschema test-schema vf/db "public")
(def user-insert (hsql/build :insert-into :users :values [{:name (rand-str) :created_on (vf/sql-ts)}]))
(defn make-config
([] (make-config vf/templates))
([templates] (vc/config {:db vf/db :schema test-schema :templates templates :unsafe? true})))
(defn test-subscribe
([sk views] (test-subscribe sk views (make-config)))
([sk views opts]
(sv/subscribe-views (:base-subscribed-views opts) {:subscriber-key sk :views [[:users]]})))
(comment
(require '[clj-logging-config.log4j :as lc] '[views.repl :as vr] '[views.db.core :as vdb] :reload)
(lc/set-loggers! "views.base-subscribed-views" {:level :info})
(def conf (vr/make-config))
(vr/test-subscribe 1 [[:users]])
(vdb/vexec! conf vr/user-insert)
(vr/test-subscribe 2 [[:users]])
(vdb/vexec! conf vr/user-insert)
)

View file

@ -1,18 +0,0 @@
CREATE ROLE views_user LOGIN PASSWORD 'password';
CREATE DATABASE views_test OWNER views_user;
\c postgresql://localhost/views_test;
CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT NOT NULL, created_on TIMESTAMP NOT NULL);
CREATE TABLE posts (id SERIAL PRIMARY KEY,
title TEXT NOT NULL,
body TEXT NOT NULL,
created_on TIMESTAMP NOT NULL,
user_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id));
CREATE TABLE comments (id SERIAL PRIMARY KEY,
body TEXT NOT NULL,
created_on TIMESTAMP NOT NULL,
post_id INTEGER NOT NULL,
FOREIGN KEY (post_id) REFERENCES posts(id));
ALTER TABLE users OWNER TO views_user;
ALTER TABLE posts OWNER TO views_user;
ALTER TABLE comments OWNER TO views_user;