restructured vexec! sig per plans; removed unnecessary code from router; added further customization features to base-subscribed-views; moved some actions from persistence -> base-subscribed-views

This commit is contained in:
Dave Della Costa 2014-06-19 19:45:40 +09:00
parent 21f6ff29d2
commit ad8d261a4a
6 changed files with 46 additions and 50 deletions

View file

@ -2,10 +2,12 @@
(:require
[views.persistence :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views! get-subscribed-views]]
[views.subscribed-views :refer [ISubscribedViews]]
[views.subscriptions :refer [default-ns subscribed-to]]
[views.subscriptions :refer [default-ns subscribed-to compiled-view-for]]
[views.filters :refer [view-filter]]
[views.db.load :refer [initial-view]]
[clojure.tools.logging :refer [debug info warn error]]
[clojure.core.async :refer [put! <! go thread]]))
[clojure.core.async :refer [put! <! go thread]]
[clojure.java.jdbc :as j]))
(defn send-fn*
[send-fn address msg]
@ -21,30 +23,37 @@
[namespace-fn msg]
(if namespace-fn (namespace-fn msg) default-ns))
(defn view-sig-fn*
[view-sig-fn msg]
(if view-sig-fn (view-sig-fn msg) (:body msg)))
(deftype BaseSubscribedViews [opts]
ISubscribedViews
(subscribe-views
[this {tdb :tdb :as msg}]
(let [{:keys [persistence templates send-fn subscriber-key-fn namespace-fn unsafe?]} opts
db (if tdb tdb (:db opts))
[this msg]
(let [{:keys [persistence templates db-fn send-fn view-sig-fn subscriber-key-fn namespace-fn unsafe?]} opts
db (if db-fn (db-fn msg) (:db opts))
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (view-filter msg (:views msg) templates {:unsafe? unsafe?}) ; this is where security comes in. Move?
view-sigs (view-filter msg (view-sig-fn* view-sig-fn msg) templates {:unsafe? unsafe?}) ; this is where security comes in. Move?
popts {:templates templates :subscriber-key subscriber-key :namespace namespace}]
(info "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace)
(debug "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace)
(when (seq view-sigs)
(thread
(doseq [vs view-sigs]
(->> (subscribe-to-view! persistence db vs popts)
(send-fn* send-fn subscriber-key)))))))
(j/with-db-transaction [t db :isolation :serializable]
(subscribe-to-view! persistence db vs popts)
(let [view (:view (if namespace (compiled-view-for vs namespace) (compiled-view-for vs)))
iv (initial-view t vs templates view)]
(send-fn* send-fn subscriber-key iv))))))))
(unsubscribe-views
[this msg]
(let [{:keys [subscriber-key-fn namespace-fn persistence]} opts
(let [{:keys [subscriber-key-fn namespace-fn persistence view-sig-fn]} opts
subscriber-key (subscriber-key-fn* subscriber-key-fn msg)
namespace (namespace-fn* namespace-fn msg)
view-sigs (:views msg)]
(info "Unsubscribing views: " view-sigs " for subscriber " subscriber-key)
view-sigs (view-sig-fn* view-sig-fn msg)]
(debug "Unsubscribing views: " view-sigs " for subscriber " subscriber-key)
(doseq [vs view-sigs] (unsubscribe-from-view! persistence vs subscriber-key namespace))))
(disconnect [this msg]

View file

@ -179,15 +179,13 @@
a collection of view-maps.
- broadcast-deltas takes ... ."
([action-map opts]
(vexec! (:db opts) action-map opts))
([db action-map {:keys [schema base-subscribed-views templates]}]
(let [subbed-views (subscribed-views base-subscribed-views db)
transaction-fn #(do-view-transaction schema db subbed-views action-map templates)]
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
(let [{:keys [new-deltas result-set]} (transaction-fn)]
(swap! deltas into new-deltas)
result-set)
(let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
(broadcast-deltas base-subscribed-views new-deltas)
result-set)))))
[{:keys [db schema base-subscribed-views templates] :as conf} action-map]
(let [subbed-views (subscribed-views base-subscribed-views db)
transaction-fn #(do-view-transaction schema db subbed-views action-map templates)]
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
(let [{:keys [new-deltas result-set]} (transaction-fn)]
(swap! deltas into new-deltas)
result-set)
(let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
(broadcast-deltas base-subscribed-views new-deltas)
result-set))))

