diff --git a/src/views/base_subscribed_views.clj b/src/views/base_subscribed_views.clj index 4dc246b..e5be3cd 100644 --- a/src/views/base_subscribed_views.clj +++ b/src/views/base_subscribed_views.clj @@ -2,10 +2,12 @@ (:require [views.persistence :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views! get-subscribed-views]] [views.subscribed-views :refer [ISubscribedViews]] - [views.subscriptions :refer [default-ns subscribed-to]] + [views.subscriptions :refer [default-ns subscribed-to compiled-view-for]] [views.filters :refer [view-filter]] + [views.db.load :refer [initial-view]] [clojure.tools.logging :refer [debug info warn error]] - [clojure.core.async :refer [put! > (subscribe-to-view! persistence db vs popts) - (send-fn* send-fn subscriber-key))))))) + (j/with-db-transaction [t db :isolation :serializable] + (subscribe-to-view! persistence db vs popts) + (let [view (:view (if namespace (compiled-view-for vs namespace) (compiled-view-for vs))) + iv (initial-view t vs templates view)] + (send-fn* send-fn subscriber-key iv)))))))) (unsubscribe-views [this msg] - (let [{:keys [subscriber-key-fn namespace-fn persistence]} opts + (let [{:keys [subscriber-key-fn namespace-fn persistence view-sig-fn]} opts subscriber-key (subscriber-key-fn* subscriber-key-fn msg) namespace (namespace-fn* namespace-fn msg) - view-sigs (:views msg)] - (info "Unsubscribing views: " view-sigs " for subscriber " subscriber-key) + view-sigs (view-sig-fn* view-sig-fn msg)] + (debug "Unsubscribing views: " view-sigs " for subscriber " subscriber-key) (doseq [vs view-sigs] (unsubscribe-from-view! persistence vs subscriber-key namespace)))) (disconnect [this msg] diff --git a/src/views/db/core.clj b/src/views/db/core.clj index 7f83f2b..02be963 100644 --- a/src/views/db/core.clj +++ b/src/views/db/core.clj @@ -179,15 +179,13 @@ a collection of view-maps. - broadcast-deltas takes ... ." - ([action-map opts] - (vexec! (:db opts) action-map opts)) - ([db action-map {:keys [schema base-subscribed-views templates]}] - (let [subbed-views (subscribed-views base-subscribed-views db) - transaction-fn #(do-view-transaction schema db subbed-views action-map templates)] - (if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry - (let [{:keys [new-deltas result-set]} (transaction-fn)] - (swap! deltas into new-deltas) - result-set) - (let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] - (broadcast-deltas base-subscribed-views new-deltas) - result-set))))) + [{:keys [db schema base-subscribed-views templates] :as conf} action-map] + (let [subbed-views (subscribed-views base-subscribed-views db) + transaction-fn #(do-view-transaction schema db subbed-views action-map templates)] + (if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry + (let [{:keys [new-deltas result-set]} (transaction-fn)] + (swap! deltas into new-deltas) + result-set) + (let [{:keys [new-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] + (broadcast-deltas base-subscribed-views new-deltas) + result-set)))) diff --git a/src/views/persistence.clj b/src/views/persistence.clj index b5d13fa..a5eee56 100644 --- a/src/views/persistence.clj +++ b/src/views/persistence.clj @@ -1,8 +1,6 @@ (ns views.persistence (:require - [clojure.java.jdbc :as j] - [views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for compiled-views-for subscriptions-for]] - [views.db.load :refer [initial-view]])) + [views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for compiled-views-for subscriptions-for]])) (defprotocol IPersistence (subscribe-to-view! [this db view-sig opts]) @@ -14,9 +12,7 @@ IPersistence (subscribe-to-view! [persistor db view-sig {:keys [templates subscriber-key namespace]}] - (j/with-db-transaction [t db :isolation :serializable] - (add-subscription! view-sig templates subscriber-key namespace) - (initial-view t view-sig templates (:view (compiled-view-for view-sig))))) + (add-subscription! view-sig templates subscriber-key namespace)) (unsubscribe-from-view! [this view-sig subscriber-key namespace] diff --git a/src/views/router.clj b/src/views/router.clj index af18fe3..109b784 100644 --- a/src/views/router.clj +++ b/src/views/router.clj @@ -1,7 +1,6 @@ (ns views.router (:require - [views.subscribed-views - :refer [subscribe-views unsubscribe-views disconnect get-delta-broadcast-channel send-delta]] + [views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect]] [clojure.core.async :refer [go go-loop chan pub sub unsub close! >! >!!