Deltas are now sent in batches.
This commit is contained in:
parent
50100d17f6
commit
d0d67d68aa
|
@ -10,7 +10,7 @@
|
|||
[clojure.core.async :refer [put! <! go thread]]
|
||||
[clojure.java.jdbc :as j]))
|
||||
|
||||
(declare post-process-deltas)
|
||||
(declare send-deltas)
|
||||
|
||||
(defn send-fn*
|
||||
[send-fn address subject msg]
|
||||
|
@ -67,7 +67,7 @@
|
|||
(unsubscribe-from-all-views! persistence subscriber-key namespace)))
|
||||
|
||||
;;
|
||||
;; The two below functions get called by vexec!
|
||||
;; The two below functions get called by vexec!/with-view-transaction
|
||||
;;
|
||||
|
||||
(subscribed-views [this namespace]
|
||||
|
@ -77,17 +77,30 @@
|
|||
(let [{:keys [templates]} config
|
||||
namespace (if namespace namespace default-ns)
|
||||
subs (get-subscriptions (:persistence config) namespace)]
|
||||
(doseq [vs (keys deltas)]
|
||||
(debug "Subscribers subscribed to " vs " are " (get subs vs))
|
||||
(doseq [sk (get subs vs)]
|
||||
(doseq [ds (map #(post-process-deltas templates %) (get deltas vs))]
|
||||
(debug "Sending delta " {vs (dissoc ds :view-sig)} " to addr " sk)
|
||||
(send-fn* (:send-fn config) sk :views.deltas {vs (dissoc ds :view-sig)})))))))
|
||||
(send-deltas deltas subs namespace config))))
|
||||
|
||||
(defn post-process-deltas
|
||||
(defn post-process-deltas*
|
||||
[templates delta-map]
|
||||
(let [vs (:view-sig delta-map)
|
||||
dm (dissoc delta-map :view-sig)]
|
||||
(if-let [post-fn (get-in templates [(first vs) :post-fn])]
|
||||
(reduce #(assoc %1 %2 (map post-fn (get dm %2))) {} (keys dm))
|
||||
dm)))
|
||||
|
||||
(defn post-process-deltas
|
||||
[delta-set templates]
|
||||
(reduce
|
||||
#(assoc %1 (first %2) (mapv (fn [ds] (post-process-deltas* templates ds)) (second %2)))
|
||||
{} delta-set))
|
||||
|
||||
(defn subscriber-keys
|
||||
[subs skeys delta-set]
|
||||
(into skeys (reduce #(into %1 (get subs %2)) #{} (keys delta-set))))
|
||||
|
||||
(defn send-deltas
|
||||
[deltas subs namespace {:keys [send-fn templates] :as config}]
|
||||
(let [deltas (mapv #(post-process-deltas % templates) deltas)
|
||||
sks (reduce #(subscriber-keys subs %1 %2) #{} deltas)]
|
||||
(doseq [sk sks]
|
||||
(debug "Sending deltas " deltas " to subscriber " sk)
|
||||
(send-fn* send-fn sk :views.deltas deltas))))
|
||||
|
|
|
@ -75,8 +75,8 @@
|
|||
transaction-fn #(vd/do-view-transaction schema db subbed-views action-map templates)]
|
||||
(if deltas ;; inside a transaction we just collect deltas and do not retry
|
||||
(let [{:keys [new-deltas result-set]} (transaction-fn)]
|
||||
(swap! deltas into new-deltas)
|
||||
(swap! deltas #(conj % new-deltas))
|
||||
result-set)
|
||||
(let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
|
||||
(broadcast-deltas base-subscribed-views new-deltas namespace)
|
||||
(broadcast-deltas base-subscribed-views [new-deltas] namespace)
|
||||
result-set))))
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
(:require
|
||||
[clojure.string :refer [split]]
|
||||
[clojure.java.jdbc :as j]
|
||||
[clojure.tools.logging :refer [debug]]
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[views.db.load :as vdbl]
|
||||
|
|
|
@ -58,23 +58,37 @@
|
|||
(is (not (subscribed-to? 1 [:user-posts 1])))
|
||||
(is (not (subscribed-to? 1 [:users])))))
|
||||
|
||||
(deftest sends-deltas
|
||||
(let [deltas {[:users] [{:view-sig [:users] :insert-deltas [{:foo "bar"}]}]}
|
||||
sent-delta {[:users] {:insert-deltas [{:foo "bar"}]}}
|
||||
send-fn #(do (is (#{1 2} %1))
|
||||
(is (= %2 :views.deltas))
|
||||
(is (= %3 sent-delta)))
|
||||
base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn))]
|
||||
(add-subscription! [:users] vf/templates 1 default-ns)
|
||||
(add-subscription! [:users] vf/templates 2 default-ns)
|
||||
(broadcast-deltas base-subbed-views deltas nil)))
|
||||
;; (deftest sends-deltas
|
||||
;; (let [deltas {[:users] [{:view-sig [:users] :insert-deltas [{:foo "bar"}]}]}
|
||||
;; sent-delta {[:users] {:insert-deltas [{:foo "bar"}]}}
|
||||
;; send-fn #(do (is (#{1 2} %1))
|
||||
;; (is (= %2 :views.deltas))
|
||||
;; (is (= %3 sent-delta)))
|
||||
;; base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn))]
|
||||
;; (add-subscription! [:users] vf/templates 1 default-ns)
|
||||
;; (add-subscription! [:users] vf/templates 2 default-ns)
|
||||
;; (broadcast-deltas base-subbed-views deltas nil)))
|
||||
|
||||
(deftest sends-deltas-in-batch
|
||||
(let [deltas {[:users] [{:view-sig [:users] :insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
|
||||
sent-delta {[:users] {:insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}}
|
||||
(let [deltas [{[:users] [{:view-sig [:users] :insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
|
||||
{[:users] [{:view-sig [:users] :insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
|
||||
;; This is just more obvious than writing some convulated fn to dig out the view-sigs.
|
||||
sent-deltas [{[:users] [{:insert-deltas [{:id 1 :name "Bob"} {:id 2 :name "Alice"}]}]}
|
||||
{[:users] [{:insert-deltas [{:id 3 :name "Jack"} {:id 4 :name "Jill"}]}]}]
|
||||
send-fn #(do (is (#{1 2} %1))
|
||||
(is (= %2 :views.deltas))
|
||||
(is (= %3 sent-delta)))
|
||||
(is (= :views.deltas %2))
|
||||
(is (= sent-deltas %3)))
|
||||
base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn))]
|
||||
(add-subscription! [:users] vf/templates 1 default-ns)
|
||||
(broadcast-deltas base-subbed-views deltas nil)))
|
||||
|
||||
(deftest deltas-are-post-processed
|
||||
(let [templates (assoc-in vf/templates [:users :post-fn] (fn [d] (update-in d [:id] #(Integer. %))))
|
||||
deltas [{[:users] [{:view-sig [:users] :insert-deltas [{:id "1" :name "Bob"}]}]}]
|
||||
sent-deltas [{[:users] [{:insert-deltas [{:id "1" :name "Bob"}]}]}]
|
||||
send-fn (fn [_ _ deltas-out]
|
||||
(is (= (:id (first (:insert-deltas (first (get (first deltas-out) [:users])))))
|
||||
1)))
|
||||
base-subbed-views (BaseSubscribedViews. (assoc view-config-fixture :send-fn send-fn :templates templates))]
|
||||
(add-subscription! [:users] templates 1 default-ns)
|
||||
(broadcast-deltas base-subbed-views deltas nil)))
|
||||
|
|
|
@ -33,10 +33,10 @@
|
|||
(let [view-sig [:user-posts (:id @vf/user-fixture)]
|
||||
sub-to-it (vs/add-subscription! view-sig vf/templates (:id @vf/user-fixture))
|
||||
posted (first (vdb/vexec! test-config (vf/insert-post-tmpl (:id @vf/user-fixture) "title" "body")))
|
||||
delta-vs (ffirst @received-deltas)
|
||||
insert-delta (-> @received-deltas first second first :insert-deltas first)]
|
||||
delta-vs (ffirst (first @received-deltas))
|
||||
insert-delta (-> @received-deltas ffirst second first :insert-deltas first)]
|
||||
|
||||
(is (= (ffirst @received-deltas) view-sig))
|
||||
(is (= (ffirst (first @received-deltas)) view-sig))
|
||||
(is (= (:name insert-delta) (:name @vf/user-fixture)))
|
||||
(is (= (:body insert-delta) (:body posted)))
|
||||
(is (= (:title insert-delta) (:title posted)))))
|
||||
|
@ -47,10 +47,10 @@
|
|||
posted (first (vdb/with-view-transaction
|
||||
[tc test-config]
|
||||
(vdb/vexec! tc (vf/insert-post-tmpl (:id @vf/user-fixture) "title" "body"))))
|
||||
delta-vs (ffirst @received-deltas)
|
||||
insert-delta (-> @received-deltas first second first :insert-deltas first)]
|
||||
delta-vs (ffirst (first @received-deltas))
|
||||
insert-delta (-> @received-deltas ffirst second first :insert-deltas first)]
|
||||
|
||||
(is (= (ffirst @received-deltas) view-sig))
|
||||
(is (= (ffirst (first @received-deltas)) view-sig))
|
||||
(is (= (:name insert-delta) (:name @vf/user-fixture)))
|
||||
(is (= (:body insert-delta) (:body posted)))
|
||||
(is (= (:title insert-delta) (:title posted)))))
|
||||
|
|
Loading…
Reference in a new issue