got vexec! partially working

This commit is contained in:
Dave Della Costa 2014-06-10 20:34:36 +09:00
parent d34e37e4ca
commit 5a25838a69
8 changed files with 48 additions and 35 deletions

View file

@ -1,7 +1,7 @@
(ns views.base-subscribed-views (ns views.base-subscribed-views
(:require (:require
[views.persistor :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views!]] [views.persistor :refer [subscribe-to-view! unsubscribe-from-view! unsubscribe-from-all-views! get-subscribed-views]]
[views.subscribed-views :refer [SubscribedViews]] [views.subscribed-views :refer [ISubscribedViews]]
[views.subscriptions :refer [default-ns]] [views.subscriptions :refer [default-ns]]
[views.filters :refer [view-filter]] [views.filters :refer [view-filter]]
[clojure.tools.logging :refer [debug info warn error]] [clojure.tools.logging :refer [debug info warn error]]
@ -22,7 +22,7 @@
(if namespace-fn (namespace-fn msg) default-ns)) (if namespace-fn (namespace-fn msg) default-ns))
(deftype BaseSubscribedViews [opts] (deftype BaseSubscribedViews [opts]
SubscribedViews ISubscribedViews
(subscribe-views (subscribe-views
[this {db :db :as msg}] [this {db :db :as msg}]
(let [{:keys [persistor templates send-fn subscriber-key-fn namespace-fn unsafe?]} opts (let [{:keys [persistor templates send-fn subscriber-key-fn namespace-fn unsafe?]} opts
@ -34,6 +34,7 @@
(info "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace) (info "Subscribing views: " view-sigs " for subscriber " subscriber-key ", in namespace " namespace)
(when (seq view-sigs) (when (seq view-sigs)
(thread (thread
(println "WTF")
(doseq [vs view-sigs] (doseq [vs view-sigs]
(send-fn* send-fn subscriber-key (subscribe-to-view! persistor db vs popts))))))) (send-fn* send-fn subscriber-key (subscribe-to-view! persistor db vs popts)))))))
@ -52,7 +53,8 @@
namespace (namespace-fn* namespace-fn msg)] namespace (namespace-fn* namespace-fn msg)]
(unsubscribe-from-all-views! persistor subscriber-key namespace))) (unsubscribe-from-all-views! persistor subscriber-key namespace)))
;; DB interaction (subscribed-views [this args]
(subscribed-views [this]) (get-subscribed-views (:persistor opts) (namespace-fn* (:namespace-fn opts) args)))
(broadcast-deltas [this fdb views-with-deltas])) (broadcast-deltas [this fdb views-with-deltas]
(println "broadcasting 1 2 3")))

View file

@ -1,9 +1,12 @@
(ns views.core (ns views.core
(:require (:require
[views.base-subscribed-views]) [views.base-subscribed-views :as bsv]; :refer [BaseSubscribedViews]]
(:import [views.persistor :as vp])); :refer [InMemoryPersistor]]))
[views.base_subscribed_views BaseSubscribedViews])) ;; (:import
;; [views.persistor InMemoryPersistor]
;; [views.base_subscribed_views BaseSubscribedViews]))
(defmacro config (defmacro config
[{:keys [db schema] :as opts}] [{:keys [db schema persistor] :as opts}]
{:db db :schema schema :subscribed-views (BaseSubscribedViews. opts)}) (let [opts (if persistor opts (assoc opts :persistor (vp/->InMemoryPersistor)))]
{:db db :schema schema :base-subscribed-views (bsv/->BaseSubscribedViews opts)}))

View file

@ -449,7 +449,7 @@
"Used to perform arbitrary insert/update/delete actions on the database, "Used to perform arbitrary insert/update/delete actions on the database,
while ensuring that view deltas are appropriately checked and calculated while ensuring that view deltas are appropriately checked and calculated
for the currently registered views as reported by a type implementing for the currently registered views as reported by a type implementing
the SubscribedViews protocol. the ISubscribedViews protocol.
Arguments are: Arguments are:
@ -459,22 +459,24 @@
- action-map: the HoneySQL map for the insert/update/delete action - action-map: the HoneySQL map for the insert/update/delete action
- subscribed-views: an implementation of SubscribedViews implementing - subscribed-views: an implementation of ISubscribedViews implementing
the follow functions: the follow functions:
- get-subscribed-views takes a database connection. It should return - subscribed-views takes a ... . It should return
a collection of view-maps. a collection of view-maps.
- send-deltas takes a db connection, and the views which have had deltas - broadcast-deltas takes a db connection, and the views which have had deltas
calculate for them and associated with the hash-maps (appropriately calculate for them and associated with the hash-maps (appropriately
called views-with-deltas)." called views-with-deltas)."
[schema db action-map subscribed-views] ([action-map opts]
(let [subbed-views (subscribed-views subscribed-views db) (vexec! (:db opts) action-map opts))
([db action-map {:keys [schema base-subscribed-views]}]
(let [subbed-views (subscribed-views base-subscribed-views db)
transaction-fn #(do-view-transaction schema db subbed-views action-map)] transaction-fn #(do-view-transaction schema db subbed-views action-map)]
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry (if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
(let [{:keys [views-with-deltas result-set]} (transaction-fn)] (let [{:keys [views-with-deltas result-set]} (transaction-fn)]
(swap! deltas into views-with-deltas) (swap! deltas into views-with-deltas)
result-set) result-set)
(let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] (let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
(broadcast-deltas subscribed-views db views-with-deltas) (broadcast-deltas base-subscribed-views db views-with-deltas)
result-set)))) result-set)))))

