diff --git a/src/views/base_subscribed_views.clj b/src/views/base_subscribed_views.clj index 959ad5f..4dc246b 100644 --- a/src/views/base_subscribed_views.clj +++ b/src/views/base_subscribed_views.clj @@ -1,8 +1,8 @@ (ns views.base-subscribed-views (:require - [views.persistor :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views! get-subscribed-views]] + [views.persistence :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views! get-subscribed-views]] [views.subscribed-views :refer [ISubscribedViews]] - [views.subscriptions :refer [default-ns]] + [views.subscriptions :refer [default-ns subscribed-to]] [views.filters :refer [view-filter]] [clojure.tools.logging :refer [debug info warn error]] [clojure.core.async :refer [put! > (subscribe-to-view! persistence db vs popts) + (send-fn* send-fn subscriber-key))))))) (unsubscribe-views [this msg] - (let [{:keys [subscriber-key-fn namespace-fn persistor]} opts + (let [{:keys [subscriber-key-fn namespace-fn persistence]} opts subscriber-key (subscriber-key-fn* subscriber-key-fn msg) namespace (namespace-fn* namespace-fn msg) view-sigs (:views msg)] (info "Unsubscribing views: " view-sigs " for subscriber " subscriber-key) - (doseq [vs view-sigs] (unsubscribe-from-view! persistor vs subscriber-key namespace)))) + (doseq [vs view-sigs] (unsubscribe-from-view! persistence vs subscriber-key namespace)))) (disconnect [this msg] - (let [{:keys [subscriber-key-fn namespace-fn persistor]} opts + (let [{:keys [subscriber-key-fn namespace-fn persistence]} opts subscriber-key (subscriber-key-fn* subscriber-key-fn msg) namespace (namespace-fn* namespace-fn msg)] - (unsubscribe-from-all-views! persistor subscriber-key namespace))) + (unsubscribe-from-all-views! persistence subscriber-key namespace))) (subscribed-views [this args] - (get-subscribed-views (:persistor opts) (namespace-fn* (:namespace-fn opts) args))) + (map :view-data (vals (get-subscribed-views (:persistence opts) (namespace-fn* (:namespace-fn opts) args))))) - (broadcast-deltas [this fdb views-with-deltas] - (println "broadcasting 1 2 3"))) + (broadcast-deltas [this deltas] + (doseq [vs (keys deltas)] + (doseq [sk (subscribed-to vs)] + (send-fn* (:send-fn opts) sk (get deltas vs)))))) diff --git a/src/views/core.clj b/src/views/core.clj index 91a9357..7d357d6 100644 --- a/src/views/core.clj +++ b/src/views/core.clj @@ -1,12 +1,12 @@ (ns views.core (:require - [views.base-subscribed-views :as bsv]; :refer [BaseSubscribedViews]] - [views.persistor :as vp])); :refer [InMemoryPersistor]])) - ;; (:import - ;; [views.persistor InMemoryPersistor] - ;; [views.base_subscribed_views BaseSubscribedViews])) + [views.base-subscribed-views :as bsv] + [views.persistence :as vp]) + (:import + [views.persistence InMemoryPersistence] + [views.base_subscribed_views BaseSubscribedViews])) -(defmacro config - [{:keys [db schema persistor] :as opts}] - (let [opts (if persistor opts (assoc opts :persistor (vp/->InMemoryPersistor)))] - {:db db :schema schema :base-subscribed-views (bsv/->BaseSubscribedViews opts)})) +(defn config + [{:keys [db schema persistence] :as opts}] + (let [opts (if persistence opts (assoc opts :persistence (InMemoryPersistence.)))] + {:db db :schema schema :base-subscribed-views (BaseSubscribedViews. opts)})) diff --git a/src/views/db/core.clj b/src/views/db/core.clj index cad43e1..3f5e578 100644 --- a/src/views/db/core.clj +++ b/src/views/db/core.clj @@ -1,25 +1,13 @@ (ns views.db.core (:import - [java.sql SQLException BatchUpdateException] - [org.postgresql.util PSQLException]) + [java.sql SQLException]) (:require - [clojure.string :refer [trim split]] - [honeysql.core :as hsql] - [honeysql.helpers :as hh] - [honeysql.format :as fmt] - [honeysql.types :as ht] [clojure.java.jdbc :as j] [clojure.tools.logging :refer [debug]] [views.db.honeysql :as vh] + [views.db.deltas :as vd] [views.subscribed-views :refer [subscribed-views broadcast-deltas]])) -(defn get-primary-key - "Get a primary key for a table." - [schema table] - (or - (keyword (get-in schema [(name table) :primary-key :column_name])) - (throw (Exception. (str "Cannot find primary key for table: " table))))) - ;; ;; Takes the HoneySQL template for a view and the arglist ;; and compiles the view with a set of dummy args in the @@ -84,295 +72,6 @@ (let [lookup (first view-sig)] (view-map (get-in templates [lookup :fn]) view-sig))) -(defn swap-out-dummy-for-pos - "Replaces dummy arg like \"?0\" for integer value (0) so we can sort args." - [dummy-arg] - (Integer. (subs dummy-arg 1))) - -;; Helper for determine-filter-clauses (which is a helper -;; for view-check-template). Extracts constituent parts from -;; where clause. -(defn set-filter-clauses - [dummy-args fc w] - (if (= w :and) - fc - (if (contains? (set dummy-args) (last w)) - (update-in fc [:s] assoc (swap-out-dummy-for-pos (last w)) (second w)) - (update-in fc [:w] (fnil conj []) w)))) - -;; Helper for generating the view-check HoneySQL template. -;; Builds the where and select clauses up from constituent -;; where-clauses. Placeholder identifies the parameters -;; to pull out into the select clause. -(defn determine-filter-clauses - [wc dummy-args] - (let [fc {:s {} :w nil} - fc (if (and (not= :and (first wc)) (not (coll? (first wc)))) - (set-filter-clauses dummy-args fc wc) - (reduce #(set-filter-clauses dummy-args %1 %2) fc wc))] - (-> fc - (update-in [:s] #(into [] (vals (sort-by key %)))) - (update-in [:w] #(vh/with-op :and %))))) - -(defn append-arg-map - "Removes table/alias namespacing from select fields and creates a hash-map - of field to arguments for checking this view against checked-results later on. - Note that this assumes our select-fields are in the same order as they - are present in the :args view-map field (which they should be)." - [view-map select-fields] - (let [select-fields (map #(-> % name (split #"\.") last keyword) select-fields)] - (assoc view-map :arg-compare (zipmap select-fields (into [] (:args view-map)))))) - -(defn- create-view-delta-where-clauses - [view-map action] - (let [action-table (first (vh/extract-tables action))] - (for [view-table (vh/find-table-aliases action-table (:tables view-map))] - (-> (:where action) - (vh/prefix-columns (vh/table-alias view-table)) - (vh/replace-table (vh/table-alias action-table) (vh/table-alias view-table)))))) - -(defn format-action-wc-for-view - "Takes view-map and action (HoneySQL hash-map for insert/update/delete), - extracts where clause from action, and formats it with the proper - alias (or no alias) so that it will work when applied to the view SQL." - [view-map action] - (if (:where action) - (let [preds (create-view-delta-where-clauses view-map action)] - (if (> (count preds) 1) - (into [:or] preds) - (first preds))))) - -(defn- update-where-clause - [hh-spec where] - (if-let [w (:where where)] - (assoc hh-spec :where w) - (dissoc hh-spec :where))) - -(defn view-check-template - "Receives a view-map and an action (insert/update/delete HoneySQL hash-map). - Returns a HoneySQL hash-map which will can be formatted as SQL to check if a - view needs to receive deltas for the action SQL." - [view-map action] - (let [{:keys [dummy-view dummy-args]} view-map - fc (determine-filter-clauses (:where dummy-view) dummy-args) - action-wc (format-action-wc-for-view view-map action) - view-map (append-arg-map view-map (:s fc))] ; we need this to compare *after* the check is run - (->> (-> dummy-view - (update-where-clause (vh/merge-where-clauses action-wc (:w fc))) - (merge (apply hh/select (:s fc)))) - (hash-map :view-map view-map :view-check)))) - -(defn prepare-checks-for-view-deltas - "Checks to see if an action has tables related to a view, and - if so builds the HoneySQL hash-map for the SQL needed. - Uses this hash-map as a key and conj's the view-map to the key's - value so as to avoid redundant delta-check querying." - [action confirmed-views view-map] - ;; Confirm if any of the tables in view-map are present in the action template: - (if (some (set (map first (vh/extract-tables action))) - (map first (:tables view-map))) - - ;; Then construct the check-template for this particular view. - (if-let [{:keys [view-check view-map]} (view-check-template view-map action)] - ;; We then use the view-check as an index and conj the - ;; view-map to it so as to avoid redundant checks. - (update-in confirmed-views [view-check] #(conj % view-map)) - confirmed-views) - confirmed-views)) - -(defn prepare-view-checks - "Prepares checks for a collection of views (view-maps) against a HoneySQL action - (insert/update/delete) hash-map. - - Returns a structure like so: - {{> views - (map #(check-view-args checked-results %)) - (remove nil?) - distinct)) - -(defn- do-view-pre-check - [db needs-deltas view-check] - ;; - ;; We have empty-select? if we have a view with no where predicate clauses-- - ;; so it will always require deltas if there are matching tables. - ;; - ;; empty-where comes about if we are inserting--we don't have any where predicate - ;; in the insert, of course, so we can't perform pre-checks reliably. - ;; When we do an insert we have to simply do the delta query regardless, for now. - ;; - (let [empty-select? (seq (remove nil? (:select (first view-check)))) - empty-where? (seq (remove #(or (nil? %) (= :and %)) (:where (first view-check))))] - (if (or (not empty-select?) (not empty-where?)) - (apply conj needs-deltas (last view-check)) ;; put them all in if we can't do pre-check. - (let [checked-results (do-check db (first view-check)) - ;; checks view args against checked result set - checked-views (check-all-view-args checked-results (last view-check))] - (if (seq checked-views) - (apply conj needs-deltas checked-views) - needs-deltas))))) - -(defn do-view-pre-checks - "Takes db, all views (view-maps) and the HoneySQL action (insert/update/delete) - hash-map. Returns view-maps for all the views which need to receive - delta updates after the action is performed. - - *This function should be called within a transaction before performing the - insert/update/delete action.*" - [db all-views action] - (let [view-checks (prepare-view-checks all-views action)] - (reduce #(do-view-pre-check db %1 %2) [] view-checks))) - -(defn- calculate-delete-deltas - [db view-map] - (->> (:delete-deltas-map view-map) - hsql/format - (j/query db) - (assoc view-map :delete-deltas))) - -;; ------------------------------------------------------------------------------- -;; Handle inserts -;; ------------------------------------------------------------------------------- - -(defn compute-delete-deltas-for-insert - "Computes and returns a sequence of delete deltas for a single view and insert." - [schema db view-map table record] - (if (vh/outer-join-table? (:view view-map) table) - (let [delta-q (vh/outer-join-delta-query schema (:view view-map) table record)] - (j/query db (hsql/format delta-q))) - [])) - -(defn primary-key-predicate - "Return a predicate for a where clause that constrains to the primary key of - the record." - [schema table record] - (let [pkey (get-primary-key schema table)] - [:= pkey (pkey record)])) - -(defn compute-insert-deltas-for-insert - [schema db view-map table record] - (let [pkey-pred (primary-key-predicate schema table record) - action (hsql/build :insert-into table :values [record] :where pkey-pred) - insert-delta-wc (format-action-wc-for-view view-map action) - view (:view view-map) - insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))] - (j/query db (hsql/format insert-delta-map)))) - -(defn compute-insert-delete-deltas-for-views - [schema db views table record] - (doall (map #(compute-delete-deltas-for-insert schema db % table record) views))) - -(defn compute-insert-insert-deltas-for-views - [schema db views table record] - (doall (map #(compute-insert-deltas-for-insert schema db % table record) views))) - -(defn compute-deltas-for-insert - "This takes a *single* insert and a view, applies the insert and computes - the view deltas." - [schema db views table record] - (let [deletes (compute-insert-delete-deltas-for-views schema db views table record) - record* (first (j/insert! db table record)) - inserts (compute-insert-insert-deltas-for-views schema db views table record*)] - {:views-with-deltas (doall (map #(assoc %1 :delete-deltas %2 :insert-deltas %3) views deletes inserts)) - :result record*})) - -;; Handles insert and calculation of insert (after insert) delta. -(defn- insert-and-append-deltas! - [schema db views action table pkey] - (let [table (:insert-into action)] - (reduce - #(-> %1 - (update-in [:views-with-deltas] into (:views-with-deltas %2)) - (update-in [:result-set] conj (:result %2))) - {:views-with-deltas [] :result-set []} - (map #(compute-deltas-for-insert schema db views table %) (:values action))))) - -;; ------------------------------------------------------------------------------- - -;; This is for insert deltas for non-insert updates. - -;;; Takes the HoneySQL map (at key :view) from the view-map and appends -;;; the appropriately-table-namespaced where clause which limits the -;;; view query to the previously inserted or updated records. -(defn- calculate-insert-deltas - [db action pkey-wc view-map] - (let [action (assoc action :where pkey-wc) - insert-delta-wc (format-action-wc-for-view view-map action) - view (:view view-map) - insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %))) - deltas (j/query db (hsql/format insert-delta-map))] - (if (seq deltas) - (update-in view-map [:insert-deltas] #(apply conj % deltas)) - view-map))) - -;; Helper to query the action's table for primary key and pull it out. -(defn- get-action-row-key - [db pkey table action] - (->> (:where action) - (hsql/build :select pkey :from table :where) - hsql/format - (j/query db) - first pkey)) - -;; Handles update and calculation of delete (before update) and insert (after update) deltas. -(defn- update-and-append-deltas! - [db views action table pkey] - (let [views-pre (doall (map #(calculate-delete-deltas db %) views)) - pkey-val (get-action-row-key db pkey table action) - update (j/execute! db (hsql/format action))] - {:views-with-deltas (doall (map #(calculate-insert-deltas db action [:= pkey pkey-val] %) views-pre)) - :result-set update})) - -;; Handles deletion and calculation of delete (before update) delta. -(defn- delete-and-append-deltas! - [db views action table pkey] - (let [views-pre (doall (map #(calculate-delete-deltas db %) views))] - {:views-with-deltas views-pre - :result-set (j/execute! db (hsql/format action))})) - -;; Identifies which action--insert, update or delete--we are performing and dispatches appropriately. -;; Returns view-map with appropriate deltas appended. -(defn- perform-action-and-return-deltas - [schema db views action table pkey] - (cond - (:insert-into action) (insert-and-append-deltas! schema db views action table pkey) - (:update action) (update-and-append-deltas! db views action table pkey) - (:delete-from action) (delete-and-append-deltas! db views action table pkey) - :else (throw (Exception. "Received malformed action: " action)))) - -(defn generate-view-delta-map - "Adds a HoneySQL hash-map for the delta-calculation specific to the view + action. - Takes a view-map and the action HoneySQL hash-map, and appends the action's - where clause to the view's where clause, and adds in new field :insert-deltas-map." - [view-map action] - (let [action-wc (format-action-wc-for-view view-map action) - view (:view view-map)] - (->> (update-in view [:where] #(:where (vh/merge-where-clauses action-wc %))) - (assoc view-map :delete-deltas-map)))) - - (defn do-view-transaction "Takes the following arguments: schema - from edl.core/defschema @@ -398,11 +97,11 @@ [schema db all-views action] ;; Every update connected with a view is done in a transaction: (j/with-db-transaction [t db :isolation :serializable] - (let [need-deltas (do-view-pre-checks t all-views action) - need-deltas (map #(generate-view-delta-map % action) need-deltas) + (let [need-deltas (vd/do-view-pre-checks t all-views action) + need-deltas (map #(vd/generate-view-delta-map % action) need-deltas) table (-> action vh/extract-tables ffirst) - pkey (get-primary-key schema table)] - (perform-action-and-return-deltas schema t need-deltas action table pkey)))) + pkey (vd/get-primary-key schema table)] + (vd/perform-action-and-return-deltas schema t need-deltas action table pkey)))) ;; ;; Need to catch this and retry: @@ -442,9 +141,19 @@ result# (j/with-db-transaction [t# ~(second binding) :isolation :serializable] (let [~tvar (assoc t# :deltas deltas#)] ~@forms))] - (broadcast-deltas ~subscribed-views ~(second binding) @deltas#) + (broadcast-deltas ~subscribed-views @deltas#) result#)))))) +(defn format-deltas + [views-with-deltas] + (->> views-with-deltas + (map #(select-keys % [:view-sig :delete-deltas :insert-deltas])) + (group-by :view-sig))) + +(defn vexec* + [] + ) + (defn vexec! "Used to perform arbitrary insert/update/delete actions on the database, while ensuring that view deltas are appropriately checked and calculated @@ -475,8 +184,8 @@ transaction-fn #(do-view-transaction schema db subbed-views action-map)] (if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry (let [{:keys [views-with-deltas result-set]} (transaction-fn)] - (swap! deltas into views-with-deltas) + (swap! deltas into (format-deltas views-with-deltas)) result-set) (let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] - (broadcast-deltas base-subscribed-views db views-with-deltas) + (broadcast-deltas base-subscribed-views (format-deltas views-with-deltas)) result-set))))) diff --git a/src/views/db/deltas.clj b/src/views/db/deltas.clj new file mode 100644 index 0000000..4650e0d --- /dev/null +++ b/src/views/db/deltas.clj @@ -0,0 +1,302 @@ +(ns views.db.deltas + (:require + [clojure.string :refer [split]] + [clojure.java.jdbc :as j] + [honeysql.core :as hsql] + [honeysql.helpers :as hh] + [views.db.honeysql :as vh])) + +(defn get-primary-key + "Get a primary key for a table." + [schema table] + (or + (keyword (get-in schema [(name table) :primary-key :column_name])) + (throw (Exception. (str "Cannot find primary key for table: " table))))) + +(defn swap-out-dummy-for-pos + "Replaces dummy arg like \"?0\" for integer value (0) so we can sort args." + [dummy-arg] + (Integer. (subs dummy-arg 1))) + +;; Helper for determine-filter-clauses (which is a helper +;; for view-check-template). Extracts constituent parts from +;; where clause. +(defn set-filter-clauses + [dummy-args fc w] + (if (= w :and) + fc + (if (contains? (set dummy-args) (last w)) + (update-in fc [:s] assoc (swap-out-dummy-for-pos (last w)) (second w)) + (update-in fc [:w] (fnil conj []) w)))) + +;; Helper for generating the view-check HoneySQL template. +;; Builds the where and select clauses up from constituent +;; where-clauses. Placeholder identifies the parameters +;; to pull out into the select clause. +(defn determine-filter-clauses + [wc dummy-args] + (let [fc {:s {} :w nil} + fc (if (and (not= :and (first wc)) (not (coll? (first wc)))) + (set-filter-clauses dummy-args fc wc) + (reduce #(set-filter-clauses dummy-args %1 %2) fc wc))] + (-> fc + (update-in [:s] #(into [] (vals (sort-by key %)))) + (update-in [:w] #(vh/with-op :and %))))) + +(defn append-arg-map + "Removes table/alias namespacing from select fields and creates a hash-map + of field to arguments for checking this view against checked-results later on. + Note that this assumes our select-fields are in the same order as they + are present in the :args view-map field (which they should be)." + [view-map select-fields] + (let [select-fields (map #(-> % name (split #"\.") last keyword) select-fields)] + (assoc view-map :arg-compare (zipmap select-fields (into [] (:args view-map)))))) + +(defn- create-view-delta-where-clauses + [view-map action] + (let [action-table (first (vh/extract-tables action))] + (for [view-table (vh/find-table-aliases action-table (:tables view-map))] + (-> (:where action) + (vh/prefix-columns (vh/table-alias view-table)) + (vh/replace-table (vh/table-alias action-table) (vh/table-alias view-table)))))) + +(defn format-action-wc-for-view + "Takes view-map and action (HoneySQL hash-map for insert/update/delete), + extracts where clause from action, and formats it with the proper + alias (or no alias) so that it will work when applied to the view SQL." + [view-map action] + (if (:where action) + (let [preds (create-view-delta-where-clauses view-map action)] + (if (> (count preds) 1) + (into [:or] preds) + (first preds))))) + +(defn- update-where-clause + [hh-spec where] + (if-let [w (:where where)] + (assoc hh-spec :where w) + (dissoc hh-spec :where))) + +(defn view-check-template + "Receives a view-map and an action (insert/update/delete HoneySQL hash-map). + Returns a HoneySQL hash-map which will can be formatted as SQL to check if a + view needs to receive deltas for the action SQL." + [view-map action] + (let [{:keys [dummy-view dummy-args]} view-map + fc (determine-filter-clauses (:where dummy-view) dummy-args) + action-wc (format-action-wc-for-view view-map action) + view-map (append-arg-map view-map (:s fc))] ; we need this to compare *after* the check is run + (->> (-> dummy-view + (update-where-clause (vh/merge-where-clauses action-wc (:w fc))) + (merge (apply hh/select (:s fc)))) + (hash-map :view-map view-map :view-check)))) + +(defn prepare-checks-for-view-deltas + "Checks to see if an action has tables related to a view, and + if so builds the HoneySQL hash-map for the SQL needed. + Uses this hash-map as a key and conj's the view-map to the key's + value so as to avoid redundant delta-check querying." + [action confirmed-views view-map] + ;; Confirm if any of the tables in view-map are present in the action template: + (if (some (set (map first (vh/extract-tables action))) + (map first (:tables view-map))) + + ;; Then construct the check-template for this particular view. + (if-let [{:keys [view-check view-map]} (view-check-template view-map action)] + ;; We then use the view-check as an index and conj the + ;; view-map to it so as to avoid redundant checks. + (update-in confirmed-views [view-check] #(conj % view-map)) + confirmed-views) + confirmed-views)) + +(defn prepare-view-checks + "Prepares checks for a collection of views (view-maps) against a HoneySQL action + (insert/update/delete) hash-map. + + Returns a structure like so: + {{> views + (map #(check-view-args checked-results %)) + (remove nil?) + distinct)) + +(defn- do-view-pre-check + [db needs-deltas view-check] + ;; + ;; We have empty-select? if we have a view with no where predicate clauses-- + ;; so it will always require deltas if there are matching tables. + ;; + ;; empty-where comes about if we are inserting--we don't have any where predicate + ;; in the insert, of course, so we can't perform pre-checks reliably. + ;; When we do an insert we have to simply do the delta query regardless, for now. + ;; + (let [empty-select? (seq (remove nil? (:select (first view-check)))) + empty-where? (seq (remove #(or (nil? %) (= :and %)) (:where (first view-check))))] + (if (or (not empty-select?) (not empty-where?)) + (apply conj needs-deltas (last view-check)) ;; put them all in if we can't do pre-check. + (let [checked-results (do-check db (first view-check)) + ;; checks view args against checked result set + checked-views (check-all-view-args checked-results (last view-check))] + (if (seq checked-views) + (apply conj needs-deltas checked-views) + needs-deltas))))) + +(defn do-view-pre-checks + "Takes db, all views (view-maps) and the HoneySQL action (insert/update/delete) + hash-map. Returns view-maps for all the views which need to receive + delta updates after the action is performed. + + *This function should be called within a transaction before performing the + insert/update/delete action.*" + [db all-views action] + (let [view-checks (prepare-view-checks all-views action)] + (reduce #(do-view-pre-check db %1 %2) [] view-checks))) + +(defn- calculate-delete-deltas + [db view-map] + (->> (:delete-deltas-map view-map) + hsql/format + (j/query db) + (assoc view-map :delete-deltas))) + +;; ------------------------------------------------------------------------------- +;; Handle inserts +;; ------------------------------------------------------------------------------- + +(defn compute-delete-deltas-for-insert + "Computes and returns a sequence of delete deltas for a single view and insert." + [schema db view-map table record] + (if (vh/outer-join-table? (:view view-map) table) + (let [delta-q (vh/outer-join-delta-query schema (:view view-map) table record)] + (j/query db (hsql/format delta-q))) + [])) + +(defn primary-key-predicate + "Return a predicate for a where clause that constrains to the primary key of + the record." + [schema table record] + (let [pkey (get-primary-key schema table)] + [:= pkey (pkey record)])) + +(defn compute-insert-deltas-for-insert + [schema db view-map table record] + (let [pkey-pred (primary-key-predicate schema table record) + action (hsql/build :insert-into table :values [record] :where pkey-pred) + insert-delta-wc (format-action-wc-for-view view-map action) + view (:view view-map) + insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))] + (j/query db (hsql/format insert-delta-map)))) + +(defn compute-insert-delete-deltas-for-views + [schema db views table record] + (doall (map #(compute-delete-deltas-for-insert schema db % table record) views))) + +(defn compute-insert-insert-deltas-for-views + [schema db views table record] + (doall (map #(compute-insert-deltas-for-insert schema db % table record) views))) + +(defn compute-deltas-for-insert + "This takes a *single* insert and a view, applies the insert and computes + the view deltas." + [schema db views table record] + (let [deletes (compute-insert-delete-deltas-for-views schema db views table record) + record* (first (j/insert! db table record)) + inserts (compute-insert-insert-deltas-for-views schema db views table record*)] + {:views-with-deltas (doall (map #(assoc %1 :delete-deltas %2 :insert-deltas %3) views deletes inserts)) + :result record*})) + +;; Handles insert and calculation of insert (after insert) delta. +(defn- insert-and-append-deltas! + [schema db views action table pkey] + (let [table (:insert-into action)] + (reduce + #(-> %1 + (update-in [:views-with-deltas] into (:views-with-deltas %2)) + (update-in [:result-set] conj (:result %2))) + {:views-with-deltas [] :result-set []} + (map #(compute-deltas-for-insert schema db views table %) (:values action))))) + +;; ------------------------------------------------------------------------------- + +;; This is for insert deltas for non-insert updates. + +;;; Takes the HoneySQL map (at key :view) from the view-map and appends +;;; the appropriately-table-namespaced where clause which limits the +;;; view query to the previously inserted or updated records. +(defn- calculate-insert-deltas + [db action pkey-wc view-map] + (let [action (assoc action :where pkey-wc) + insert-delta-wc (format-action-wc-for-view view-map action) + view (:view view-map) + insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %))) + deltas (j/query db (hsql/format insert-delta-map))] + (if (seq deltas) + (update-in view-map [:insert-deltas] #(apply conj % deltas)) + view-map))) + +;; Helper to query the action's table for primary key and pull it out. +(defn- get-action-row-key + [db pkey table action] + (->> (:where action) + (hsql/build :select pkey :from table :where) + hsql/format + (j/query db) + first pkey)) + +;; Handles update and calculation of delete (before update) and insert (after update) deltas. +(defn- update-and-append-deltas! + [db views action table pkey] + (let [views-pre (doall (map #(calculate-delete-deltas db %) views)) + pkey-val (get-action-row-key db pkey table action) + update (j/execute! db (hsql/format action))] + {:views-with-deltas (doall (map #(calculate-insert-deltas db action [:= pkey pkey-val] %) views-pre)) + :result-set update})) + +;; Handles deletion and calculation of delete (before update) delta. +(defn- delete-and-append-deltas! + [db views action table pkey] + (let [views-pre (doall (map #(calculate-delete-deltas db %) views))] + {:views-with-deltas views-pre + :result-set (j/execute! db (hsql/format action))})) + +;; Identifies which action--insert, update or delete--we are performing and dispatches appropriately. +;; Returns view-map with appropriate deltas appended. +(defn perform-action-and-return-deltas + [schema db views action table pkey] + (cond + (:insert-into action) (insert-and-append-deltas! schema db views action table pkey) + (:update action) (update-and-append-deltas! db views action table pkey) + (:delete-from action) (delete-and-append-deltas! db views action table pkey) + :else (throw (Exception. "Received malformed action: " action)))) + +(defn generate-view-delta-map + "Adds a HoneySQL hash-map for the delta-calculation specific to the view + action. + Takes a view-map and the action HoneySQL hash-map, and appends the action's + where clause to the view's where clause, and adds in new field :insert-deltas-map." + [view-map action] + (let [action-wc (format-action-wc-for-view view-map action) + view (:view view-map)] + (->> (update-in view [:where] #(:where (vh/merge-where-clauses action-wc %))) + (assoc view-map :delete-deltas-map)))) diff --git a/src/views/persistor.clj b/src/views/persistence.clj similarity index 92% rename from src/views/persistor.clj rename to src/views/persistence.clj index d768682..b5d13fa 100644 --- a/src/views/persistor.clj +++ b/src/views/persistence.clj @@ -1,17 +1,17 @@ -(ns views.persistor +(ns views.persistence (:require [clojure.java.jdbc :as j] [views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for compiled-views-for subscriptions-for]] [views.db.load :refer [initial-view]])) -(defprotocol IPersistor +(defprotocol IPersistence (subscribe-to-view! [this db view-sig opts]) (unsubscribe-from-view! [this view-sig subscriber-key namespace]) (unsubscribe-from-all-views! [this subscriber-key namespace]) (get-subscribed-views [this namespace])) -(deftype InMemoryPersistor [] - IPersistor +(deftype InMemoryPersistence [] + IPersistence (subscribe-to-view! [persistor db view-sig {:keys [templates subscriber-key namespace]}] (j/with-db-transaction [t db :isolation :serializable] diff --git a/src/views/subscribed_views.clj b/src/views/subscribed_views.clj index 46ebbe9..7f2b66c 100644 --- a/src/views/subscribed_views.clj +++ b/src/views/subscribed_views.clj @@ -7,5 +7,5 @@ (disconnect [this disconnect-request]) ;; DB interaction - (broadcast-deltas [this db views-with-deltas]) + (broadcast-deltas [this deltas]) (subscribed-views [this args])) diff --git a/test/views/all_tests.clj b/test/views/all_tests.clj index bfcb031..ebdb4ff 100644 --- a/test/views/all_tests.clj +++ b/test/views/all_tests.clj @@ -3,7 +3,8 @@ [clojure.test :refer [run-tests]] [views.subscriptions-test] [views.base-subscribed-views-test] - [views.db.core-test] + ;; [views.db.core-test] + [views.db.deltas-test] [views.db.honeysql-test] [views.db.load-test])) @@ -11,6 +12,7 @@ [] (run-tests 'views.subscriptions-test 'views.base-subscribed-views-test - 'views.db.core-test +;; 'views.db.core-test + 'views.db.deltas-test 'views.db.honeysql-test 'views.db.load-test)) diff --git a/test/views/base_subscribed_views_test.clj b/test/views/base_subscribed_views_test.clj index 98f45c6..42d6ee0 100644 --- a/test/views/base_subscribed_views_test.clj +++ b/test/views/base_subscribed_views_test.clj @@ -1,7 +1,7 @@ (ns views.base-subscribed-views-test (:require - [views.base-subscribed-views :as bsv] ; :refer [BaseSubscribedViews]] - [views.persistor];; :refer [InMemoryPersistor]] + [views.base-subscribed-views :as bsv] + [views.persistence] [views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect]] [views.subscriptions :as vs :refer [subscribed-to?]] [views.fixtures :as vf] @@ -9,7 +9,7 @@ [clojure.java.jdbc :as j] [clj-logging-config.log4j :refer [set-logger! set-loggers!]]) (:import - [views.persistor InMemoryPersistor] + [views.persistence InMemoryPersistence] [views.base_subscribed_views BaseSubscribedViews])) (set-loggers! @@ -23,27 +23,27 @@ (use-fixtures :each vf/database-fixtures! subscription-fixtures!) -(def persistor (InMemoryPersistor.)) +(def persistence (InMemoryPersistence.)) (deftest subscribes-and-dispatches-initial-view-result-set (let [send-fn #(is (and (= %1 1) (= %2 {[:users] []}))) - base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :send-fn send-fn :unsafe? true})] + base-subbed-views (BaseSubscribedViews. {:persistence persistence :db vf/db :templates vf/templates :send-fn send-fn :unsafe? true})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}))) (deftest unsubscribes-view - (let [base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :unsafe? true})] + (let [base-subbed-views (BaseSubscribedViews. {:persistence persistence :db vf/db :templates vf/templates :unsafe? true})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (unsubscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (is (not (subscribed-to? 1 [:users]))))) (deftest filters-subscription-requests (let [templates (assoc-in vf/templates [:users :filter-fn] (fn [msg _] (:authorized? msg))) - base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates templates})] + base-subbed-views (BaseSubscribedViews. {:persistence persistence :db vf/db :templates templates})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users]]}) (is (not (subscribed-to? 1 [:users]))))) (deftest removes-all-subscriptions-on-disconnect - (let [base-subbed-views (BaseSubscribedViews. {:persistor persistor :db vf/db :templates vf/templates :unsafe? true})] + (let [base-subbed-views (BaseSubscribedViews. {:persistence persistence :db vf/db :templates vf/templates :unsafe? true})] (subscribe-views base-subbed-views {:subscriber-key 1 :views [[:users][:user-posts 1]]}) (disconnect base-subbed-views {:subscriber-key 1}) (is (not (subscribed-to? 1 [:user-posts 1]))) diff --git a/test/views/db/core_test.clj b/test/views/db/deltas_test.clj similarity index 87% rename from test/views/db/core_test.clj rename to test/views/db/deltas_test.clj index 3f5fb88..f0456e8 100644 --- a/test/views/db/core_test.clj +++ b/test/views/db/deltas_test.clj @@ -1,10 +1,11 @@ -(ns views.db.core-test +(ns views.db.deltas-test (:require [clojure.test :refer [deftest is run-tests]] [honeysql.core :as hsql] [honeysql.helpers :as hh] [views.fixtures :as vf] [views.db.core :as vdb] + [views.db.deltas :as vdbd] [views.base-subscribed-views :as bsv]) (:import [views.base_subscribed_views BaseSubscribedViews])) @@ -46,14 +47,14 @@ (deftest constructs-view-check-template (let [update-bar (update-bar-template "foo" [:= :id 123]) vm (vdb/view-map join-test-template [:join-test 1 "foo"]) - check-template (:view-check (vdb/view-check-template vm update-bar))] + check-template (:view-check (vdbd/view-check-template vm update-bar))] (is (= (set (:select check-template)) #{:f.id :f.val3})) (is (= (set (rest (:where check-template))) #{[:= :f.val2 "constant"] [:= :b.id 123]})))) (deftest view-check-template-generates-proper-sql (let [update-bar (update-bar-template "foo" [:= :id 123]) vm (vdb/view-map join-test-template [:join-test 1 "foo"]) - check-template (:view-check (vdb/view-check-template vm update-bar))] + check-template (:view-check (vdbd/view-check-template vm update-bar))] (is (= (hsql/format check-template) ["SELECT f.id, f.val3 FROM foo f INNER JOIN bar b ON b.id = f.b_id LEFT JOIN baz ba ON ba.id = b.ba_id RIGHT JOIN qux q ON q.id = ba.q_id WHERE (b.id = 123 AND f.val2 = ?)" "constant"])))) @@ -65,7 +66,7 @@ (vdb/view-map join-test-template [:join-test 1 "foo"]) ; has :bar (vdb/view-map join-test-template [:join-test 2 "bar"])] ; has :bar update-bar (update-bar-template "foo" [:= :id 123]) - checked-views (vdb/prepare-view-checks views update-bar)] + checked-views (vdbd/prepare-view-checks views update-bar)] ;; It should return one check for the bar-template above, ;; and 1 for *both* the joint-test-templates. @@ -74,9 +75,9 @@ ;; What is this for? (def left-join-example (hsql/build :select [:R.a :S.C] :from :R :left-join [:S [:= :R.B :S.B]] :where [:!= :S.C 20])) -(deftest notes-view-map-as-no-delta-calc - (let [tmpl (with-meta vf/users-tmpl {:bulk-update? true})] - (is (:bulk-update? (vdb/view-map tmpl [:users]))))) +;; (deftest notes-view-map-as-no-delta-calc +;; (let [tmpl (with-meta vf/users-tmpl {:bulk-update? true})] +;; (is (:bulk-update? (vdb/view-map tmpl [:users]))))) ;; (defschema schema vf/db "public") diff --git a/test/views/repl.clj b/test/views/repl.clj new file mode 100644 index 0000000..a0b4121 --- /dev/null +++ b/test/views/repl.clj @@ -0,0 +1,46 @@ +(ns views.repl + (:require [honeysql.core :as hsql] + [edl.core :refer [defschema]] + [views.core :as vc] + [views.core-test] + [views.db.load :as vdl] + [views.persistence] + [views.subscriptions] + [views.subscriptions-test] + [views.db.load-test] + [views.subscribed-views :as sv] + [views.base-subscribed-views :as vb] + [views.base-subscribed-views-test] + [views.fixtures :as vf] + [clojure.data.generators :as dg] + [honeysql.core :as hsql] + [clojure.java.jdbc :as j] + [views.db.core :as vdb] + ;; [views.db.core-test] + [views.all-tests :as at] + [clj-logging-config.log4j :refer [set-logger! set-loggers!]])) + +(defn rand-str + ([] (rand-str 10)) + ([n] (dg/string #(rand-nth (clojure.string/split "abcdefghijklmnopqrstuvwxyz" #"\B")) n))) + +(defschema test-schema vf/db "public") + +(def user-insert (hsql/build :insert-into :users :values [{:name (rand-str) :created_on (vf/sql-ts)}])) + +(defn make-opts [] (vc/config {:db vf/db :schema test-schema :templates vf/templates :unsafe? true})) + +(defn test-subscribe + ([sk views] (test-subscribe sk views (make-opts))) + ([sk views opts] + (sv/subscribe-views (:base-subscribed-views opts) {:subscriber-key sk :views [[:users]]}))) + +(comment + (require '[clj-logging-config.log4j :as lc] '[views.repl :as vr] '[views.db.core :as vdb] :reload) + (lc/set-loggers! "views.base-subscribed-views" {:level :info}) + (def opts (vr/make-opts)) + (vr/test-subscribe 1 [[:users]]) + (vdb/vexec! vr/user-insert opts) + (vr/test-subscribe 2 [[:users]]) + (vdb/vexec! vr/user-insert opts) + )