From 2388472f460a2e312007b6228b97c0b91e5333df Mon Sep 17 00:00:00 2001 From: Dave Della Costa Date: Fri, 6 Jun 2014 19:58:08 +0900 Subject: [PATCH] first commit; subscription code and view init code pulled out and tested; DE-specific code stripped --- .gitignore | 10 + LICENSE | 214 +++++++++++++ README.md | 49 +++ doc/intro.md | 3 + project.clj | 20 ++ src/views/base_subscribed_views.clj | 36 +++ src/views/db.clj | 479 ++++++++++++++++++++++++++++ src/views/db/core.clj | 32 ++ src/views/deltas.clj | 39 +++ src/views/filters.clj | 34 ++ src/views/honeysql.clj | 148 +++++++++ src/views/router.clj | 45 +++ src/views/subscribed_views.clj | 14 + src/views/subscriptions.clj | 65 ++++ test/views/all_tests.clj | 10 + test/views/db/core_test.clj | 39 +++ test/views/fixtures.clj | 54 ++++ test/views/subscriptions_test.clj | 59 ++++ 18 files changed, 1350 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 doc/intro.md create mode 100644 project.clj create mode 100644 src/views/base_subscribed_views.clj create mode 100644 src/views/db.clj create mode 100644 src/views/db/core.clj create mode 100644 src/views/deltas.clj create mode 100644 src/views/filters.clj create mode 100644 src/views/honeysql.clj create mode 100644 src/views/router.clj create mode 100644 src/views/subscribed_views.clj create mode 100644 src/views/subscriptions.clj create mode 100644 test/views/all_tests.clj create mode 100644 test/views/db/core_test.clj create mode 100644 test/views/fixtures.clj create mode 100644 test/views/subscriptions_test.clj diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..46d8b9a --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +/target +/classes +/checkouts +pom.xml +pom.xml.asc +*.jar +*.class +/.lein-* +/.nrepl-port +*~ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..786edf6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,214 @@ +THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC +LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM +CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + +a) in the case of the initial Contributor, the initial code and +documentation distributed under this Agreement, and + +b) in the case of each subsequent Contributor: + +i) changes to the Program, and + +ii) additions to the Program; + +where such changes and/or additions to the Program originate from and are +distributed by that particular Contributor. A Contribution 'originates' from +a Contributor if it was added to the Program by such Contributor itself or +anyone acting on such Contributor's behalf. Contributions do not include +additions to the Program which: (i) are separate modules of software +distributed in conjunction with the Program under their own license +agreement, and (ii) are not derivative works of the Program. + +"Contributor" means any person or entity that distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which are +necessarily infringed by the use or sale of its Contribution alone or when +combined with the Program. + +"Program" means the Contributions distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement, +including all Contributors. + +2. GRANT OF RIGHTS + +a) Subject to the terms of this Agreement, each Contributor hereby grants +Recipient a non-exclusive, worldwide, royalty-free copyright license to +reproduce, prepare derivative works of, publicly display, publicly perform, +distribute and sublicense the Contribution of such Contributor, if any, and +such derivative works, in source code and object code form. + +b) Subject to the terms of this Agreement, each Contributor hereby grants +Recipient a non-exclusive, worldwide, royalty-free patent license under +Licensed Patents to make, use, sell, offer to sell, import and otherwise +transfer the Contribution of such Contributor, if any, in source code and +object code form. This patent license shall apply to the combination of the +Contribution and the Program if, at the time the Contribution is added by the +Contributor, such addition of the Contribution causes such combination to be +covered by the Licensed Patents. The patent license shall not apply to any +other combinations which include the Contribution. No hardware per se is +licensed hereunder. + +c) Recipient understands that although each Contributor grants the licenses +to its Contributions set forth herein, no assurances are provided by any +Contributor that the Program does not infringe the patent or other +intellectual property rights of any other entity. Each Contributor disclaims +any liability to Recipient for claims brought by any other entity based on +infringement of intellectual property rights or otherwise. As a condition to +exercising the rights and licenses granted hereunder, each Recipient hereby +assumes sole responsibility to secure any other intellectual property rights +needed, if any. For example, if a third party patent license is required to +allow Recipient to distribute the Program, it is Recipient's responsibility +to acquire that license before distributing the Program. + +d) Each Contributor represents that to its knowledge it has sufficient +copyright rights in its Contribution, if any, to grant the copyright license +set forth in this Agreement. + +3. REQUIREMENTS + +A Contributor may choose to distribute the Program in object code form under +its own license agreement, provided that: + +a) it complies with the terms and conditions of this Agreement; and + +b) its license agreement: + +i) effectively disclaims on behalf of all Contributors all warranties and +conditions, express and implied, including warranties or conditions of title +and non-infringement, and implied warranties or conditions of merchantability +and fitness for a particular purpose; + +ii) effectively excludes on behalf of all Contributors all liability for +damages, including direct, indirect, special, incidental and consequential +damages, such as lost profits; + +iii) states that any provisions which differ from this Agreement are offered +by that Contributor alone and not by any other party; and + +iv) states that source code for the Program is available from such +Contributor, and informs licensees how to obtain it in a reasonable manner on +or through a medium customarily used for software exchange. + +When the Program is made available in source code form: + +a) it must be made available under this Agreement; and + +b) a copy of this Agreement must be included with each copy of the Program. + +Contributors may not remove or alter any copyright notices contained within +the Program. + +Each Contributor must identify itself as the originator of its Contribution, +if any, in a manner that reasonably allows subsequent Recipients to identify +the originator of the Contribution. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities with +respect to end users, business partners and the like. While this license is +intended to facilitate the commercial use of the Program, the Contributor who +includes the Program in a commercial product offering should do so in a +manner which does not create potential liability for other Contributors. +Therefore, if a Contributor includes the Program in a commercial product +offering, such Contributor ("Commercial Contributor") hereby agrees to defend +and indemnify every other Contributor ("Indemnified Contributor") against any +losses, damages and costs (collectively "Losses") arising from claims, +lawsuits and other legal actions brought by a third party against the +Indemnified Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program in +a commercial product offering. The obligations in this section do not apply +to any claims or Losses relating to any actual or alleged intellectual +property infringement. In order to qualify, an Indemnified Contributor must: +a) promptly notify the Commercial Contributor in writing of such claim, and +b) allow the Commercial Contributor tocontrol, and cooperate with the +Commercial Contributor in, the defense and any related settlement +negotiations. The Indemnified Contributor may participate in any such claim +at its own expense. + +For example, a Contributor might include the Program in a commercial product +offering, Product X. That Contributor is then a Commercial Contributor. If +that Commercial Contributor then makes performance claims, or offers +warranties related to Product X, those performance claims and warranties are +such Commercial Contributor's responsibility alone. Under this section, the +Commercial Contributor would have to defend claims against the other +Contributors related to those performance claims and warranties, and if a +court requires any other Contributor to pay any damages as a result, the +Commercial Contributor must pay those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON +AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER +EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR +CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A +PARTICULAR PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all risks +associated with its exercise of rights under this Agreement , including but +not limited to the risks and costs of program errors, compliance with +applicable laws, damage to or loss of data, programs or equipment, and +unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY +CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION +LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY +OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of the +remainder of the terms of this Agreement, and without further action by the +parties hereto, such provision shall be reformed to the minimum extent +necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Program itself +(excluding combinations of the Program with other software or hardware) +infringes such Recipient's patent(s), then such Recipient's rights granted +under Section 2(b) shall terminate as of the date such litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it fails to +comply with any of the material terms or conditions of this Agreement and +does not cure such failure in a reasonable period of time after becoming +aware of such noncompliance. If all Recipient's rights under this Agreement +terminate, Recipient agrees to cease use and distribution of the Program as +soon as reasonably practicable. However, Recipient's obligations under this +Agreement and any licenses granted by Recipient relating to the Program shall +continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, but in +order to avoid inconsistency the Agreement is copyrighted and may only be +modified in the following manner. The Agreement Steward reserves the right to +publish new versions (including revisions) of this Agreement from time to +time. No one other than the Agreement Steward has the right to modify this +Agreement. The Eclipse Foundation is the initial Agreement Steward. The +Eclipse Foundation may assign the responsibility to serve as the Agreement +Steward to a suitable separate entity. Each new version of the Agreement will +be given a distinguishing version number. The Program (including +Contributions) may always be distributed subject to the version of the +Agreement under which it was received. In addition, after a new version of +the Agreement is published, Contributor may elect to distribute the Program +(including its Contributions) under the new version. Except as expressly +stated in Sections 2(a) and 2(b) above, Recipient receives no rights or +licenses to the intellectual property of any Contributor under this +Agreement, whether expressly, by implication, estoppel or otherwise. All +rights in the Program not expressly granted under this Agreement are +reserved. + +This Agreement is governed by the laws of the State of Washington and the +intellectual property laws of the United States of America. No party to this +Agreement will bring a legal action under this Agreement more than one year +after the cause of action arose. Each party waives its rights to a jury trial +in any resulting litigation. diff --git a/README.md b/README.md new file mode 100644 index 0000000..6a1445d --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# views + +A Clojure library designed to ... well, that part is up to you. + + +## Design + +Subscription + +* subscribe to a view +* unsubscribe from a view +* remove all subscriptions (disconnect) +* get set of subscribed views + +Deltas + +* send deltas +* broadcast deltas +* receive deltas ( receive deltas from any other clients and broadcast out to subscribers a.k.a. broadcast deltas ) + +DB + +* calculates pre-delta checks (heuristic optimization) +* calculates actual deltas +* different heuristics for inserts, updates, and deletes +* different properties for outer joins +* pre-checks and delta calculations are coupled: + - we don't know if we need to calculate insert deltas until we do the insert and see if the result affects a view + +How should DB code work? + +- separate namespaces for insert, update, delete? +- low-level db actions (execute/query/transaction/etc.) should be separate namespace? +- initial view computing (view map, args, etc.) should be separate namespace? + + + + + +## Usage + +FIXME + +## License + +Copyright © 2014 FIXME + +Distributed under the Eclipse Public License either version 1.0 or (at +your option) any later version. diff --git a/doc/intro.md b/doc/intro.md new file mode 100644 index 0000000..2a918b3 --- /dev/null +++ b/doc/intro.md @@ -0,0 +1,3 @@ +# Introduction to views + +TODO: write [great documentation](http://jacobian.org/writing/great-documentation/what-to-write/) diff --git a/project.clj b/project.clj new file mode 100644 index 0000000..0d9bd97 --- /dev/null +++ b/project.clj @@ -0,0 +1,20 @@ +(defproject views "0.1.0-SNAPSHOT" + :description "You underestimate the power of the SQL side" + + :url "https://github.com/diligenceengine/views" + + :license {:name "Eclipse Public License" + :url "http://www.eclipse.org/legal/epl-v10.html"} + + :dependencies [[org.clojure/clojure "1.6.0"] + [org.clojure/tools.logging "0.2.6"] + [org.clojure/core.async "0.1.303.0-886421-alpha"] + [org.clojure/java.jdbc "0.3.3"] + [honeysql "0.4.3"] + [org.postgresql/postgresql "9.2-1003-jdbc4"]] + + :profiles {:test {:dependencies [[org.clojure/tools.nrepl "0.2.3"] + [environ "0.4.0"] + [org.clojure/data.generators "0.1.2"]]}} + + :plugins [[lein-environ "0.4.0"]]) diff --git a/src/views/base_subscribed_views.clj b/src/views/base_subscribed_views.clj new file mode 100644 index 0000000..9720f65 --- /dev/null +++ b/src/views/base_subscribed_views.clj @@ -0,0 +1,36 @@ +(ns views.base-subscribed-views + (:require + [views.db.core :refer [initial-views]] + [views.subscribed-views :refer [SubscribedViews subscriber-key-fn prefix-fn send-message]] + [views.subscriptions :as vs :refer [add-subscriptions!]] + [clojure.tools.logging :refer [debug info warn error]] + [clojure.core.async :refer [put! > (initial-views db view-sigs templates @vs/compiled-views) + (send-message this subscriber-key)))))) + + (unsubscribe-views [this unsub-req]) + + (disconnect [this disconnect-req]) + + (subscribed-views [this] @vs/compiled-views) + + (broadcast-deltas [this fdb views-with-deltas]) + + (send-message [this address msg] + (warn "IMPLEMENT ME. Got message " msg " sent to address " address)) + + (subscriber-key-fn [this msg] (:subscriber-key msg)) + + (prefix-fn [this msg] nil)) diff --git a/src/views/db.clj b/src/views/db.clj new file mode 100644 index 0000000..092e57a --- /dev/null +++ b/src/views/db.clj @@ -0,0 +1,479 @@ +(ns views.db + (:import + [java.sql SQLException BatchUpdateException] + [org.postgresql.util PSQLException]) + (:require + [clojure.string :refer [trim split]] + [honeysql.core :as hsql] + [honeysql.helpers :as hh] + [honeysql.format :as fmt] + [honeysql.types :as ht] + [clojure.java.jdbc :as j] + [clojure.tools.logging :refer [debug]] + [views.honeysql :as vh] + [views.subscribed-views :refer [get-subscribed-views broadcast-deltas]])) + +(defn get-primary-key + "Get a primary key for a table." + [schema table] + (or + (keyword (get-in schema [(name table) :primary-key :column_name])) + (throw (Exception. (str "Cannot find primary key for table: " table))))) + +;; +;; Takes the HoneySQL template for a view and the arglist +;; and compiles the view with a set of dummy args in the +;; format +;; [?1, ?2, ?3 ... ?N] +;; +;; Returns a map of the compiled hash-map and the args +;; with keys :dummy-view and :dummy-args respectively. +;; +(defn- compile-dummy-view + [view-template args] + (let [dummy-args (take (count args) (range)) + dummy-args (map #(str "?" %) dummy-args)] + {:dummy-view (apply view-template dummy-args) + :dummy-args dummy-args})) + +;; +;; Terminology and data structures used throughout this code +;; +;; -template - refers to a function which receives parameters +;; and returns a HoneySQL hash-map with params interpolated. +;; +;; action - describes the HoneySQL hash-map for the action to be performed +;; --the template function has already been called and returned this +;; with the appropriate parameter arguments. +;; +;; view-map - contains a set of computed information for each view itself. +;; Refer to the view-map doc-string for more information. +;; +;; view-check - SQL for checking whether or not a view needs to receive deltas +;; upon completion of an operation. +;; + +(defn view-map + "Constructs a view map from a HoneySQL view function and its arguments. + Contains four fields: + :view - the hash-map with interpolated parameters + :view-sig - the \"signature\" for the view, i.e. [:matter 1] + :args - the arguments passed in. + :tables - the tables present in all :from, :insert-into, + :update, :delete-from, :join, :left-join :right-join clauses + + Input is a view template function and a view signature. The template + function must take the same number of paramters as the signature and + return a honeysql data structure " + [view-template view-sig] + (let [compiled-view (if (> (count view-sig) 1) + (apply view-template (rest view-sig)) + (view-template))] + (merge {:args (rest view-sig) + :view-sig view-sig + :view compiled-view + :tables (set (vh/extract-tables compiled-view))} + (compile-dummy-view view-template (rest view-sig))))) + +(defn view-sig->view-map + "Takes a map of sig keys to view template function vars (templates) + and a view signature (view-sig the key for the template map and its args) + and returns a view-map for that view-sig." + [templates view-sig] + (let [lookup (first view-sig)] + (view-map (get-in templates [lookup :fn]) view-sig))) + +(defn swap-out-dummy-for-pos + "Replaces dummy arg like \"?0\" for integer value (0) so we can sort args." + [dummy-arg] + (Integer. (subs dummy-arg 1))) + +;; Helper for determine-filter-clauses (which is a helper +;; for view-check-template). Extracts constituent parts from +;; where clause. +(defn set-filter-clauses + [dummy-args fc w] + (if (= w :and) + fc + (if (contains? (set dummy-args) (last w)) + (update-in fc [:s] assoc (swap-out-dummy-for-pos (last w)) (second w)) + (update-in fc [:w] (fnil conj []) w)))) + +;; Helper for generating the view-check HoneySQL template. +;; Builds the where and select clauses up from constituent +;; where-clauses. Placeholder identifies the parameters +;; to pull out into the select clause. +(defn determine-filter-clauses + [wc dummy-args] + (let [fc {:s {} :w nil} + fc (if (and (not= :and (first wc)) (not (coll? (first wc)))) + (set-filter-clauses dummy-args fc wc) + (reduce #(set-filter-clauses dummy-args %1 %2) fc wc))] + (-> fc + (update-in [:s] #(into [] (vals (sort-by key %)))) + (update-in [:w] #(vh/with-op :and %))))) + +(defn append-arg-map + "Removes table/alias namespacing from select fields and creates a hash-map + of field to arguments for checking this view against checked-results later on. + Note that this assumes our select-fields are in the same order as they + are present in the :args view-map field (which they should be)." + [view-map select-fields] + (let [select-fields (map #(-> % name (split #"\.") last keyword) select-fields)] + (assoc view-map :arg-compare (zipmap select-fields (into [] (:args view-map)))))) + +(defn- create-view-delta-where-clauses + [view-map action] + (let [action-table (first (vh/extract-tables action))] + (for [view-table (vh/find-table-aliases action-table (:tables view-map))] + (-> (:where action) + (vh/prefix-columns (vh/table-alias view-table)) + (vh/replace-table (vh/table-alias action-table) (vh/table-alias view-table)))))) + +(defn format-action-wc-for-view + "Takes view-map and action (HoneySQL hash-map for insert/update/delete), + extracts where clause from action, and formats it with the proper + alias (or no alias) so that it will work when applied to the view SQL." + [view-map action] + (if (:where action) + (let [preds (create-view-delta-where-clauses view-map action)] + (if (> (count preds) 1) + (into [:or] preds) + (first preds))))) + +(defn- update-where-clause + [hh-spec where] + (if-let [w (:where where)] + (assoc hh-spec :where w) + (dissoc hh-spec :where))) + +(defn view-check-template + "Receives a view-map and an action (insert/update/delete HoneySQL hash-map). + Returns a HoneySQL hash-map which will can be formatted as SQL to check if a + view needs to receive deltas for the action SQL." + [view-map action] + (let [{:keys [dummy-view dummy-args]} view-map + fc (determine-filter-clauses (:where dummy-view) dummy-args) + action-wc (format-action-wc-for-view view-map action) + view-map (append-arg-map view-map (:s fc))] ; we need this to compare *after* the check is run + (->> (-> dummy-view + (update-where-clause (vh/merge-where-clauses action-wc (:w fc))) + (merge (apply hh/select (:s fc)))) + (hash-map :view-map view-map :view-check)))) + +(defn prepare-checks-for-view-deltas + "Checks to see if an action has tables related to a view, and + if so builds the HoneySQL hash-map for the SQL needed. + Uses this hash-map as a key and conj's the view-map to the key's + value so as to avoid redundant delta-check querying." + [action confirmed-views view-map] + ;; Confirm if any of the tables in view-map are present in the action template: + (if (some (set (map first (vh/extract-tables action))) + (map first (:tables view-map))) + + ;; Then construct the check-template for this particular view. + (if-let [{:keys [view-check view-map]} (view-check-template view-map action)] + ;; We then use the view-check as an index and conj the + ;; view-map to it so as to avoid redundant checks. + (update-in confirmed-views [view-check] #(conj % view-map)) + confirmed-views) + confirmed-views)) + +(defn prepare-view-checks + "Prepares checks for a collection of views (view-maps) against a HoneySQL action + (insert/update/delete) hash-map. + + Returns a structure like so: + {{> views + (map #(check-view-args checked-results %)) + (remove nil?) + distinct)) + +(defn- do-view-pre-check + [db needs-deltas view-check] + ;; + ;; We have empty-select? if we have a view with no where predicate clauses-- + ;; so it will always require deltas if there are matching tables. + ;; + ;; empty-where comes about if we are inserting--we don't have any where predicate + ;; in the insert, of course, so we can't perform pre-checks reliably. + ;; When we do an insert we have to simply do the delta query regardless, for now. + ;; + (let [empty-select? (seq (remove nil? (:select (first view-check)))) + empty-where? (seq (remove #(or (nil? %) (= :and %)) (:where (first view-check))))] + (if (or (not empty-select?) (not empty-where?)) + (apply conj needs-deltas (last view-check)) ;; put them all in if we can't do pre-check. + (let [checked-results (do-check db (first view-check)) + ;; checks view args against checked result set + checked-views (check-all-view-args checked-results (last view-check))] + (if (seq checked-views) + (apply conj needs-deltas checked-views) + needs-deltas))))) + +(defn do-view-pre-checks + "Takes db, all views (view-maps) and the HoneySQL action (insert/update/delete) + hash-map. Returns view-maps for all the views which need to receive + delta updates after the action is performed. + + *This function should be called within a transaction before performing the + insert/update/delete action.*" + [db all-views action] + (let [view-checks (prepare-view-checks all-views action)] + (reduce #(do-view-pre-check db %1 %2) [] view-checks))) + +(defn- calculate-delete-deltas + [db view-map] + (->> (:delete-deltas-map view-map) + hsql/format + (j/query db) + (assoc view-map :delete-deltas))) + +;; ------------------------------------------------------------------------------- +;; Handle inserts +;; ------------------------------------------------------------------------------- + +(defn compute-delete-deltas-for-insert + "Computes and returns a sequence of delete deltas for a single view and insert." + [schema db view-map table record] + (if (vh/outer-join-table? (:view view-map) table) + (let [delta-q (vh/outer-join-delta-query schema (:view view-map) table record)] + (j/query db (hsql/format delta-q))) + [])) + +(defn primary-key-predicate + "Return a predicate for a where clause that constrains to the primary key of + the record." + [schema table record] + (let [pkey (get-primary-key schema table)] + [:= pkey (pkey record)])) + +(defn compute-insert-deltas-for-insert + [schema db view-map table record] + (let [pkey-pred (primary-key-predicate schema table record) + action (hsql/build :insert-into table :values [record] :where pkey-pred) + insert-delta-wc (format-action-wc-for-view view-map action) + view (:view view-map) + insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %)))] + (j/query db (hsql/format insert-delta-map)))) + +(defn compute-insert-delete-deltas-for-views + [schema db views table record] + (doall (map #(compute-delete-deltas-for-insert schema db % table record) views))) + +(defn compute-insert-insert-deltas-for-views + [schema db views table record] + (doall (map #(compute-insert-deltas-for-insert schema db % table record) views))) + +(defn compute-deltas-for-insert + "This takes a *single* insert and a view, applies the insert and computes + the view deltas." + [schema db views table record] + (let [deletes (compute-insert-delete-deltas-for-views schema db views table record) + record* (first (j/insert! db table record)) + inserts (compute-insert-insert-deltas-for-views schema db views table record*)] + {:views-with-deltas (doall (map #(assoc %1 :delete-deltas %2 :insert-deltas %3) views deletes inserts)) + :result record*})) + +;; Handles insert and calculation of insert (after insert) delta. +(defn- insert-and-append-deltas! + [schema db views action table pkey] + (let [table (:insert-into action)] + (reduce + #(-> %1 + (update-in [:views-with-deltas] into (:views-with-deltas %2)) + (update-in [:result-set] conj (:result %2))) + {:views-with-deltas [] :result-set []} + (map #(compute-deltas-for-insert schema db views table %) (:values action))))) + +;; ------------------------------------------------------------------------------- + +;; This is for insert deltas for non-insert updates. + +;;; Takes the HoneySQL map (at key :view) from the view-map and appends +;;; the appropriately-table-namespaced where clause which limits the +;;; view query to the previously inserted or updated records. +(defn- calculate-insert-deltas + [db action pkey-wc view-map] + (let [action (assoc action :where pkey-wc) + insert-delta-wc (format-action-wc-for-view view-map action) + view (:view view-map) + insert-delta-map (update-in view [:where] #(:where (vh/merge-where-clauses insert-delta-wc %))) + deltas (j/query db (hsql/format insert-delta-map))] + (if (seq deltas) + (update-in view-map [:insert-deltas] #(apply conj % deltas)) + view-map))) + +;; Helper to query the action's table for primary key and pull it out. +(defn- get-action-row-key + [db pkey table action] + (->> (:where action) + (hsql/build :select pkey :from table :where) + hsql/format + (j/query db) + first pkey)) + +;; Handles update and calculation of delete (before update) and insert (after update) deltas. +(defn- update-and-append-deltas! + [db views action table pkey] + (let [views-pre (doall (map #(calculate-delete-deltas db %) views)) + pkey-val (get-action-row-key db pkey table action) + update (j/execute! db (hsql/format action))] + {:views-with-deltas (doall (map #(calculate-insert-deltas db action [:= pkey pkey-val] %) views-pre)) + :result-set update})) + +;; Handles deletion and calculation of delete (before update) delta. +(defn- delete-and-append-deltas! + [db views action table pkey] + (let [views-pre (doall (map #(calculate-delete-deltas db %) views))] + {:views-with-deltas views-pre + :result-set (j/execute! db (hsql/format action))})) + +;; Identifies which action--insert, update or delete--we are performing and dispatches appropriately. +;; Returns view-map with appropriate deltas appended. +(defn- perform-action-and-return-deltas + [schema db views action table pkey] + (cond + (:insert-into action) (insert-and-append-deltas! schema db views action table pkey) + (:update action) (update-and-append-deltas! db views action table pkey) + (:delete-from action) (delete-and-append-deltas! db views action table pkey) + :else (throw (Exception. "Received malformed action: " action)))) + +(defn generate-view-delta-map + "Adds a HoneySQL hash-map for the delta-calculation specific to the view + action. + Takes a view-map and the action HoneySQL hash-map, and appends the action's + where clause to the view's where clause, and adds in new field :insert-deltas-map." + [view-map action] + (let [action-wc (format-action-wc-for-view view-map action) + view (:view view-map)] + (->> (update-in view [:where] #(:where (vh/merge-where-clauses action-wc %))) + (assoc view-map :delete-deltas-map)))) + + +(defn do-view-transaction + "Takes the following arguments: + schema - from edl.core/defschema + db - clojure.java.jdbc database connection + all-views - the current set of views (view-maps--see view-map fn docstring for + description) in memory for the database + action - the HoneySQL pre-SQL hash-map with parameters already interpolated. + + The function will then perform the following sequence of actions, all run + within a transaction (with isolation serializable): + + 1) Create pre-check SQL for each view in the list. + 2) Run the pre-check SQL (or fail out based on some simple heuristics) to + identify if we want to send delta messages to the view's subscribers + (Note: this happens after the database action for *inserts only*). + 3) Run the database action (insert/action/delete). + 4) Calculate deltas based on the method described in section 5.4, \"Rule Generation\" + of the paper \"Deriving Production Rules for Incremental Rule Maintenance\" + by Stefano Ceri and Jennifer Widom (http://ilpubs.stanford.edu:8090/8/1/1991-4.pdf) + + The function returns the views which received delta updates with the deltas + keyed to each view-map at the keys :insert-deltas and :delete-deltas." + [schema db all-views action] + ;; Every update connected with a view is done in a transaction: + (j/with-db-transaction [t db :isolation :serializable] + (let [need-deltas (do-view-pre-checks t all-views action) + need-deltas (map #(generate-view-delta-map % action) need-deltas) + table (-> action vh/extract-tables ffirst) + pkey (get-primary-key schema table)] + (perform-action-and-return-deltas schema t need-deltas action table pkey)))) + +;; +;; Need to catch this and retry: +;; java.sql.SQLException: ERROR: could not serialize access due to concurrent update +;; +(defn get-nested-exceptions* + [exceptions e] + (if-let [next-e (.getNextException e)] + (recur (conj exceptions next-e) next-e) + exceptions)) + +(defn get-nested-exceptions + [e] + (get-nested-exceptions* [e] e)) + +(defn do-transaction-fn-with-retries + [transaction-fn] + (try + (transaction-fn) + (catch SQLException e + ;; http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html + (debug "Caught exception with error code: " (.getSQLState e)) + (debug "Exception message: " (.getMessage e)) + ;; (debug "stack trace message: " (.printStackTrace e)) + (if (some #(= (.getSQLState %) "40001") (get-nested-exceptions e)) + (do-transaction-fn-with-retries transaction-fn) ;; try it again + (throw e))))) ;; otherwise rethrow + +(defmacro with-view-transaction + [subscribed-views binding & forms] + (let [tvar (first binding)] + `(if (:deltas ~(second binding)) ;; check if we are in a nested transaction + (let [~tvar ~(second binding)] ~@forms) + (do-transaction-fn-with-retries + (fn [] + (let [deltas# (atom []) + result# (j/with-db-transaction [t# ~(second binding) :isolation :serializable] + (let [~tvar (assoc t# :deltas deltas#)] + ~@forms))] + (broadcast-deltas ~subscribed-views ~(second binding) @deltas#) + result#)))))) + +(defn vaction! + "Used to perform arbitrary insert/update/delete actions on the database, + while ensuring that view deltas are appropriately checked and calculated + for the currently registered views as reported by a type implementing + the SubscribedViews protocol. + + Arguments are: + + - schema: an edl schema (\"(defschema my-schema ...)\") + + - db: a clojure.java.jdbc database + + - action-map: the HoneySQL map for the insert/update/delete action + + - subscribed-views: an implementation of SubscribedViews implementing + the follow functions: + + - get-subscribed-views takes a database connection. It should return + a collection of view-maps. + + - send-deltas takes a db connection, and the views which have had deltas + calculate for them and associated with the hash-maps (appropriately + called views-with-deltas)." + [schema db action-map subscribed-views] + (let [subbed-views (get-subscribed-views subscribed-views db) + transaction-fn #(do-view-transaction schema db subbed-views action-map)] + (if-let [deltas (:deltas db)] ;; inside a transaction we just collect deltas and do not retry + (let [{:keys [views-with-deltas result-set]} (transaction-fn)] + (swap! deltas into views-with-deltas) + result-set) + (let [{:keys [views-with-deltas result-set]} (do-transaction-fn-with-retries transaction-fn)] + (broadcast-deltas subscribed-views db views-with-deltas) + result-set)))) diff --git a/src/views/db/core.clj b/src/views/db/core.clj new file mode 100644 index 0000000..5bb635f --- /dev/null +++ b/src/views/db/core.clj @@ -0,0 +1,32 @@ +(ns views.db.core + (:require + [clojure.java.jdbc :as j] + [honeysql.core :as hsql])) + +(defn view-query + "Takes db and query-fn (compiled HoneySQL hash-map) + and runs the query, returning results." + [db query-map] + (j/query db (hsql/format query-map))) + +(defn post-process-result-set + [nv templates result-set] + (if-let [post-fn (get-in templates [(first nv) :post-fn])] + (mapv post-fn result-set) + result-set)) + +(defn initial-views + "Takes a db spec, the new views sigs (new-views) we want to produce result-sets for, + the template config map, and the subscribed-views themselves (with compiled view-maps) + and returns a result-set for the new-views with post-fn functions applied to the data." + [db new-views templates subscribed-views] + (reduce + (fn [results nv] + (->> (get subscribed-views nv) + :view-map + (view-query db) + (into []) + (post-process-result-set nv templates) + (assoc results nv))) + {} + new-views)) diff --git a/src/views/deltas.clj b/src/views/deltas.clj new file mode 100644 index 0000000..6e32c00 --- /dev/null +++ b/src/views/deltas.clj @@ -0,0 +1,39 @@ +(ns view.deltas) + +;; ;; This preprocesses a batch of view deltas. They get packaged into batches for each +;; ;; subscriber in send-delta. +;; (defn preprocess-delta +;; "Returns a pair [firm_id, deltas]." +;; [fdb templates v] +;; (let [insert-deltas (post-process-result-sets templates [[(:view-sig v) (:insert-deltas v)]]) +;; delete-deltas (post-process-result-sets templates [[(:view-sig v) (:delete-deltas v)]]) +;; deltas [(:view-sig v) {:insert-deltas (seq (last (last insert-deltas))) +;; :delete-deltas (seq (last (last delete-deltas)))}]] +;; [(:fid fdb) deltas])) + +;; (defn build-delta-batch +;; [fdb templates views-with-deltas] +;; (doall (map #(preprocess-delta fdb templates %) views-with-deltas))) + + +;; The following creates delta batches for each subscriber. + +;; If we have a batch of deltas: +;; - there are multiple views +;; - each view has a list of clients +;; - produce client -> deltas + +(defn add-message + [msg batches subscriber] + (debug "DELTAs to S: " msg " | " subscriber) + (update-in batches [subscriber] (fnil conj []) msg)) + +(defn build-messages + [delta-batch] + (loop [batches {}, deltas (seq delta-batch)] + (if-not deltas + batches + (let [[firm_id [view-sig delta]] (first deltas) + subscribed (get-in @subscribed-views [firm_id view-sig :sessions])] + (debug "\n\nSUBSCRIBED: " subscribed) + (recur (reduce #(add-message {view-sig delta} %1 %2) batches subscribed) (next deltas)))))) diff --git a/src/views/filters.clj b/src/views/filters.clj new file mode 100644 index 0000000..a63c67a --- /dev/null +++ b/src/views/filters.clj @@ -0,0 +1,34 @@ +(ns views.filters) + +(defn view-filter + "Takes a subscription request msg, a collection of view-sigs and + the config templates hash-map for an app. Checks if there is + a global filter-fn in the hash-map metadata and checks against + that if it exists, as well as against any existing filter + functions for individual template config entries. Template + config hash-map entries can specify a filter-fn using the key + :filter-fn, and the global filter-fn is the same, only on + the config meta-data (i.e. (with-meta templates {:filter-fn ...})) + + By default throws an exception if no filters are present. + By passing in {:unsafe true} in opts, this can be overridden." + [msg view-sigs templates & opts] + (let [global-filter-fn (:filter-fn (meta templates))] + (filterv + #(let [filter-fn (:filter-fn (get templates (first %)))] + (cond + (and filter-fn global-filter-fn) + (and (global-filter-fn msg %) (filter-fn msg %)) + + filter-fn + (filter-fn msg %) + + global-filter-fn + (global-filter-fn msg %) + + :else + (if (-> opts first :unsafe) + (do (warn "YOU ARE RUNNING IN UNSAFE MODE, AND NO FILTERS ARE PRESENT FOR VIEW-SIG: " %) + true) + (throw (Exception. (str "No filter set for view " %)))))) + view-sigs))) diff --git a/src/views/honeysql.clj b/src/views/honeysql.clj new file mode 100644 index 0000000..d3e9460 --- /dev/null +++ b/src/views/honeysql.clj @@ -0,0 +1,148 @@ +(ns views.honeysql + (:require + [honeysql.core :as hsql] + [honeysql.helpers :as hh] + [clojure.string :refer [split]])) + +(def table-clauses + [:from :insert-into :update :delete-from :join :left-join :right-join]) + +(def pred-ops + #{:= :< :> :<> :>= :<= :in :between :match :ltree-match :and :or :not=}) + +(defn process-complex-clause + [tables clause] + (reduce + #(if (coll? %2) + (if (some pred-ops [(first %2)]) + %1 + (conj %1 %2)) + (conj %1 [%2])) + tables + clause)) + +(defn extract-tables* + [tables clause] + (if clause + (if (coll? clause) + (process-complex-clause tables clause) + (conj tables [clause])) + tables)) + +(defn with-op + "Takes a collection of things and returns either an nary op of them, or + the item in the collection if there is only one." + [op coll] + (if (> (count coll) 1) (into [op] coll) (first coll))) + +(defn extract-tables + "Extracts a set of table vector from a HoneySQL spec hash-map. + Each vector either contains a single table keyword, or the + table keyword and an alias keyword." + ([hh-spec] (extract-tables hh-spec table-clauses)) + ([hh-spec clauses] (reduce #(extract-tables* %1 (%2 hh-spec)) #{} clauses))) + +(defn find-table-aliases + "Returns the table alias for the supplied table." + [action-table tables] + (filter #(= (first action-table) (first %)) tables)) + +(defn outer-join-table? + "Return true if table is used in an outer join in the honeysql expression." + [hh-spec table] + (let [tables (map first (extract-tables hh-spec [:left-join :right-join]))] + (boolean (some #(= table %) tables)))) + +(defn prefix-column + "Prefixes a column with an alias." + [column alias] + (keyword (str (name alias) \. (name column)))) + +(defn create-null-constraints + "Create 'is null' constraints for all the columns of a table." + [schema table table-alias] + (let [columns (map #(prefix-column % table-alias) (keys (:columns (get schema (name table)))))] + (into [:and] (for [c columns] [:= c nil])))) + +(defn table-alias + "Returns the name of a table or its alias. E.g. for [:table :t] returns :t." + [table] + (if (keyword? table) table (last table))) + +(defn table-name + "Returns the name of a table . E.g. for [:table :t] returns :table." + [table] + (if (keyword? table) table (first table))) + +(defn table-column + "Assumes that table columns are keywords of the form :table.column. Returns + the column as a keyword or nil if the supplied keyword doesn't match the pattern." + [table item] + (let [s (name item), t (str (name table) \.)] + (if (.startsWith s t) (keyword (subs s (count t)))))) + +(defn modified-outer-join-predicate + "Returns an outer join predicate with the join tables columns subistituted + with values from a record." + [table predicate record] + (if-let [column (and (keyword? predicate) (table-column table predicate))] + (or (get record column) + (throw (Exception. (str "No value for column " column " in " record)))) + (if (vector? predicate) + (apply vector (map #(modified-outer-join-predicate table % record) predicate)) + predicate))) + +(defn find-outer-joins + "Find and return all the outer joins on a given table." + [hh-spec table] + (->> (concat (:left-join hh-spec) (:right-join hh-spec)) + (partition 2 2) + (filter #(= table (table-name (first %)))))) + +(defn- create-outer-join-predicates + "Create outer join predicate from a record and joins." + [schema table record joins] + (->> joins + (map (fn [[table-spec join-pred]] + [:and + (modified-outer-join-predicate (table-alias table-spec) join-pred record) + (create-null-constraints schema table (table-alias table-spec))])) + (with-op :or))) + +(defn outer-join-delta-query + "Create an outer join delta query given a honeysql template and record" + [schema hh-spec table record] + (let [join-tables (find-outer-joins hh-spec table) + join-pred (create-outer-join-predicates schema table record join-tables)] + (assert (not (nil? join-pred))) + (update-in hh-spec [:where] #(vector :and % join-pred)))) + +(defn merge-where-clauses + "Takes two where clauses from two different HoneySQL maps and joins then with an and. + If one is nil, returns only the non-nil where clause." + [wc1 wc2] + (if (and wc1 wc2) + (hh/where wc1 wc2) + (hh/where (or wc1 wc2)))) + +(defn replace-table + "Replace all instances of table name t1 pred with t2." + [pred t1 t2] + (if-let [column (and (keyword? pred) (table-column t1 pred))] + (keyword (str (name t2) \. (name column))) + (if (coll? pred) + (map #(replace-table % t1 t2) pred) + pred))) + +(defn unprefixed-column? + [c] + (and (keyword? c) (not (pred-ops c)) (neg? (.indexOf (name c) (int \.))))) + +(defn prefix-columns + "Prefix all unprefixed columns with table." + [pred table] + (if (unprefixed-column? pred) + (keyword (str (name table) \. (name pred))) + (if (coll? pred) + (map #(prefix-columns % table) pred) + pred))) diff --git a/src/views/router.clj b/src/views/router.clj new file mode 100644 index 0000000..af18fe3 --- /dev/null +++ b/src/views/router.clj @@ -0,0 +1,45 @@ +(ns views.router + (:require + [views.subscribed-views + :refer [subscribe-views unsubscribe-views disconnect get-delta-broadcast-channel send-delta]] + [clojure.core.async :refer [go go-loop chan pub sub unsub close! >! >!! (get views-rs [:users]) first :name) + (-> users first :name upper-case))))) diff --git a/test/views/fixtures.clj b/test/views/fixtures.clj new file mode 100644 index 0000000..038028c --- /dev/null +++ b/test/views/fixtures.clj @@ -0,0 +1,54 @@ +(ns views.fixtures + (:require + [environ.core :as e] + [clojure.java.jdbc :as j] + [honeysql.core :as hsql] + [clojure.data.generators :as dg])) + +;; CREATE ROLE views_user LOGIN PASSWORD 'password'; +;; CREATE DATABASE views_test OWNER views_user; + +(defn sql-ts + ([ts] (java.sql.Timestamp. ts)) + ([] (java.sql.Timestamp. (.getTime (java.util.Date.))))) + +(def db {:classname "org.postgresql.Driver" + :subprotocol "postgresql" + :subname (get :views-test-db e/env "//localhost/views_test") + :user (get :views-test-user e/env "views_user") + :password (get :views-test-ppassword e/env "password")}) + +(defn users-table-fixture! + [] + (j/execute! db ["CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT NOT NULL, created_on DATE NOT NULL)"])) + +(defn posts-table-fixture! + [] + (j/execute! db ["CREATE TABLE posts (id SERIAL PRIMARY KEY, + title TEXT NOT NULL, + body TEXT NOT NULL, + created_on DATE NOT NULL, + user_id INTEGER NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id))"])) + +(defn drop-tables! + [tables] + (doseq [t tables] + (j/execute! db [(str "DROP TABLE " (name t))]))) + +(defn database-fixtures! + [f] + (users-table-fixture!) + (posts-table-fixture!) + (f) + (drop-tables! [:posts :users])) + +(defn user-fixture! + [name] + (j/execute! db (hsql/format (hsql/build :insert-into :users :values [{:name name :created_on (sql-ts)}])))) + +(defn gen-n-users! + [n] + (dotimes [n n] + (user-fixture! (dg/string #(rand-nth (seq "abcdefghijklmnopqrstuwvxyz"))))) + (j/query db ["SELECT * FROM users"])) diff --git a/test/views/subscriptions_test.clj b/test/views/subscriptions_test.clj new file mode 100644 index 0000000..3702b98 --- /dev/null +++ b/test/views/subscriptions_test.clj @@ -0,0 +1,59 @@ +(ns views.subscriptions-test + (:require + [clojure.test :refer [use-fixtures deftest is]] + [views.subscriptions :as vs])) + +(defn- reset-subscribed-views! + [f] + (reset! vs/subscribed-views {}) + (f)) + +(use-fixtures :each reset-subscribed-views!) + +(deftest adds-a-subscription + (let [key 1, view-sig [:view 1]] + (vs/add-subscription! key view-sig) + (is (vs/subscribed-to? key view-sig)))) + +(deftest can-use-prefix + (let [prefix1 1, prefix2 2, key 1, view-sig [:view 1]] + (vs/add-subscription! key view-sig prefix1) + (vs/add-subscription! key view-sig prefix2) + (is (vs/subscribed-to? key view-sig prefix1)) + (is (vs/subscribed-to? key view-sig prefix2)))) + +(deftest removes-a-subscription + (let [key 1, view-sig [:view 1]] + (vs/add-subscription! key view-sig) + (vs/remove-subscription! key view-sig) + (is (not (vs/subscribed-to? key view-sig))))) + +(deftest doesnt-fail-or-create-view-entry-when-empty + (vs/remove-subscription! 1 [:view 1]) + (is (= {} @vs/subscribed-views))) + +(deftest removes-a-subscription-with-prefix + (let [prefix1 1, prefix2 2, key 1, view-sig [:view 1]] + (vs/add-subscription! key view-sig prefix1) + (vs/add-subscription! key view-sig prefix2) + (vs/remove-subscription! key view-sig prefix1) + (is (not (vs/subscribed-to? key view-sig prefix1))) + (is (vs/subscribed-to? key view-sig prefix2)))) + +(deftest removes-unsubscribed-to-view-from-subscribed-views + (let [key 1, view-sig [:view 1]] + (vs/add-subscription! key view-sig) + (vs/remove-subscription! key view-sig) + (is (= {} @vs/subscribed-views)))) + +(deftest adds-multiple-views-at-a-time + (let [key 1, view-sigs [[:view 1] [:view 2]]] + (vs/add-subscriptions! key view-sigs) + (is (vs/subscribed-to? key (first view-sigs))) + (is (vs/subscribed-to? key (last view-sigs))))) + +;; (deftest subscribing-compiles-and-stores-view-maps +;; (let [key 1, view-sig [:view 1]] +;; (vs/add-subscriptions! key view-sigs) +;; (is (vs/subscribed-to? key (first view-sigs))) +;; (is (vs/subscribed-to? key (last view-sigs)))))