View file

@ -1,8 +1,6 @@
(ns views.persistence
(:require
[clojure.java.jdbc :as j]
[views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for compiled-views-for subscriptions-for]]
[views.db.load :refer [initial-view]]))
[views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for compiled-views-for subscriptions-for]]))
(defprotocol IPersistence
(subscribe-to-view! [this db view-sig opts])
@ -14,9 +12,7 @@
IPersistence
(subscribe-to-view!
[persistor db view-sig {:keys [templates subscriber-key namespace]}]
(j/with-db-transaction [t db :isolation :serializable]
(add-subscription! view-sig templates subscriber-key namespace)
(initial-view t view-sig templates (:view (compiled-view-for view-sig)))))
(add-subscription! view-sig templates subscriber-key namespace))
(unsubscribe-from-view!
[this view-sig subscriber-key namespace]

View file

@ -1,7 +1,6 @@
(ns views.router
(:require
[views.subscribed-views
:refer [subscribe-views unsubscribe-views disconnect get-delta-broadcast-channel send-delta]]
[views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect]]
[clojure.core.async :refer [go go-loop chan pub sub unsub close! >! >!! <! <!! filter<]]
[clojure.tools.logging :refer [debug]]))
@ -11,13 +10,6 @@
(let [sub (<! subscriptions)]
(subscribe-views subscribed-views sub)))))
(defn handle-deltas!
[subscribed-views]
(let [delta-channel (get-delta-broadcast-channel subscribed-views)]
(go (while true
(let [delta (<! delta-channel)]
(send-delta subscribed-views delta))))))
(defn handle-unsubscriptions!
[subscribed-views unsubscriptions]
(go (while true
@ -31,7 +23,7 @@
(disconnect subscribed-views disc)))))
(defn init!
[subscribed-views client-chan]
[{:keys [base-subscribed-views] :as conf} client-chan]
(let [subs (chan)
unsubs (chan)
control (chan)
@ -39,7 +31,6 @@
(sub client-chan :views.subscribe subs)
(sub client-chan :views.unsubscribe unsubs)
(sub client-chan :client-channel disconnects)
(handle-subscriptions! subscribed-views subs)
(handle-deltas! subscribed-views)
(handle-unsubscriptions! subscribed-views unsubs)
(handle-disconnects! subscribed-views disconnects)))
(handle-subscriptions! base-subscribed-views subs)
(handle-unsubscriptions! base-subscribed-views unsubs)
(handle-disconnects! base-subscribed-views disconnects)))

View file

@ -5,6 +5,7 @@
[views.base-subscribed-views-test]
;; [views.db.core-test]
[views.db.deltas-test]
[views.db.checks-test] ; STILL SPECULATIVE
[views.db.honeysql-test]
[views.db.load-test]))
@ -14,5 +15,6 @@
'views.base-subscribed-views-test
;; 'views.db.core-test
'views.db.deltas-test
'views.db.checks-test
'views.db.honeysql-test
'views.db.load-test))

View file

@ -17,21 +17,21 @@
(def user-insert (hsql/build :insert-into :users :values [{:name (rand-str) :created_on (vf/sql-ts)}]))
(defn make-opts
([] (make-opts vf/templates))
(defn make-config
([] (make-config vf/templates))
([templates] (vc/config {:db vf/db :schema test-schema :templates templates :unsafe? true})))
(defn test-subscribe
([sk views] (test-subscribe sk views (make-opts)))
([sk views] (test-subscribe sk views (make-config)))
([sk views opts]
(sv/subscribe-views (:base-subscribed-views opts) {:subscriber-key sk :views [[:users]]})))
(comment
(require '[clj-logging-config.log4j :as lc] '[views.repl :as vr] '[views.db.core :as vdb] :reload)
(lc/set-loggers! "views.base-subscribed-views" {:level :info})
(def opts (vr/make-opts))
(def conf (vr/make-config))
(vr/test-subscribe 1 [[:users]])
(vdb/vexec! vr/user-insert opts)
(vdb/vexec! conf vr/user-insert)
(vr/test-subscribe 2 [[:users]])
(vdb/vexec! vr/user-insert opts)
(vdb/vexec! conf vr/user-insert)
)