replaces pre-checks with simple table intersection check; moved some functions from core to deltas; adds tests for delta calculations (more to come)
This commit is contained in:
parent
e68ec0169a
commit
153b44ecc5
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -8,3 +8,4 @@ pom.xml.asc
|
|||
/.lein-*
|
||||
/.nrepl-port
|
||||
*~
|
||||
*.bk
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
(ns views.db.checks
|
||||
(:require
|
||||
[views.db.honeysql :as vh]
|
||||
[clojure.set :refer [intersection]]
|
||||
[clojure.zip :as z]
|
||||
[zip.visit :as zv]
|
||||
[honeysql.core :as hsql]))
|
||||
|
@ -32,3 +34,13 @@
|
|||
(-> q
|
||||
(update-in [:where] #(merge % (:where action)))
|
||||
(assoc :select (mapv second p)))))
|
||||
|
||||
(defn have-overlapping-tables?
|
||||
"Takes two Honeysql hash-maps, one for action, one for view, and returns
|
||||
boolean value representing whether or not their set of tables intersect."
|
||||
[action view]
|
||||
(->> [action view]
|
||||
(map (comp set #(map first %) vh/extract-tables))
|
||||
(apply intersection)
|
||||
seq
|
||||
boolean))
|
||||
|
|
|
@ -4,133 +4,9 @@
|
|||
(:require
|
||||
[clojure.java.jdbc :as j]
|
||||
[clojure.tools.logging :refer [debug]]
|
||||
[views.db.load :as vdbl]
|
||||
[views.db.honeysql :as vh]
|
||||
[views.db.deltas :as vd]
|
||||
[views.subscribed-views :refer [subscribed-views broadcast-deltas]]))
|
||||
|
||||
;;
|
||||
;; Takes the HoneySQL template for a view and the arglist
|
||||
;; and compiles the view with a set of dummy args in the
|
||||
;; format
|
||||
;; [?1, ?2, ?3 ... ?N]
|
||||
;;
|
||||
;; Returns a map of the compiled hash-map and the args
|
||||
;; with keys :dummy-view and :dummy-args respectively.
|
||||
;;
|
||||
(defn- compile-dummy-view
|
||||
[view-template args]
|
||||
(let [dummy-args (take (count args) (range))
|
||||
dummy-args (map #(str "?" %) dummy-args)]
|
||||
{:dummy-view (apply view-template dummy-args)
|
||||
:dummy-args dummy-args}))
|
||||
|
||||
;;
|
||||
;; Terminology and data structures used throughout this code
|
||||
;;
|
||||
;; <name>-template - refers to a function which receives parameters
|
||||
;; and returns a HoneySQL hash-map with params interpolated.
|
||||
;;
|
||||
;; action - describes the HoneySQL hash-map for the action to be performed
|
||||
;; --the template function has already been called and returned this
|
||||
;; with the appropriate parameter arguments.
|
||||
;;
|
||||
;; view-map - contains a set of computed information for each view itself.
|
||||
;; Refer to the view-map doc-string for more information.
|
||||
;;
|
||||
;; view-check - SQL for checking whether or not a view needs to receive deltas
|
||||
;; upon completion of an operation.
|
||||
;;
|
||||
|
||||
(defn view-map
|
||||
"Constructs a view map from a HoneySQL view function and its arguments.
|
||||
Contains four fields:
|
||||
:view - the hash-map with interpolated parameters
|
||||
:view-sig - the \"signature\" for the view, i.e. [:matter 1]
|
||||
:args - the arguments passed in.
|
||||
:tables - the tables present in all :from, :insert-into,
|
||||
:update, :delete-from, :join, :left-join :right-join clauses
|
||||
|
||||
Input is a view template function and a view signature. The template
|
||||
function must take the same number of paramters as the signature and
|
||||
return a honeysql data structure "
|
||||
[view-template view-sig]
|
||||
(let [compiled-view (if (> (count view-sig) 1)
|
||||
(apply view-template (rest view-sig))
|
||||
(view-template))]
|
||||
(merge {:args (rest view-sig)
|
||||
:view-sig view-sig
|
||||
:view compiled-view
|
||||
:refresh-only? (:refresh-only (meta view-template))
|
||||
:tables (set (vh/extract-tables compiled-view))}
|
||||
(compile-dummy-view view-template (rest view-sig)))))
|
||||
|
||||
(defn view-sig->view-map
|
||||
"Takes a map of sig keys to view template function vars (templates)
|
||||
and a view signature (view-sig the key for the template map and its args)
|
||||
and returns a view-map for that view-sig."
|
||||
[templates view-sig]
|
||||
(let [lookup (first view-sig)]
|
||||
(view-map (get-in templates [lookup :fn]) view-sig)))
|
||||
|
||||
(defn update-deltas-with-refresh-set
|
||||
[refresh-set]
|
||||
(fn [view-deltas]
|
||||
(if (coll? view-deltas)
|
||||
(map #(assoc % :refresh-set refresh-set) view-deltas)
|
||||
[{:refresh-set refresh-set}])))
|
||||
|
||||
(defn calculate-refresh-sets
|
||||
"For refresh-only views, calculates the refresh-set and adds it to the view's delta update collection."
|
||||
[deltas db templates refresh-only-views]
|
||||
(reduce
|
||||
(fn [d {:keys [view-sig view] :as rov}]
|
||||
(let [refresh-set (get (vdbl/initial-view db view-sig templates view) view-sig)]
|
||||
(update-in d [view-sig] (update-deltas-with-refresh-set refresh-set))))
|
||||
deltas
|
||||
refresh-only-views))
|
||||
|
||||
(defn format-deltas
|
||||
"Removes extraneous data from view delta response collections."
|
||||
[views-with-deltas]
|
||||
(->> views-with-deltas
|
||||
(map #(select-keys % [:view-sig :delete-deltas :insert-deltas :refresh-set]))
|
||||
(group-by :view-sig)))
|
||||
|
||||
(defn do-view-transaction
|
||||
"Takes the following arguments:
|
||||
schema - from edl.core/defschema
|
||||
db - clojure.java.jdbc database connection
|
||||
all-views - the current set of views (view-maps--see view-map fn docstring for
|
||||
description) in memory for the database
|
||||
action - the HoneySQL pre-SQL hash-map with parameters already interpolated.
|
||||
|
||||
The function will then perform the following sequence of actions, all run
|
||||
within a transaction (with isolation serializable):
|
||||
|
||||
1) Create pre-check SQL for each view in the list.
|
||||
2) Run the pre-check SQL (or fail out based on some simple heuristics) to
|
||||
identify if we want to send delta messages to the view's subscribers
|
||||
(Note: this happens after the database action for *inserts only*).
|
||||
3) Run the database action (insert/action/delete).
|
||||
4) Calculate deltas based on the method described in section 5.4, \"Rule Generation\"
|
||||
of the paper \"Deriving Production Rules for Incremental Rule Maintenance\"
|
||||
by Stefano Ceri and Jennifer Widom (http://ilpubs.stanford.edu:8090/8/1/1991-4.pdf)
|
||||
|
||||
The function returns the views which received delta updates with the deltas
|
||||
keyed to each view-map at the keys :insert-deltas and :delete-deltas."
|
||||
[schema db all-views action templates]
|
||||
;; Every update connected with a view is done in a transaction:
|
||||
(j/with-db-transaction [t db :isolation :serializable]
|
||||
(let [{full-refresh-views true normal-views nil} (group-by :refresh-only? all-views)
|
||||
need-deltas (vd/do-view-pre-checks t normal-views action)
|
||||
need-deltas (map #(vd/generate-view-delta-map % action) need-deltas)
|
||||
table (-> action vh/extract-tables ffirst)
|
||||
pkey (vd/get-primary-key schema table)
|
||||
{:keys [views-with-deltas result-set]} (vd/perform-action-and-return-deltas schema t need-deltas action table pkey)
|
||||
deltas (calculate-refresh-sets (format-deltas views-with-deltas) t templates full-refresh-views)]
|
||||
{:new-deltas deltas :result-set result-set})))
|
||||
|
||||
;;
|
||||
;; Need to catch this and retry:
|
||||
;; java.sql.SQLException: ERROR: could not serialize access due to concurrent update
|
||||
|
@ -198,7 +74,7 @@
|
|||
- broadcast-deltas takes ... ."
|
||||
[{:keys [db schema base-subscribed-views templates namespace] :as conf} action-map]
|
||||
(let [subbed-views (subscribed-views base-subscribed-views namespace)
|
||||
transaction-fn #(do-view-transaction schema db subbed-views action-map templates)]
|
||||
transaction-fn #(vd/do-view-transaction schema db subbed-views action-map templates)]
|
||||
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
|
||||
(let [{:keys [new-deltas result-set]} (transaction-fn)]
|
||||
(swap! deltas into new-deltas)
|
||||
|
|
|
@ -4,8 +4,53 @@
|
|||
[clojure.java.jdbc :as j]
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[views.db.load :as vdbl]
|
||||
[views.db.checks :as vc]
|
||||
[views.db.honeysql :as vh]))
|
||||
|
||||
;;
|
||||
;; Terminology and data structures used throughout this code
|
||||
;;
|
||||
;; <name>-template - refers to a function which receives parameters
|
||||
;; and returns a HoneySQL hash-map with params interpolated.
|
||||
;;
|
||||
;; action - describes the HoneySQL hash-map for the action to be performed
|
||||
;; --the template function has already been called and returned this
|
||||
;; with the appropriate parameter arguments.
|
||||
;;
|
||||
;; view-map - contains a set of computed information for each view itself.
|
||||
;; Refer to the view-map doc-string for more information.
|
||||
;;
|
||||
|
||||
(defn view-map
|
||||
"Constructs a view map from a HoneySQL view function and its arguments.
|
||||
Contains four fields:
|
||||
:view - the hash-map with interpolated parameters
|
||||
:view-sig - the \"signature\" for the view, i.e. [:matter 1]
|
||||
:tables - the tables present in all :from, :insert-into,
|
||||
:update, :delete-from, :join, :left-join :right-join clauses
|
||||
|
||||
Input is a view template function and a view signature. The template
|
||||
function must take the same number of paramters as the signature and
|
||||
return a honeysql data structure "
|
||||
[view-template view-sig]
|
||||
(let [compiled-view (if (> (count view-sig) 1)
|
||||
(apply view-template (rest view-sig))
|
||||
(view-template))]
|
||||
{:view-sig view-sig
|
||||
:view compiled-view
|
||||
:refresh-only? (:refresh-only (meta view-template))}))
|
||||
|
||||
(defn view-sig->view-map
|
||||
"Takes a map of sig keys to view template function vars (templates)
|
||||
and a view signature (view-sig the key for the template map and its args)
|
||||
and returns a view-map for that view-sig."
|
||||
[templates view-sig]
|
||||
(let [lookup (first view-sig)]
|
||||
(view-map (get-in templates [lookup :fn]) view-sig)))
|
||||
|
||||
;; Helpers
|
||||
|
||||
(defn get-primary-key
|
||||
"Get a primary key for a table."
|
||||
[schema table]
|
||||
|
@ -13,49 +58,11 @@
|
|||
(keyword (get-in schema [(name table) :primary-key :column_name]))
|
||||
(throw (Exception. (str "Cannot find primary key for table: " table)))))
|
||||
|
||||
(defn swap-out-dummy-for-pos
|
||||
"Replaces dummy arg like \"?0\" for integer value (0) so we can sort args."
|
||||
[dummy-arg]
|
||||
(Integer. (subs dummy-arg 1)))
|
||||
|
||||
;; Helper for determine-filter-clauses (which is a helper
|
||||
;; for view-check-template). Extracts constituent parts from
|
||||
;; where clause.
|
||||
(defn set-filter-clauses
|
||||
[dummy-args fc w]
|
||||
(if (= w :and)
|
||||
fc
|
||||
(if (contains? (set dummy-args) (last w))
|
||||
(update-in fc [:s] assoc (swap-out-dummy-for-pos (last w)) (second w))
|
||||
(update-in fc [:w] (fnil conj []) w))))
|
||||
|
||||
;; Helper for generating the view-check HoneySQL template.
|
||||
;; Builds the where and select clauses up from constituent
|
||||
;; where-clauses. Placeholder identifies the parameters
|
||||
;; to pull out into the select clause.
|
||||
(defn determine-filter-clauses
|
||||
[wc dummy-args]
|
||||
(let [fc {:s {} :w nil}
|
||||
fc (if (and (not= :and (first wc)) (not (coll? (first wc))))
|
||||
(set-filter-clauses dummy-args fc wc)
|
||||
(reduce #(set-filter-clauses dummy-args %1 %2) fc wc))]
|
||||
(-> fc
|
||||
(update-in [:s] #(into [] (vals (sort-by key %))))
|
||||
(update-in [:w] #(vh/with-op :and %)))))
|
||||
|
||||
(defn append-arg-map
|
||||
"Removes table/alias namespacing from select fields and creates a hash-map
|
||||
of field to arguments for checking this view against checked-results later on.
|
||||
Note that this assumes our select-fields are in the same order as they
|
||||
are present in the :args view-map field (which they should be)."
|
||||
[view-map select-fields]
|
||||
(let [select-fields (map #(-> % name (split #"\.") last keyword) select-fields)]
|
||||
(assoc view-map :arg-compare (zipmap select-fields (into [] (:args view-map))))))
|
||||
|
||||
(defn- create-view-delta-where-clauses
|
||||
[view-map action]
|
||||
(let [action-table (first (vh/extract-tables action))]
|
||||
(for [view-table (vh/find-table-aliases action-table (:tables view-map))]
|
||||
(let [action-table (first (vh/extract-tables action))
|
||||
view-tables (vh/extract-tables (:view view-map))]
|
||||
(for [view-table (vh/find-table-aliases action-table view-tables)]
|
||||
(-> (:where action)
|
||||
(vh/prefix-columns (vh/table-alias view-table))
|
||||
(vh/replace-table (vh/table-alias action-table) (vh/table-alias view-table))))))
|
||||
|
@ -71,108 +78,7 @@
|
|||
(into [:or] preds)
|
||||
(first preds)))))
|
||||
|
||||
(defn- update-where-clause
|
||||
[hh-spec where]
|
||||
(if-let [w (:where where)]
|
||||
(assoc hh-spec :where w)
|
||||
(dissoc hh-spec :where)))
|
||||
|
||||
(defn view-check-template
|
||||
"Receives a view-map and an action (insert/update/delete HoneySQL hash-map).
|
||||
Returns a HoneySQL hash-map which will can be formatted as SQL to check if a
|
||||
view needs to receive deltas for the action SQL."
|
||||
[view-map action]
|
||||
(let [{:keys [dummy-view dummy-args]} view-map
|
||||
fc (determine-filter-clauses (:where dummy-view) dummy-args)
|
||||
action-wc (format-action-wc-for-view view-map action)
|
||||
view-map (append-arg-map view-map (:s fc))] ; we need this to compare *after* the check is run
|
||||
(->> (-> dummy-view
|
||||
(update-where-clause (vh/merge-where-clauses action-wc (:w fc)))
|
||||
(merge (apply hh/select (:s fc))))
|
||||
(hash-map :view-map view-map :view-check))))
|
||||
|
||||
(defn prepare-checks-for-view-deltas
|
||||
"Checks to see if an action has tables related to a view, and
|
||||
if so builds the HoneySQL hash-map for the SQL needed.
|
||||
Uses this hash-map as a key and conj's the view-map to the key's
|
||||
value so as to avoid redundant delta-check querying."
|
||||
[action confirmed-views view-map]
|
||||
;; Confirm if any of the tables in view-map are present in the action template:
|
||||
(if (some (set (map first (vh/extract-tables action)))
|
||||
(map first (:tables view-map)))
|
||||
|
||||
;; Then construct the check-template for this particular view.
|
||||
(if-let [{:keys [view-check view-map]} (view-check-template view-map action)]
|
||||
;; We then use the view-check as an index and conj the
|
||||
;; view-map to it so as to avoid redundant checks.
|
||||
(update-in confirmed-views [view-check] #(conj % view-map))
|
||||
confirmed-views)
|
||||
confirmed-views))
|
||||
|
||||
(defn prepare-view-checks
|
||||
"Prepares checks for a collection of views (view-maps) against a HoneySQL action
|
||||
(insert/update/delete) hash-map.
|
||||
|
||||
Returns a structure like so:
|
||||
{{<computed HoneySQL hash-map for the check SQL}
|
||||
[<collection of all views this check hash-map key applies to]}
|
||||
"
|
||||
[view-maps action]
|
||||
(reduce #(prepare-checks-for-view-deltas action %1 %2) {} view-maps))
|
||||
|
||||
(defn- do-check
|
||||
[db check-template]
|
||||
(j/query db (hsql/format check-template)))
|
||||
|
||||
(defn- check-view-args
|
||||
[checked-results view-map]
|
||||
(let [view-args (:arg-compare view-map)]
|
||||
(reduce
|
||||
(fn [hit cr]
|
||||
(if (seq (filter #(= (% cr) (% view-args)) (keys view-args)))
|
||||
(reduced view-map) ; don't care which args, just whether or not the view-map hit
|
||||
hit))
|
||||
nil
|
||||
checked-results)))
|
||||
|
||||
(defn- check-all-view-args
|
||||
[checked-results views]
|
||||
(->> views
|
||||
(map #(check-view-args checked-results %))
|
||||
(remove nil?)
|
||||
distinct))
|
||||
|
||||
(defn- do-view-pre-check
|
||||
[db needs-deltas view-check]
|
||||
;;
|
||||
;; We have empty-select? if we have a view with no where predicate clauses--
|
||||
;; so it will always require deltas if there are matching tables.
|
||||
;;
|
||||
;; empty-where comes about if we are inserting--we don't have any where predicate
|
||||
;; in the insert, of course, so we can't perform pre-checks reliably.
|
||||
;; When we do an insert we have to simply do the delta query regardless, for now.
|
||||
;;
|
||||
(let [empty-select? (seq (remove nil? (:select (first view-check))))
|
||||
empty-where? (seq (remove #(or (nil? %) (= :and %)) (:where (first view-check))))]
|
||||
(if (or (not empty-select?) (not empty-where?))
|
||||
(apply conj needs-deltas (last view-check)) ;; put them all in if we can't do pre-check.
|
||||
(let [checked-results (do-check db (first view-check))
|
||||
;; checks view args against checked result set
|
||||
checked-views (check-all-view-args checked-results (last view-check))]
|
||||
(if (seq checked-views)
|
||||
(apply conj needs-deltas checked-views)
|
||||
needs-deltas)))))
|
||||
|
||||
(defn do-view-pre-checks
|
||||
"Takes db, all views (view-maps) and the HoneySQL action (insert/update/delete)
|
||||
hash-map. Returns view-maps for all the views which need to receive
|
||||
delta updates after the action is performed.
|
||||
|
||||
*This function should be called within a transaction before performing the
|
||||
insert/update/delete action.*"
|
||||
[db all-views action]
|
||||
(let [view-checks (prepare-view-checks all-views action)]
|
||||
(reduce #(do-view-pre-check db %1 %2) [] view-checks)))
|
||||
;; DELTA CALCULATIONS
|
||||
|
||||
(defn- calculate-delete-deltas
|
||||
[db view-map]
|
||||
|
@ -181,10 +87,6 @@
|
|||
(j/query db)
|
||||
(assoc view-map :delete-deltas)))
|
||||
|
||||
;; -------------------------------------------------------------------------------
|
||||
;; Handle inserts
|
||||
;; -------------------------------------------------------------------------------
|
||||
|
||||
(defn compute-delete-deltas-for-insert
|
||||
"Computes and returns a sequence of delete deltas for a single view and insert."
|
||||
[schema db view-map table record]
|
||||
|
@ -227,8 +129,8 @@
|
|||
{:views-with-deltas (doall (map #(assoc %1 :delete-deltas %2 :insert-deltas %3) views deletes inserts))
|
||||
:result record*}))
|
||||
|
||||
;; Handles insert and calculation of insert (after insert) delta.
|
||||
(defn- insert-and-append-deltas!
|
||||
"Handles insert and calculation of insert (after insert) delta."
|
||||
[schema db views action table pkey]
|
||||
(let [table (:insert-into action)]
|
||||
(reduce
|
||||
|
@ -238,14 +140,12 @@
|
|||
{:views-with-deltas [] :result-set []}
|
||||
(map #(compute-deltas-for-insert schema db views table %) (:values action)))))
|
||||
|
||||
;; -------------------------------------------------------------------------------
|
||||
|
||||
;; This is for insert deltas for non-insert updates.
|
||||
|
||||
;;; Takes the HoneySQL map (at key :view) from the view-map and appends
|
||||
;;; the appropriately-table-namespaced where clause which limits the
|
||||
;;; view query to the previously inserted or updated records.
|
||||
(defn- calculate-insert-deltas
|
||||
"This is for insert deltas for non-insert updates.
|
||||
|
||||
Takes the HoneySQL map (at key :view) from the view-map and appends
|
||||
the appropriately-table-namespaced where clause which limits the
|
||||
view query to the previously inserted or updated records."
|
||||
[db action pkey-wc view-map]
|
||||
(let [action (assoc action :where pkey-wc)
|
||||
insert-delta-wc (format-action-wc-for-view view-map action)
|
||||
|
@ -256,8 +156,8 @@
|
|||
(update-in view-map [:insert-deltas] #(apply conj % deltas))
|
||||
view-map)))
|
||||
|
||||
;; Helper to query the action's table for primary key and pull it out.
|
||||
(defn- get-action-row-key
|
||||
"Helper to query the action's table for primary key and pull it out."
|
||||
[db pkey table action]
|
||||
(->> (:where action)
|
||||
(hsql/build :select pkey :from table :where)
|
||||
|
@ -265,8 +165,8 @@
|
|||
(j/query db)
|
||||
first pkey))
|
||||
|
||||
;; Handles update and calculation of delete (before update) and insert (after update) deltas.
|
||||
(defn- update-and-append-deltas!
|
||||
"Handles update and calculation of delete (before update) and insert (after update) deltas."
|
||||
[db views action table pkey]
|
||||
(let [views-pre (doall (map #(calculate-delete-deltas db %) views))
|
||||
pkey-val (get-action-row-key db pkey table action)
|
||||
|
@ -274,16 +174,16 @@
|
|||
{:views-with-deltas (doall (map #(calculate-insert-deltas db action [:= pkey pkey-val] %) views-pre))
|
||||
:result-set update}))
|
||||
|
||||
;; Handles deletion and calculation of delete (before update) delta.
|
||||
(defn- delete-and-append-deltas!
|
||||
"Handles deletion and calculation of delete (before update) delta."
|
||||
[db views action table pkey]
|
||||
(let [views-pre (doall (map #(calculate-delete-deltas db %) views))]
|
||||
{:views-with-deltas views-pre
|
||||
:result-set (j/execute! db (hsql/format action))}))
|
||||
|
||||
;; Identifies which action--insert, update or delete--we are performing and dispatches appropriately.
|
||||
;; Returns view-map with appropriate deltas appended.
|
||||
(defn perform-action-and-return-deltas
|
||||
"Identifies which action--insert, update or delete--we are performing and dispatches appropriately.
|
||||
Returns view-map with appropriate deltas appended."
|
||||
[schema db views action table pkey]
|
||||
(cond
|
||||
(:insert-into action) (insert-and-append-deltas! schema db views action table pkey)
|
||||
|
@ -300,3 +200,63 @@
|
|||
view (:view view-map)]
|
||||
(->> (update-in view [:where] #(:where (vh/merge-where-clauses action-wc %)))
|
||||
(assoc view-map :delete-deltas-map))))
|
||||
|
||||
(defn update-deltas-with-refresh-set
|
||||
[refresh-set]
|
||||
(fn [view-deltas]
|
||||
(if (coll? view-deltas)
|
||||
(map #(assoc % :refresh-set refresh-set) view-deltas)
|
||||
[{:refresh-set refresh-set}])))
|
||||
|
||||
(defn calculate-refresh-sets
|
||||
"For refresh-only views, calculates the refresh-set and adds it to the view's delta update collection."
|
||||
[deltas db templates refresh-only-views]
|
||||
(reduce
|
||||
(fn [d {:keys [view-sig view] :as rov}]
|
||||
(let [refresh-set (get (vdbl/initial-view db view-sig templates view) view-sig)]
|
||||
(update-in d [view-sig] (update-deltas-with-refresh-set refresh-set))))
|
||||
deltas
|
||||
refresh-only-views))
|
||||
|
||||
(defn format-deltas
|
||||
"Removes extraneous data from view delta response collections."
|
||||
[views-with-deltas]
|
||||
(->> views-with-deltas
|
||||
(map #(select-keys % [:view-sig :delete-deltas :insert-deltas :refresh-set]))
|
||||
(group-by :view-sig)))
|
||||
|
||||
(defn do-view-transaction
|
||||
"Takes the following arguments:
|
||||
schema - from edl.core/defschema
|
||||
db - clojure.java.jdbc database connection
|
||||
all-views - the current set of views (view-maps--see view-map fn docstring for
|
||||
description) in memory for the database
|
||||
action - the HoneySQL pre-SQL hash-map with parameters already interpolated.
|
||||
templates - the mapping of view names (keywords) to SQL templates
|
||||
(a.k.a. HoneySQL hash-map producing functions)
|
||||
|
||||
The function will then perform the following sequence of actions, all run
|
||||
within a transaction (with isolation serializable):
|
||||
|
||||
1) Create pre-check SQL for each view in the list.
|
||||
2) Run the pre-check SQL (or fail out based on some simple heuristics) to
|
||||
identify if we want to send delta messages to the view's subscribers
|
||||
(Note: this happens after the database action for *inserts only*).
|
||||
3) Run the database action (insert/action/delete).
|
||||
4) Calculate deltas based on the method described in section 5.4, \"Rule Generation\"
|
||||
of the paper \"Deriving Production Rules for Incremental Rule Maintenance\"
|
||||
by Stefano Ceri and Jennifer Widom (http://ilpubs.stanford.edu:8090/8/1/1991-4.pdf)
|
||||
|
||||
The function returns a hash-map with :result-set and :new-deltas collection values.
|
||||
:new-deltas contains :insert-deltas, :delete-deltas, and :refresh-set values, as well
|
||||
as the original :view-sig the deltas apply to."
|
||||
[schema db all-views action templates]
|
||||
(j/with-db-transaction [t db :isolation :serializable]
|
||||
(let [filtered-views (filterv #(vc/have-overlapping-tables? action (:view %)) all-views)
|
||||
{full-refresh-views true normal-views nil} (group-by :refresh-only? filtered-views)
|
||||
need-deltas (map #(generate-view-delta-map % action) normal-views)
|
||||
table (-> action vh/extract-tables ffirst)
|
||||
pkey (get-primary-key schema table)
|
||||
{:keys [views-with-deltas result-set]} (perform-action-and-return-deltas schema t need-deltas action table pkey)
|
||||
deltas (calculate-refresh-sets (format-deltas views-with-deltas) t templates full-refresh-views)]
|
||||
{:new-deltas deltas :result-set result-set})))
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
(ns views.db.load
|
||||
(:require
|
||||
[clojure.tools.logging :refer [debug info warn error]]
|
||||
[clojure.java.jdbc :as j]
|
||||
[honeysql.core :as hsql]))
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
(ns views.subscriptions
|
||||
(:require
|
||||
[views.db.core :as vdb]))
|
||||
[views.db.deltas :as vd]))
|
||||
|
||||
;;
|
||||
;; {namespace {[:view-sig 1 "arg2"] {:subscriptions [1 2 3 4 ... ] :view-data {:view ...}}}}
|
||||
|
@ -22,7 +22,7 @@
|
|||
(fn [svs]
|
||||
(-> svs
|
||||
(update-in [namespace view-sig :subscriptions] (add-subscriber-key subscriber-key))
|
||||
(assoc-in [namespace view-sig :view-data] (vdb/view-map (get-in templates [(first view-sig) :fn]) view-sig)))))
|
||||
(assoc-in [namespace view-sig :view-data] (vd/view-map (get-in templates [(first view-sig) :fn]) view-sig)))))
|
||||
|
||||
(defn add-subscription!
|
||||
([view-sig templates subscriber-key]
|
||||
|
|
|
@ -1,88 +1,77 @@
|
|||
(ns views.db.deltas-test
|
||||
(:require
|
||||
[clojure.test :refer [deftest is run-tests]]
|
||||
[clojure.test :refer [use-fixtures deftest is]]
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[views.fixtures :as vf]
|
||||
[views.fixtures :as vf :refer [vschema sql-ts]]
|
||||
[views.db.core :as vdb]
|
||||
[views.db.deltas :as vdbd]
|
||||
[views.base-subscribed-views :as bsv])
|
||||
(:import
|
||||
[views.base_subscribed_views BaseSubscribedViews]))
|
||||
[views.db.deltas :as vd]))
|
||||
|
||||
(defn join-test-template
|
||||
[id val3]
|
||||
(-> (hh/select :f.id :f.val1 :f.val2 :b.val1)
|
||||
(hh/from [:foo :f])
|
||||
(hh/join [:bar :b] [:= :b.id :f.b_id])
|
||||
(hh/left-join [:baz :ba] [:= :ba.id :b.ba_id])
|
||||
(hh/right-join [:qux :q] [:= :q.id :ba.q_id])
|
||||
(hh/where [:= :f.id id] [:= :f.val3 val3] [:= :f.val2 "constant"])))
|
||||
(defn dvt-helper
|
||||
([all-views action] (dvt-helper all-views action vf/templates))
|
||||
([all-views action templates]
|
||||
(vd/do-view-transaction vschema vf/db all-views action templates)))
|
||||
|
||||
(defn no-where-view-template
|
||||
[]
|
||||
(-> (hh/select :f.id :f.val1 :f.val2)
|
||||
(hh/from [:foo :f])))
|
||||
(use-fixtures :each vf/database-fixtures!)
|
||||
|
||||
(defn bar-template
|
||||
[id]
|
||||
(-> (hh/select :b.id :b.val1)
|
||||
(hh/from [:bar :b])
|
||||
(hh/where [:= :val2 "some constant"]
|
||||
[:= :id id])))
|
||||
(deftest builds-view-map
|
||||
(let [{:keys [view-sig view refresh-only?]} (vd/view-map vf/users-tmpl [:users])]
|
||||
(is (= view-sig [:users]))
|
||||
(is (= view {:from [:users], :select [:id :name :created_on]}))
|
||||
(is (nil? refresh-only?))))
|
||||
|
||||
(defn unrelated-template
|
||||
[id]
|
||||
(-> (hh/select :u.id :u.val1)
|
||||
(hh/from :unrelated
|
||||
(hh/where [:= :val "some constant"]
|
||||
[:= :id id]))))
|
||||
(defn non-nil-values-for-keys?
|
||||
[hm keys]
|
||||
(every? #(% hm) keys))
|
||||
|
||||
(defn update-bar-template
|
||||
[val1 wc]
|
||||
(-> (hh/update :bar)
|
||||
(hh/values {:val1 val1})
|
||||
(hh/where wc)))
|
||||
(deftest calculates-insert-deltas
|
||||
(let [views [(vd/view-map vf/users-tmpl [:users])]
|
||||
user-args {:name "Test user" :created_on (sql-ts)}
|
||||
insert (hsql/build :insert-into :users :values [user-args])
|
||||
{:keys [new-deltas result-set]} (dvt-helper views insert)
|
||||
insert-delta (first (:insert-deltas (first (get new-deltas [:users]))))]
|
||||
|
||||
(deftest constructs-view-check-template
|
||||
(let [update-bar (update-bar-template "foo" [:= :id 123])
|
||||
vm (vdb/view-map join-test-template [:join-test 1 "foo"])
|
||||
check-template (:view-check (vdbd/view-check-template vm update-bar))]
|
||||
(is (= (set (:select check-template)) #{:f.id :f.val3}))
|
||||
(is (= (set (rest (:where check-template))) #{[:= :f.val2 "constant"] [:= :b.id 123]}))))
|
||||
;; Result set
|
||||
(is (not (nil? (:id (first result-set)))))
|
||||
(is (= user-args (dissoc (first result-set) :id)))
|
||||
|
||||
(deftest view-check-template-generates-proper-sql
|
||||
(let [update-bar (update-bar-template "foo" [:= :id 123])
|
||||
vm (vdb/view-map join-test-template [:join-test 1 "foo"])
|
||||
check-template (:view-check (vdbd/view-check-template vm update-bar))]
|
||||
(is (= (hsql/format check-template)
|
||||
["SELECT f.id, f.val3 FROM foo f INNER JOIN bar b ON b.id = f.b_id LEFT JOIN baz ba ON ba.id = b.ba_id RIGHT JOIN qux q ON q.id = ba.q_id WHERE (b.id = 123 AND f.val2 = ?)" "constant"]))))
|
||||
;; Deltas
|
||||
(is (= (:name user-args) (:name insert-delta)))
|
||||
(is (= (:created_on user-args) (:created_on insert-delta)))
|
||||
(is (non-nil-values-for-keys? insert-delta (-> views first :view :select)))))
|
||||
|
||||
(deftest creates-collection-of-views-to-check
|
||||
(let [views [(vdb/view-map no-where-view-template [:no-where]) ; no :bar
|
||||
(vdb/view-map no-where-view-template [:no-where]) ; no :bar
|
||||
(vdb/view-map bar-template [:bar 1]) ; has :bar
|
||||
(vdb/view-map unrelated-template [:unrelated 2]) ; no :bar
|
||||
(vdb/view-map join-test-template [:join-test 1 "foo"]) ; has :bar
|
||||
(vdb/view-map join-test-template [:join-test 2 "bar"])] ; has :bar
|
||||
update-bar (update-bar-template "foo" [:= :id 123])
|
||||
checked-views (vdbd/prepare-view-checks views update-bar)]
|
||||
(deftest calculates-delete-deltas
|
||||
(let [views [(vd/view-map vf/users-tmpl [:users])]
|
||||
user-args {:name "Test user" :created_on (sql-ts)}
|
||||
user (vf/view-action! (hsql/build :insert-into :users :values [user-args]))
|
||||
delete (hsql/build :delete-from :users :where [:= :name (:name user-args)])
|
||||
{:keys [new-deltas result-set]} (dvt-helper views delete)
|
||||
delete-delta (first (:delete-deltas (first (get new-deltas [:users]))))]
|
||||
|
||||
;; It should return one check for the bar-template above,
|
||||
;; and 1 for *both* the joint-test-templates.
|
||||
(is (= (count checked-views) 2))))
|
||||
;; Deltas
|
||||
(is (= (:name user-args) (:name delete-delta)))
|
||||
(is (= (:created_on user-args) (:created_on delete-delta)))
|
||||
(is (non-nil-values-for-keys? delete-delta (-> views first :view :select)))))
|
||||
|
||||
;; What is this for?
|
||||
(def left-join-example (hsql/build :select [:R.a :S.C] :from :R :left-join [:S [:= :R.B :S.B]] :where [:!= :S.C 20]))
|
||||
(deftest calculates-update-deltas
|
||||
(let [views [(vd/view-map vf/users-tmpl [:users])]
|
||||
user-args {:name "Test user" :created_on (sql-ts)}
|
||||
user (vf/view-action! (hsql/build :insert-into :users :values [user-args]))
|
||||
new-name "new name!"
|
||||
update (hsql/build :update :users :set {:name new-name} :where [:= :name (:name user-args)])
|
||||
{:keys [new-deltas result-set]} (dvt-helper views update)
|
||||
{:keys [insert-deltas delete-deltas]} (first (get new-deltas [:users]))]
|
||||
|
||||
;; (deftest notes-view-map-as-no-delta-calc
|
||||
;; (let [tmpl (with-meta vf/users-tmpl {:bulk-update? true})]
|
||||
;; (is (:bulk-update? (vdb/view-map tmpl [:users])))))
|
||||
;; Deltas
|
||||
(is (= (:name user-args) (:name (first delete-deltas))))
|
||||
(is (= new-name (:name (first insert-deltas))))))
|
||||
|
||||
;; (defschema schema vf/db "public")
|
||||
|
||||
;; (deftest sends-entire-view-on-every-update-with-bulk-update
|
||||
;; (let [tmpl (with-meta vf/users-tmpl {:bulk-update? true})
|
||||
;; vm (vdb/view-map tmpl [:users])
|
||||
;; bsv (BaseSubscribedViews. vf/db
|
||||
(deftest does-not-calculate-deltas-for-unrelated-views
|
||||
(let [views [(vd/view-map vf/users-tmpl [:users])
|
||||
(vd/view-map vf/all-comments-tmpl [:all-comments])]
|
||||
user-args {:name "Test user" :created_on (sql-ts)}
|
||||
insert (hsql/build :insert-into :users :values [user-args])
|
||||
{:keys [new-deltas result-set]} (dvt-helper views insert)]
|
||||
|
||||
;; (is (= (count (insert-deltas new-deltas) 1))
|
||||
(is (nil? (get new-deltas [:all-comments])))))
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
[environ.core :as e]
|
||||
[clojure.java.jdbc :as j]
|
||||
[honeysql.core :as hsql]
|
||||
[edl.core :refer [defschema]]
|
||||
[clojure.data.generators :as dg]))
|
||||
|
||||
(defn sql-ts
|
||||
|
@ -15,6 +16,8 @@
|
|||
:user (get :views-test-user e/env "views_user")
|
||||
:password (get :views-test-ppassword e/env "password")})
|
||||
|
||||
(defschema vschema db "public")
|
||||
|
||||
(defn clean-tables!
|
||||
[tables]
|
||||
(doseq [t (map name tables)]
|
||||
|
@ -22,19 +25,42 @@
|
|||
|
||||
(defn database-fixtures!
|
||||
[f]
|
||||
(clean-tables! [:posts :users])
|
||||
(clean-tables! [:posts :users :comments])
|
||||
(f))
|
||||
|
||||
(defn rand-str
|
||||
[l]
|
||||
(dg/string #(rand-nth (seq "abcdefghijklmnopqrstuwvxyz ")) l))
|
||||
|
||||
(defn view-query
|
||||
[view]
|
||||
(j/query db (hsql/format view)))
|
||||
|
||||
(defn view-action!
|
||||
[action]
|
||||
(j/execute! db (hsql/format action)))
|
||||
|
||||
(defn user-fixture!
|
||||
[name]
|
||||
(j/execute! db (hsql/format (hsql/build :insert-into :users :values [{:name name :created_on (sql-ts)}]))))
|
||||
(view-action! (hsql/build :insert-into :users :values [{:name name :created_on (sql-ts)}])))
|
||||
|
||||
(defn gen-n-users!
|
||||
[n]
|
||||
(dotimes [n n]
|
||||
(user-fixture! (dg/string #(rand-nth (seq "abcdefghijklmnopqrstuwvxyz")))))
|
||||
(dotimes [n n] (user-fixture! (rand-str 10)))
|
||||
(j/query db ["SELECT * FROM users"]))
|
||||
|
||||
(defn insert-post-tmpl
|
||||
[uid title body]
|
||||
(hsql/build :insert-into :posts :values [{:user_id uid :title title :body body :created_on (sql-ts)}]))
|
||||
|
||||
(defn post-fixture!
|
||||
[uid title body]
|
||||
(view-action! (insert-post-tmpl uid title body)))
|
||||
|
||||
(defn gen-n-posts-for-user!
|
||||
[n uid]
|
||||
(dotimes [n n] (post-fixture! uid (rand-str 20) (rand-str 100))))
|
||||
|
||||
(defn users-tmpl
|
||||
[]
|
||||
(hsql/build :select [:id :name :created_on] :from :users))
|
||||
|
@ -46,6 +72,17 @@
|
|||
:join [[:users :u][:= :u.id :p.user_id]]
|
||||
:where [:= :p.user_id user_id]))
|
||||
|
||||
(defn users-posts-tmpl
|
||||
[]
|
||||
(hsql/build :select [[:u.id :user_id] :u.name :p.id :p.title :p.body :p.created_on]
|
||||
:from {:users :u}
|
||||
:left-join [[:posts :p][:= :u.id :p.user_id]]))
|
||||
|
||||
(defn all-comments-tmpl
|
||||
[]
|
||||
(hsql/build :select [:id :body :created_on] :from {:comments :c}))
|
||||
|
||||
(def templates
|
||||
{:users {:fn #'users-tmpl}
|
||||
:user-posts {:fn #'user-posts-tmpl}})
|
||||
{:users {:fn #'users-tmpl}
|
||||
:user-posts {:fn #'user-posts-tmpl}
|
||||
:all-comments {:fn #'all-comments-tmpl}})
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
CREATE ROLE views_user LOGIN PASSWORD 'password';
|
||||
CREATE DATABASE views_test OWNER views_user;
|
||||
\c postgresql://localhost/views_test;
|
||||
CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT NOT NULL, created_on DATE NOT NULL);
|
||||
CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT NOT NULL, created_on TIMESTAMP NOT NULL);
|
||||
CREATE TABLE posts (id SERIAL PRIMARY KEY,
|
||||
title TEXT NOT NULL,
|
||||
body TEXT NOT NULL,
|
||||
created_on DATE NOT NULL,
|
||||
created_on TIMESTAMP NOT NULL,
|
||||
user_id INTEGER NOT NULL,
|
||||
FOREIGN KEY (user_id) REFERENCES users(id));
|
||||
CREATE TABLE comments (id SERIAL PRIMARY KEY,
|
||||
body TEXT NOT NULL,
|
||||
created_on TIMESTAMP NOT NULL,
|
||||
post_id INTEGER NOT NULL,
|
||||
FOREIGN KEY (post_id) REFERENCES posts(id));
|
||||
ALTER TABLE users OWNER TO views_user;
|
||||
ALTER TABLE posts OWNER TO views_user;
|
||||
ALTER TABLE comments OWNER TO views_user;
|
||||
|
|
Loading…
Reference in a new issue