first commit; subscription code and view init code pulled out and tested; DE-specific code stripped
This commit is contained in:
commit
2388472f46
10
.gitignore
vendored
Normal file
10
.gitignore
vendored
Normal file
|
@ -0,0 +1,10 @@
|
|||
/target
|
||||
/classes
|
||||
/checkouts
|
||||
pom.xml
|
||||
pom.xml.asc
|
||||
*.jar
|
||||
*.class
|
||||
/.lein-*
|
||||
/.nrepl-port
|
||||
*~
|
214
LICENSE
Normal file
214
LICENSE
Normal file
|
@ -0,0 +1,214 @@
|
|||
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC
|
||||
LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM
|
||||
CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
|
||||
|
||||
1. DEFINITIONS
|
||||
|
||||
"Contribution" means:
|
||||
|
||||
a) in the case of the initial Contributor, the initial code and
|
||||
documentation distributed under this Agreement, and
|
||||
|
||||
b) in the case of each subsequent Contributor:
|
||||
|
||||
i) changes to the Program, and
|
||||
|
||||
ii) additions to the Program;
|
||||
|
||||
where such changes and/or additions to the Program originate from and are
|
||||
distributed by that particular Contributor. A Contribution 'originates' from
|
||||
a Contributor if it was added to the Program by such Contributor itself or
|
||||
anyone acting on such Contributor's behalf. Contributions do not include
|
||||
additions to the Program which: (i) are separate modules of software
|
||||
distributed in conjunction with the Program under their own license
|
||||
agreement, and (ii) are not derivative works of the Program.
|
||||
|
||||
"Contributor" means any person or entity that distributes the Program.
|
||||
|
||||
"Licensed Patents" mean patent claims licensable by a Contributor which are
|
||||
necessarily infringed by the use or sale of its Contribution alone or when
|
||||
combined with the Program.
|
||||
|
||||
"Program" means the Contributions distributed in accordance with this
|
||||
Agreement.
|
||||
|
||||
"Recipient" means anyone who receives the Program under this Agreement,
|
||||
including all Contributors.
|
||||
|
||||
2. GRANT OF RIGHTS
|
||||
|
||||
a) Subject to the terms of this Agreement, each Contributor hereby grants
|
||||
Recipient a non-exclusive, worldwide, royalty-free copyright license to
|
||||
reproduce, prepare derivative works of, publicly display, publicly perform,
|
||||
distribute and sublicense the Contribution of such Contributor, if any, and
|
||||
such derivative works, in source code and object code form.
|
||||
|
||||
b) Subject to the terms of this Agreement, each Contributor hereby grants
|
||||
Recipient a non-exclusive, worldwide, royalty-free patent license under
|
||||
Licensed Patents to make, use, sell, offer to sell, import and otherwise
|
||||
transfer the Contribution of such Contributor, if any, in source code and
|
||||
object code form. This patent license shall apply to the combination of the
|
||||
Contribution and the Program if, at the time the Contribution is added by the
|
||||
Contributor, such addition of the Contribution causes such combination to be
|
||||
covered by the Licensed Patents. The patent license shall not apply to any
|
||||
other combinations which include the Contribution. No hardware per se is
|
||||
licensed hereunder.
|
||||
|
||||
c) Recipient understands that although each Contributor grants the licenses
|
||||
to its Contributions set forth herein, no assurances are provided by any
|
||||
Contributor that the Program does not infringe the patent or other
|
||||
intellectual property rights of any other entity. Each Contributor disclaims
|
||||
any liability to Recipient for claims brought by any other entity based on
|
||||
infringement of intellectual property rights or otherwise. As a condition to
|
||||
exercising the rights and licenses granted hereunder, each Recipient hereby
|
||||
assumes sole responsibility to secure any other intellectual property rights
|
||||
needed, if any. For example, if a third party patent license is required to
|
||||
allow Recipient to distribute the Program, it is Recipient's responsibility
|
||||
to acquire that license before distributing the Program.
|
||||
|
||||
d) Each Contributor represents that to its knowledge it has sufficient
|
||||
copyright rights in its Contribution, if any, to grant the copyright license
|
||||
set forth in this Agreement.
|
||||
|
||||
3. REQUIREMENTS
|
||||
|
||||
A Contributor may choose to distribute the Program in object code form under
|
||||
its own license agreement, provided that:
|
||||
|
||||
a) it complies with the terms and conditions of this Agreement; and
|
||||
|
||||
b) its license agreement:
|
||||
|
||||
i) effectively disclaims on behalf of all Contributors all warranties and
|
||||
conditions, express and implied, including warranties or conditions of title
|
||||
and non-infringement, and implied warranties or conditions of merchantability
|
||||
and fitness for a particular purpose;
|
||||
|
||||
ii) effectively excludes on behalf of all Contributors all liability for
|
||||
damages, including direct, indirect, special, incidental and consequential
|
||||
damages, such as lost profits;
|
||||
|
||||
iii) states that any provisions which differ from this Agreement are offered
|
||||
by that Contributor alone and not by any other party; and
|
||||
|
||||
iv) states that source code for the Program is available from such
|
||||
Contributor, and informs licensees how to obtain it in a reasonable manner on
|
||||
or through a medium customarily used for software exchange.
|
||||
|
||||
When the Program is made available in source code form:
|
||||
|
||||
a) it must be made available under this Agreement; and
|
||||
|
||||
b) a copy of this Agreement must be included with each copy of the Program.
|
||||
|
||||
Contributors may not remove or alter any copyright notices contained within
|
||||
the Program.
|
||||
|
||||
Each Contributor must identify itself as the originator of its Contribution,
|
||||
if any, in a manner that reasonably allows subsequent Recipients to identify
|
||||
the originator of the Contribution.
|
||||
|
||||
4. COMMERCIAL DISTRIBUTION
|
||||
|
||||
Commercial distributors of software may accept certain responsibilities with
|
||||
respect to end users, business partners and the like. While this license is
|
||||
intended to facilitate the commercial use of the Program, the Contributor who
|
||||
includes the Program in a commercial product offering should do so in a
|
||||
manner which does not create potential liability for other Contributors.
|
||||
Therefore, if a Contributor includes the Program in a commercial product
|
||||
offering, such Contributor ("Commercial Contributor") hereby agrees to defend
|
||||
and indemnify every other Contributor ("Indemnified Contributor") against any
|
||||
losses, damages and costs (collectively "Losses") arising from claims,
|
||||
lawsuits and other legal actions brought by a third party against the
|
||||
Indemnified Contributor to the extent caused by the acts or omissions of such
|
||||
Commercial Contributor in connection with its distribution of the Program in
|
||||
a commercial product offering. The obligations in this section do not apply
|
||||
to any claims or Losses relating to any actual or alleged intellectual
|
||||
property infringement. In order to qualify, an Indemnified Contributor must:
|
||||
a) promptly notify the Commercial Contributor in writing of such claim, and
|
||||
b) allow the Commercial Contributor tocontrol, and cooperate with the
|
||||
Commercial Contributor in, the defense and any related settlement
|
||||
negotiations. The Indemnified Contributor may participate in any such claim
|
||||
at its own expense.
|
||||
|
||||
For example, a Contributor might include the Program in a commercial product
|
||||
offering, Product X. That Contributor is then a Commercial Contributor. If
|
||||
that Commercial Contributor then makes performance claims, or offers
|
||||
warranties related to Product X, those performance claims and warranties are
|
||||
such Commercial Contributor's responsibility alone. Under this section, the
|
||||
Commercial Contributor would have to defend claims against the other
|
||||
Contributors related to those performance claims and warranties, and if a
|
||||
court requires any other Contributor to pay any damages as a result, the
|
||||
Commercial Contributor must pay those damages.
|
||||
|
||||
5. NO WARRANTY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON
|
||||
AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
|
||||
EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR
|
||||
CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A
|
||||
PARTICULAR PURPOSE. Each Recipient is solely responsible for determining the
|
||||
appropriateness of using and distributing the Program and assumes all risks
|
||||
associated with its exercise of rights under this Agreement , including but
|
||||
not limited to the risks and costs of program errors, compliance with
|
||||
applicable laws, damage to or loss of data, programs or equipment, and
|
||||
unavailability or interruption of operations.
|
||||
|
||||
6. DISCLAIMER OF LIABILITY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY
|
||||
CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION
|
||||
LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
|
||||
EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY
|
||||
OF SUCH DAMAGES.
|
||||
|
||||
7. GENERAL
|
||||
|
||||
If any provision of this Agreement is invalid or unenforceable under
|
||||
applicable law, it shall not affect the validity or enforceability of the
|
||||
remainder of the terms of this Agreement, and without further action by the
|
||||
parties hereto, such provision shall be reformed to the minimum extent
|
||||
necessary to make such provision valid and enforceable.
|
||||
|
||||
If Recipient institutes patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Program itself
|
||||
(excluding combinations of the Program with other software or hardware)
|
||||
infringes such Recipient's patent(s), then such Recipient's rights granted
|
||||
under Section 2(b) shall terminate as of the date such litigation is filed.
|
||||
|
||||
All Recipient's rights under this Agreement shall terminate if it fails to
|
||||
comply with any of the material terms or conditions of this Agreement and
|
||||
does not cure such failure in a reasonable period of time after becoming
|
||||
aware of such noncompliance. If all Recipient's rights under this Agreement
|
||||
terminate, Recipient agrees to cease use and distribution of the Program as
|
||||
soon as reasonably practicable. However, Recipient's obligations under this
|
||||
Agreement and any licenses granted by Recipient relating to the Program shall
|
||||
continue and survive.
|
||||
|
||||
Everyone is permitted to copy and distribute copies of this Agreement, but in
|
||||
order to avoid inconsistency the Agreement is copyrighted and may only be
|
||||
modified in the following manner. The Agreement Steward reserves the right to
|
||||
publish new versions (including revisions) of this Agreement from time to
|
||||
time. No one other than the Agreement Steward has the right to modify this
|
||||
Agreement. The Eclipse Foundation is the initial Agreement Steward. The
|
||||
Eclipse Foundation may assign the responsibility to serve as the Agreement
|
||||
Steward to a suitable separate entity. Each new version of the Agreement will
|
||||
be given a distinguishing version number. The Program (including
|
||||
Contributions) may always be distributed subject to the version of the
|
||||
Agreement under which it was received. In addition, after a new version of
|
||||
the Agreement is published, Contributor may elect to distribute the Program
|
||||
(including its Contributions) under the new version. Except as expressly
|
||||
stated in Sections 2(a) and 2(b) above, Recipient receives no rights or
|
||||
licenses to the intellectual property of any Contributor under this
|
||||
Agreement, whether expressly, by implication, estoppel or otherwise. All
|
||||
rights in the Program not expressly granted under this Agreement are
|
||||
reserved.
|
||||
|
||||
This Agreement is governed by the laws of the State of Washington and the
|
||||
intellectual property laws of the United States of America. No party to this
|
||||
Agreement will bring a legal action under this Agreement more than one year
|
||||
after the cause of action arose. Each party waives its rights to a jury trial
|
||||
in any resulting litigation.
|
49
README.md
Normal file
49
README.md
Normal file
|
@ -0,0 +1,49 @@
|
|||
# views
|
||||
|
||||
A Clojure library designed to ... well, that part is up to you.
|
||||
|
||||
|
||||
## Design
|
||||
|
||||
Subscription
|
||||
|
||||
* subscribe to a view
|
||||
* unsubscribe from a view
|
||||
* remove all subscriptions (disconnect)
|
||||
* get set of subscribed views
|
||||
|
||||
Deltas
|
||||
|
||||
* send deltas
|
||||
* broadcast deltas
|
||||
* receive deltas ( receive deltas from any other clients and broadcast out to subscribers a.k.a. broadcast deltas )
|
||||
|
||||
DB
|
||||
|
||||
* calculates pre-delta checks (heuristic optimization)
|
||||
* calculates actual deltas
|
||||
* different heuristics for inserts, updates, and deletes
|
||||
* different properties for outer joins
|
||||
* pre-checks and delta calculations are coupled:
|
||||
- we don't know if we need to calculate insert deltas until we do the insert and see if the result affects a view
|
||||
|
||||
How should DB code work?
|
||||
|
||||
- separate namespaces for insert, update, delete?
|
||||
- low-level db actions (execute/query/transaction/etc.) should be separate namespace?
|
||||
- initial view computing (view map, args, etc.) should be separate namespace?
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
## Usage
|
||||
|
||||
FIXME
|
||||
|
||||
## License
|
||||
|
||||
Copyright © 2014 FIXME
|
||||
|
||||
Distributed under the Eclipse Public License either version 1.0 or (at
|
||||
your option) any later version.
|
3
doc/intro.md
Normal file
3
doc/intro.md
Normal file
|
@ -0,0 +1,3 @@
|
|||
# Introduction to views
|
||||
|
||||
TODO: write [great documentation](http://jacobian.org/writing/great-documentation/what-to-write/)
|
20
project.clj
Normal file
20
project.clj
Normal file
|
@ -0,0 +1,20 @@
|
|||
(defproject views "0.1.0-SNAPSHOT"
|
||||
:description "You underestimate the power of the SQL side"
|
||||
|
||||
:url "https://github.com/diligenceengine/views"
|
||||
|
||||
:license {:name "Eclipse Public License"
|
||||
:url "http://www.eclipse.org/legal/epl-v10.html"}
|
||||
|
||||
:dependencies [[org.clojure/clojure "1.6.0"]
|
||||
[org.clojure/tools.logging "0.2.6"]
|
||||
[org.clojure/core.async "0.1.303.0-886421-alpha"]
|
||||
[org.clojure/java.jdbc "0.3.3"]
|
||||
[honeysql "0.4.3"]
|
||||
[org.postgresql/postgresql "9.2-1003-jdbc4"]]
|
||||
|
||||
:profiles {:test {:dependencies [[org.clojure/tools.nrepl "0.2.3"]
|
||||
[environ "0.4.0"]
|
||||
[org.clojure/data.generators "0.1.2"]]}}
|
||||
|
||||
:plugins [[lein-environ "0.4.0"]])
|
36
src/views/base_subscribed_views.clj
Normal file
36
src/views/base_subscribed_views.clj
Normal file
|
@ -0,0 +1,36 @@
|
|||
(ns views.base-subscribed-views
|
||||
(:require
|
||||
[views.db.core :refer [initial-views]]
|
||||
[views.subscribed-views :refer [SubscribedViews subscriber-key-fn prefix-fn send-message]]
|
||||
[views.subscriptions :as vs :refer [add-subscriptions!]]
|
||||
[clojure.tools.logging :refer [debug info warn error]]
|
||||
[clojure.core.async :refer [put! <! go thread]]))
|
||||
|
||||
(defrecord BaseSubscribedViews [db templates delta-broadcast-chan]
|
||||
SubscribedViews
|
||||
(subscribe-views
|
||||
[this sub-req]
|
||||
;; (let [view-sigs (view-filter sub-req (:body sub-req) templates)] ; this is where security comes in.
|
||||
(let [subscriber-key (subscriber-key-fn this sub-req)
|
||||
view-sigs (:view-sigs sub-req)]
|
||||
(info "Subscribing views: " view-sigs)
|
||||
(when (seq view-sigs)
|
||||
(add-subscriptions! subscriber-key view-sigs (prefix-fn this sub-req))
|
||||
(thread
|
||||
(->> (initial-views db view-sigs templates @vs/compiled-views)
|
||||
(send-message this subscriber-key))))))
|
||||
|
||||
(unsubscribe-views [this unsub-req])
|
||||
|
||||
(disconnect [this disconnect-req])
|
||||
|
||||
(subscribed-views [this] @vs/compiled-views)
|
||||
|
||||
(broadcast-deltas [this fdb views-with-deltas])
|
||||
|
||||
(send-message [this address msg]
|
||||
(warn "IMPLEMENT ME. Got message " msg " sent to address " address))
|
||||
|
||||
(subscriber-key-fn [this msg] (:subscriber-key msg))
|
||||
|
||||
(prefix-fn [this msg] nil))
|
479
src/views/db.clj
Normal file
479
src/views/db.clj
Normal file
|
@ -0,0 +1,479 @@
|
|||
(ns views.db
|
||||
(:import
|
||||
[java.sql SQLException BatchUpdateException]
|
||||
[org.postgresql.util PSQLException])
|
||||
(:require
|
||||
[clojure.string :refer [trim split]]
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[honeysql.format :as fmt]
|
||||
[honeysql.types :as ht]
|
||||
[clojure.java.jdbc :as j]
|
||||
[clojure.tools.logging :refer [debug]]
|
||||
[views.honeysql :as vh]
|
||||
[views.subscribed-views :refer [get-subscribed-views broadcast-deltas]]))
|
||||
|
||||
(defn get-primary-key
|
||||
"Get a primary key for a table."
|
||||
[schema table]
|
||||
(or
|
||||
(keyword (get-in schema [(name table) :primary-key :column_name]))
|
||||
(throw (Exception. (str "Cannot find primary key for table: " table)))))
|
||||
|
||||
;;
|
||||
;; Takes the HoneySQL template for a view and the arglist
|
||||
;; and compiles the view with a set of dummy args in the
|
||||
;; format
|
||||
;; [?1, ?2, ?3 ... ?N]
|
||||
;;
|
||||
;; Returns a map of the compiled hash-map and the args
|
||||
;; with keys :dummy-view and :dummy-args respectively.
|
||||
;;
|
||||
(defn- compile-dummy-view
|
||||
[view-template args]
|
||||
(let [dummy-args (take (count args) (range))
|
||||
dummy-args (map #(str "?" %) dummy-args)]
|
||||
{:dummy-view (apply view-template dummy-args)
|
||||
:dummy-args dummy-args}))
|
||||
|
||||
;;
|
||||
;; Terminology and data structures used throughout this code
|
||||
;;
|
||||
;; <name>-template - refers to a function which receives parameters
|
||||
;; and returns a HoneySQL hash-map with params interpolated.
|
||||
;;
|
||||
;; action - describes the HoneySQL hash-map for the action to be performed
|
||||
;; --the template function has already been called and returned this
|
||||
;; with the appropriate parameter arguments.
|
||||
;;
|
||||
;; view-map - contains a set of computed information for each view itself.
|
||||
;; Refer to the view-map doc-string for more information.
|
||||
;;
|
||||
;; view-check - SQL for checking whether or not a view needs to receive deltas
|
||||
;; upon completion of an operation.
|
||||
;;
|
||||
|
||||
(defn view-map
|
||||
"Constructs a view map from a HoneySQL view function and its arguments.
|
||||
Contains four fields:
|
||||
:view - the hash-map with interpolated parameters
|
||||
:view-sig - the \"signature\" for the view, i.e. [:matter 1]
|
||||
:args - the arguments passed in.
|
||||
:tables - the tables present in all :from, :insert-into,
|
||||
:update, :delete-from, :join, :left-join :right-join clauses
|
||||
|
||||
Input is a view template function and a view signature. The template
|
||||
function must take the same number of paramters as the signature and
|
||||
return a honeysql data structure "
|
||||
[view-template view-sig]
|
||||
(let [compiled-view (if (> (count view-sig) 1)
|
||||
(apply view-template (rest view-sig))
|
||||
(view-template))]
|
||||
(merge {:args (rest view-sig)
|
||||
:view-sig view-sig
|
||||
:view compiled-view
|
||||
:tables (set (vh/extract-tables compiled-view))}
|
||||
(compile-dummy-view view-template (rest view-sig)))))
|
||||
|
||||
(defn view-sig->view-map
|
||||
"Takes a map of sig keys to view template function vars (templates)
|
||||
and a view signature (view-sig the key for the template map and its args)
|
||||
and returns a view-map for that view-sig."
|
||||
[templates view-sig]
|
||||
(let [lookup (first view-sig)]
|
||||
(view-map (get-in templates [lookup :fn]) view-sig)))
|
||||
|
||||
(defn swap-out-dummy-for-pos
|
||||
"Replaces dummy arg like \"?0\" for integer value (0) so we can sort args."
|
||||
[dummy-arg]
|
||||
(Integer. (subs dummy-arg 1)))
|
||||
|
||||
;; Helper for determine-filter-clauses (which is a helper
|
||||
;; for view-check-template). Extracts constituent parts from
|
||||
;; where clause.
|
||||
(defn set-filter-clauses
|
||||
[dummy-args fc w]
|
||||
(if (= w :and)
|
||||
fc
|
||||
(if (contains? (set dummy-args) (last w))
|
||||
(update-in fc [:s] assoc (swap-out-dummy-for-pos (last w)) (second w))
|
||||
(update-in fc [:w] (fnil conj []) w))))
|
||||
|
||||
;; Helper for generating the view-check HoneySQL template.
|
||||
;; Builds the where and select clauses up from constituent
|
||||
;; where-clauses. Placeholder identifies the parameters
|
||||
;; to pull out into the select clause.
|
||||
(defn determine-filter-clauses
|
||||
[wc dummy-args]
|
||||
(let [fc {:s {} :w nil}
|
||||
fc (if (and (not= :and (first wc)) (not (coll? (first wc))))
|
||||
(set-filter-clauses dummy-args fc wc)
|
||||
(reduce #(set-filter-clauses dummy-args %1 %2) fc wc))]
|
||||
(-> fc
|
||||
(update-in [:s] #(into [] (vals (sort-by key %))))
|
||||
(update-in [:w] #(vh/with-op :and %)))))
|
||||
|
||||
(defn append-arg-map
|
||||
"Removes table/alias namespacing from select fields and creates a hash-map
|
||||
of field to arguments for checking this view against checked-results later on.
|
||||
Note that this assumes our select-fields are in the same order as they
|
||||
are present in the :args view-map field (which they should be)."
|
||||
[view-map select-fields]
|
||||
(let [select-fields (map #(-> % name (split #"\.") last keyword) select-fields)]
|
||||
(assoc view-map :arg-compare (zipmap select-fields (into [] (:args view-map))))))
|
||||
|
||||
(defn- create-view-delta-where-clauses
|
||||
[view-map action]
|
||||
(let [action-table (first (vh/extract-tables action))]
|
||||
(for [view-table (vh/find-table-aliases action-table (:tables view-map))]
|
||||
(-> (:where action)
|
||||
(vh/prefix-columns (vh/table-alias view-table))
|
||||
(vh/replace-table (vh/table-alias action-table) (vh/table-alias view-table))))))
|
||||
|
||||
(defn format-action-wc-for-view
|
||||
"Takes view-map and action (HoneySQL hash-map for insert/update/delete),
|
||||
extracts where clause from action, and formats it with the proper
|
||||
alias (or no alias) so that it will work when applied to the view SQL."
|
||||
[view-map action]
|
||||
(if (:where action)
|
||||
(let [preds (create-view-delta-where-clauses view-map action)]
|
||||
(if (> (count preds) 1)
|
||||
(into [:or] preds)
|
||||
(first preds)))))
|
||||
|
||||
(defn- update-where-clause
|
||||
[hh-spec where]
|
||||
(if-let [w (:where where)]
|
||||
(assoc hh-spec :where w)
|
||||
(dissoc hh-spec :where)))
|
||||
|
||||
(defn view-check-template
|
||||
"Receives a view-map and an action (insert/update/delete HoneySQL hash-map).
|
||||
Returns a HoneySQL hash-map which will can be formatted as SQL to check if a
|
||||
view needs to receive deltas for the action SQL."
|
||||
[view-map action]
|
||||
(let [{:keys [dummy-view dummy-args]} view-map
|
||||
fc (determine-filter-clauses (:where dummy-view) dummy-args)
|
||||
action-wc (format-action-wc-for-view view-map action)
|
||||
view-map (append-arg-map view-map (:s fc))] ; we need this to compare *after* the check is run
|
||||
(->> (-> dummy-view
|
||||
(update-where-clause (vh/merge-where-clauses action-wc (:w fc)))
|
||||
(merge (apply hh/select (:s fc))))
|
||||
(hash-map :view-map view-map :view-check))))
|
||||
|
||||
(defn prepare-checks-for-view-deltas
|
||||
"Checks to see if an action has tables related to a view, and
|
||||
if so builds the HoneySQL hash-map for the SQL needed.
|
||||
Uses this hash-map as a key and conj's the view-map to the key's
|
||||
value so as to avoid redundant delta-check querying."
|
||||
[action confirmed-views view-map]
|
||||
;; Confirm if any of the tables in view-map are present in the action template:
|
||||
(if (some (set (map first (vh/extract-tables action)))
|
||||
(map first (:tables view-map)))
|
||||
|
||||
;; Then construct the check-template for this particular view.
|
||||
(if-let [{:keys [view-check view-map]} (view-check-template view-map action)]
|
||||
;; We then use the view-check as an index and conj the
|
||||
;; view-map to it so as to avoid redundant checks.
|
||||
(update-in confirmed-views [view-check] #(conj % view-map))
|
||||
confirmed-views)
|
||||
confirmed-views))
|
||||
|
||||
(defn prepare-view-checks
|
||||
"Prepares checks for a collection of views (view-maps) against a HoneySQL action
|
||||
(insert/update/delete) hash-map.
|
||||
|
||||
Returns a structure like so:
|
||||
{{<computed HoneySQL hash-map for the check SQL}
|
||||
[<collection of all views this check hash-map key applies to]}
|
||||
"
|
||||
[view-maps action]
|
||||
(reduce #(prepare-checks-for-view-deltas action %1 %2) {} view-maps))
|
||||
|
||||
(defn- do-check
|
||||
[db check-template]
|
||||
(j/query db (hsql/format check-template)))
|
||||
|
||||
(defn- check-view-args
|
||||
[checked-results view-map]
|
||||
(let [view-args (:arg-compare view-map)]
|
||||
(reduce
|
||||
(fn [hit cr]
|
||||
(if (seq (filter #(= (% cr) (% view-args)) (keys view-args)))
|
||||
(reduced view-map) ; don't care which args, just whether or not the view-map hit
|
||||
hit))
|
||||
nil
|
||||
checked-results)))
|
||||
|
||||
(defn- check-all-view-args
|
||||
[checked-results views]
|
||||
(->> views
|
||||
(map #(check-view-args checked-results %))
|
||||
(remove nil?)
|
||||
distinct))
|
||||
|
||||
(defn- do-view-pre-check
|
||||
[db needs-deltas view-check]
|
||||
;;
|
||||
;; We have empty-select? if we have a view with no where predicate clauses--
|
||||
;; so it will always require deltas if there are matching tables.
|
||||
;;
|
||||
;; empty-where comes about if we are inserting--we don't have any where predicate
|
||||
;; in the insert, of course, so we can't perform pre-checks reliably.
|
||||
;; When we do an insert we have to simply do the delta query regardless, for now.
|
||||
;;
|
||||
(let [empty-select? (seq (remove nil? (:select (first view-check))))
|
||||
empty-where? (seq (remove #(or (nil? %) (= :and %)) (:where (first view-check))))]
|
||||
(if (or (not empty-select?) (not empty-where?))
|
||||
(apply conj needs-deltas (last view-check)) ;; put them all in if we can't do pre-check.
|
||||
(let [checked-results (do-check db (first view-check))
|
||||
;; checks view args against checked result set
|
||||
checked-views (check-all-view-args checked-results (last view-check))]
|
||||
(if (seq checked-views)
|
||||
(apply conj needs-deltas checked-views)
|
||||
needs-deltas)))))
|
||||
|
||||
(defn do-view-pre-checks
|
||||
"Takes db, all views (view-maps) and the HoneySQL action (insert/update/delete)
|
||||
hash-map. Returns view-maps for all the views which need to receive
|
||||
delta updates after the action is performed.
|
||||
|
||||
*This function should be called within a transaction before performing the
|
||||
insert/update/delete action.*"
|
||||
[db all-views action]
|
||||
(let [view-checks (prepare-view-checks all-views action)]
|
||||
(reduce #(do-view-pre-check db %1 %2) [] view-checks)))
|
||||
|
||||
(defn- calculate-delete-deltas
|
||||
[db view-map]
|
||||
(->> (:delete-deltas-map view-map)
|
||||
hsql/format
|
||||
(j/query db)
|
||||
(assoc view-map :delete-deltas)))
|
||||
|
||||
;; -------------------------------------------------------------------------------
|
||||
;; Handle inserts
|
||||
;; -------------------------------------------------------------------------------
|
||||
|
||||
(defn compute-delete-deltas-for-insert
|
||||
"Computes and returns a sequence of delete deltas for a single view and insert."
|
||||
[schema db view-map table record]
|
||||
(if (vh/outer-join-table? (:view view-map) table)
|
||||
(let [delta-q (vh/outer-join-delta-query schema (:view view-map) table record)]
|
||||
(j/query db (hsql/format delta-q)))
|
||||
[]))
|
||||
|
||||
(defn primary-key-predicate
|
||||
"Return a predicate for a where clause that constrains to the primary key of
|
||||
the record."
|
||||
[schema table record]
|
||||
(let [pkey (get-primary-key schema table)]
|
||||
[:= pkey (pkey record)]))
|
||||
|
||||
(defn compute-insert-deltas-for-insert
|
||||
[schema db view-map table record]
|
||||
(let [pkey-pred (primary-key-predicate schema table record)
|
||||
action (hsql/build :insert-into table :values [record] :where pkey-pred)
|
||||
insert-delta-wc (format-action-wc-for-view view-map action)
|
||||
view (:view view-map)
|
||||
insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))]
|
||||
(j/query db (hsql/format insert-delta-map))))
|
||||
|
||||
(defn compute-insert-delete-deltas-for-views
|
||||
[schema db views table record]
|
||||
(doall (map #(compute-delete-deltas-for-insert schema db % table record) views)))
|
||||
|
||||
(defn compute-insert-insert-deltas-for-views
|
||||
[schema db views table record]
|
||||
(doall (map #(compute-insert-deltas-for-insert schema db % table record) views)))
|
||||
|
||||
(defn compute-deltas-for-insert
|
||||
"This takes a *single* insert and a view, applies the insert and computes
|
||||
the view deltas."
|
||||
[schema db views table record]
|
||||
(let [deletes (compute-insert-delete-deltas-for-views schema db views table record)
|
||||
record* (first (j/insert! db table record))
|
||||
inserts (compute-insert-insert-deltas-for-views schema db views table record*)]
|
||||
{:views-with-deltas (doall (map #(assoc %1 :delete-deltas %2 :insert-deltas %3) views deletes inserts))
|
||||
:result record*}))
|
||||
|
||||
;; Handles insert and calculation of insert (after insert) delta.
|
||||
(defn- insert-and-append-deltas!
|
||||
[schema db views action table pkey]
|
||||
(let [table (:insert-into action)]
|
||||
(reduce
|
||||
#(-> %1
|
||||
(update-in [:views-with-deltas] into (:views-with-deltas %2))
|
||||
(update-in [:result-set] conj (:result %2)))
|
||||
{:views-with-deltas [] :result-set []}
|
||||
(map #(compute-deltas-for-insert schema db views table %) (:values action)))))
|
||||
|
||||
;; -------------------------------------------------------------------------------
|
||||
|
||||
;; This is for insert deltas for non-insert updates.
|
||||
|
||||
;;; Takes the HoneySQL map (at key :view) from the view-map and appends
|
||||
;;; the appropriately-table-namespaced where clause which limits the
|
||||
;;; view query to the previously inserted or updated records.
|
||||
(defn- calculate-insert-deltas
|
||||
[db action pkey-wc view-map]
|
||||
(let [action (assoc action :where pkey-wc)
|
||||
insert-delta-wc (format-action-wc-for-view view-map action)
|
||||
view (:view view-map)
|
||||
insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))
|
||||
deltas (j/query db (hsql/format insert-delta-map))]
|
||||
(if (seq deltas)
|
||||
(update-in view-map [:insert-deltas] #(apply conj % deltas))
|
||||
view-map)))
|
||||
|
||||
;; Helper to query the action's table for primary key and pull it out.
|
||||
(defn- get-action-row-key
|
||||
[db pkey table action]
|
||||
(->> (:where action)
|
||||
(hsql/build :select pkey :from table :where)
|
||||
hsql/format
|
||||
(j/query db)
|
||||
first pkey))
|
||||
|
||||
;; Handles update and calculation of delete (before update) and insert (after update) deltas.
|
||||
(defn- update-and-append-deltas!
|
||||
[db views action table pkey]
|
||||
(let [views-pre (doall (map #(calculate-delete-deltas db %) views))
|
||||
pkey-val (get-action-row-key db pkey table action)
|
||||
update (j/execute! db (hsql/format action))]
|
||||
{:views-with-deltas (doall (map #(calculate-insert-deltas db action [:= pkey pkey-val] %) views-pre))
|
||||
:result-set update}))
|
||||
|
||||
;; Handles deletion and calculation of delete (before update) delta.
|
||||
(defn- delete-and-append-deltas!
|
||||
[db views action table pkey]
|
||||
(let [views-pre (doall (map #(calculate-delete-deltas db %) views))]
|
||||
{:views-with-deltas views-pre
|
||||
:result-set (j/execute! db (hsql/format action))}))
|
||||
|
||||
;; Identifies which action--insert, update or delete--we are performing and dispatches appropriately.
|
||||
;; Returns view-map with appropriate deltas appended.
|
||||
(defn- perform-action-and-return-deltas
|
||||
[schema db views action table pkey]
|
||||
(cond
|
||||
(:insert-into action) (insert-and-append-deltas! schema db views action table pkey)
|
||||
(:update action) (update-and-append-deltas! db views action table pkey)
|
||||
(:delete-from action) (delete-and-append-deltas! db views action table pkey)
|
||||
:else (throw (Exception. "Received malformed action: " action))))
|
||||
|
||||
(defn generate-view-delta-map
|
||||
"Adds a HoneySQL hash-map for the delta-calculation specific to the view + action.
|
||||
Takes a view-map and the action HoneySQL hash-map, and appends the action's
|
||||
where clause to the view's where clause, and adds in new field :insert-deltas-map."
|
||||
[view-map action]
|
||||
(let [action-wc (format-action-wc-for-view view-map action)
|
||||
view (:view view-map)]
|
||||
(->> (update-in view [:where] #(:where (vh/merge-where-clauses action-wc %)))
|
||||
(assoc view-map :delete-deltas-map))))
|
||||
|
||||
|
||||
(defn do-view-transaction
|
||||
"Takes the following arguments:
|
||||
schema - from edl.core/defschema
|
||||
db - clojure.java.jdbc database connection
|
||||
all-views - the current set of views (view-maps--see view-map fn docstring for
|
||||
description) in memory for the database
|
||||
action - the HoneySQL pre-SQL hash-map with parameters already interpolated.
|
||||
|
||||
The function will then perform the following sequence of actions, all run
|
||||
within a transaction (with isolation serializable):
|
||||
|
||||
1) Create pre-check SQL for each view in the list.
|
||||
2) Run the pre-check SQL (or fail out based on some simple heuristics) to
|
||||
identify if we want to send delta messages to the view's subscribers
|
||||
(Note: this happens after the database action for *inserts only*).
|
||||
3) Run the database action (insert/action/delete).
|
||||
4) Calculate deltas based on the method described in section 5.4, \"Rule Generation\"
|
||||
of the paper \"Deriving Production Rules for Incremental Rule Maintenance\"
|
||||
by Stefano Ceri and Jennifer Widom (http://ilpubs.stanford.edu:8090/8/1/1991-4.pdf)
|
||||
|
||||
The function returns the views which received delta updates with the deltas
|
||||
keyed to each view-map at the keys :insert-deltas and :delete-deltas."
|
||||
[schema db all-views action]
|
||||
;; Every update connected with a view is done in a transaction:
|
||||
(j/with-db-transaction [t db :isolation :serializable]
|
||||
(let [need-deltas (do-view-pre-checks t all-views action)
|
||||
need-deltas (map #(generate-view-delta-map % action) need-deltas)
|
||||
table (-> action vh/extract-tables ffirst)
|
||||
pkey (get-primary-key schema table)]
|
||||
(perform-action-and-return-deltas schema t need-deltas action table pkey))))
|
||||
|
||||
;;
|
||||
;; Need to catch this and retry:
|
||||
;; java.sql.SQLException: ERROR: could not serialize access due to concurrent update
|
||||
;;
|
||||
(defn get-nested-exceptions*
|
||||
[exceptions e]
|
||||
(if-let [next-e (.getNextException e)]
|
||||
(recur (conj exceptions next-e) next-e)
|
||||
exceptions))
|
||||
|
||||
(defn get-nested-exceptions
|
||||
[e]
|
||||
(get-nested-exceptions* [e] e))
|
||||
|
||||
(defn do-transaction-fn-with-retries
|
||||
[transaction-fn]
|
||||
(try
|
||||
(transaction-fn)
|
||||
(catch SQLException e
|
||||
;; http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
|
||||
(debug "Caught exception with error code: " (.getSQLState e))
|
||||
(debug "Exception message: " (.getMessage e))
|
||||
;; (debug "stack trace message: " (.printStackTrace e))
|
||||
(if (some #(= (.getSQLState %) "40001") (get-nested-exceptions e))
|
||||
(do-transaction-fn-with-retries transaction-fn) ;; try it again
|
||||
(throw e))))) ;; otherwise rethrow
|
||||
|
||||
(defmacro with-view-transaction
|
||||
[subscribed-views binding & forms]
|
||||
(let [tvar (first binding)]
|
||||
`(if (:deltas ~(second binding)) ;; check if we are in a nested transaction
|
||||
(let [~tvar ~(second binding)] ~@forms)
|
||||
(do-transaction-fn-with-retries
|
||||
(fn []
|
||||
(let [deltas# (atom [])
|
||||
result# (j/with-db-transaction [t# ~(second binding) :isolation :serializable]
|
||||
(let [~tvar (assoc t# :deltas deltas#)]
|
||||
~@forms))]
|
||||
(broadcast-deltas ~subscribed-views ~(second binding) @deltas#)
|
||||
result#))))))
|
||||
|
||||
(defn vaction!
|
||||
"Used to perform arbitrary insert/update/delete actions on the database,
|
||||
while ensuring that view deltas are appropriately checked and calculated
|
||||
for the currently registered views as reported by a type implementing
|
||||
the SubscribedViews protocol.
|
||||
|
||||
Arguments are:
|
||||
|
||||
- schema: an edl schema (\"(defschema my-schema ...)\")
|
||||
|
||||
- db: a clojure.java.jdbc database
|
||||
|
||||
- action-map: the HoneySQL map for the insert/update/delete action
|
||||
|
||||
- subscribed-views: an implementation of SubscribedViews implementing
|
||||
the follow functions:
|
||||
|
||||
- get-subscribed-views takes a database connection. It should return
|
||||
a collection of view-maps.
|
||||
|
||||
- send-deltas takes a db connection, and the views which have had deltas
|
||||
calculate for them and associated with the hash-maps (appropriately
|
||||
called views-with-deltas)."
|
||||
[schema db action-map subscribed-views]
|
||||
(let [subbed-views (get-subscribed-views subscribed-views db)
|
||||
transaction-fn #(do-view-transaction schema db subbed-views action-map)]
|
||||
(if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry
|
||||
(let [{:keys [views-with-deltas result-set]} (transaction-fn)]
|
||||
(swap! deltas into views-with-deltas)
|
||||
result-set)
|
||||
(let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)]
|
||||
(broadcast-deltas subscribed-views db views-with-deltas)
|
||||
result-set))))
|
32
src/views/db/core.clj
Normal file
32
src/views/db/core.clj
Normal file
|
@ -0,0 +1,32 @@
|
|||
(ns views.db.core
|
||||
(:require
|
||||
[clojure.java.jdbc :as j]
|
||||
[honeysql.core :as hsql]))
|
||||
|
||||
(defn view-query
|
||||
"Takes db and query-fn (compiled HoneySQL hash-map)
|
||||
and runs the query, returning results."
|
||||
[db query-map]
|
||||
(j/query db (hsql/format query-map)))
|
||||
|
||||
(defn post-process-result-set
|
||||
[nv templates result-set]
|
||||
(if-let [post-fn (get-in templates [(first nv) :post-fn])]
|
||||
(mapv post-fn result-set)
|
||||
result-set))
|
||||
|
||||
(defn initial-views
|
||||
"Takes a db spec, the new views sigs (new-views) we want to produce result-sets for,
|
||||
the template config map, and the subscribed-views themselves (with compiled view-maps)
|
||||
and returns a result-set for the new-views with post-fn functions applied to the data."
|
||||
[db new-views templates subscribed-views]
|
||||
(reduce
|
||||
(fn [results nv]
|
||||
(->> (get subscribed-views nv)
|
||||
:view-map
|
||||
(view-query db)
|
||||
(into [])
|
||||
(post-process-result-set nv templates)
|
||||
(assoc results nv)))
|
||||
{}
|
||||
new-views))
|
39
src/views/deltas.clj
Normal file
39
src/views/deltas.clj
Normal file
|
@ -0,0 +1,39 @@
|
|||
(ns view.deltas)
|
||||
|
||||
;; ;; This preprocesses a batch of view deltas. They get packaged into batches for each
|
||||
;; ;; subscriber in send-delta.
|
||||
;; (defn preprocess-delta
|
||||
;; "Returns a pair [firm_id, deltas]."
|
||||
;; [fdb templates v]
|
||||
;; (let [insert-deltas (post-process-result-sets templates [[(:view-sig v) (:insert-deltas v)]])
|
||||
;; delete-deltas (post-process-result-sets templates [[(:view-sig v) (:delete-deltas v)]])
|
||||
;; deltas [(:view-sig v) {:insert-deltas (seq (last (last insert-deltas)))
|
||||
;; :delete-deltas (seq (last (last delete-deltas)))}]]
|
||||
;; [(:fid fdb) deltas]))
|
||||
|
||||
;; (defn build-delta-batch
|
||||
;; [fdb templates views-with-deltas]
|
||||
;; (doall (map #(preprocess-delta fdb templates %) views-with-deltas)))
|
||||
|
||||
|
||||
;; The following creates delta batches for each subscriber.
|
||||
|
||||
;; If we have a batch of deltas:
|
||||
;; - there are multiple views
|
||||
;; - each view has a list of clients
|
||||
;; - produce client -> deltas
|
||||
|
||||
(defn add-message
|
||||
[msg batches subscriber]
|
||||
(debug "DELTAs to S: " msg " | " subscriber)
|
||||
(update-in batches [subscriber] (fnil conj []) msg))
|
||||
|
||||
(defn build-messages
|
||||
[delta-batch]
|
||||
(loop [batches {}, deltas (seq delta-batch)]
|
||||
(if-not deltas
|
||||
batches
|
||||
(let [[firm_id [view-sig delta]] (first deltas)
|
||||
subscribed (get-in @subscribed-views [firm_id view-sig :sessions])]
|
||||
(debug "\n\nSUBSCRIBED: " subscribed)
|
||||
(recur (reduce #(add-message {view-sig delta} %1 %2) batches subscribed) (next deltas))))))
|
34
src/views/filters.clj
Normal file
34
src/views/filters.clj
Normal file
|
@ -0,0 +1,34 @@
|
|||
(ns views.filters)
|
||||
|
||||
(defn view-filter
|
||||
"Takes a subscription request msg, a collection of view-sigs and
|
||||
the config templates hash-map for an app. Checks if there is
|
||||
a global filter-fn in the hash-map metadata and checks against
|
||||
that if it exists, as well as against any existing filter
|
||||
functions for individual template config entries. Template
|
||||
config hash-map entries can specify a filter-fn using the key
|
||||
:filter-fn, and the global filter-fn is the same, only on
|
||||
the config meta-data (i.e. (with-meta templates {:filter-fn ...}))
|
||||
|
||||
By default throws an exception if no filters are present.
|
||||
By passing in {:unsafe true} in opts, this can be overridden."
|
||||
[msg view-sigs templates & opts]
|
||||
(let [global-filter-fn (:filter-fn (meta templates))]
|
||||
(filterv
|
||||
#(let [filter-fn (:filter-fn (get templates (first %)))]
|
||||
(cond
|
||||
(and filter-fn global-filter-fn)
|
||||
(and (global-filter-fn msg %) (filter-fn msg %))
|
||||
|
||||
filter-fn
|
||||
(filter-fn msg %)
|
||||
|
||||
global-filter-fn
|
||||
(global-filter-fn msg %)
|
||||
|
||||
:else
|
||||
(if (-> opts first :unsafe)
|
||||
(do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %)
|
||||
true)
|
||||
(throw (Exception. (str "No filter set for view " %))))))
|
||||
view-sigs)))
|
148
src/views/honeysql.clj
Normal file
148
src/views/honeysql.clj
Normal file
|
@ -0,0 +1,148 @@
|
|||
(ns views.honeysql
|
||||
(:require
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[clojure.string :refer [split]]))
|
||||
|
||||
(def table-clauses
|
||||
[:from :insert-into :update :delete-from :join :left-join :right-join])
|
||||
|
||||
(def pred-ops
|
||||
#{:= :< :> :<> :>= :<= :in :between :match :ltree-match :and :or :not=})
|
||||
|
||||
(defn process-complex-clause
|
||||
[tables clause]
|
||||
(reduce
|
||||
#(if (coll? %2)
|
||||
(if (some pred-ops [(first %2)])
|
||||
%1
|
||||
(conj %1 %2))
|
||||
(conj %1 [%2]))
|
||||
tables
|
||||
clause))
|
||||
|
||||
(defn extract-tables*
|
||||
[tables clause]
|
||||
(if clause
|
||||
(if (coll? clause)
|
||||
(process-complex-clause tables clause)
|
||||
(conj tables [clause]))
|
||||
tables))
|
||||
|
||||
(defn with-op
|
||||
"Takes a collection of things and returns either an nary op of them, or
|
||||
the item in the collection if there is only one."
|
||||
[op coll]
|
||||
(if (> (count coll) 1) (into [op] coll) (first coll)))
|
||||
|
||||
(defn extract-tables
|
||||
"Extracts a set of table vector from a HoneySQL spec hash-map.
|
||||
Each vector either contains a single table keyword, or the
|
||||
table keyword and an alias keyword."
|
||||
([hh-spec] (extract-tables hh-spec table-clauses))
|
||||
([hh-spec clauses] (reduce #(extract-tables* %1 (%2 hh-spec)) #{} clauses)))
|
||||
|
||||
(defn find-table-aliases
|
||||
"Returns the table alias for the supplied table."
|
||||
[action-table tables]
|
||||
(filter #(= (first action-table) (first %)) tables))
|
||||
|
||||
(defn outer-join-table?
|
||||
"Return true if table is used in an outer join in the honeysql expression."
|
||||
[hh-spec table]
|
||||
(let [tables (map first (extract-tables hh-spec [:left-join :right-join]))]
|
||||
(boolean (some #(= table %) tables))))
|
||||
|
||||
(defn prefix-column
|
||||
"Prefixes a column with an alias."
|
||||
[column alias]
|
||||
(keyword (str (name alias) \. (name column))))
|
||||
|
||||
(defn create-null-constraints
|
||||
"Create 'is null' constraints for all the columns of a table."
|
||||
[schema table table-alias]
|
||||
(let [columns (map #(prefix-column % table-alias) (keys (:columns (get schema (name table)))))]
|
||||
(into [:and] (for [c columns] [:= c nil]))))
|
||||
|
||||
(defn table-alias
|
||||
"Returns the name of a table or its alias. E.g. for [:table :t] returns :t."
|
||||
[table]
|
||||
(if (keyword? table) table (last table)))
|
||||
|
||||
(defn table-name
|
||||
"Returns the name of a table . E.g. for [:table :t] returns :table."
|
||||
[table]
|
||||
(if (keyword? table) table (first table)))
|
||||
|
||||
(defn table-column
|
||||
"Assumes that table columns are keywords of the form :table.column. Returns
|
||||
the column as a keyword or nil if the supplied keyword doesn't match the pattern."
|
||||
[table item]
|
||||
(let [s (name item), t (str (name table) \.)]
|
||||
(if (.startsWith s t) (keyword (subs s (count t))))))
|
||||
|
||||
(defn modified-outer-join-predicate
|
||||
"Returns an outer join predicate with the join tables columns subistituted
|
||||
with values from a record."
|
||||
[table predicate record]
|
||||
(if-let [column (and (keyword? predicate) (table-column table predicate))]
|
||||
(or (get record column)
|
||||
(throw (Exception. (str "No value for column " column " in " record))))
|
||||
(if (vector? predicate)
|
||||
(apply vector (map #(modified-outer-join-predicate table % record) predicate))
|
||||
predicate)))
|
||||
|
||||
(defn find-outer-joins
|
||||
"Find and return all the outer joins on a given table."
|
||||
[hh-spec table]
|
||||
(->> (concat (:left-join hh-spec) (:right-join hh-spec))
|
||||
(partition 2 2)
|
||||
(filter #(= table (table-name (first %))))))
|
||||
|
||||
(defn- create-outer-join-predicates
|
||||
"Create outer join predicate from a record and joins."
|
||||
[schema table record joins]
|
||||
(->> joins
|
||||
(map (fn [[table-spec join-pred]]
|
||||
[:and
|
||||
(modified-outer-join-predicate (table-alias table-spec) join-pred record)
|
||||
(create-null-constraints schema table (table-alias table-spec))]))
|
||||
(with-op :or)))
|
||||
|
||||
(defn outer-join-delta-query
|
||||
"Create an outer join delta query given a honeysql template and record"
|
||||
[schema hh-spec table record]
|
||||
(let [join-tables (find-outer-joins hh-spec table)
|
||||
join-pred (create-outer-join-predicates schema table record join-tables)]
|
||||
(assert (not (nil? join-pred)))
|
||||
(update-in hh-spec [:where] #(vector :and % join-pred))))
|
||||
|
||||
(defn merge-where-clauses
|
||||
"Takes two where clauses from two different HoneySQL maps and joins then with an and.
|
||||
If one is nil, returns only the non-nil where clause."
|
||||
[wc1 wc2]
|
||||
(if (and wc1 wc2)
|
||||
(hh/where wc1 wc2)
|
||||
(hh/where (or wc1 wc2))))
|
||||
|
||||
(defn replace-table
|
||||
"Replace all instances of table name t1 pred with t2."
|
||||
[pred t1 t2]
|
||||
(if-let [column (and (keyword? pred) (table-column t1 pred))]
|
||||
(keyword (str (name t2) \. (name column)))
|
||||
(if (coll? pred)
|
||||
(map #(replace-table % t1 t2) pred)
|
||||
pred)))
|
||||
|
||||
(defn unprefixed-column?
|
||||
[c]
|
||||
(and (keyword? c) (not (pred-ops c)) (neg? (.indexOf (name c) (int \.)))))
|
||||
|
||||
(defn prefix-columns
|
||||
"Prefix all unprefixed columns with table."
|
||||
[pred table]
|
||||
(if (unprefixed-column? pred)
|
||||
(keyword (str (name table) \. (name pred)))
|
||||
(if (coll? pred)
|
||||
(map #(prefix-columns % table) pred)
|
||||
pred)))
|
45
src/views/router.clj
Normal file
45
src/views/router.clj
Normal file
|
@ -0,0 +1,45 @@
|
|||
(ns views.router
|
||||
(:require
|
||||
[views.subscribed-views
|
||||
:refer [subscribe-views unsubscribe-views disconnect get-delta-broadcast-channel send-delta]]
|
||||
[clojure.core.async :refer [go go-loop chan pub sub unsub close! >! >!! <! <!! filter<]]
|
||||
[clojure.tools.logging :refer [debug]]))
|
||||
|
||||
(defn handle-subscriptions!
|
||||
[subscribed-views subscriptions]
|
||||
(go (while true
|
||||
(let [sub (<! subscriptions)]
|
||||
(subscribe-views subscribed-views sub)))))
|
||||
|
||||
(defn handle-deltas!
|
||||
[subscribed-views]
|
||||
(let [delta-channel (get-delta-broadcast-channel subscribed-views)]
|
||||
(go (while true
|
||||
(let [delta (<! delta-channel)]
|
||||
(send-delta subscribed-views delta))))))
|
||||
|
||||
(defn handle-unsubscriptions!
|
||||
[subscribed-views unsubscriptions]
|
||||
(go (while true
|
||||
(let [unsub (<! unsubscriptions)]
|
||||
(unsubscribe-views subscribed-views unsub)))))
|
||||
|
||||
(defn handle-disconnects!
|
||||
[subscribed-views disconnects]
|
||||
(go (while true
|
||||
(let [disc (<! disconnects)]
|
||||
(disconnect subscribed-views disc)))))
|
||||
|
||||
(defn init!
|
||||
[subscribed-views client-chan]
|
||||
(let [subs (chan)
|
||||
unsubs (chan)
|
||||
control (chan)
|
||||
disconnects (filter< #(= :disconnect (:body %)) control)]
|
||||
(sub client-chan :views.subscribe subs)
|
||||
(sub client-chan :views.unsubscribe unsubs)
|
||||
(sub client-chan :client-channel disconnects)
|
||||
(handle-subscriptions! subscribed-views subs)
|
||||
(handle-deltas! subscribed-views)
|
||||
(handle-unsubscriptions! subscribed-views unsubs)
|
||||
(handle-disconnects! subscribed-views disconnects)))
|
14
src/views/subscribed_views.clj
Normal file
14
src/views/subscribed_views.clj
Normal file
|
@ -0,0 +1,14 @@
|
|||
(ns views.subscribed-views)
|
||||
|
||||
(defprotocol SubscribedViews
|
||||
;; Subscription and Delta routing
|
||||
(subscribe-views [this sub-request])
|
||||
(unsubscribe-views [this unsub-request])
|
||||
(disconnect [this disconnect-request])
|
||||
(send-message [this address msg])
|
||||
(subscriber-key-fn [this msg])
|
||||
(prefix-fn [this msg])
|
||||
|
||||
;; DB interaction
|
||||
(broadcast-deltas [this db views-with-deltas])
|
||||
(subscribed-views [this]))
|
65
src/views/subscriptions.clj
Normal file
65
src/views/subscriptions.clj
Normal file
|
@ -0,0 +1,65 @@
|
|||
(ns views.subscriptions)
|
||||
|
||||
;;
|
||||
;; {[:view-sig 1 "arg2"] {:keys [1 2 3 4 ... ] :view-map {:view ...}}}
|
||||
;;
|
||||
;; or
|
||||
;;
|
||||
;; {prefix {[:view-sig 1 "arg2"] {:keys [1 2 3 4 ... ] :view-map {:view ...}}}}
|
||||
;;
|
||||
|
||||
(def subscribed-views (atom {}))
|
||||
(def compiled-views (atom {}))
|
||||
|
||||
(defn- add-subscriber-key
|
||||
[subscriber-key]
|
||||
(fn [view-subs]
|
||||
(if (seq view-subs)
|
||||
(conj view-subs subscriber-key)
|
||||
#{subscriber-key})))
|
||||
|
||||
(defn add-subscription!
|
||||
([subscriber-key view-sig]
|
||||
(swap! subscribed-views #(update-in % [view-sig] (add-subscriber-key subscriber-key))))
|
||||
([subscriber-key view-sig prefix]
|
||||
(swap! subscribed-views #(update-in % [prefix view-sig] (add-subscriber-key subscriber-key)))))
|
||||
|
||||
(defn add-subscriptions!
|
||||
([subscriber-key view-sigs]
|
||||
(add-subscriptions! subscriber-key view-sigs nil))
|
||||
([subscriber-key view-sigs prefix]
|
||||
(doseq [vs view-sigs]
|
||||
(if prefix
|
||||
(add-subscription! subscriber-key vs prefix)
|
||||
(add-subscription! subscriber-key vs)))))
|
||||
|
||||
(defn subscribed-to
|
||||
([view-sig]
|
||||
(get @subscribed-views view-sig))
|
||||
([view-sig prefix]
|
||||
(get-in @subscribed-views [prefix view-sig])))
|
||||
|
||||
(defn subscribed-to?
|
||||
([subscriber-key view-sig]
|
||||
(subscribed-to? subscriber-key view-sig nil))
|
||||
([subscriber-key view-sig prefix]
|
||||
(if-let [view-subs (if prefix (subscribed-to view-sig prefix) (subscribed-to view-sig))]
|
||||
(view-subs subscriber-key))))
|
||||
|
||||
(defn- remove-key-or-view
|
||||
[subscriber-key view-sig prefix]
|
||||
(fn [subbed-views]
|
||||
(let [path (if prefix [prefix view-sig] [view-sig])
|
||||
updated (update-in subbed-views path disj subscriber-key)]
|
||||
(if (seq (get-in updated path))
|
||||
updated
|
||||
(if prefix
|
||||
(update-in updated [prefix] dissoc view-sig)
|
||||
(dissoc updated view-sig))))))
|
||||
|
||||
(defn remove-subscription!
|
||||
([subscriber-key view-sig]
|
||||
(remove-subscription! subscriber-key view-sig nil))
|
||||
([subscriber-key view-sig prefix]
|
||||
(when (subscribed-to? subscriber-key view-sig (if prefix prefix))
|
||||
(swap! subscribed-views (remove-key-or-view subscriber-key view-sig prefix)))))
|
10
test/views/all_tests.clj
Normal file
10
test/views/all_tests.clj
Normal file
|
@ -0,0 +1,10 @@
|
|||
(ns views.all-tests
|
||||
(:require
|
||||
[clojure.test :refer [run-tests]]
|
||||
[views.subscriptions-test]
|
||||
[views.db.core-test]))
|
||||
|
||||
(defn run-all-tests
|
||||
[]
|
||||
(run-tests 'views.subscriptions-test
|
||||
'views.db.core-test))
|
39
test/views/db/core_test.clj
Normal file
39
test/views/db/core_test.clj
Normal file
|
@ -0,0 +1,39 @@
|
|||
(ns views.db.core-test
|
||||
(:require
|
||||
[clojure.test :refer [use-fixtures deftest is]]
|
||||
[honeysql.core :as hsql]
|
||||
[views.fixtures :as vf :refer [gen-n-users! database-fixtures!]]
|
||||
[views.db.core :as vdb]
|
||||
[clojure.string :refer [upper-case]]))
|
||||
|
||||
(use-fixtures :each database-fixtures!)
|
||||
|
||||
(defn users-tmpl
|
||||
[]
|
||||
(hsql/build :select [:id :name :created_on] :from :users))
|
||||
|
||||
(defn user-posts-tmpl
|
||||
[user_id]
|
||||
(hsql/build :select [:u.user_id :u.name :p.title :p.body :p.created_on]
|
||||
:from {:posts :p}
|
||||
:join [[:users :u][:= :user_id user_id]]))
|
||||
|
||||
(def templates
|
||||
{:users {:fn #'users-tmpl}
|
||||
:user-posts {:fn #'user-posts-tmpl}})
|
||||
|
||||
(defn subscribed-views
|
||||
[]
|
||||
{[:users] {:view-map ((get-in templates [:users :fn]))}})
|
||||
|
||||
(deftest initializes-views
|
||||
(let [users (gen-n-users! 2)]
|
||||
(is (= (vdb/initial-views vf/db [[:users]] templates (subscribed-views))
|
||||
{[:users] users}))))
|
||||
|
||||
(deftest post-processes-views
|
||||
(let [users (gen-n-users! 1)
|
||||
with-postfn (assoc-in templates [:users :post-fn] #(update-in % [:name] upper-case))
|
||||
views-rs (vdb/initial-views vf/db [[:users]] with-postfn (subscribed-views))]
|
||||
(is (= (-> (get views-rs [:users]) first :name)
|
||||
(-> users first :name upper-case)))))
|
54
test/views/fixtures.clj
Normal file
54
test/views/fixtures.clj
Normal file
|
@ -0,0 +1,54 @@
|
|||
(ns views.fixtures
|
||||
(:require
|
||||
[environ.core :as e]
|
||||
[clojure.java.jdbc :as j]
|
||||
[honeysql.core :as hsql]
|
||||
[clojure.data.generators :as dg]))
|
||||
|
||||
;; CREATE ROLE views_user LOGIN PASSWORD 'password';
|
||||
;; CREATE DATABASE views_test OWNER views_user;
|
||||
|
||||
(defn sql-ts
|
||||
([ts] (java.sql.Timestamp. ts))
|
||||
([] (java.sql.Timestamp. (.getTime (java.util.Date.)))))
|
||||
|
||||
(def db {:classname "org.postgresql.Driver"
|
||||
:subprotocol "postgresql"
|
||||
:subname (get :views-test-db e/env "//localhost/views_test")
|
||||
:user (get :views-test-user e/env "views_user")
|
||||
:password (get :views-test-ppassword e/env "password")})
|
||||
|
||||
(defn users-table-fixture!
|
||||
[]
|
||||
(j/execute! db ["CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT NOT NULL, created_on DATE NOT NULL)"]))
|
||||
|
||||
(defn posts-table-fixture!
|
||||
[]
|
||||
(j/execute! db ["CREATE TABLE posts (id SERIAL PRIMARY KEY,
|
||||
title TEXT NOT NULL,
|
||||
body TEXT NOT NULL,
|
||||
created_on DATE NOT NULL,
|
||||
user_id INTEGER NOT NULL,
|
||||
FOREIGN KEY (user_id) REFERENCES users(id))"]))
|
||||
|
||||
(defn drop-tables!
|
||||
[tables]
|
||||
(doseq [t tables]
|
||||
(j/execute! db [(str "DROP TABLE " (name t))])))
|
||||
|
||||
(defn database-fixtures!
|
||||
[f]
|
||||
(users-table-fixture!)
|
||||
(posts-table-fixture!)
|
||||
(f)
|
||||
(drop-tables! [:posts :users]))
|
||||
|
||||
(defn user-fixture!
|
||||
[name]
|
||||
(j/execute! db (hsql/format (hsql/build :insert-into :users :values [{:name name :created_on (sql-ts)}]))))
|
||||
|
||||
(defn gen-n-users!
|
||||
[n]
|
||||
(dotimes [n n]
|
||||
(user-fixture! (dg/string #(rand-nth (seq "abcdefghijklmnopqrstuwvxyz")))))
|
||||
(j/query db ["SELECT * FROM users"]))
|
59
test/views/subscriptions_test.clj
Normal file
59
test/views/subscriptions_test.clj
Normal file
|
@ -0,0 +1,59 @@
|
|||
(ns views.subscriptions-test
|
||||
(:require
|
||||
[clojure.test :refer [use-fixtures deftest is]]
|
||||
[views.subscriptions :as vs]))
|
||||
|
||||
(defn- reset-subscribed-views!
|
||||
[f]
|
||||
(reset! vs/subscribed-views {})
|
||||
(f))
|
||||
|
||||
(use-fixtures :each reset-subscribed-views!)
|
||||
|
||||
(deftest adds-a-subscription
|
||||
(let [key 1, view-sig [:view 1]]
|
||||
(vs/add-subscription! key view-sig)
|
||||
(is (vs/subscribed-to? key view-sig))))
|
||||
|
||||
(deftest can-use-prefix
|
||||
(let [prefix1 1, prefix2 2, key 1, view-sig [:view 1]]
|
||||
(vs/add-subscription! key view-sig prefix1)
|
||||
(vs/add-subscription! key view-sig prefix2)
|
||||
(is (vs/subscribed-to? key view-sig prefix1))
|
||||
(is (vs/subscribed-to? key view-sig prefix2))))
|
||||
|
||||
(deftest removes-a-subscription
|
||||
(let [key 1, view-sig [:view 1]]
|
||||
(vs/add-subscription! key view-sig)
|
||||
(vs/remove-subscription! key view-sig)
|
||||
(is (not (vs/subscribed-to? key view-sig)))))
|
||||
|
||||
(deftest doesnt-fail-or-create-view-entry-when-empty
|
||||
(vs/remove-subscription! 1 [:view 1])
|
||||
(is (= {} @vs/subscribed-views)))
|
||||
|
||||
(deftest removes-a-subscription-with-prefix
|
||||
(let [prefix1 1, prefix2 2, key 1, view-sig [:view 1]]
|
||||
(vs/add-subscription! key view-sig prefix1)
|
||||
(vs/add-subscription! key view-sig prefix2)
|
||||
(vs/remove-subscription! key view-sig prefix1)
|
||||
(is (not (vs/subscribed-to? key view-sig prefix1)))
|
||||
(is (vs/subscribed-to? key view-sig prefix2))))
|
||||
|
||||
(deftest removes-unsubscribed-to-view-from-subscribed-views
|
||||
(let [key 1, view-sig [:view 1]]
|
||||
(vs/add-subscription! key view-sig)
|
||||
(vs/remove-subscription! key view-sig)
|
||||
(is (= {} @vs/subscribed-views))))
|
||||
|
||||
(deftest adds-multiple-views-at-a-time
|
||||
(let [key 1, view-sigs [[:view 1] [:view 2]]]
|
||||
(vs/add-subscriptions! key view-sigs)
|
||||
(is (vs/subscribed-to? key (first view-sigs)))
|
||||
(is (vs/subscribed-to? key (last view-sigs)))))
|
||||
|
||||
;; (deftest subscribing-compiles-and-stores-view-maps
|
||||
;; (let [key 1, view-sig [:view 1]]
|
||||
;; (vs/add-subscriptions! key view-sigs)
|
||||
;; (is (vs/subscribed-to? key (first view-sigs)))
|
||||
;; (is (vs/subscribed-to? key (last view-sigs)))))
|
Loading…
Reference in a new issue