View file

@ -1,13 +1,14 @@
(ns views.persistor (ns views.persistor
(:require (:require
[clojure.java.jdbc :as j] [clojure.java.jdbc :as j]
[views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for subscriptions-for]] [views.subscriptions :refer [add-subscription! remove-subscription! compiled-view-for compiled-views-for subscriptions-for]]
[views.db.load :refer [initial-view]])) [views.db.load :refer [initial-view]]))
(defprotocol IPersistor (defprotocol IPersistor
(subscribe-to-view! [this db view-sig opts]) (subscribe-to-view! [this db view-sig opts])
(unsubscribe-from-view! [this view-sig subscriber-key namespace]) (unsubscribe-from-view! [this view-sig subscriber-key namespace])
(unsubscribe-from-all-views! [this subscriber-key namespace])) (unsubscribe-from-all-views! [this subscriber-key namespace])
(get-subscribed-views [this namespace]))
(deftype InMemoryPersistor [] (deftype InMemoryPersistor []
IPersistor IPersistor
@ -24,4 +25,6 @@
(unsubscribe-from-all-views! (unsubscribe-from-all-views!
[this subscriber-key namespace] [this subscriber-key namespace]
(doseq [vs (subscriptions-for subscriber-key namespace)] (doseq [vs (subscriptions-for subscriber-key namespace)]
(remove-subscription! vs subscriber-key namespace)))) (remove-subscription! vs subscriber-key namespace)))
(get-subscribed-views [this namespace] (compiled-views-for namespace)))

View file

@ -1,6 +1,6 @@
(ns views.subscribed-views) (ns views.subscribed-views)
(defprotocol SubscribedViews (defprotocol ISubscribedViews
;; Subscription and Delta routing ;; Subscription and Delta routing
(subscribe-views [this sub-request]) (subscribe-views [this sub-request])
(unsubscribe-views [this unsub-request]) (unsubscribe-views [this unsub-request])
@ -8,4 +8,4 @@
;; DB interaction ;; DB interaction
(broadcast-deltas [this db views-with-deltas]) (broadcast-deltas [this db views-with-deltas])
(subscribed-views [this])) (subscribed-views [this args]))

View file

@ -77,3 +77,7 @@
([view-sig] (compiled-view-for view-sig default-ns)) ([view-sig] (compiled-view-for view-sig default-ns))
([view-sig namespace] ([view-sig namespace]
(get-in @subscribed-views [namespace view-sig :view-data]))) (get-in @subscribed-views [namespace view-sig :view-data])))
(defn compiled-views-for
[namespace]
(get @subscribed-views namespace))

View file

@ -2,7 +2,7 @@
(:require (:require
[views.base-subscribed-views :as bsv] ; :refer [BaseSubscribedViews]] [views.base-subscribed-views :as bsv] ; :refer [BaseSubscribedViews]]
[views.persistor];; :refer [InMemoryPersistor]] [views.persistor];; :refer [InMemoryPersistor]]
[views.subscribed-views :refer [SubscribedViews subscribe-views unsubscribe-views disconnect]] [views.subscribed-views :refer [subscribe-views unsubscribe-views disconnect]]
[views.subscriptions :as vs :refer [subscribed-to?]] [views.subscriptions :as vs :refer [subscribed-to?]]
[views.fixtures :as vf] [views.fixtures :as vf]
[clojure.test :refer [use-fixtures deftest is]] [clojure.test :refer [use-fixtures deftest is]]

View file

@ -3,13 +3,12 @@
[clojure.test :refer [use-fixtures deftest is]] [clojure.test :refer [use-fixtures deftest is]]
[edl.core :refer [defschema]] [edl.core :refer [defschema]]
[views.fixtures :as vf] [views.fixtures :as vf]
[views.subscribed-views :as vs] ;; :refer [SubscribedViews]] [views.subscribed-views :as vs]
[views.core :refer [config]])) [views.core :refer [config]]))
(defschema schema vf/db "public") (defschema schema vf/db "public")
(deftest configures-views (deftest configures-views
(let [conf (config {:db vf/db :schema schema :templates vf/templates :unsafe? true})] (let [conf (config {:db vf/db :schema schema :templates vf/templates :unsafe? true})]
(println (satisfies? views.subscribed-views/SubscribedViews (:subscribed-views conf))) ; wtf?! ;; wtf is this false?!
;; (is (satisfies? views.subscribed-views/SubscribedViews (:subscribed-views conf))))) (is (satisfies? views.subscribed-views/ISubscribedViews (:subscribed-views conf)))))
))