bulk-updates basically working.
This commit is contained in:
parent
7dfa8f1df2
commit
a33ee8a3f6
|
@ -7,6 +7,6 @@
|
||||||
[views.base_subscribed_views BaseSubscribedViews]))
|
[views.base_subscribed_views BaseSubscribedViews]))
|
||||||
|
|
||||||
(defn config
|
(defn config
|
||||||
[{:keys [db schema persistence] :as opts}]
|
[{:keys [db schema templates persistence] :as opts}]
|
||||||
(let [opts (if persistence opts (assoc opts :persistence (InMemoryPersistence.)))]
|
(let [opts (if persistence opts (assoc opts :persistence (InMemoryPersistence.)))]
|
||||||
{:db db :schema schema :base-subscribed-views (BaseSubscribedViews. opts)}))
|
{:db db :schema schema :templates templates :base-subscribed-views (BaseSubscribedViews. opts)}))
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
(:require
|
(:require
|
||||||
[clojure.java.jdbc :as j]
|
[clojure.java.jdbc :as j]
|
||||||
[clojure.tools.logging :refer [debug]]
|
[clojure.tools.logging :refer [debug]]
|
||||||
|
[views.db.load :as vdbl]
|
||||||
[views.db.honeysql :as vh]
|
[views.db.honeysql :as vh]
|
||||||
[views.db.deltas :as vd]
|
[views.db.deltas :as vd]
|
||||||
[views.subscribed-views :refer [subscribed-views broadcast-deltas]]))
|
[views.subscribed-views :refer [subscribed-views broadcast-deltas]]))
|
||||||
|
@ -72,6 +73,16 @@
|
||||||
(let [lookup (first view-sig)]
|
(let [lookup (first view-sig)]
|
||||||
(view-map (get-in templates [lookup :fn]) view-sig)))
|
(view-map (get-in templates [lookup :fn]) view-sig)))
|
||||||
|
|
||||||
|
(defn calculate-bulk-updates
|
||||||
|
[deltas db templates bulk-update-views]
|
||||||
|
(reduce #(assoc %1 (:view-sig %2) {:bulk-update (vdbl/initial-view db (:view-sig %2) templates (:view %2))}) deltas bulk-update-views))
|
||||||
|
|
||||||
|
(defn format-deltas
|
||||||
|
[views-with-deltas]
|
||||||
|
(->> views-with-deltas
|
||||||
|
(map #(select-keys % [:view-sig :delete-deltas :insert-deltas :bulk-load]))
|
||||||
|
(group-by :view-sig)))
|
||||||
|
|
||||||
(defn do-view-transaction
|
(defn do-view-transaction
|
||||||
"Takes the following arguments:
|
"Takes the following arguments:
|
||||||
schema - from edl.core/defschema
|
schema - from edl.core/defschema
|
||||||
|
@ -94,14 +105,17 @@
|
||||||
|
|
||||||
The function returns the views which received delta updates with the deltas
|
The function returns the views which received delta updates with the deltas
|
||||||
keyed to each view-map at the keys :insert-deltas and :delete-deltas."
|
keyed to each view-map at the keys :insert-deltas and :delete-deltas."
|
||||||
[schema db all-views action]
|
[schema db all-views action templates]
|
||||||
;; Every update connected with a view is done in a transaction:
|
;; Every update connected with a view is done in a transaction:
|
||||||
(j/with-db-transaction [t db :isolation :serializable]
|
(j/with-db-transaction [t db :isolation :serializable]
|
||||||
(let [need-deltas (vd/do-view-pre-checks t all-views action)
|
(let [{bulk-update-views true reg-views nil} (group-by :bulk-update? all-views)
|
||||||
|
need-deltas (vd/do-view-pre-checks t reg-views action)
|
||||||
need-deltas (map #(vd/generate-view-delta-map % action) need-deltas)
|
need-deltas (map #(vd/generate-view-delta-map % action) need-deltas)
|
||||||
table (-> action vh/extract-tables ffirst)
|
table (-> action vh/extract-tables ffirst)
|
||||||
pkey (vd/get-primary-key schema table)]
|
pkey (vd/get-primary-key schema table)
|
||||||
(vd/perform-action-and-return-deltas schema t need-deltas action table pkey))))
|
{:keys [views-with-deltas result-set]} (vd/perform-action-and-return-deltas schema t need-deltas action table pkey)
|
||||||
|
deltas (calculate-bulk-updates (format-deltas views-with-deltas) t templates bulk-update-views)]
|
||||||
|
{:new-deltas deltas :result-set result-set})))
|
||||||
|
|
||||||
;;
|
;;
|
||||||
;; Need to catch this and retry:
|
;; Need to catch this and retry:
|
||||||
|
@ -144,16 +158,6 @@
|
||||||
(broadcast-deltas ~subscribed-views @deltas#)
|
(broadcast-deltas ~subscribed-views @deltas#)
|
||||||
result#))))))
|
result#))))))
|
||||||
|
|
||||||
(defn format-deltas
|
|
||||||
[views-with-deltas]
|
|
||||||
(->> views-with-deltas
|
|
||||||
(map #(select-keys % [:view-sig :delete-deltas :insert-deltas]))
|
|
||||||
(group-by :view-sig)))
|
|
||||||
|
|
||||||
(defn vexec*
|
|
||||||
[]
|
|
||||||
)
|
|
||||||
|
|
||||||
(defn vexec!
|
(defn vexec!
|
||||||
"Used to perform arbitrary insert/update/delete actions on the database,
|
"Used to perform arbitrary insert/update/delete actions on the database,
|
||||||
while ensuring that view deltas are appropriately checked and calculated
|
while ensuring that view deltas are appropriately checked and calculated
|
||||||
|
@ -174,18 +178,16 @@
|
||||||
- subscribed-views takes a ... . It should return
|
- subscribed-views takes a ... . It should return
|
||||||
a collection of view-maps.
|
a collection of view-maps.
|
||||||
|
|
||||||
- broadcast-deltas takes a db connection, and the views which have had deltas
|
- broadcast-deltas takes ... ."
|
||||||
calculate for them and associated with the hash-maps (appropriately
|
|
||||||
called views-with-deltas)."
|
|
||||||
([action-map opts]
|
([action-map opts]
|
||||||
(vexec! (:db opts) action-map opts))
|
(vexec! (:db opts) action-map opts))
|
||||||
([db action-map {:keys [schema base-subscribed-views]}]
|
([db action-map {:keys [schema base-subscribed-views templates]}]
|
||||||
(let [subbed-views (subscribed-views base-subscribed-views db)
|
(let [subbed-views (subscribed-views base-subscribed-views db)
|
||||||
transaction-fn #(do-view-transaction schema db subbed-views action-map)]
|
transaction-fn #(do-view-transaction schema db subbed-views action-map templates)]
|
||||||
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
|
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
|
||||||
(let [{:keys [views-with-deltas result-set]} (transaction-fn)]
|
(let [{:keys [new-deltas result-set]} (transaction-fn)]
|
||||||
(swap! deltas into (format-deltas views-with-deltas))
|
(swap! deltas into new-deltas)
|
||||||
result-set)
|
result-set)
|
||||||
(let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
|
(let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
|
||||||
(broadcast-deltas base-subscribed-views (format-deltas views-with-deltas))
|
(broadcast-deltas base-subscribed-views new-deltas)
|
||||||
result-set)))))
|
result-set)))))
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
|
|
||||||
(def user-insert (hsql/build :insert-into :users :values [{:name (rand-str) :created_on (vf/sql-ts)}]))
|
(def user-insert (hsql/build :insert-into :users :values [{:name (rand-str) :created_on (vf/sql-ts)}]))
|
||||||
|
|
||||||
(defn make-opts [] (vc/config {:db vf/db :schema test-schema :templates vf/templates :unsafe? true}))
|
(defn make-opts
|
||||||
|
([] (make-opts vf/templates))
|
||||||
|
([templates] (vc/config {:db vf/db :schema test-schema :templates templates :unsafe? true})))
|
||||||
|
|
||||||
(defn test-subscribe
|
(defn test-subscribe
|
||||||
([sk views] (test-subscribe sk views (make-opts)))
|
([sk views] (test-subscribe sk views (make-opts)))
|
||||||
|
|
Loading…
Reference in a new issue