diff --git a/src/views/core.clj b/src/views/core.clj index 7d357d6..a062bfe 100644 --- a/src/views/core.clj +++ b/src/views/core.clj @@ -7,6 +7,6 @@ [views.base_subscribed_views BaseSubscribedViews])) (defn config - [{:keys [db schema persistence] :as opts}] + [{:keys [db schema templates persistence] :as opts}] (let [opts (if persistence opts (assoc opts :persistence (InMemoryPersistence.)))] - {:db db :schema schema :base-subscribed-views (BaseSubscribedViews. opts)})) + {:db db :schema schema :templates templates :base-subscribed-views (BaseSubscribedViews. opts)})) diff --git a/src/views/db/core.clj b/src/views/db/core.clj index 3f5e578..7f83f2b 100644 --- a/src/views/db/core.clj +++ b/src/views/db/core.clj @@ -4,6 +4,7 @@ (:require [clojure.java.jdbc :as j] [clojure.tools.logging :refer [debug]] + [views.db.load :as vdbl] [views.db.honeysql :as vh] [views.db.deltas :as vd] [views.subscribed-views :refer [subscribed-views broadcast-deltas]])) @@ -72,6 +73,16 @@ (let [lookup (first view-sig)] (view-map (get-in templates [lookup :fn]) view-sig))) +(defn calculate-bulk-updates + [deltas db templates bulk-update-views] + (reduce #(assoc %1 (:view-sig %2) {:bulk-update (vdbl/initial-view db (:view-sig %2) templates (:view %2))}) deltas bulk-update-views)) + +(defn format-deltas + [views-with-deltas] + (->> views-with-deltas + (map #(select-keys % [:view-sig :delete-deltas :insert-deltas :bulk-load])) + (group-by :view-sig))) + (defn do-view-transaction "Takes the following arguments: schema - from edl.core/defschema @@ -94,14 +105,17 @@ The function returns the views which received delta updates with the deltas keyed to each view-map at the keys :insert-deltas and :delete-deltas." - [schema db all-views action] + [schema db all-views action templates] ;; Every update connected with a view is done in a transaction: (j/with-db-transaction [t db :isolation :serializable] - (let [need-deltas (vd/do-view-pre-checks t all-views action) + (let [{bulk-update-views true reg-views nil} (group-by :bulk-update? all-views) + need-deltas (vd/do-view-pre-checks t reg-views action) need-deltas (map #(vd/generate-view-delta-map % action) need-deltas) table (-> action vh/extract-tables ffirst) - pkey (vd/get-primary-key schema table)] - (vd/perform-action-and-return-deltas schema t need-deltas action table pkey)))) + pkey (vd/get-primary-key schema table) + {:keys [views-with-deltas result-set]} (vd/perform-action-and-return-deltas schema t need-deltas action table pkey) + deltas (calculate-bulk-updates (format-deltas views-with-deltas) t templates bulk-update-views)] + {:new-deltas deltas :result-set result-set}))) ;; ;; Need to catch this and retry: @@ -144,16 +158,6 @@ (broadcast-deltas ~subscribed-views @deltas#) result#)))))) -(defn format-deltas - [views-with-deltas] - (->> views-with-deltas - (map #(select-keys % [:view-sig :delete-deltas :insert-deltas])) - (group-by :view-sig))) - -(defn vexec* - [] - ) - (defn vexec! "Used to perform arbitrary insert/update/delete actions on the database, while ensuring that view deltas are appropriately checked and calculated @@ -174,18 +178,16 @@ - subscribed-views takes a ... . It should return a collection of view-maps. - - broadcast-deltas takes a db connection, and the views which have had deltas - calculate for them and associated with the hash-maps (appropriately - called views-with-deltas)." + - broadcast-deltas takes ... ." ([action-map opts] (vexec! (:db opts) action-map opts)) - ([db action-map {:keys [schema base-subscribed-views]}] + ([db action-map {:keys [schema base-subscribed-views templates]}] (let [subbed-views (subscribed-views base-subscribed-views db) - transaction-fn #(do-view-transaction schema db subbed-views action-map)] + transaction-fn #(do-view-transaction schema db subbed-views action-map templates)] (if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry - (let [{:keys [views-with-deltas result-set]} (transaction-fn)] - (swap! deltas into (format-deltas views-with-deltas)) + (let [{:keys [new-deltas result-set]} (transaction-fn)] + (swap! deltas into new-deltas) result-set) - (let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] - (broadcast-deltas base-subscribed-views (format-deltas views-with-deltas)) + (let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] + (broadcast-deltas base-subscribed-views new-deltas) result-set))))) diff --git a/test/views/repl.clj b/test/views/repl.clj index 46daca1..00af766 100644 --- a/test/views/repl.clj +++ b/test/views/repl.clj @@ -17,7 +17,9 @@ (def user-insert (hsql/build :insert-into :users :values [{:name (rand-str) :created_on (vf/sql-ts)}])) -(defn make-opts [] (vc/config {:db vf/db :schema test-schema :templates vf/templates :unsafe? true})) +(defn make-opts + ([] (make-opts vf/templates)) + ([templates] (vc/config {:db vf/db :schema test-schema :templates templates :unsafe? true}))) (defn test-subscribe ([sk views] (test-subscribe sk views (make-opts)))