Added hashing to prevent duplicate full refresh views from being sent out. Computes full refresh view updates in parallel.
This commit is contained in:
parent
5ff1fed578
commit
86546e0bf6
|
@ -1,4 +1,4 @@
|
|||
(defproject views "0.4.9"
|
||||
(defproject views "0.5.0"
|
||||
:description "You underestimate the power of the SQL side"
|
||||
|
||||
:url "https://github.com/diligenceengine/views"
|
||||
|
|
|
@ -85,9 +85,11 @@
|
|||
(persist/unsubscribe-all! persistence namespace subscriber-key)))
|
||||
|
||||
;;
|
||||
;; The two below functions get called by vexec!/with-view-transaction
|
||||
;; The below functions get called by vexec!/with-view-transaction
|
||||
;;
|
||||
(persistence [this] (:persistence config))
|
||||
|
||||
;; Deprecate this? It's just a call to persistence.
|
||||
(subscribed-views [this namespace]
|
||||
;; Table name optimization not yet worked through the library.
|
||||
(persist/view-data (:persistence config) namespace "fix-me"))
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
[clojure.tools.logging :refer [debug]]
|
||||
[views.db.deltas :as vd]
|
||||
[views.db.util :refer [with-retry retry-on-transaction-failure]]
|
||||
[views.subscribed-views :refer [subscribed-views broadcast-deltas]]))
|
||||
[views.subscribed-views :refer [subscribed-views broadcast-deltas persistence]]))
|
||||
|
||||
(defmacro with-view-transaction
|
||||
"Like with-db-transaction, but operates with views. If you want to use a
|
||||
|
@ -46,7 +46,7 @@
|
|||
- broadcast-deltas takes ... ."
|
||||
[{:keys [db schema base-subscribed-views templates namespace deltas] :as conf} action-map]
|
||||
(let [subbed-views (subscribed-views base-subscribed-views namespace)
|
||||
transaction-fn #(vd/do-view-transaction schema db subbed-views action-map templates)]
|
||||
transaction-fn #(vd/do-view-transaction (persistence base-subscribed-views) namespace schema db subbed-views action-map templates)]
|
||||
(if deltas ;; inside a transaction we just collect deltas and do not retry
|
||||
(let [{:keys [new-deltas result-set]} (transaction-fn)]
|
||||
(swap! deltas #(conj % new-deltas))
|
||||
|
|
|
@ -4,12 +4,14 @@
|
|||
[clojure.string :refer [split]]
|
||||
[clojure.java.jdbc :as j]
|
||||
[clojure.tools.logging :refer [debug error]]
|
||||
[clojure.core.reducers :as r]
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[views.db.load :as vdbl]
|
||||
[views.db.checks :as vc]
|
||||
[views.db.honeysql :as vh]
|
||||
[views.db.util :refer [log-exception serialization-error?]]))
|
||||
[views.db.util :refer [log-exception serialization-error?]]
|
||||
[views.persistence.core :refer [view-hash update-hash!]]))
|
||||
|
||||
;;
|
||||
;; Terminology and data structures used throughout this code
|
||||
|
@ -213,24 +215,66 @@
|
|||
[refresh-set]
|
||||
(fn [view-deltas]
|
||||
(if (coll? view-deltas)
|
||||
(map #(assoc % :refresh-set refresh-set) view-deltas)
|
||||
[{:refresh-set refresh-set}])))
|
||||
(mapv #(merge % refresh-set) view-deltas)
|
||||
[refresh-set])))
|
||||
|
||||
(defn calculate-refresh-sets*
|
||||
"Compute all the refresh views in parallel."
|
||||
[db templates refresh-only-views]
|
||||
(let [reducefn (fn ([] [])
|
||||
([a b]
|
||||
(try
|
||||
(conj a {:view-sig (:view-sig b)
|
||||
:refresh-set (get (vdbl/initial-view db (:view-sig b) templates (:view b)) (:view-sig b))})
|
||||
(catch SQLException e
|
||||
(conj a {:view-sig (:view-sig b) :sql-exception e}))
|
||||
(catch Exception e
|
||||
(conj a {:view-sig (:view-sig b) :exception e})))))]
|
||||
(r/fold 1 concat reducefn refresh-only-views)))
|
||||
|
||||
(defn filter-by-hash
|
||||
"Remove any views with hashes that haven't changed, else update the hash and keep the view."
|
||||
[persistence namespace refresh-sets]
|
||||
(filterv
|
||||
(fn [rs]
|
||||
(let [hash-value (hash (:refresh-set rs))]
|
||||
(if (not= (view-hash persistence namespace (:view-sig rs)) hash-value)
|
||||
(do (update-hash! persistence namespace (:view-sig rs) hash-value) true))))
|
||||
refresh-sets))
|
||||
|
||||
(defn first-serialization-error
|
||||
[refresh-sets]
|
||||
(some #(if-let [e (:sql-exception %)] (and (serialization-error? e) e)) refresh-sets))
|
||||
|
||||
(defn some-exception
|
||||
[rs]
|
||||
(or (:sql-exception rs) (:exception rs)))
|
||||
|
||||
;; Exceedingly ugly code.
|
||||
(defn look-for-exceptions
|
||||
"Because exceptions in threads get buried, we have to look through the results to check for
|
||||
serialization errors or other exceptions. We must bubble any serialization exception up."
|
||||
[refresh-sets]
|
||||
(if (some #(some-exception %) refresh-sets)
|
||||
(if-let [e (first-serialization-error refresh-sets)]
|
||||
(throw e)
|
||||
(do
|
||||
(doseq [rs refresh-sets :when (some-exception rs)]
|
||||
(error "error computing refresh-set for" (:view-sig rs))
|
||||
(when-let [e (some-exception rs)] (log-exception e)))
|
||||
(throw (some some-exception refresh-sets))))
|
||||
refresh-sets))
|
||||
|
||||
(defn calculate-refresh-sets
|
||||
"For refresh-only views, calculates the refresh-set and adds it to the view's delta update collection."
|
||||
[deltas db templates refresh-only-views]
|
||||
(reduce
|
||||
(fn [d {:keys [view-sig view] :as rov}]
|
||||
(try
|
||||
(let [refresh-set (get (vdbl/initial-view db view-sig templates view) view-sig)]
|
||||
(update-in d [view-sig] (update-deltas-with-refresh-set refresh-set)))
|
||||
;; report bad view-sig on error
|
||||
(catch Exception e
|
||||
(error "error refreshing view" view-sig)
|
||||
(log-exception e)
|
||||
(throw e))))
|
||||
deltas
|
||||
refresh-only-views))
|
||||
[persistence namespace deltas db templates refresh-only-views]
|
||||
(let [refresh-sets (->> (calculate-refresh-sets* db templates refresh-only-views)
|
||||
look-for-exceptions
|
||||
(filter-by-hash persistence namespace))]
|
||||
(reduce
|
||||
(fn [d rs] (update-in d [(:view-sig rs)] (update-deltas-with-refresh-set rs)))
|
||||
deltas
|
||||
refresh-sets)))
|
||||
|
||||
(defn format-deltas
|
||||
"Removes extraneous data from view delta response collections.
|
||||
|
@ -264,7 +308,7 @@
|
|||
The function returns a hash-map with :result-set and :new-deltas collection values.
|
||||
:new-deltas contains :insert-deltas, :delete-deltas, and :refresh-set values, as well
|
||||
as the original :view-sig the deltas apply to."
|
||||
[schema db all-views action templates]
|
||||
[persistence namespace schema db all-views action templates]
|
||||
(j/with-db-transaction [t db :isolation :serializable]
|
||||
(let [filtered-views (filterv #(vc/have-overlapping-tables? action (:view %)) all-views)
|
||||
{full-refresh-views true normal-views nil} (group-by :refresh-only? filtered-views)
|
||||
|
@ -272,5 +316,5 @@
|
|||
table (-> action vh/extract-tables ffirst)
|
||||
pkey (get-primary-key schema table)
|
||||
{:keys [views-with-deltas result-set]} (perform-action-and-return-deltas schema t need-deltas action table pkey)
|
||||
deltas (calculate-refresh-sets (format-deltas views-with-deltas) t templates full-refresh-views)]
|
||||
deltas (calculate-refresh-sets persistence namespace (format-deltas views-with-deltas) t templates full-refresh-views)]
|
||||
{:new-deltas deltas :result-set result-set})))
|
||||
|
|
|
@ -24,13 +24,8 @@
|
|||
the template config map, and the view-map itself.
|
||||
and returns a result-set for the new-views with post-fn functions applied to the data."
|
||||
[db new-view templates view-map]
|
||||
(try
|
||||
(->> view-map
|
||||
(view-query db)
|
||||
(into [])
|
||||
(post-process-result-set new-view templates)
|
||||
(hash-map new-view))
|
||||
(catch Exception e
|
||||
(error "when computing initial-view for" new-view)
|
||||
(log-exception e)
|
||||
(throw e))))
|
||||
(->> view-map
|
||||
(view-query db)
|
||||
(into [])
|
||||
(post-process-result-set new-view templates)
|
||||
(hash-map new-view)))
|
||||
|
|
|
@ -17,6 +17,12 @@
|
|||
"Unsubscribes the subscriber with key 'subscriber-key' from ALL views
|
||||
in namespace 'namespace'.")
|
||||
|
||||
(update-hash! [this namespace view-sig hash-value]
|
||||
"Updates the data has for the view.")
|
||||
|
||||
(view-hash [this namespace view-sig]
|
||||
"Returns the latest data has for the view.")
|
||||
|
||||
(view-data [this namespace table-name]
|
||||
"Return all the view data that references a table name in a namespace.")
|
||||
|
||||
|
|
|
@ -49,6 +49,14 @@
|
|||
(swap! subbed-views
|
||||
(fn [sv] (update-in sv [namespace] ns-unsubscribe-all! subscriber-key))))
|
||||
|
||||
(update-hash!
|
||||
[this namespace view-sig hash-value]
|
||||
(swap! subbed-views assoc-in [namespace view-sig :hash] hash-value))
|
||||
|
||||
(view-hash
|
||||
[this namespace view-sig]
|
||||
(get-in @subbed-views [namespace view-sig :hash]))
|
||||
|
||||
(view-data [this namespace table]
|
||||
;; We don't yet use table name as an optimization here.
|
||||
(map :view-data (vals (get @subbed-views namespace))))
|
||||
|
|
|
@ -7,5 +7,6 @@
|
|||
(disconnect [this disconnect-request])
|
||||
|
||||
;; DB interaction
|
||||
(persistence [this])
|
||||
(subscribed-views [this namespace])
|
||||
(broadcast-deltas [this deltas namespace]))
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
(subscribed-views [this namespace]
|
||||
(persist/view-data @memory default-ns nil))
|
||||
|
||||
(persistence [this] @memory)
|
||||
|
||||
(broadcast-deltas [this new-deltas namespace]
|
||||
(reset! received-deltas new-deltas)))
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
(defn dvt-helper
|
||||
([all-views action] (dvt-helper all-views action vf/templates))
|
||||
([all-views action templates]
|
||||
(vd/do-view-transaction vschema vf/db all-views action templates)))
|
||||
(vd/do-view-transaction vf/persistence :default-ns vf/vschema vf/db all-views action templates)))
|
||||
|
||||
(use-fixtures :each (vf/database-fixtures!))
|
||||
|
||||
|
|
|
@ -4,7 +4,8 @@
|
|||
[clojure.java.jdbc :as j]
|
||||
[honeysql.core :as hsql]
|
||||
[edl.core :refer [defschema]]
|
||||
[clojure.data.generators :as dg]))
|
||||
[clojure.data.generators :as dg]
|
||||
[views.persistence.memory :refer [new-memory-persistence]]))
|
||||
|
||||
(defn sql-ts
|
||||
([ts] (java.sql.Timestamp. ts))
|
||||
|
@ -18,6 +19,8 @@
|
|||
|
||||
(defschema vschema db "public")
|
||||
|
||||
(def persistence (new-memory-persistence))
|
||||
|
||||
(defn clean-tables!
|
||||
[tables]
|
||||
(doseq [t (map name tables)]
|
||||
|
|
Loading…
Reference in a new issue