From 86546e0bf6a453073f0fa78542f5241c22067d60 Mon Sep 17 00:00:00 2001 From: "Alexander K. Hudek" Date: Fri, 26 Dec 2014 17:42:49 -0500 Subject: [PATCH] Added hashing to prevent duplicate full refresh views from being sent out. Computes full refresh view updates in parallel. --- project.clj | 2 +- src/views/base_subscribed_views.clj | 4 +- src/views/db/core.clj | 4 +- src/views/db/deltas.clj | 80 ++++++++++++++++++++++------- src/views/db/load.clj | 15 ++---- src/views/persistence/core.clj | 6 +++ src/views/persistence/memory.clj | 8 +++ src/views/subscribed_views.clj | 1 + test/views/db/core_test.clj | 2 + test/views/db/deltas_test.clj | 2 +- test/views/fixtures.clj | 5 +- 11 files changed, 95 insertions(+), 34 deletions(-) diff --git a/project.clj b/project.clj index 85fe34e..474d938 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject views "0.4.9" +(defproject views "0.5.0" :description "You underestimate the power of the SQL side" :url "https://github.com/diligenceengine/views" diff --git a/src/views/base_subscribed_views.clj b/src/views/base_subscribed_views.clj index 491658e..7a608a5 100644 --- a/src/views/base_subscribed_views.clj +++ b/src/views/base_subscribed_views.clj @@ -85,9 +85,11 @@ (persist/unsubscribe-all! persistence namespace subscriber-key))) ;; - ;; The two below functions get called by vexec!/with-view-transaction + ;; The below functions get called by vexec!/with-view-transaction ;; + (persistence [this] (:persistence config)) + ;; Deprecate this? It's just a call to persistence. (subscribed-views [this namespace] ;; Table name optimization not yet worked through the library. (persist/view-data (:persistence config) namespace "fix-me")) diff --git a/src/views/db/core.clj b/src/views/db/core.clj index 1b13dbb..8c831b1 100644 --- a/src/views/db/core.clj +++ b/src/views/db/core.clj @@ -4,7 +4,7 @@ [clojure.tools.logging :refer [debug]] [views.db.deltas :as vd] [views.db.util :refer [with-retry retry-on-transaction-failure]] - [views.subscribed-views :refer [subscribed-views broadcast-deltas]])) + [views.subscribed-views :refer [subscribed-views broadcast-deltas persistence]])) (defmacro with-view-transaction "Like with-db-transaction, but operates with views. If you want to use a @@ -46,7 +46,7 @@ - broadcast-deltas takes ... ." [{:keys [db schema base-subscribed-views templates namespace deltas] :as conf} action-map] (let [subbed-views (subscribed-views base-subscribed-views namespace) - transaction-fn #(vd/do-view-transaction schema db subbed-views action-map templates)] + transaction-fn #(vd/do-view-transaction (persistence base-subscribed-views) namespace schema db subbed-views action-map templates)] (if deltas ;; inside a transaction we just collect deltas and do not retry (let [{:keys [new-deltas result-set]} (transaction-fn)] (swap! deltas #(conj % new-deltas)) diff --git a/src/views/db/deltas.clj b/src/views/db/deltas.clj index ec5aa6d..34be129 100644 --- a/src/views/db/deltas.clj +++ b/src/views/db/deltas.clj @@ -4,12 +4,14 @@ [clojure.string :refer [split]] [clojure.java.jdbc :as j] [clojure.tools.logging :refer [debug error]] + [clojure.core.reducers :as r] [honeysql.core :as hsql] [honeysql.helpers :as hh] [views.db.load :as vdbl] [views.db.checks :as vc] [views.db.honeysql :as vh] - [views.db.util :refer [log-exception serialization-error?]])) + [views.db.util :refer [log-exception serialization-error?]] + [views.persistence.core :refer [view-hash update-hash!]])) ;; ;; Terminology and data structures used throughout this code @@ -213,24 +215,66 @@ [refresh-set] (fn [view-deltas] (if (coll? view-deltas) - (map #(assoc % :refresh-set refresh-set) view-deltas) - [{:refresh-set refresh-set}]))) + (mapv #(merge % refresh-set) view-deltas) + [refresh-set]))) + +(defn calculate-refresh-sets* + "Compute all the refresh views in parallel." + [db templates refresh-only-views] + (let [reducefn (fn ([] []) + ([a b] + (try + (conj a {:view-sig (:view-sig b) + :refresh-set (get (vdbl/initial-view db (:view-sig b) templates (:view b)) (:view-sig b))}) + (catch SQLException e + (conj a {:view-sig (:view-sig b) :sql-exception e})) + (catch Exception e + (conj a {:view-sig (:view-sig b) :exception e})))))] + (r/fold 1 concat reducefn refresh-only-views))) + +(defn filter-by-hash + "Remove any views with hashes that haven't changed, else update the hash and keep the view." + [persistence namespace refresh-sets] + (filterv + (fn [rs] + (let [hash-value (hash (:refresh-set rs))] + (if (not= (view-hash persistence namespace (:view-sig rs)) hash-value) + (do (update-hash! persistence namespace (:view-sig rs) hash-value) true)))) + refresh-sets)) + +(defn first-serialization-error + [refresh-sets] + (some #(if-let [e (:sql-exception %)] (and (serialization-error? e) e)) refresh-sets)) + +(defn some-exception + [rs] + (or (:sql-exception rs) (:exception rs))) + +;; Exceedingly ugly code. +(defn look-for-exceptions + "Because exceptions in threads get buried, we have to look through the results to check for + serialization errors or other exceptions. We must bubble any serialization exception up." + [refresh-sets] + (if (some #(some-exception %) refresh-sets) + (if-let [e (first-serialization-error refresh-sets)] + (throw e) + (do + (doseq [rs refresh-sets :when (some-exception rs)] + (error "error computing refresh-set for" (:view-sig rs)) + (when-let [e (some-exception rs)] (log-exception e))) + (throw (some some-exception refresh-sets)))) + refresh-sets)) (defn calculate-refresh-sets "For refresh-only views, calculates the refresh-set and adds it to the view's delta update collection." - [deltas db templates refresh-only-views] - (reduce - (fn [d {:keys [view-sig view] :as rov}] - (try - (let [refresh-set (get (vdbl/initial-view db view-sig templates view) view-sig)] - (update-in d [view-sig] (update-deltas-with-refresh-set refresh-set))) - ;; report bad view-sig on error - (catch Exception e - (error "error refreshing view" view-sig) - (log-exception e) - (throw e)))) - deltas - refresh-only-views)) + [persistence namespace deltas db templates refresh-only-views] + (let [refresh-sets (->> (calculate-refresh-sets* db templates refresh-only-views) + look-for-exceptions + (filter-by-hash persistence namespace))] + (reduce + (fn [d rs] (update-in d [(:view-sig rs)] (update-deltas-with-refresh-set rs))) + deltas + refresh-sets))) (defn format-deltas "Removes extraneous data from view delta response collections. @@ -264,7 +308,7 @@ The function returns a hash-map with :result-set and :new-deltas collection values. :new-deltas contains :insert-deltas, :delete-deltas, and :refresh-set values, as well as the original :view-sig the deltas apply to." - [schema db all-views action templates] + [persistence namespace schema db all-views action templates] (j/with-db-transaction [t db :isolation :serializable] (let [filtered-views (filterv #(vc/have-overlapping-tables? action (:view %)) all-views) {full-refresh-views true normal-views nil} (group-by :refresh-only? filtered-views) @@ -272,5 +316,5 @@ table (-> action vh/extract-tables ffirst) pkey (get-primary-key schema table) {:keys [views-with-deltas result-set]} (perform-action-and-return-deltas schema t need-deltas action table pkey) - deltas (calculate-refresh-sets (format-deltas views-with-deltas) t templates full-refresh-views)] + deltas (calculate-refresh-sets persistence namespace (format-deltas views-with-deltas) t templates full-refresh-views)] {:new-deltas deltas :result-set result-set}))) diff --git a/src/views/db/load.clj b/src/views/db/load.clj index 06229d1..00ce4fe 100644 --- a/src/views/db/load.clj +++ b/src/views/db/load.clj @@ -24,13 +24,8 @@ the template config map, and the view-map itself. and returns a result-set for the new-views with post-fn functions applied to the data." [db new-view templates view-map] - (try - (->> view-map - (view-query db) - (into []) - (post-process-result-set new-view templates) - (hash-map new-view)) - (catch Exception e - (error "when computing initial-view for" new-view) - (log-exception e) - (throw e)))) + (->> view-map + (view-query db) + (into []) + (post-process-result-set new-view templates) + (hash-map new-view))) diff --git a/src/views/persistence/core.clj b/src/views/persistence/core.clj index f140ccb..956edef 100644 --- a/src/views/persistence/core.clj +++ b/src/views/persistence/core.clj @@ -17,6 +17,12 @@ "Unsubscribes the subscriber with key 'subscriber-key' from ALL views in namespace 'namespace'.") + (update-hash! [this namespace view-sig hash-value] + "Updates the data has for the view.") + + (view-hash [this namespace view-sig] + "Returns the latest data has for the view.") + (view-data [this namespace table-name] "Return all the view data that references a table name in a namespace.") diff --git a/src/views/persistence/memory.clj b/src/views/persistence/memory.clj index 073373c..876f36c 100644 --- a/src/views/persistence/memory.clj +++ b/src/views/persistence/memory.clj @@ -49,6 +49,14 @@ (swap! subbed-views (fn [sv] (update-in sv [namespace] ns-unsubscribe-all! subscriber-key)))) + (update-hash! + [this namespace view-sig hash-value] + (swap! subbed-views assoc-in [namespace view-sig :hash] hash-value)) + + (view-hash + [this namespace view-sig] + (get-in @subbed-views [namespace view-sig :hash])) + (view-data [this namespace table] ;; We don't yet use table name as an optimization here. (map :view-data (vals (get @subbed-views namespace)))) diff --git a/src/views/subscribed_views.clj b/src/views/subscribed_views.clj index 43a104f..d8c5126 100644 --- a/src/views/subscribed_views.clj +++ b/src/views/subscribed_views.clj @@ -7,5 +7,6 @@ (disconnect [this disconnect-request]) ;; DB interaction + (persistence [this]) (subscribed-views [this namespace]) (broadcast-deltas [this deltas namespace])) diff --git a/test/views/db/core_test.clj b/test/views/db/core_test.clj index e3911ec..c88cf0c 100644 --- a/test/views/db/core_test.clj +++ b/test/views/db/core_test.clj @@ -17,6 +17,8 @@ (subscribed-views [this namespace] (persist/view-data @memory default-ns nil)) + (persistence [this] @memory) + (broadcast-deltas [this new-deltas namespace] (reset! received-deltas new-deltas))) diff --git a/test/views/db/deltas_test.clj b/test/views/db/deltas_test.clj index 637cbd3..0c2ed1f 100644 --- a/test/views/db/deltas_test.clj +++ b/test/views/db/deltas_test.clj @@ -10,7 +10,7 @@ (defn dvt-helper ([all-views action] (dvt-helper all-views action vf/templates)) ([all-views action templates] - (vd/do-view-transaction vschema vf/db all-views action templates))) + (vd/do-view-transaction vf/persistence :default-ns vf/vschema vf/db all-views action templates))) (use-fixtures :each (vf/database-fixtures!)) diff --git a/test/views/fixtures.clj b/test/views/fixtures.clj index a2a9112..98a1963 100644 --- a/test/views/fixtures.clj +++ b/test/views/fixtures.clj @@ -4,7 +4,8 @@ [clojure.java.jdbc :as j] [honeysql.core :as hsql] [edl.core :refer [defschema]] - [clojure.data.generators :as dg])) + [clojure.data.generators :as dg] + [views.persistence.memory :refer [new-memory-persistence]])) (defn sql-ts ([ts] (java.sql.Timestamp. ts)) @@ -18,6 +19,8 @@ (defschema vschema db "public") +(def persistence (new-memory-persistence)) + (defn clean-tables! [tables] (doseq [t (map name tables)]