Compare commits
No commits in common. "master" and "v2" have entirely different histories.
16
.gitignore
vendored
16
.gitignore
vendored
|
@ -1,18 +1,12 @@
|
|||
.DS_Store
|
||||
/target
|
||||
/classes
|
||||
/checkouts
|
||||
/out
|
||||
pom.xml
|
||||
pom.xml.asc
|
||||
*.jar
|
||||
*.class
|
||||
.lein-*
|
||||
.nrepl-port
|
||||
/*.project
|
||||
/*.classpath
|
||||
/.settings/
|
||||
*.iml
|
||||
*.ipr
|
||||
*.iws
|
||||
.idea
|
||||
/.lein-*
|
||||
/.nrepl-port
|
||||
*~
|
||||
*.bk
|
||||
.idea
|
||||
|
|
227
LICENSE
227
LICENSE
|
@ -1,21 +1,214 @@
|
|||
The MIT License (MIT)
|
||||
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC
|
||||
LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM
|
||||
CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
|
||||
|
||||
Copyright (c) 2015 Kira Inc.
|
||||
1. DEFINITIONS
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
"Contribution" means:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
a) in the case of the initial Contributor, the initial code and
|
||||
documentation distributed under this Agreement, and
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
b) in the case of each subsequent Contributor:
|
||||
|
||||
i) changes to the Program, and
|
||||
|
||||
ii) additions to the Program;
|
||||
|
||||
where such changes and/or additions to the Program originate from and are
|
||||
distributed by that particular Contributor. A Contribution 'originates' from
|
||||
a Contributor if it was added to the Program by such Contributor itself or
|
||||
anyone acting on such Contributor's behalf. Contributions do not include
|
||||
additions to the Program which: (i) are separate modules of software
|
||||
distributed in conjunction with the Program under their own license
|
||||
agreement, and (ii) are not derivative works of the Program.
|
||||
|
||||
"Contributor" means any person or entity that distributes the Program.
|
||||
|
||||
"Licensed Patents" mean patent claims licensable by a Contributor which are
|
||||
necessarily infringed by the use or sale of its Contribution alone or when
|
||||
combined with the Program.
|
||||
|
||||
"Program" means the Contributions distributed in accordance with this
|
||||
Agreement.
|
||||
|
||||
"Recipient" means anyone who receives the Program under this Agreement,
|
||||
including all Contributors.
|
||||
|
||||
2. GRANT OF RIGHTS
|
||||
|
||||
a) Subject to the terms of this Agreement, each Contributor hereby grants
|
||||
Recipient a non-exclusive, worldwide, royalty-free copyright license to
|
||||
reproduce, prepare derivative works of, publicly display, publicly perform,
|
||||
distribute and sublicense the Contribution of such Contributor, if any, and
|
||||
such derivative works, in source code and object code form.
|
||||
|
||||
b) Subject to the terms of this Agreement, each Contributor hereby grants
|
||||
Recipient a non-exclusive, worldwide, royalty-free patent license under
|
||||
Licensed Patents to make, use, sell, offer to sell, import and otherwise
|
||||
transfer the Contribution of such Contributor, if any, in source code and
|
||||
object code form. This patent license shall apply to the combination of the
|
||||
Contribution and the Program if, at the time the Contribution is added by the
|
||||
Contributor, such addition of the Contribution causes such combination to be
|
||||
covered by the Licensed Patents. The patent license shall not apply to any
|
||||
other combinations which include the Contribution. No hardware per se is
|
||||
licensed hereunder.
|
||||
|
||||
c) Recipient understands that although each Contributor grants the licenses
|
||||
to its Contributions set forth herein, no assurances are provided by any
|
||||
Contributor that the Program does not infringe the patent or other
|
||||
intellectual property rights of any other entity. Each Contributor disclaims
|
||||
any liability to Recipient for claims brought by any other entity based on
|
||||
infringement of intellectual property rights or otherwise. As a condition to
|
||||
exercising the rights and licenses granted hereunder, each Recipient hereby
|
||||
assumes sole responsibility to secure any other intellectual property rights
|
||||
needed, if any. For example, if a third party patent license is required to
|
||||
allow Recipient to distribute the Program, it is Recipient's responsibility
|
||||
to acquire that license before distributing the Program.
|
||||
|
||||
d) Each Contributor represents that to its knowledge it has sufficient
|
||||
copyright rights in its Contribution, if any, to grant the copyright license
|
||||
set forth in this Agreement.
|
||||
|
||||
3. REQUIREMENTS
|
||||
|
||||
A Contributor may choose to distribute the Program in object code form under
|
||||
its own license agreement, provided that:
|
||||
|
||||
a) it complies with the terms and conditions of this Agreement; and
|
||||
|
||||
b) its license agreement:
|
||||
|
||||
i) effectively disclaims on behalf of all Contributors all warranties and
|
||||
conditions, express and implied, including warranties or conditions of title
|
||||
and non-infringement, and implied warranties or conditions of merchantability
|
||||
and fitness for a particular purpose;
|
||||
|
||||
ii) effectively excludes on behalf of all Contributors all liability for
|
||||
damages, including direct, indirect, special, incidental and consequential
|
||||
damages, such as lost profits;
|
||||
|
||||
iii) states that any provisions which differ from this Agreement are offered
|
||||
by that Contributor alone and not by any other party; and
|
||||
|
||||
iv) states that source code for the Program is available from such
|
||||
Contributor, and informs licensees how to obtain it in a reasonable manner on
|
||||
or through a medium customarily used for software exchange.
|
||||
|
||||
When the Program is made available in source code form:
|
||||
|
||||
a) it must be made available under this Agreement; and
|
||||
|
||||
b) a copy of this Agreement must be included with each copy of the Program.
|
||||
|
||||
Contributors may not remove or alter any copyright notices contained within
|
||||
the Program.
|
||||
|
||||
Each Contributor must identify itself as the originator of its Contribution,
|
||||
if any, in a manner that reasonably allows subsequent Recipients to identify
|
||||
the originator of the Contribution.
|
||||
|
||||
4. COMMERCIAL DISTRIBUTION
|
||||
|
||||
Commercial distributors of software may accept certain responsibilities with
|
||||
respect to end users, business partners and the like. While this license is
|
||||
intended to facilitate the commercial use of the Program, the Contributor who
|
||||
includes the Program in a commercial product offering should do so in a
|
||||
manner which does not create potential liability for other Contributors.
|
||||
Therefore, if a Contributor includes the Program in a commercial product
|
||||
offering, such Contributor ("Commercial Contributor") hereby agrees to defend
|
||||
and indemnify every other Contributor ("Indemnified Contributor") against any
|
||||
losses, damages and costs (collectively "Losses") arising from claims,
|
||||
lawsuits and other legal actions brought by a third party against the
|
||||
Indemnified Contributor to the extent caused by the acts or omissions of such
|
||||
Commercial Contributor in connection with its distribution of the Program in
|
||||
a commercial product offering. The obligations in this section do not apply
|
||||
to any claims or Losses relating to any actual or alleged intellectual
|
||||
property infringement. In order to qualify, an Indemnified Contributor must:
|
||||
a) promptly notify the Commercial Contributor in writing of such claim, and
|
||||
b) allow the Commercial Contributor tocontrol, and cooperate with the
|
||||
Commercial Contributor in, the defense and any related settlement
|
||||
negotiations. The Indemnified Contributor may participate in any such claim
|
||||
at its own expense.
|
||||
|
||||
For example, a Contributor might include the Program in a commercial product
|
||||
offering, Product X. That Contributor is then a Commercial Contributor. If
|
||||
that Commercial Contributor then makes performance claims, or offers
|
||||
warranties related to Product X, those performance claims and warranties are
|
||||
such Commercial Contributor's responsibility alone. Under this section, the
|
||||
Commercial Contributor would have to defend claims against the other
|
||||
Contributors related to those performance claims and warranties, and if a
|
||||
court requires any other Contributor to pay any damages as a result, the
|
||||
Commercial Contributor must pay those damages.
|
||||
|
||||
5. NO WARRANTY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON
|
||||
AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
|
||||
EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR
|
||||
CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A
|
||||
PARTICULAR PURPOSE. Each Recipient is solely responsible for determining the
|
||||
appropriateness of using and distributing the Program and assumes all risks
|
||||
associated with its exercise of rights under this Agreement , including but
|
||||
not limited to the risks and costs of program errors, compliance with
|
||||
applicable laws, damage to or loss of data, programs or equipment, and
|
||||
unavailability or interruption of operations.
|
||||
|
||||
6. DISCLAIMER OF LIABILITY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY
|
||||
CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION
|
||||
LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
|
||||
EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY
|
||||
OF SUCH DAMAGES.
|
||||
|
||||
7. GENERAL
|
||||
|
||||
If any provision of this Agreement is invalid or unenforceable under
|
||||
applicable law, it shall not affect the validity or enforceability of the
|
||||
remainder of the terms of this Agreement, and without further action by the
|
||||
parties hereto, such provision shall be reformed to the minimum extent
|
||||
necessary to make such provision valid and enforceable.
|
||||
|
||||
If Recipient institutes patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Program itself
|
||||
(excluding combinations of the Program with other software or hardware)
|
||||
infringes such Recipient's patent(s), then such Recipient's rights granted
|
||||
under Section 2(b) shall terminate as of the date such litigation is filed.
|
||||
|
||||
All Recipient's rights under this Agreement shall terminate if it fails to
|
||||
comply with any of the material terms or conditions of this Agreement and
|
||||
does not cure such failure in a reasonable period of time after becoming
|
||||
aware of such noncompliance. If all Recipient's rights under this Agreement
|
||||
terminate, Recipient agrees to cease use and distribution of the Program as
|
||||
soon as reasonably practicable. However, Recipient's obligations under this
|
||||
Agreement and any licenses granted by Recipient relating to the Program shall
|
||||
continue and survive.
|
||||
|
||||
Everyone is permitted to copy and distribute copies of this Agreement, but in
|
||||
order to avoid inconsistency the Agreement is copyrighted and may only be
|
||||
modified in the following manner. The Agreement Steward reserves the right to
|
||||
publish new versions (including revisions) of this Agreement from time to
|
||||
time. No one other than the Agreement Steward has the right to modify this
|
||||
Agreement. The Eclipse Foundation is the initial Agreement Steward. The
|
||||
Eclipse Foundation may assign the responsibility to serve as the Agreement
|
||||
Steward to a suitable separate entity. Each new version of the Agreement will
|
||||
be given a distinguishing version number. The Program (including
|
||||
Contributions) may always be distributed subject to the version of the
|
||||
Agreement under which it was received. In addition, after a new version of
|
||||
the Agreement is published, Contributor may elect to distribute the Program
|
||||
(including its Contributions) under the new version. Except as expressly
|
||||
stated in Sections 2(a) and 2(b) above, Recipient receives no rights or
|
||||
licenses to the intellectual property of any Contributor under this
|
||||
Agreement, whether expressly, by implication, estoppel or otherwise. All
|
||||
rights in the Program not expressly granted under this Agreement are
|
||||
reserved.
|
||||
|
||||
This Agreement is governed by the laws of the State of Washington and the
|
||||
intellectual property laws of the United States of America. No party to this
|
||||
Agreement will bring a legal action under this Agreement more than one year
|
||||
after the cause of action arose. Each party waives its rights to a jury trial
|
||||
in any resulting litigation.
|
||||
|
|
734
README.md
734
README.md
|
@ -2,738 +2,26 @@
|
|||
|
||||
Eventually consistent external materialized views.
|
||||
|
||||
Also see these plugin libraries which allow you to use a views system
|
||||
in a number of really great ways in your applications:
|
||||
*Notice: despite being 1.x.x, this is incomplete. Version 2.0.0 will be
|
||||
the first release version.*
|
||||
|
||||
* [views.sql](https://github.com/gered/views.sql)
|
||||
* [views.honeysql](https://github.com/gered/views.honeysql)
|
||||
* [views.reagent](https://github.com/gered/views.reagent)
|
||||
|
||||
## Leiningen
|
||||
|
||||
[![](https://clojars.org/net.gered/views/latest-version.svg)](https://clojars.org/net.gered/views)
|
||||
|
||||
## This is a fork!
|
||||
I'm keeping this as a separate fork for now because I've made some
|
||||
breaking or otherwise significant changes from the original, some of
|
||||
which are based simply on personal preferences. I simply haven't felt
|
||||
it's quite right to submit a pull request because of some of these
|
||||
types of changes. Perhaps in the future.
|
||||
|
||||
I definitely cannot take credit for the original idea behind this
|
||||
library or the core implementation or design of it. Definitely keep an
|
||||
eye on the [original repository][1] which is maintained by Kira Inc.
|
||||
|
||||
[1]: https://github.com/kirasystems/views
|
||||
|
||||
|
||||
## Basic Concepts
|
||||
|
||||
The views library allows you to manage a **view system** which is a
|
||||
collection of **views** and a list of **subscribers** to those views.
|
||||
Subscribers will get sent **view refreshes** in realtime when the data
|
||||
represented by the views they are subscribed to changes. Relevant
|
||||
changes are found through the use of **hints** which are added to the
|
||||
view system by anything that is actually changing the data at the
|
||||
instant it is changed.
|
||||
|
||||
A **view** is similar in concept to a [materialized view][2], though in
|
||||
practice it may not actually keep a copy of the underlying data
|
||||
represented by the view and instead just keep a copy of a query or the
|
||||
location of where the data can be retrieved from when it is needed
|
||||
(e.g. when view refrehses need to be sent out).
|
||||
|
||||
[2]: https://en.wikipedia.org/wiki/Materialized_view
|
||||
|
||||
A view is represented by the protocol `views.protocols/IView`:
|
||||
|
||||
```clj
|
||||
(defprotocol IView
|
||||
(data [this namespace parameters])
|
||||
(relevant? [this namespace parameters hints])
|
||||
(id [this]))
|
||||
```
|
||||
|
||||
`id` simply returns a unique identifier for this view. `data` returns a
|
||||
copy of the underlying data represented by this view. `relevant?`
|
||||
determines if a collection of hints are relevant to the view and is
|
||||
called by the view system whenever new hints are received to determine
|
||||
if view refreshes need to be sent out for this view.
|
||||
|
||||
A **hint** is a map of the form:
|
||||
|
||||
```clj
|
||||
{:namespace ...
|
||||
:type ...
|
||||
:hint ...}
|
||||
```
|
||||
|
||||
`:type` represents the type of view (e.g. `:sql-table-name`) and is
|
||||
defined by the view implementation that this hint is intended for.
|
||||
`:hint` is the actual hint information itself and it's contents will
|
||||
differ depending on the type of view it is intended for. As an example,
|
||||
for a SQL view it may be a list of database table names.
|
||||
|
||||
**Namespaces** can be used to isolate multiple sets of the same type of
|
||||
data being represented by the views within the view system. As an
|
||||
example, for SQL views a namespace could be used to represent the
|
||||
database to connect to if your system is comprised of multiple similar
|
||||
databases. A view is not specifically tied to a namespace, however the
|
||||
hints processed by the view system are only relevant for the namespace
|
||||
specified in the hint.
|
||||
|
||||
When a view's `relevant?` check is determining if any given hint is
|
||||
relevant or not, it will compare all the properties of a hint,
|
||||
including the namespace and type to ensure that view refreshes aren't
|
||||
issued incorrectly or too frequently.
|
||||
|
||||
**Subscribers** can be registered within the view system. A
|
||||
subscription can be created within the view system by specifying the
|
||||
view to subscribe to, identified by it's view ID, and also a namespace
|
||||
and any parameters that the view might take. These 3 properties go
|
||||
together to form a **view signature** or **view sig**. A view sig is
|
||||
represented by a map:
|
||||
|
||||
```clj
|
||||
{:namespace ...
|
||||
:view-id ...
|
||||
:parameters ...}
|
||||
```
|
||||
|
||||
Subscriptions are considered unique for a subscriber based on all 3 of
|
||||
these properties combined. As such a subscriber can have multiple
|
||||
concurrent subscriptions to the same view if the namespace and/or
|
||||
parameters are different for all of them.
|
||||
|
||||
A subscriber is uniquely identified by it's **subscriber key**. Common
|
||||
subscriber key values include user ID's, Session ID's, or other
|
||||
identifiers like client ID's used in libraries like Sente for websocket
|
||||
connections.
|
||||
|
||||
When hints are processed by the view system and found to be relevant
|
||||
for any of the views (through the use of the `relevant?` check
|
||||
mentioned earlier), **view refreshes** are sent out to all of the
|
||||
subscribers of the view. Up-to-date data for the view is retrieved via
|
||||
the view's `data` function and then sent out.
|
||||
|
||||
Whenever data is refreshed a hash is kept and is compared to on each
|
||||
refresh to make sure that we don't send out another refresh if the data
|
||||
is unchanged from the last refresh sent.
|
||||
## Design
|
||||
|
||||
TODO
|
||||
|
||||
## Usage
|
||||
|
||||
To explain basic usage of the views library, we'll walk through an
|
||||
example building up a simple system so you can see how it works
|
||||
interactively.
|
||||
|
||||
To begin, we'll need to use functions from the `views.core` namespace.
|
||||
|
||||
```clj
|
||||
(require '[views.core :as views])
|
||||
```
|
||||
|
||||
### View System Initialization
|
||||
|
||||
We first need to create the view system. This will be kept in an atom
|
||||
and will be passed around to the different views library functions also
|
||||
*as an atom* as the views system needs to maintain it's own internal
|
||||
state.
|
||||
|
||||
```clj
|
||||
(def view-system (atom {}))
|
||||
```
|
||||
|
||||
For a fully working view system, we need to also provide a function
|
||||
that will be used to send view refreshes to subscribers. For now we'll
|
||||
just print view refreshes out in the REPL, but in a real system you'd
|
||||
probably want this to send them to a connected Websocket client, or out
|
||||
over some kind of distributed messaging service, etc.
|
||||
|
||||
```clj
|
||||
(defn send-fn
|
||||
[subscriber-key [view-sig view-data]]
|
||||
(println "view refresh" subscriber-key view-sig view-data))
|
||||
```
|
||||
|
||||
Now we're ready to actually create the view syem. To do this we call
|
||||
`init!` which takes a set of options. We provide our send function
|
||||
above using the `:send-fn` option. For a description of all the options
|
||||
available, see `views.core/default-options`.
|
||||
|
||||
```clj
|
||||
(views/init! view-system {:send-fn send-fn})
|
||||
```
|
||||
|
||||
At this point, the view system is ready.
|
||||
|
||||
Right now there are some background threads running, one of which is
|
||||
the *refresh watcher* which handles incoming hints and checks them for
|
||||
relevancy. When relevant hints are found, view refresh requests are
|
||||
dispatched to one or more *refresh worker* threads which actually
|
||||
perform the work of retrieving updated view data and sending it off to
|
||||
subscribers.
|
||||
|
||||
But now we need to talk about setting up some views, as we have none in
|
||||
our view system.
|
||||
|
||||
### Adding Views
|
||||
|
||||
For demonstration purposes, we'll set up views for an in-memory
|
||||
datastore:
|
||||
|
||||
```clj
|
||||
(def memory-datastore
|
||||
(atom {:a {:foo 1
|
||||
:bar 200
|
||||
:baz [1 2 3]}
|
||||
:b {:foo 2
|
||||
:bar 300
|
||||
:baz [2 3 4]}}))
|
||||
```
|
||||
|
||||
To retrieve or modify data within this memory datastore, we'd likely
|
||||
want to use a path made up of keywords, e.g. `[:a :bar]` would
|
||||
correspond with the value `200`, and `[:b :baz 2]` with the value `4`
|
||||
using the initial data defined above.
|
||||
|
||||
So, let's create a `MemoryView`:
|
||||
|
||||
```clj
|
||||
(require '[views.protocols :refer [IView]])
|
||||
|
||||
(def memory-view-hint-type :ks-path)
|
||||
|
||||
(defrecord MemoryView [id ks]
|
||||
IView
|
||||
(id [_] id)
|
||||
(data [_ namespace parameters]
|
||||
(get-in @memory-datastore
|
||||
(-> [namespace]
|
||||
(into ks)
|
||||
(into parameters))))
|
||||
(relevant? [_ namespace parameters hints]
|
||||
(some #(and (= namespace (:namespace %))
|
||||
(= ks (:hint %))
|
||||
(= memory-view-hint-type (:type %)))
|
||||
hints)))
|
||||
```
|
||||
|
||||
Nothing particularly special here, `data` simply returns a value from
|
||||
`memory-datastore` using a path made by combining `namespace` with a
|
||||
sequence of keywords `ks` and then finally adding `parameters` (which
|
||||
is a collection of parameters) to the end of the path.
|
||||
|
||||
Note that with this method of referencing data within
|
||||
`memory-datastore`, the keys `:a` and `:b` are being used as
|
||||
namespaces.
|
||||
|
||||
`relevant?` simply compares all 3 values of each of the hints passed in
|
||||
to make sure they all match. `memory-view-hint-type` is, as it's name
|
||||
implies, a value that is used to identify hints as being those intended
|
||||
for memory views and not for, e.g. SQL views (if we had a view system
|
||||
with multiple different types of views in it). The function returns
|
||||
true if at least one of the passed in hints was found to be relevant.
|
||||
|
||||
Now we can add some views to our view system:
|
||||
|
||||
```clj
|
||||
(views/add-views!
|
||||
view-system
|
||||
[(MemoryView. :foo [:foo])
|
||||
(MemoryView. :bar [:bar])
|
||||
(MemoryView. :baz [:baz])])
|
||||
```
|
||||
|
||||
We now have 3 views, `:foo`, `:bar` and `:baz` which each refer to data
|
||||
under that same path. Note that these views do not define a namespace.
|
||||
That is for subscribers to specify when they register a subscription.
|
||||
As well, code that updates `memory-datastore` will create hints for the
|
||||
view system as we'll soon see, and at that time it will include a
|
||||
namespace in any created hints.
|
||||
|
||||
> Most applications will probably want to just pass in a list of views
|
||||
> via `views.core/init!` through the `:views` option. However, there is
|
||||
> nothing wrong with using `add-views!` like this if you prefer or if
|
||||
> you simply need to change views on the fly.
|
||||
>
|
||||
> Keep in mind though that adding views via `add-views!` will replace
|
||||
> existing views in the view system with the same ID. Take care when
|
||||
> doing this if there is the possibility that there are existing
|
||||
> subscribers to views that are being replaced!
|
||||
|
||||
### Subscribing to Views
|
||||
|
||||
As mentioned previously, view subscriptions are keyed by a **view
|
||||
signature** or view sig, which we can create using a helper function if
|
||||
we wish:
|
||||
|
||||
```clj
|
||||
(views/->view-sig :a :foo [])
|
||||
=> {:namespace :a, :view-id :foo, :parameters []}
|
||||
```
|
||||
|
||||
We create a subscription by calling `views.core/subscribe!`. For this
|
||||
demonstration, we'll simply make up a subscriber key. The last argument
|
||||
is where we could pass in some application/user context data that would
|
||||
be helpful to use when doing subscription authorization (which we'll
|
||||
discuss later and just ignore for now). For now, we'll just pass in
|
||||
`nil` context.
|
||||
|
||||
```clj
|
||||
(views/subscribe!
|
||||
view-system ; view system atom
|
||||
(views/->view-sig :a :foo []) ; view sig of the view to subscribe to
|
||||
123 ; subscriber key
|
||||
nil) ; context
|
||||
```
|
||||
|
||||
`subscribe!` returns a `future` which will be realized when the
|
||||
subscription finishes. Whenever a new subscription is added, the
|
||||
subscriber is sent an initial set of data for the view. This view
|
||||
refresh is done in a separate thread via a `future`.
|
||||
|
||||
We can see that a view refresh was sent out as a result of this
|
||||
subscription as our `send-fn` function from before was called and the
|
||||
following output should have appeared
|
||||
|
||||
```
|
||||
view refresh 123 {:view-id :foo, :parameters []} 1
|
||||
```
|
||||
|
||||
right away after the call to `subscribe!`. The `1` at the end
|
||||
corresponds to the data in `memory-datastore` under the path
|
||||
`[:a :foo]`.
|
||||
|
||||
> Note that an initial view refresh is **always** sent out to the
|
||||
> subscriber when a subscription is first created. This happens even
|
||||
> if the view data has not changed since the last refresh for this view
|
||||
> occurred, as obviously the new subscriber was not part of that
|
||||
> refresh.
|
||||
|
||||
### Hints and View Refreshes
|
||||
|
||||
Adding hints to the view system triggers refreshes of views for which
|
||||
they are relevant towards. Our application code that changes data which
|
||||
these views are based on needs to have a way of adding views to the
|
||||
view system.
|
||||
|
||||
#### Hints
|
||||
|
||||
As mentioned previously, a hint is simply a map that contains a
|
||||
namespace, a type and some data that will differ based on the types of
|
||||
views in the view system. There is a helper function to create this
|
||||
map:
|
||||
|
||||
```clj
|
||||
(views/hint :a [:foo] memory-view-hint-type)
|
||||
=> {:namespace :a, :hint [:foo], :type :ks-path}
|
||||
```
|
||||
|
||||
Generally speaking the `:type` value will be the same for all hints
|
||||
which are intended for the same types of views. For example, all of our
|
||||
`MemoryView` views expect the type to be `:ks-path`, because the
|
||||
`:hint` values they expect to compare against are all keyword paths.
|
||||
|
||||
#### Adding Hints to the View System
|
||||
|
||||
There are two main ways to do this:
|
||||
|
||||
1. Queue hints which will be picked up by the refresh watcher thread on
|
||||
a regular interval (set by the option `:refresh-interval`).
|
||||
2. Immediately trigger a refresh for a list of hints.
|
||||
|
||||
Using option 2 all the time generally does result in much more
|
||||
responsive feeling system from the user's perspective. But you should
|
||||
also consider just how frequently your code could end up triggering
|
||||
refreshes.
|
||||
|
||||
Queueing hints as in option 1 will help to guard against duplicate
|
||||
hints triggering excessive view refreshes as duplicate hints added to
|
||||
the queue are dropped. But queued hints are not processed until the
|
||||
refresh watcher thread runs at the next `:refresh-interval`, so you
|
||||
lose some responsiveness by going this route.
|
||||
|
||||
There are more factors to consider in addition to all of this though.
|
||||
As hints are processed, they are internally turned into view refresh
|
||||
requests and dispatched to the refresh worker threads by adding them to
|
||||
an internal queue. This refresh queue also drops duplicate requests,
|
||||
but only if there is a backlog of refresh requests waiting in the queue
|
||||
(which would happen if some views are taking too long to refresh, e.g.
|
||||
slow SQL queries, overloaded server/network, not enough worker threads,
|
||||
etc). If the worker threads are able to process refresh requests very
|
||||
quickly, then the internal queue will usually be empty or near-empty
|
||||
and some or all duplicate refresh requests might make it through.
|
||||
|
||||
Also keep in mind that hashes of view data are computed and then
|
||||
compared each time a view refresh is about to be sent out, and while
|
||||
the underlying view data must still be retrieved to compute this hash
|
||||
each time a refresh request is processed, a view refresh will not
|
||||
actually be sent out to the subscribers if the data is found to be
|
||||
unchanged since the last refresh.
|
||||
|
||||
Ultimately there isn't really a right or wrong answer as to which
|
||||
method you choose. Generally speaking it will usually make the most
|
||||
sense to default to option 2 for most actions that need to add hints to
|
||||
the view system. This will generally result in a more responsive
|
||||
system. But you'll want to continually evaluate whether some actions
|
||||
should possibly be switched over to queue up hints instead.
|
||||
|
||||
#### Option 1: Queueing Hints
|
||||
|
||||
Use `queue-hints!` and pass in a collection of hints. They will be
|
||||
added to the queue and the refresh watcher thread will process them on
|
||||
the next refresh interval.
|
||||
|
||||
```clj
|
||||
(views/queue-hints!
|
||||
view-system
|
||||
[(views/hint :a [:foo] memory-view-hint-type)])
|
||||
```
|
||||
|
||||
#### Option 2: Immediately Trigger Refreshes From Hints
|
||||
|
||||
Use `refresh-views!` and pass in a collection of hints. They will be
|
||||
processed immediately and refresh requests will be dispatched for all
|
||||
views for which there were relevant hints (and subscribers) for.
|
||||
|
||||
```clj
|
||||
(views/refresh-views!
|
||||
view-system
|
||||
[(views/hint :a [:foo] memory-view-hint-type)])
|
||||
```
|
||||
|
||||
#### But Wait -- View Data Must Be Changed First!
|
||||
|
||||
If you were following along and tried the above examples out, you would
|
||||
have noticed that our `send-fn` function was never called. As mentioned
|
||||
previously, each time a view refresh is processed a hash is taken of
|
||||
the data and compared against the previous refresh's hash. Only if the
|
||||
data is found to have been changed is a refresh sent out.
|
||||
|
||||
We haven't changed any of the data in `memory-datastore` yet, so none
|
||||
of the hints we add to the system will trigger a view refresh to be
|
||||
sent. This is a good thing!
|
||||
|
||||
Normally in your application you'll want to add hints to the view
|
||||
system at the same place you do some operation that changes data. So,
|
||||
we can add a function to allow us to change the data in
|
||||
`memory-datastore` and add an appropriate hint about what was changed
|
||||
to the view system at the same time:
|
||||
|
||||
```clj
|
||||
(defn memdb-assoc-in!
|
||||
[vs namespace ks v]
|
||||
(let [path (into [namespace] ks)
|
||||
hints [(views/hint namespace ks memory-view-hint-type)]]
|
||||
(swap! memory-datastore assoc-in path v)
|
||||
(views/refresh-views! vs hints)))
|
||||
```
|
||||
|
||||
And then we can use it to change data relevant to the view we're
|
||||
subscribed to (`:foo`):
|
||||
|
||||
```clj
|
||||
(memdb-assoc-in! view-system :a [:foo] 42)
|
||||
```
|
||||
|
||||
As soon as you run this you should see that `send-fn` was called to
|
||||
send out a view refresh:
|
||||
|
||||
```
|
||||
view refresh 123 {:view-id :foo, :parameters []} 42
|
||||
```
|
||||
|
||||
And of course, `memory-datastore` was updated correctly at the same
|
||||
time:
|
||||
|
||||
```clj
|
||||
@memory-datastore
|
||||
=> {:a {:foo 42, :bar 200, :baz [1 2 3]}, :b {:foo 2, :bar 300, :baz [2 3 4]}}
|
||||
```
|
||||
|
||||
As we would expect given the current subscriptions in our view system,
|
||||
view refreshes will only be sent out if we change the data under
|
||||
`[:a :foo]` as refreshes are only processed if there are subscribers
|
||||
for a view.
|
||||
|
||||
### Unsubscribing
|
||||
|
||||
Unsubscribing a subscriber is done through `views.core/unsubscribe!`
|
||||
and the arguments are the same:
|
||||
|
||||
```clj
|
||||
(views/unsubscribe!
|
||||
view-system ; view system atom
|
||||
(views/->view-sig :a :foo []) ; view sig of the view to unsubscribe from
|
||||
123 ; subscriber key
|
||||
nil) ; context
|
||||
```
|
||||
|
||||
Remember that subscriptions are keyed by view sig, so to unsubscribe
|
||||
from a view, you must use the exact same namespace and parameters that
|
||||
was used to subscribe to it in the first place.
|
||||
|
||||
If you need to unsubscribe from all of a subscriber's current
|
||||
subscriptions, you can use `views.core/unsubscribe-all!` which
|
||||
essentially completely removes a subscriber from the views system.
|
||||
|
||||
```clj
|
||||
(views/unsubscribe-all! view-system 123) ; where '123' is the subscriber key
|
||||
```
|
||||
|
||||
### Shutting Down the Views System
|
||||
|
||||
You can stop the views system by simply calling `views.core/shutdown!`
|
||||
|
||||
```clj
|
||||
(views/shutdown! view-system)
|
||||
```
|
||||
|
||||
This function will by default block until the refresh watcher and all
|
||||
refresh worker threads have finished (they are sent interrupt signals
|
||||
when `shutdown!` is called). If for some reason you do not wish to
|
||||
block, you can pass an additional argument to `shutdown!`:
|
||||
|
||||
```clj
|
||||
(views/shutdown! view-system true) ; don't block waiting for threads to terminate
|
||||
```
|
||||
|
||||
## Subscription Authorization
|
||||
|
||||
By default, no subscriptions require authorization. If you wish for
|
||||
some or all views to require some kind of authorization, you should
|
||||
provide an `:auth-fn` option to `views.core/init!`.
|
||||
|
||||
This is a function of the form:
|
||||
|
||||
```clj
|
||||
(fn [view-sig subscriber-key context]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
It should return true if the subscription is authorized. `context` is
|
||||
the exact value that was passed in as the context argument to
|
||||
`subscribe!`. You might wish to pass in a Ring request map or a user
|
||||
profile for example.
|
||||
|
||||
If subscription authorization fails, `subscribe!` returns `nil`.
|
||||
|
||||
You can also provide the `:on-unauth-fn` option to `views.core/init!`
|
||||
and set it to a function that will be called in the event that
|
||||
subscription authorization failed. This function takes the same
|
||||
arguments as `:auth-fn`. The return value is not used.
|
||||
|
||||
Your application may or may not need this depending on how you have
|
||||
things set up (the fact that `subscribe!` returns `nil` if unauthorized
|
||||
may be enough for you). It is just provided as an extra convenience.
|
||||
|
||||
## Namespaces
|
||||
|
||||
As has been mentioned already, namespaces can be used to isolate
|
||||
subscriptions to views and view refreshes. Typical use of namespaces
|
||||
within a views system would be to set them to something that specifies
|
||||
which database to retrieve view data from when you have multiple
|
||||
databases all with an identical structure.
|
||||
|
||||
Namespace information is not included in the actual view refresh data
|
||||
that gets sent to subscribers. It is just considered to be a
|
||||
server-side concern.
|
||||
|
||||
Depending on your application, you may be perfectly ok with just
|
||||
passing in the specific namespace needed when creating view
|
||||
subscriptions. However, you can also specify a `:namespace-fn` option
|
||||
in your call to `views.core/init!` and provide a function that will
|
||||
return the namespace to use for all calls to `subscribe!` and
|
||||
`unsubscribe!` that get passed a view sig which **does not** include a
|
||||
namespace in it.
|
||||
|
||||
The `:namespace-fn` function should be of the form:
|
||||
|
||||
```clj
|
||||
(fn [view-sig subscriber-key context]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
`context` will be whatever was passed in as the context argument to
|
||||
`subscribe!`/`unsubscribe!`.
|
||||
|
||||
It bears repeating that `:namespace-fn` will **not** be called even if
|
||||
it was set if you use a view sig that includes a `:namespace` key.
|
||||
|
||||
For this reason the helper function `->view-sig` includes an extra
|
||||
overload that does not set a namespace.
|
||||
|
||||
```clj
|
||||
; a view sig that will result in namespace-fn being called (if one is set)
|
||||
(views/->view-sig :foo [])
|
||||
=> {:view-id :foo, :parameters []}
|
||||
|
||||
; a view sig that will always use :a as the namespace, even if a namespace-fn is set
|
||||
(views/->view-sig :a :foo [])
|
||||
=> {:namespace :a, :view-id :foo, :parameters []}
|
||||
```
|
||||
|
||||
## View System Initialization Options
|
||||
|
||||
There are a number of options that can be provided to
|
||||
`views.core/init!`. The only one that absolutely must be provided for a
|
||||
working system is `:send-fn` while all the other default options will
|
||||
generally suffice for a non-distributed relatively low-load
|
||||
application.
|
||||
|
||||
The default options are defined in `views.core/default-options`.
|
||||
|
||||
#### `:send-fn`
|
||||
|
||||
A function that is used to send view refresh data to subscribers.
|
||||
|
||||
```clj
|
||||
(fn [subscriber-key [view-sig view-data]]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
#### `:views`
|
||||
|
||||
A list of `IView` instances. These are the views that can be
|
||||
subscribed to. Views can also be added/replaced in the system after
|
||||
initialization by calling `views.core/add-views!`.
|
||||
|
||||
#### `:put-hints-fn`
|
||||
|
||||
A function that typically will be used by the different views plugin
|
||||
libraries providing view implementations (such as
|
||||
[views.sql](https://github.com/gered/views.sql) or
|
||||
[views.honeysql](https://github.com/gered/views.honeysql)) to add
|
||||
hints to the view system.
|
||||
|
||||
This function is used as a common configurable way for these different
|
||||
plugin libraries to add hints because the application can provide an
|
||||
alternate implementation to e.g. send hints out over a
|
||||
distributed messaging service and it will affect all views in the
|
||||
system (which would not be possible if all or just some were hard-coded
|
||||
to use `queue-hints!` or `refresh-views!`).
|
||||
|
||||
```clj
|
||||
(fn [^Atom view-system hints]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
The default implementation is:
|
||||
|
||||
```clj
|
||||
(fn [^Atom view-system hints]
|
||||
(refresh-views! view-system hints))
|
||||
```
|
||||
|
||||
#### `:refresh-queue-size`
|
||||
|
||||
The size of the internal refresh request queue used to hold refresh
|
||||
requests for the refresh worker threads. If you notice some refresh
|
||||
requests being dropped, you may wish to increase this (after of course
|
||||
seeing if you have some slow views that could be improved).
|
||||
|
||||
Default is `1000`.
|
||||
|
||||
#### `:refresh-interval`
|
||||
|
||||
An interval in milliseconds at which the refresh watcher thread will
|
||||
check for queued up hints and dispatch relevant view refresh requests
|
||||
to the refresh worker threads.
|
||||
|
||||
Default is `1000`.
|
||||
|
||||
#### `:worker-threads`
|
||||
|
||||
The number of refresh worker threads that continually poll for refresh
|
||||
requests and handle sending view refreshes to subscribers.
|
||||
|
||||
Default is `8`.
|
||||
|
||||
#### `:auth-fn`
|
||||
|
||||
A function that authorizes view subscriptions. It should return true
|
||||
if the subscription is authorized. If this function is not set, no view
|
||||
subscriptions will require authorization.
|
||||
|
||||
```clj
|
||||
(fn [view-sig subscriber-key context]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
#### `:on-unauth-fn`
|
||||
|
||||
A function that is called when subscription authorization fails. The
|
||||
return value of this function is not used.
|
||||
|
||||
```clj
|
||||
(fn [view-sig subscriber-key context]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
#### `:namespace-fn`
|
||||
|
||||
A function that is used during subscription and unsubscription **only**
|
||||
if no namespace is specified in the view sig passed in. This function
|
||||
should return the namespace to be used for the
|
||||
subscription/unsubscription.
|
||||
|
||||
```clj
|
||||
(fn [view-sig subscriber-key context]
|
||||
; ...
|
||||
)
|
||||
```
|
||||
|
||||
#### `:stats-log-interval`
|
||||
|
||||
Interval in milliseconds at which a logger will output an INFO log
|
||||
entry with some view system statistics (refreshes/sec,
|
||||
dropped-refreshes/sec, duplicate-refreshes/sec). If not set, no
|
||||
logging is done.
|
||||
|
||||
|
||||
## Considerations for Distributed Systems
|
||||
|
||||
If you're looking to use a views system with an application that will
|
||||
be running on multiple servers, all you really need to do to get the
|
||||
views system working consistently across all the nodes is to make sure
|
||||
that when new hints are to be added to the views system, they are sent
|
||||
to all application nodes.
|
||||
|
||||
For example, you can set up a messaging service (such as RabbitMQ, etc)
|
||||
and when you need to add hints to the views system, instead of calling
|
||||
`queue-hints!` or `refresh-views!` with the new hints, you simply send
|
||||
them to the messaging service.
|
||||
|
||||
Most of the views plugin libraries providing view implementations
|
||||
(such as views.sql) will call `views.core/put-hints!` to add hints to
|
||||
the system. `put-hints!` uses whatever the `:put-hints-fn` function was
|
||||
set to in the options passed to `views.core/init!`. The default
|
||||
`:put-hints-fn` implementation simply calls `refresh-views!`, but you
|
||||
can easily provide an alternative function that sends the hints to a
|
||||
messaging service.
|
||||
|
||||
Then your application nodes need to listen for hints being received
|
||||
from the messaging service. You should then call `queue-hints!` or
|
||||
`refresh-views!` with the hints received this way.
|
||||
TODO
|
||||
|
||||
## Testing
|
||||
|
||||
TODO
|
||||
|
||||
## License
|
||||
|
||||
Copyright © 2015-2016 Kira Inc.
|
||||
Copyright © 2015 DiligenceEngine
|
||||
|
||||
Original authors:
|
||||
* Dave Della Costa (https://github.com/ddellacosta)
|
||||
* Alexander Hudek (https://github.com/akhudek)
|
||||
Authors Dave Della Costa (https://github.com/ddellacosta) and Alexander Hudek (https://github.com/akhudek)
|
||||
|
||||
Various updates and other changes in this fork by
|
||||
Gered King (https://github.com/gered)
|
||||
|
||||
Distributed under the MIT License.
|
||||
Distributed under the Eclipse Public License either version 1.0 or (at
|
||||
your option) any later version.
|
||||
|
|
42
project.clj
42
project.clj
|
@ -1,29 +1,25 @@
|
|||
(defproject net.gered/views "1.7.0-SNAPSHOT"
|
||||
:description "A view to the past helps navigate the future."
|
||||
:url "https://github.com/gered/views"
|
||||
:license {:name "MIT License"
|
||||
:url "http://opensource.org/licenses/MIT"}
|
||||
(defproject views "1.0.0"
|
||||
:description "A view to the past helps navigate the future."
|
||||
|
||||
:dependencies [[org.clojure/tools.logging "1.2.4"]]
|
||||
:url "https://github.com/diligenceengine/views"
|
||||
|
||||
:profiles {:provided
|
||||
{:dependencies [[org.clojure/clojure "1.10.3"]]}
|
||||
:license {:name "Eclipse Public License"
|
||||
:url "http://www.eclipse.org/legal/epl-v10.html"}
|
||||
|
||||
:test
|
||||
{:dependencies [[pjstadig/humane-test-output "0.11.0"]]
|
||||
:injections [(require 'pjstadig.humane-test-output)
|
||||
(pjstadig.humane-test-output/activate!)]}}
|
||||
:dependencies [[org.clojure/clojure "1.6.0"]
|
||||
[org.clojure/tools.logging "0.2.6"]
|
||||
[org.clojure/core.async "0.1.303.0-886421-alpha"]
|
||||
[honeysql "0.4.3"]
|
||||
[clj-logging-config "1.9.10"]
|
||||
[zip-visit "1.0.2"]
|
||||
[prismatic/plumbing "0.3.5"]
|
||||
[pjstadig/humane-test-output "0.6.0"]]
|
||||
|
||||
:deploy-repositories [["releases" :clojars]
|
||||
["snapshots" :clojars]]
|
||||
:profiles {:test {:dependencies [[org.clojure/tools.nrepl "0.2.3"]
|
||||
[environ "0.4.0"]
|
||||
[org.clojure/data.generators "0.1.2"]]
|
||||
|
||||
:release-tasks [["vcs" "assert-committed"]
|
||||
["change" "version" "leiningen.release/bump-version" "release"]
|
||||
["vcs" "commit"]
|
||||
["vcs" "tag" "v" "--no-sign"]
|
||||
["deploy"]
|
||||
["change" "version" "leiningen.release/bump-version"]
|
||||
["vcs" "commit" "bump to next snapshot version for future development"]
|
||||
["vcs" "push"]]
|
||||
:injections [(require 'pjstadig.humane-test-output)
|
||||
(pjstadig.humane-test-output/activate!)]}}
|
||||
|
||||
)
|
||||
:plugins [[lein-environ "0.4.0"]])
|
||||
|
|
|
@ -1,509 +1,200 @@
|
|||
(ns views.core
|
||||
(:import
|
||||
(java.util.concurrent ArrayBlockingQueue TimeUnit)
|
||||
(clojure.lang Atom))
|
||||
[java.util.concurrent ArrayBlockingQueue TimeUnit])
|
||||
(:require
|
||||
[views.protocols :refer [IView id data relevant?]]
|
||||
[clojure.tools.logging :refer [info debug error trace]]))
|
||||
[plumbing.core :refer [swap-pair!]]
|
||||
[clojure.tools.logging :refer [debug error]]))
|
||||
|
||||
;; The view-system data structure has this shape:
|
||||
;;
|
||||
;; {
|
||||
;; {:views {:id1 view1, id2 view2, ...}
|
||||
;; :send-fn (fn [subscriber-key data] ...)
|
||||
;;
|
||||
;; :refresh-queue (ArrayBlockingQueue.)
|
||||
;; :views {:id1 view1, id2 view2, ...}
|
||||
;; :send-fn (fn [subscriber-key data] ...)
|
||||
;; :put-hints-fn (fn [hints] ... )
|
||||
;; :auth-fn (fn [view-sig subscriber-key context] ...)
|
||||
;; :namespace-fn (fn [view-sig subscriber-key context] ...)
|
||||
;;
|
||||
;; :hashes {view-sig hash, ...}
|
||||
;; :subscribed {subscriber-key #{view-sig, ...}}
|
||||
;; :subscribers {view-sig #{subscriber-key, ...}}
|
||||
;; :hints #{hint1 hint2 ...}
|
||||
;; :hashes {view-sig hash, ...}
|
||||
;; :subscribed {subscriber-key #{view-sig, ...}}
|
||||
;; :subscribers {view-sig #{subscriber-key, ...}}
|
||||
;; :hints #{hint1 hint2 ...}
|
||||
;;
|
||||
;; }
|
||||
;;
|
||||
;; Each hint has the form {:namespace x :hint y}
|
||||
|
||||
(def refresh-queue (ArrayBlockingQueue. 500))
|
||||
|
||||
|
||||
(defn reset-stats!
|
||||
"Resets statistics collected back to zero."
|
||||
[^Atom view-system]
|
||||
(swap! view-system update-in [:statistics] assoc
|
||||
:refreshes 0
|
||||
:dropped 0
|
||||
:deduplicated 0)
|
||||
view-system)
|
||||
|
||||
(defn collecting-stats?
|
||||
"Whether view statem statistics collection and logging is enabled or not."
|
||||
[^Atom view-system]
|
||||
(boolean (get-in @view-system [:statistics :logger])))
|
||||
|
||||
(defn ->view-sig
|
||||
([namespace view-id parameters]
|
||||
{:namespace namespace
|
||||
:view-id view-id
|
||||
:parameters parameters})
|
||||
([view-id parameters]
|
||||
{:view-id view-id
|
||||
:parameters parameters}))
|
||||
|
||||
(defn- send-view-data!
|
||||
[view-system subscriber-key {:keys [namespace view-id parameters] :as view-sig} data]
|
||||
(if-let [send-fn (:send-fn view-system)]
|
||||
(send-fn subscriber-key [(dissoc view-sig :namespace) data])
|
||||
(throw (new Exception "no send-fn function set in view-system"))))
|
||||
|
||||
(defn- authorized-subscription?
|
||||
[view-system view-sig subscriber-key context]
|
||||
(if-let [auth-fn (:auth-fn view-system)]
|
||||
(auth-fn view-sig subscriber-key context)
|
||||
; assume that if no auth-fn is specified, that we are not doing auth checks at all
|
||||
; so do not disallow access to any subscription
|
||||
true))
|
||||
|
||||
(defn- on-unauthorized-subscription
|
||||
[view-system view-sig subscriber-key context]
|
||||
(if-let [on-unauth-fn (:on-unauth-fn view-system)]
|
||||
(on-unauth-fn view-sig subscriber-key context)))
|
||||
|
||||
(defn- get-namespace
|
||||
[view-system view-sig subscriber-key context]
|
||||
(if-let [namespace-fn (:namespace-fn view-system)]
|
||||
(namespace-fn view-sig subscriber-key context)
|
||||
(:namespace view-sig)))
|
||||
|
||||
(defn- subscribe-view!
|
||||
[view-system view-sig subscriber-key]
|
||||
(trace "subscribing to view" view-sig subscriber-key)
|
||||
(defn subscribe-view!
|
||||
[view-system view-sig subscriber-key data-hash]
|
||||
(-> view-system
|
||||
(update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig)
|
||||
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)))
|
||||
|
||||
(defn- update-hash!
|
||||
[view-system view-sig data-hash]
|
||||
(update-in view-system [:hashes view-sig] #(or % data-hash))) ;; see note #1 in NOTES.md
|
||||
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)
|
||||
(update-in [:hashes view-sig] #(or % data-hash)))) ;; see note #1
|
||||
|
||||
(defn subscribe!
|
||||
"Creates a subscription to a view identified by view-sig for a subscriber
|
||||
identified by subscriber-key. If the subscription is not authorized,
|
||||
returns nil. Additional context info can be passed in, which will be
|
||||
passed to the view-system's namespace-fn and auth-fn (if provided). If
|
||||
the subscription is successful, the subscriber will be sent the initial
|
||||
data for the view."
|
||||
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
|
||||
[view-system namespace view-id parameters subscriber-key]
|
||||
(if-let [view (get-in @view-system [:views view-id])]
|
||||
(let [namespace (if (contains? view-sig :namespace)
|
||||
namespace
|
||||
(get-namespace @view-system view-sig subscriber-key context))
|
||||
view-sig (->view-sig namespace view-id parameters)]
|
||||
(if (authorized-subscription? @view-system view-sig subscriber-key context)
|
||||
(do
|
||||
(swap! view-system subscribe-view! view-sig subscriber-key)
|
||||
(future
|
||||
(try
|
||||
(let [vdata (data view namespace parameters)
|
||||
data-hash (hash vdata)]
|
||||
;; Check to make sure that we are still subscribed. It's possible that
|
||||
;; an unsubscription event came in while computing the view.
|
||||
(when (contains? (get-in @view-system [:subscribed subscriber-key]) view-sig)
|
||||
(swap! view-system update-hash! view-sig data-hash)
|
||||
(send-view-data! @view-system subscriber-key view-sig vdata)))
|
||||
(catch Exception e
|
||||
(error e "error subscribing to view" view-sig)))))
|
||||
(do
|
||||
(trace "subscription not authorized" view-sig subscriber-key context)
|
||||
(on-unauthorized-subscription @view-system view-sig subscriber-key context)
|
||||
nil)))
|
||||
(throw (new Exception (str "Subscription for non-existant view: " view-id)))))
|
||||
(future
|
||||
(let [vdata (data view namespace parameters)]
|
||||
(swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata))
|
||||
((get @view-system :send-fn) subscriber-key [[view-id parameters] vdata])))))
|
||||
|
||||
(defn- remove-from-subscribers
|
||||
(defn remove-from-subscribers
|
||||
[view-system view-sig subscriber-key]
|
||||
(-> view-system
|
||||
(update-in [:subscribers view-sig] disj subscriber-key)
|
||||
; remove view-sig entry if no subscribers. helps prevent the subscribers
|
||||
; map from e.g. endlessly filling up with all sorts of different
|
||||
; view-sigs with crazy amounts of only-slightly-varying parameters
|
||||
(update-in [:subscribers]
|
||||
(fn [subscribers]
|
||||
(if (empty? (get subscribers view-sig))
|
||||
(dissoc subscribers view-sig)
|
||||
subscribers)))))
|
||||
|
||||
(defn- remove-from-subscribed
|
||||
[view-system view-sig subscriber-key]
|
||||
(-> view-system
|
||||
(update-in [:subscribed subscriber-key] disj view-sig)
|
||||
; remove subscriber-key entry if no current subscriptions. this helps prevent
|
||||
; the subscribed map from (for example) endlessly filling up with massive
|
||||
; amounts of entries with no subscriptions. this could easily happen over time
|
||||
; naturally for applications with long uptimes.
|
||||
(update-in [:subscribed]
|
||||
(fn [subscribed]
|
||||
(if (empty? (get subscribed subscriber-key))
|
||||
(dissoc subscribed subscriber-key)
|
||||
subscribed)))))
|
||||
|
||||
(defn- clean-up-unneeded-hashes
|
||||
[view-system view-sig]
|
||||
; hashes for view-sigs which do not have any unsubscribers are no longer necessary
|
||||
; to keep around (again, at risk of endlessly filling up with tons of hashes over time)
|
||||
(if-not (get (:subscribers view-system) view-sig)
|
||||
(update-in view-system [:hashes] dissoc view-sig)
|
||||
view-system))
|
||||
(update-in view-system [:subscribers view-sig] disj subscriber-key))
|
||||
|
||||
(defn unsubscribe!
|
||||
"Removes a subscription to a view identified by view-sig for a subscriber
|
||||
identified by subscriber-key. Additional context info can be passed in,
|
||||
which will be passed to the view-system's namespace-fn (if provided)."
|
||||
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
|
||||
(trace "unsubscribing from view" view-sig subscriber-key)
|
||||
[view-system namespace view-id parameters subscriber-key]
|
||||
(swap! view-system
|
||||
(fn [view-system]
|
||||
(let [namespace (if (contains? view-sig :namespace)
|
||||
namespace
|
||||
(get-namespace view-system view-sig subscriber-key context))
|
||||
view-sig (->view-sig namespace view-id parameters)]
|
||||
(-> view-system
|
||||
(remove-from-subscribed view-sig subscriber-key)
|
||||
(remove-from-subscribers view-sig subscriber-key)
|
||||
(clean-up-unneeded-hashes view-sig)))))
|
||||
view-system)
|
||||
(fn [vs]
|
||||
(-> vs
|
||||
(update-in [:subscribed subscriber-key] disj [namespace view-id parameters])
|
||||
(remove-from-subscribers [namespace view-id parameters] subscriber-key)))))
|
||||
|
||||
(defn unsubscribe-all!
|
||||
"Removes all of a subscriber's (identified by subscriber-key) current
|
||||
view subscriptions."
|
||||
[^Atom view-system subscriber-key]
|
||||
(trace "unsubscribing from all views" subscriber-key)
|
||||
"Remove all subscriptions by a given subscriber."
|
||||
[view-system subscriber-key]
|
||||
(swap! view-system
|
||||
(fn [view-system]
|
||||
(let [view-sigs (get-in view-system [:subscribed subscriber-key])
|
||||
view-system* (update-in view-system [:subscribed] dissoc subscriber-key)]
|
||||
(reduce
|
||||
#(-> %1
|
||||
(remove-from-subscribers %2 subscriber-key)
|
||||
(clean-up-unneeded-hashes %2))
|
||||
view-system*
|
||||
view-sigs))))
|
||||
view-system)
|
||||
(fn [vs]
|
||||
(let [view-sigs (get-in vs [:subscribed subscriber-key])
|
||||
vs* (update-in vs [:subscribed] dissoc subscriber-key)]
|
||||
(reduce #(remove-from-subscribers %1 %2 subscriber-key) vs* view-sigs)))))
|
||||
|
||||
(defn refresh-view!
|
||||
"Schedules a view (identified by view-sig) to be refreshed by one of the worker threads
|
||||
only if the provided collection of hints is relevant to that view."
|
||||
[^Atom view-system hints {:keys [namespace view-id parameters] :as view-sig}]
|
||||
"We refresh a view if it is relevant and its data hash has changed."
|
||||
[view-system hints [namespace view-id parameters :as view-sig]]
|
||||
(let [v (get-in @view-system [:views view-id])]
|
||||
(if-let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
|
||||
(try
|
||||
(if (relevant? v namespace parameters hints)
|
||||
(if-not (.contains refresh-queue view-sig)
|
||||
(when-not (.offer refresh-queue view-sig)
|
||||
(if (collecting-stats? view-system) (swap! view-system update-in [:statistics :dropped] inc))
|
||||
(error "refresh-queue full, dropping refresh request for" view-sig))
|
||||
(do
|
||||
(if (collecting-stats? view-system) (swap! view-system update-in [:statistics :deduplicated] inc))
|
||||
(trace "already queued for refresh" view-sig))))
|
||||
(catch Exception e
|
||||
(error e "error determining if view is relevant" view-sig))))
|
||||
view-system))
|
||||
(if (relevant? v namespace parameters hints)
|
||||
(if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig)
|
||||
(when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig)
|
||||
(error "refresh-queue full, dropping refresh request for" view-sig))
|
||||
(debug "already queued for refresh" view-sig)))))
|
||||
|
||||
(defn subscribed-views
|
||||
"Returns a list of all views in the system that have subscribers."
|
||||
[^Atom view-system]
|
||||
(reduce into #{} (vals (:subscribed @view-system))))
|
||||
[view-system]
|
||||
(reduce into #{} (vals (:subscribed view-system))))
|
||||
|
||||
(defn active-view-count
|
||||
"Returns a count of views with at least one subscriber."
|
||||
[^Atom view-system]
|
||||
(count (remove #(empty? (val %)) (:subscribers @view-system))))
|
||||
|
||||
(defn- pop-hints!
|
||||
[^Atom view-system]
|
||||
(let [p (swap-vals! view-system assoc :hints #{})]
|
||||
(defn pop-hints!
|
||||
"Return hints and clear hint set atomicly."
|
||||
[view-system]
|
||||
(let [p (swap-pair! view-system assoc :hints #{})]
|
||||
(or (:hints (first p)) #{})))
|
||||
|
||||
(defn refresh-views!
|
||||
"Given a collection of hints, check all views in the system to find any that need refreshing
|
||||
and schedule refreshes for them. If no hints are provided, will use any that have been
|
||||
queued up in the view-system."
|
||||
([^Atom view-system hints]
|
||||
(when (seq hints)
|
||||
(trace "refresh hints:" hints)
|
||||
(doseq [view-sig (subscribed-views view-system)]
|
||||
(refresh-view! view-system hints view-sig)))
|
||||
(swap! view-system assoc :last-update (System/currentTimeMillis))
|
||||
view-system)
|
||||
([^Atom view-system]
|
||||
(refresh-views! view-system (pop-hints! view-system))))
|
||||
"Given a collection of hints, find all dirty views."
|
||||
[view-system]
|
||||
(let [hints (pop-hints! view-system)]
|
||||
(debug "refresh hints:" hints)
|
||||
(mapv #(refresh-view! view-system hints %) (subscribed-views @view-system))
|
||||
(swap! view-system assoc :last-update (System/currentTimeMillis))))
|
||||
|
||||
(defn- can-refresh?
|
||||
(defn can-refresh?
|
||||
[last-update min-refresh-interval]
|
||||
(> (- (System/currentTimeMillis) last-update) min-refresh-interval))
|
||||
|
||||
(defn- wait
|
||||
(defn wait
|
||||
[last-update min-refresh-interval]
|
||||
(Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update)))))
|
||||
|
||||
(defn do-view-refresh!
|
||||
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig}]
|
||||
(if (collecting-stats? view-system) (swap! view-system update-in [:statistics :refreshes] inc))
|
||||
(try
|
||||
(let [view (get-in @view-system [:views view-id])
|
||||
vdata (data view namespace parameters)
|
||||
hdata (hash vdata)]
|
||||
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
|
||||
(doseq [subscriber-key (get-in @view-system [:subscribers view-sig])]
|
||||
(send-view-data! @view-system subscriber-key view-sig vdata))
|
||||
(swap! view-system assoc-in [:hashes view-sig] hdata)))
|
||||
(catch Exception e
|
||||
(error e "error refreshing:" namespace view-id parameters))))
|
||||
|
||||
(defn- refresh-worker-thread
|
||||
[^Atom view-system]
|
||||
(let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
|
||||
(fn []
|
||||
(try
|
||||
(when-let [view-sig (.poll refresh-queue 60 TimeUnit/SECONDS)]
|
||||
(trace "worker running refresh for" view-sig)
|
||||
(do-view-refresh! view-system view-sig))
|
||||
(catch InterruptedException e))
|
||||
(if-not (:stop-workers? @view-system)
|
||||
(recur)
|
||||
(trace "exiting worker thread")))))
|
||||
|
||||
(defn- refresh-watcher-thread
|
||||
[^Atom view-system min-refresh-interval]
|
||||
(defn worker-thread
|
||||
"Handles refresh requests."
|
||||
[view-system]
|
||||
(fn []
|
||||
(let [last-update (:last-update @view-system)]
|
||||
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
|
||||
(try
|
||||
(if (can-refresh? last-update min-refresh-interval)
|
||||
(refresh-views! view-system)
|
||||
(wait last-update min-refresh-interval))
|
||||
(catch InterruptedException e)
|
||||
(let [view (get-in @view-system [:views view-id])
|
||||
vdata (data view namespace parameters)
|
||||
hdata (hash vdata)]
|
||||
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
|
||||
(doseq [s (get-in @view-system [:subscribers view-sig])]
|
||||
((:send-fn @view-system) s [[view-id parameters] vdata]))
|
||||
(swap! view-system assoc-in [:hashes view-sig] hdata)))
|
||||
(catch Exception e
|
||||
(error e "exception in views")))
|
||||
(if-not (:stop-refresh-watcher? @view-system)
|
||||
(recur)
|
||||
(trace "exiting refresh watcher thread")))))
|
||||
(error "error refreshing:" namespace view-id parameters
|
||||
"e:" e "msg:" (.getMessage e)))))
|
||||
(recur)))
|
||||
|
||||
(defn start-update-watcher!
|
||||
"Starts threads for the views refresh watcher and worker threads that handle queued
|
||||
hints and view refresh requests."
|
||||
[^Atom view-system min-refresh-interval threads]
|
||||
(trace "starting refresh watcher at" min-refresh-interval "ms interval and" threads "workers")
|
||||
(if (and (:refresh-watcher @view-system)
|
||||
(:workers @view-system))
|
||||
(error "cannot start new watcher and worker threads until existing threads are stopped")
|
||||
(let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread view-system min-refresh-interval))
|
||||
worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread view-system)))
|
||||
(range threads))]
|
||||
(swap! view-system assoc
|
||||
:last-update 0
|
||||
:refresh-watcher refresh-watcher
|
||||
:stop-refresh-watcher? false
|
||||
:workers worker-threads
|
||||
:stop-workers? false)
|
||||
(.start refresh-watcher)
|
||||
(doseq [^Thread t worker-threads]
|
||||
(.start t))
|
||||
view-system)))
|
||||
(defn update-watcher!
|
||||
"A single threaded view update mechanism."
|
||||
[view-system min-refresh-interval threads]
|
||||
(swap! view-system assoc :last-update 0)
|
||||
(.start (Thread. (fn [] (let [last-update (:last-update @view-system)]
|
||||
(try
|
||||
(if (can-refresh? last-update min-refresh-interval)
|
||||
(refresh-views! view-system)
|
||||
(wait last-update min-refresh-interval))
|
||||
(catch Exception e (error "exception in views e:" e "msg:"(.getMessage e))))
|
||||
(recur)))))
|
||||
(dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system)))))
|
||||
|
||||
(defn stop-update-watcher!
|
||||
"Stops threads for the views refresh watcher and worker threads."
|
||||
[^Atom view-system & [dont-wait-for-threads?]]
|
||||
(trace "stopping refresh watcher and workers")
|
||||
(let [worker-threads (:workers @view-system)
|
||||
watcher-thread (:refresh-watcher @view-system)
|
||||
threads (->> worker-threads
|
||||
(cons watcher-thread)
|
||||
(remove nil?))]
|
||||
(swap! view-system assoc
|
||||
:stop-refresh-watcher? true
|
||||
:stop-workers? true)
|
||||
(doseq [^Thread t threads]
|
||||
(.interrupt t))
|
||||
(if-not dont-wait-for-threads?
|
||||
(doseq [^Thread t threads]
|
||||
(.join t)))
|
||||
(swap! view-system assoc
|
||||
:refresh-watcher nil
|
||||
:workers nil))
|
||||
view-system)
|
||||
|
||||
(defn- logger-thread
|
||||
[^Atom view-system msecs]
|
||||
(let [secs (/ msecs 1000)]
|
||||
(fn []
|
||||
(try
|
||||
(Thread/sleep msecs)
|
||||
(let [stats (:statistics @view-system)]
|
||||
(reset-stats! view-system)
|
||||
(info "subscribed views:" (active-view-count view-system)
|
||||
(format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs)))
|
||||
(format "dropped/sec: %.1f" (double (/ (:dropped stats) secs)))
|
||||
(format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs)))))
|
||||
(catch InterruptedException e))
|
||||
(if-not (get-in @view-system [:statistics :stop?])
|
||||
(recur)))))
|
||||
|
||||
(defn start-logger!
|
||||
"Starts a logger thread that will enable collection of view statistics
|
||||
which the logger will periodically write out to the log."
|
||||
[^Atom view-system log-interval]
|
||||
(trace "starting logger. logging at" log-interval "secs intervals")
|
||||
(if (get-in @view-system [:statistics :logger])
|
||||
(error "cannot start new logger thread until existing thread is stopped")
|
||||
(let [logger (Thread. ^Runnable (logger-thread view-system log-interval))]
|
||||
(swap! view-system update-in [:statistics] assoc
|
||||
:logger logger
|
||||
:stop? false)
|
||||
(reset-stats! view-system)
|
||||
(.start logger)))
|
||||
view-system)
|
||||
|
||||
(defn stop-logger!
|
||||
"Stops the logger thread."
|
||||
[^Atom view-system & [dont-wait-for-thread?]]
|
||||
(trace "stopping logger")
|
||||
(let [^Thread logger-thread (get-in @view-system [:statistics :logger])]
|
||||
(swap! view-system assoc-in [:statistics :stop?] true)
|
||||
(if logger-thread (.interrupt logger-thread))
|
||||
(if-not dont-wait-for-thread? (.join logger-thread))
|
||||
(swap! view-system assoc-in [:statistics :logger] nil))
|
||||
view-system)
|
||||
|
||||
(defn hint
|
||||
"Create a hint."
|
||||
[namespace hint type]
|
||||
{:namespace namespace :hint hint :type type})
|
||||
[namespace hint]
|
||||
{:namespace namespace :hint hint})
|
||||
|
||||
(defn queue-hints!
|
||||
"Queues up hints in the view system so that they will be picked up by the refresh
|
||||
watcher and dispatched to the workers resulting in view updates being sent out
|
||||
for the relevant views/subscribers."
|
||||
[^Atom view-system hints]
|
||||
(trace "queueing hints" hints)
|
||||
(swap! view-system update-in [:hints] (fnil into #{}) hints)
|
||||
view-system)
|
||||
|
||||
(defn put-hints!
|
||||
"Adds a collection of hints to the view system by using the view system
|
||||
configuration's :put-hints-fn."
|
||||
[^Atom view-system hints]
|
||||
((:put-hints-fn @view-system) view-system hints)
|
||||
view-system)
|
||||
|
||||
(defn- ->views-map
|
||||
[views]
|
||||
(map vector (map id views) views))
|
||||
(defn add-hint!
|
||||
"Add a hint to the system."
|
||||
[view-system hint]
|
||||
(swap! view-system update-in [:hints] (fnil conj #{}) hint))
|
||||
|
||||
(defn add-views!
|
||||
"Add a collection of views to the system."
|
||||
[^Atom view-system views]
|
||||
(swap! view-system update-in [:views] (fnil into {}) (->views-map views))
|
||||
view-system)
|
||||
[view-system views]
|
||||
(swap! view-system update-in [:views] (fnil into {}) (map vector (map id views) views)))
|
||||
|
||||
(def default-options
|
||||
"Default options used to initialize the views system via init!"
|
||||
{
|
||||
; *REQUIRED*
|
||||
; a function that is used to send view refresh data to subscribers.
|
||||
; this function must be set for normal operation of the views system.
|
||||
; (fn [subscriber-key [view-sig view-data]] ...)
|
||||
:send-fn nil
|
||||
(comment
|
||||
(defrecord SQLView [id query-fn]
|
||||
IView
|
||||
(id [_] id)
|
||||
(data [_ namespace parameters]
|
||||
(j/query (db/firm-connection namespace) (hsql/format (apply query-fn parameters))))
|
||||
(relevant? [_ namespace parameters hints]
|
||||
(let [tables (query-tables (apply query-fn parameters))]
|
||||
(boolean (some #(not-empty (intersection % talbes)) hints)))))
|
||||
|
||||
; *REQUIRED*
|
||||
; a function that adds hints to the view system. this function will be used
|
||||
; by other libraries that implement IView. this function must be set for
|
||||
; normal operation of the views system. the default function provided
|
||||
; will trigger relevant view refreshes immediately.
|
||||
; (fn [^Atom view-system hints] ... )
|
||||
:put-hints-fn (fn [^Atom view-system hints] (refresh-views! view-system hints))
|
||||
(def memory-system (atom {}))
|
||||
|
||||
; *REQUIRED*
|
||||
; the size of the queue used to hold view refresh requests for
|
||||
; the worker threads. for very heavy systems, this can be set
|
||||
; higher if you start to get warnings about dropped refresh requests
|
||||
:refresh-queue-size 1000
|
||||
(reset! memory-system {:a {:foo 1 :bar 200 :baz [1 2 3]}
|
||||
:b {:foo 2 :bar 300 :baz [2 3 4]}})
|
||||
|
||||
; *REQUIRED*
|
||||
; interval in milliseconds at which the refresh watcher thread will
|
||||
; check for any queued up hints and dispatch relevant view refresh
|
||||
; updates to the worker threads.
|
||||
:refresh-interval 1000
|
||||
(defrecord MemoryView [id ks]
|
||||
IView
|
||||
(id [_] id)
|
||||
(data [_ namespace parameters]
|
||||
(get-in @memory-system (-> [namespace] (into ks) (into parameters))))
|
||||
(relevant? [_ namespace parameters hints]
|
||||
(some #(and (= namespace (:namespace %)) (= ks (:hint %))) hints)))
|
||||
|
||||
; *REQUIRED*
|
||||
; the number of refresh worker threads that poll for view refresh
|
||||
; requests and dispatch updated view data to subscribers.
|
||||
:worker-threads 8
|
||||
(def view-system
|
||||
(atom
|
||||
{:views {:foo (MemoryView. :foo [:foo])
|
||||
:bar (MemoryView. :bar [:bar])
|
||||
:baz (MemoryView. :baz [:baz])}
|
||||
:send-fn (fn [subscriber-key data] (println "sending to:" subscriber-key "data:" data))}))
|
||||
|
||||
; a list of IView instances. these are the views that can be subscribed
|
||||
; to. views can also be added/replaced after system initialization through
|
||||
; the use of add-views!
|
||||
:views nil
|
||||
(subscribe! view-system :a :foo [] 1)
|
||||
(subscribe! view-system :b :foo [] 2)
|
||||
(subscribe! view-system :b :baz [] 2)
|
||||
|
||||
; a function that authorizes view subscriptions. should return true if the
|
||||
; subscription is authorized. if not set, no view subscriptions will require
|
||||
; any authorization.
|
||||
; (fn [view-sig subscriber-key context] ... )
|
||||
:auth-fn nil
|
||||
(subscribed-views @view-system)
|
||||
|
||||
; a function that is called when subscription authorization fails.
|
||||
; (fn [view-sig subscriber-key context] ... )
|
||||
:on-unauth-fn nil
|
||||
(doto view-system
|
||||
(add-hint! [:foo])
|
||||
(add-hint! [:baz]))
|
||||
|
||||
; a function that returns a namespace to use for view subscriptions
|
||||
; (fn [view-sig subscriber-key context] ... )
|
||||
:namespace-fn nil
|
||||
|
||||
; interval in milliseconds at which a logger will write view system
|
||||
; statistics to the log. if not set, the logger will be disabled.
|
||||
:stats-log-interval nil
|
||||
})
|
||||
(refresh-views! view-system)
|
||||
|
||||
(defn init!
|
||||
"Initializes the view system for use with the list of views provided.
|
||||
;; Example of function that updates and hints the view system.
|
||||
(defn massoc-in!
|
||||
[memory-system namespace ks v]
|
||||
(let [ms (swap! memory-system assoc-in (into [namespace] ks) v)]
|
||||
(add-hint! view-system ks)
|
||||
ms))
|
||||
|
||||
An existing atom that will be used to store the state of the views
|
||||
system can be provided, otherwise one will be created. Either way,
|
||||
the atom with the initialized view system is returned.
|
||||
(massoc-in! memory-system :a [:foo] 1)
|
||||
(massoc-in! memory-system :b [:baz] [2 4 3])
|
||||
|
||||
options is a map of options to configure the view system with. See
|
||||
views.core/default-options for a description of the available options
|
||||
and the defaults that will be used for any options not provided in
|
||||
the call to init!."
|
||||
([^Atom view-system options]
|
||||
(let [options (merge default-options options)]
|
||||
(trace "initializing views system using options:" options)
|
||||
(reset! view-system
|
||||
{:refresh-queue (ArrayBlockingQueue. (:refresh-queue-size options))
|
||||
:views (into {} (->views-map (:views options)))
|
||||
:send-fn (:send-fn options)
|
||||
:put-hints-fn (:put-hints-fn options)
|
||||
:auth-fn (:auth-fn options)
|
||||
:on-unauth-fn (:on-unauth-fn options)
|
||||
:namespace-fn (:namespace-fn options)
|
||||
; keeping a copy of the options used during init allows other libraries
|
||||
; that plugin/extend views functionality (e.g. IView implementations)
|
||||
; to make use of any options themselves
|
||||
:options options})
|
||||
(start-update-watcher! view-system (:refresh-interval options) (:worker-threads options))
|
||||
(when-let [stats-log-interval (:stats-log-interval options)]
|
||||
(swap! view-system assoc :logging? true)
|
||||
(start-logger! view-system stats-log-interval))
|
||||
view-system))
|
||||
([options]
|
||||
(init! (atom {}) options)))
|
||||
|
||||
(defn shutdown!
|
||||
"Shuts the view system down, terminating all worker threads and clearing
|
||||
all view subscriptions and data."
|
||||
[^Atom view-system & [dont-wait-for-threads?]]
|
||||
(trace "shutting down views sytem")
|
||||
(stop-update-watcher! view-system dont-wait-for-threads?)
|
||||
(if (:logging? @view-system)
|
||||
(stop-logger! view-system dont-wait-for-threads?))
|
||||
(reset! view-system {})
|
||||
view-system)
|
||||
(start-update-watcher! view-system 1000)
|
||||
|
||||
)
|
70
src/views/honeysql/util.clj
Normal file
70
src/views/honeysql/util.clj
Normal file
|
@ -0,0 +1,70 @@
|
|||
(ns views.honeysql.util
|
||||
(:require
|
||||
[honeysql.core :as hsql]
|
||||
[honeysql.helpers :as hh]
|
||||
[clojure.string :refer [split]]))
|
||||
;; The following is used for full refresh views where we can have CTEs and
|
||||
;; subselects in play.
|
||||
|
||||
(declare query-tables)
|
||||
|
||||
(defn cte-tables
|
||||
[query]
|
||||
(mapcat #(query-tables (second %)) (:with query)))
|
||||
|
||||
(defn isolate-tables
|
||||
"Isolates tables from table definitions in from and join clauses."
|
||||
[c]
|
||||
(if (keyword? c) [c] (let [v (first c)] (if (map? v) (query-tables v) [v]))))
|
||||
|
||||
(defn from-tables
|
||||
[query]
|
||||
(mapcat isolate-tables (:from query)))
|
||||
|
||||
(defn every-second
|
||||
[coll]
|
||||
(map first (partition 2 coll)))
|
||||
|
||||
(defn join-tables
|
||||
[query k]
|
||||
(mapcat isolate-tables (every-second (k query))))
|
||||
|
||||
(defn collect-maps
|
||||
[wc]
|
||||
(cond
|
||||
(coll? wc) (let [maps (filterv map? wc)
|
||||
colls (filter #(and (coll? %) (not (map? %))) wc)]
|
||||
(into maps (mapcat collect-maps colls)))
|
||||
(map? wc) [wc]
|
||||
:else []))
|
||||
|
||||
(defn where-tables
|
||||
"This search for subqueries in the where clause."
|
||||
[query]
|
||||
(mapcat query-tables (collect-maps (:where query))))
|
||||
|
||||
(defn insert-tables
|
||||
[query]
|
||||
(if-let [v (:insert-into query)] [v] []))
|
||||
|
||||
(defn update-tables
|
||||
[query]
|
||||
(if-let [v (:update query)] [v] []))
|
||||
|
||||
(defn delete-tables
|
||||
[query]
|
||||
(if-let [v (:delete-from query)] [v] []))
|
||||
|
||||
(defn query-tables
|
||||
"Return all the tables in an sql statement."
|
||||
[query]
|
||||
(set (concat
|
||||
(cte-tables query)
|
||||
(from-tables query)
|
||||
(join-tables query :join)
|
||||
(join-tables query :left-join)
|
||||
(join-tables query :right-join)
|
||||
(where-tables query)
|
||||
(insert-tables query)
|
||||
(update-tables query)
|
||||
(delete-tables query))))
|
|
@ -4,11 +4,11 @@
|
|||
(data [this namespace parameters]
|
||||
"Returns view data.")
|
||||
(relevant? [this namespace parameters hints]
|
||||
"Given hints of the form {:namespace x :hint y :type z}, the view must
|
||||
return true if the hint indicates that an instance of this view
|
||||
with supplied namespace and parameters might require updating.
|
||||
It is always safe to return true, but false should be returned only
|
||||
if you are sure this view does not need updating.")
|
||||
"Given hints of the form {:namespace x :hint y}, the view must
|
||||
return true if the hint indicates that an instance of this view
|
||||
with supplied namespace and parameters might require updating.
|
||||
It is always safe to return true, but false sure be returned only
|
||||
if you are sure this view does not need updating.")
|
||||
(id [this]
|
||||
"A unique identifer for a view."))
|
||||
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
(ns views.basic-system-init-tests
|
||||
(:use
|
||||
clojure.test
|
||||
views.test-helpers
|
||||
views.protocols
|
||||
views.core
|
||||
views.test-view-system)
|
||||
(:import
|
||||
(views.test_view_system MemoryView)
|
||||
(clojure.lang Atom)))
|
||||
|
||||
(use-fixtures :each reset-test-views-system)
|
||||
|
||||
|
||||
(defn dummy-send-fn [subscriber-key [view-sig view-data]])
|
||||
|
||||
(def test-options (merge default-options
|
||||
{:views views
|
||||
:send-fn dummy-send-fn}))
|
||||
|
||||
|
||||
;; tests
|
||||
|
||||
(deftest inits-with-correct-config-and-shutsdown-correctly
|
||||
(let [options test-options
|
||||
; 1. init views
|
||||
init-returned-atom (init! test-views-system test-options)]
|
||||
(is (instance? Atom init-returned-atom))
|
||||
(is (= init-returned-atom test-views-system))
|
||||
(is (seq @test-views-system))
|
||||
(is (= dummy-send-fn (:send-fn @test-views-system)))
|
||||
(is (and (contains-view? test-views-system :foo)
|
||||
(contains-view? test-views-system :bar)
|
||||
(contains-view? test-views-system :baz)))
|
||||
(is (not (:logging? @test-views-system)))
|
||||
(is (not (collecting-stats? test-views-system)))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(let [refresh-watcher (:refresh-watcher @test-views-system)
|
||||
workers (:workers @test-views-system)]
|
||||
(is (.isAlive ^Thread refresh-watcher))
|
||||
(is (= (:worker-threads options) (count workers)))
|
||||
(doseq [^Thread t workers]
|
||||
(is (.isAlive t)))
|
||||
; 2. shutdown views (and wait for all threads to also finish)
|
||||
(shutdown! test-views-system)
|
||||
(is (empty? @test-views-system))
|
||||
(is (not (.isAlive ^Thread refresh-watcher)))
|
||||
(doseq [^Thread t workers]
|
||||
(is (not (.isAlive t)))))))
|
||||
|
||||
(deftest init-without-existing-view-system-atom
|
||||
(let [options test-options]
|
||||
(let [init-created-atom (init! options)]
|
||||
(is (instance? Atom init-created-atom))
|
||||
(is (seq @init-created-atom))
|
||||
(is (= dummy-send-fn (:send-fn @init-created-atom)))
|
||||
(is (and (contains-view? init-created-atom :foo)
|
||||
(contains-view? init-created-atom :bar)
|
||||
(contains-view? init-created-atom :baz)))
|
||||
(shutdown! init-created-atom)
|
||||
(is (empty? @init-created-atom)))))
|
||||
|
||||
(deftest init-can-also-start-logger
|
||||
(let [options (-> test-options
|
||||
(assoc :stats-log-interval 10000))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
(is (seq (:statistics @test-views-system)))
|
||||
(is (:logging? @test-views-system))
|
||||
(is (collecting-stats? test-views-system))
|
||||
(let [logger-thread (get-in @test-views-system [:statistics :logger])]
|
||||
(is (.isAlive ^Thread logger-thread))
|
||||
; 2. shutdown views
|
||||
(shutdown! test-views-system)
|
||||
(is (nil? (get-in @test-views-system [:statistics :logger])))
|
||||
(is (not (.isAlive ^Thread logger-thread))))))
|
||||
|
||||
(deftest can-add-new-views-after-init
|
||||
(let [options test-options]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
(is (and (contains-view? test-views-system :foo)
|
||||
(contains-view? test-views-system :bar)
|
||||
(contains-view? test-views-system :baz)))
|
||||
; 2. add new views
|
||||
(add-views! test-views-system
|
||||
[(MemoryView. :one [:one])
|
||||
(MemoryView. :two [:two])])
|
||||
(is (and (contains-view? test-views-system :foo)
|
||||
(contains-view? test-views-system :bar)
|
||||
(contains-view? test-views-system :baz)
|
||||
(contains-view? test-views-system :one)
|
||||
(contains-view? test-views-system :two)))
|
||||
; 3. shutdown views
|
||||
(shutdown! test-views-system)))
|
||||
|
||||
(deftest can-replace-views-after-init
|
||||
(let [options test-options
|
||||
replacement-view (MemoryView. :foo [:new-foo])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
(is (and (contains-view? test-views-system :foo)
|
||||
(contains-view? test-views-system :bar)
|
||||
(contains-view? test-views-system :baz)))
|
||||
(is (not= replacement-view (get-in @test-views-system [:views :foo])))
|
||||
; 2. add view. has same id so should replace existing one
|
||||
(add-views! test-views-system [replacement-view])
|
||||
(is (and (contains-view? test-views-system :foo)
|
||||
(contains-view? test-views-system :bar)
|
||||
(contains-view? test-views-system :baz)))
|
||||
(is (= replacement-view (get-in @test-views-system [:views :foo])))
|
||||
; 3. shutdown views
|
||||
(shutdown! test-views-system)))
|
|
@ -1,275 +0,0 @@
|
|||
(ns views.hint-tests
|
||||
(:use
|
||||
clojure.test
|
||||
views.test-helpers
|
||||
views.protocols
|
||||
views.core
|
||||
views.test-view-system)
|
||||
(:import (clojure.lang Atom)))
|
||||
|
||||
|
||||
(def test-sent-data
|
||||
(atom []))
|
||||
|
||||
(defn test-send-fn [subscriber-key [view-sig view-data]]
|
||||
(swap! test-sent-data conj {:subscriber-key subscriber-key
|
||||
:view-sig view-sig
|
||||
:view-data view-data}))
|
||||
|
||||
(def test-options (merge default-options
|
||||
{:views views
|
||||
:send-fn test-send-fn}))
|
||||
|
||||
(defn clear-sent-data-fixture [f]
|
||||
(reset! test-sent-data [])
|
||||
(f))
|
||||
|
||||
(use-fixtures :each clear-sent-data-fixture reset-test-views-system reset-memory-db-fixture)
|
||||
|
||||
|
||||
|
||||
;; tests
|
||||
|
||||
(deftest refresh-views!-instantly-attempts-view-refresh-with-given-hints
|
||||
(let [options test-options
|
||||
hints-refreshed (atom [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; with a view subscription (any subscription will do)
|
||||
(with-redefs [subscribed-views (fn [_] #{(->view-sig :namespace :fake-view [])})
|
||||
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
|
||||
; 2. trigger refresh by calling refresh-views! with hints
|
||||
(refresh-views! test-views-system [(hint :namespace [:foo] :fake-type)])
|
||||
(is (contains-only? @hints-refreshed
|
||||
[(hint :namespace [:foo] :fake-type)]))
|
||||
(reset! hints-refreshed [])
|
||||
; 3. same thing again, but passing in multiple hints
|
||||
(refresh-views! test-views-system
|
||||
[(hint :namespace [:foo] :fake-type)
|
||||
(hint :namespace [:bar] :fake-type)])
|
||||
(is (contains-only? @hints-refreshed
|
||||
[(hint :namespace [:foo] :fake-type)
|
||||
(hint :namespace [:bar] :fake-type)]))
|
||||
(reset! hints-refreshed []))
|
||||
; now, without any view subscriptions
|
||||
(with-redefs [subscribed-views (fn [_] #{})
|
||||
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
|
||||
; 4. again trigger refresh by calling refresh-views! with hints
|
||||
(refresh-views! test-views-system [(hint :namespace [:foo] :fake-type)])
|
||||
(is (empty? @hints-refreshed))
|
||||
(reset! hints-refreshed []))))
|
||||
|
||||
(deftest refresh-watcher-runs-at-specified-interval-and-picks-up-queued-hints
|
||||
(let [options test-options
|
||||
hints-refreshed (atom [])]
|
||||
(with-redefs [subscribed-views (fn [_] #{(->view-sig :namespace :fake-view [])})
|
||||
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. queue a hint and wait until the next refresh interval
|
||||
(queue-hints! test-views-system [(hint :namespace [:foo] :fake-type)])
|
||||
(wait-for-refresh-interval options)
|
||||
(is (contains-only? @hints-refreshed
|
||||
[(hint :namespace [:foo] :fake-type)]))
|
||||
(reset! hints-refreshed [])
|
||||
; 3. queue multiple hints and wait again
|
||||
(queue-hints! test-views-system
|
||||
[(hint :namespace [:foo] :fake-type)
|
||||
(hint :namespace [:bar] :fake-type)])
|
||||
(wait-for-refresh-interval options)
|
||||
(is (contains-only? @hints-refreshed
|
||||
[(hint :namespace [:foo] :fake-type)
|
||||
(hint :namespace [:bar] :fake-type)]))
|
||||
(reset! hints-refreshed [])
|
||||
; 4. queue up no hints and wait
|
||||
(wait-for-refresh-interval options)
|
||||
(reset! hints-refreshed []))))
|
||||
|
||||
(deftest refresh-worker-thread-processes-relevant-hints
|
||||
(let [options test-options
|
||||
views-refreshed (atom [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
(with-redefs [subscribed-views (fn [_] #{(->view-sig :a :foo [])})
|
||||
do-view-refresh! (fn [_ view-sig] (swap! views-refreshed into [view-sig]))]
|
||||
; 2. trigger refresh by calling refresh-views! with relevant hint
|
||||
(refresh-views! test-views-system [(hint :a [:foo] memory-view-hint-type)])
|
||||
(wait-for-refresh-views)
|
||||
(is (contains-only? @views-refreshed [(->view-sig :a :foo [])]))
|
||||
(reset! views-refreshed [])
|
||||
; 3. same thing again, but passing in multiple hints (1 relevant, 1 not)
|
||||
(refresh-views! test-views-system [(hint :a [:foo] memory-view-hint-type)
|
||||
(hint :a [:bar] memory-view-hint-type)])
|
||||
(wait-for-refresh-views)
|
||||
(is (contains-only? @views-refreshed [(->view-sig :a :foo [])]))
|
||||
(reset! views-refreshed [])
|
||||
; 4. and lastly, passing in only irrelevant hints
|
||||
(refresh-views! test-views-system
|
||||
[(hint :b [:foo] memory-view-hint-type)
|
||||
(hint :a [:foo] :some-other-type)])
|
||||
(wait-for-refresh-views)
|
||||
(is (empty? @views-refreshed))
|
||||
(reset! views-refreshed []))))
|
||||
|
||||
; this test is really just testing that our helper function memory-db-assoc-in! works as we expect it to
|
||||
; (otherwise, it is entirely redundant given the above tests)
|
||||
(deftest test-memory-db-operation-triggers-proper-refresh-hints
|
||||
(let [options test-options
|
||||
hints-refreshed (atom [])
|
||||
views-refreshed (atom [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; first tests verifying that correct hints are being sent out (don't care if relevant or not yet)
|
||||
(with-redefs [subscribed-views (fn [_] #{(->view-sig :a :foo [])})
|
||||
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] 42)
|
||||
(memory-db-assoc-in! test-views-system :a [:bar] 3.14)
|
||||
(memory-db-assoc-in! test-views-system :b [:baz] [10 20 30])
|
||||
(wait-for-refresh-views)
|
||||
(is (contains-only? @hints-refreshed
|
||||
[(hint :a [:foo] memory-view-hint-type)
|
||||
(hint :a [:bar] memory-view-hint-type)
|
||||
(hint :b [:baz] memory-view-hint-type)]))
|
||||
(reset! views-refreshed []))
|
||||
; now we test that relevant views were recognized as relevant and forwarded on to be used to
|
||||
; trigger actual refreshes of view data
|
||||
(with-redefs [subscribed-views (fn [_] #{(->view-sig :a :foo [])})
|
||||
do-view-refresh! (fn [_ view-sig] (swap! views-refreshed into [view-sig]))]
|
||||
; 2. update memory database (in a location covered by the subscribed view)
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] 1337)
|
||||
(wait-for-refresh-interval options)
|
||||
(is (contains-only? @views-refreshed [(->view-sig :a :foo [])]))
|
||||
(reset! views-refreshed [])
|
||||
; 3. same thing again, but update a different location not covered by any subscription
|
||||
(memory-db-assoc-in! test-views-system :a [:bar] 1234.5678)
|
||||
(wait-for-refresh-interval options)
|
||||
(is (empty? @views-refreshed)))))
|
||||
|
||||
(deftest relevant-hints-cause-refreshed-data-to-be-sent-to-subscriber
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [original-view-data (get-view-data test-views-system view-sig)
|
||||
updated-view-data 21
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
|
||||
; 3. block until subscription finishes. we don't care about the initial view data refresh
|
||||
(while (not (realized? subscribe-result)))
|
||||
(reset! test-sent-data [])
|
||||
(is (= (hash original-view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
; 4. change some test data that is covered by the view subscription
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] updated-view-data)
|
||||
(wait-for-refresh-views)
|
||||
(is (= (hash updated-view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data updated-view-data}])))))
|
||||
|
||||
(deftest irrelevant-hints-dont-trigger-refreshes
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
|
||||
; 3. block until subscription finishes. we don't care about the initial view data refresh
|
||||
(while (not (realized? subscribe-result)))
|
||||
(reset! test-sent-data [])
|
||||
; 4. change some test data that is NOT covered by the view subscription
|
||||
(memory-db-assoc-in! test-views-system :b [:foo] 6)
|
||||
(memory-db-assoc-in! test-views-system :a [:bar] 7)
|
||||
(wait-for-refresh-views)
|
||||
(is (empty? @test-sent-data)))))
|
||||
|
||||
(deftest refreshes-not-sent-if-view-data-is-unchanged-since-last-refresh
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [updated-view-data 1111
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
|
||||
; 3. block until subscription finishes. we don't care about the initial view data refresh
|
||||
(while (not (realized? subscribe-result)))
|
||||
(reset! test-sent-data [])
|
||||
; 4. change some test data, will cause a refresh to be sent out
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] updated-view-data)
|
||||
(wait-for-refresh-views)
|
||||
(is (= (hash updated-view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data updated-view-data}]))
|
||||
(reset! test-sent-data [])
|
||||
; 5. manually trigger another refresh for the view
|
||||
(refresh-views! test-views-system [(hint :a [:foo] memory-view-hint-type)])
|
||||
(wait-for-refresh-views)
|
||||
(is (empty? @test-sent-data))
|
||||
; 6. also try "updating" the db with the same values
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] updated-view-data)
|
||||
(wait-for-refresh-views)
|
||||
(is (empty? @test-sent-data)))))
|
||||
|
||||
(deftest refresh-queue-drops-duplicate-hints
|
||||
(let [options (-> test-options
|
||||
; enable statistics collection
|
||||
(assoc :stats-log-interval 10000))
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. prematurely stop refresh worker threads so that we can more easily inspect the
|
||||
; internal refresh queue's entries. the refresh worker threads are what remove
|
||||
; hints from the refresh queue as they are added to it.
|
||||
(stop-refresh-worker-threads test-views-system)
|
||||
; 3. subscribe to a view
|
||||
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
|
||||
; 4. block until subscription finishes
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= 0 (get-in @test-views-system [:statistics :deduplicated])))
|
||||
; 5. add duplicate hints by changing the same set of data twice
|
||||
; (hints will stay in the queue forever because we stopped the worker threads)
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] 6)
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] 7)
|
||||
(wait-for-refresh-views)
|
||||
(is (= 1 (get-in @test-views-system [:statistics :deduplicated])))
|
||||
(is (= [view-sig]
|
||||
(vec (:refresh-queue @test-views-system)))))))
|
||||
|
||||
(deftest refresh-queue-drops-hints-when-full
|
||||
(let [options (-> test-options
|
||||
; enable statistics collection
|
||||
(assoc :stats-log-interval 10000
|
||||
:refresh-queue-size 1))
|
||||
subscriber-key 123
|
||||
view-sig-a (->view-sig :a :foo [])
|
||||
view-sig-b (->view-sig :b :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. prematurely stop refresh worker threads so that we can more easily inspect the
|
||||
; internal refresh queue's entries. the refresh worker threads are what remove
|
||||
; hints from the refresh queue as they are added to it.
|
||||
(stop-refresh-worker-threads test-views-system)
|
||||
; 3. subscribe to a view
|
||||
; note: log* redef is to suppress error log output which will normally happen whenever
|
||||
; another item is added to the refresh queue when it's already full
|
||||
(with-redefs [clojure.tools.logging/log* (fn [& _])]
|
||||
(let [subscribe-a (subscribe! test-views-system view-sig-a subscriber-key nil)
|
||||
subscribe-b (subscribe! test-views-system view-sig-b subscriber-key nil)]
|
||||
; 4. block until subscription finishes
|
||||
(while (or (not (realized? subscribe-a))
|
||||
(not (realized? subscribe-b))))
|
||||
(is (= 0 (get-in @test-views-system [:statistics :dropped])))
|
||||
; 5. change some data affecting the subscribed view, resulting in more then 1 hint
|
||||
; being added to the refresh queue
|
||||
(memory-db-assoc-in! test-views-system :a [:foo] 101010)
|
||||
(memory-db-assoc-in! test-views-system :b [:foo] 010101)
|
||||
(wait-for-refresh-views)
|
||||
(is (= 1 (get-in @test-views-system [:statistics :dropped])))
|
||||
(is (= [view-sig-a]
|
||||
(vec (:refresh-queue @test-views-system))))))))
|
|
@ -1,407 +0,0 @@
|
|||
(ns views.subscription-tests
|
||||
(:use
|
||||
clojure.test
|
||||
views.test-helpers
|
||||
views.protocols
|
||||
views.core
|
||||
views.test-view-system))
|
||||
|
||||
|
||||
(def test-sent-data
|
||||
(atom []))
|
||||
|
||||
(defn test-send-fn [subscriber-key [view-sig view-data]]
|
||||
(swap! test-sent-data conj {:subscriber-key subscriber-key
|
||||
:view-sig view-sig
|
||||
:view-data view-data}))
|
||||
|
||||
(def test-options (merge default-options
|
||||
{:views views
|
||||
:send-fn test-send-fn}))
|
||||
|
||||
(defn clear-sent-data-fixture [f]
|
||||
(reset! test-sent-data [])
|
||||
(f))
|
||||
|
||||
(use-fixtures :each clear-sent-data-fixture reset-test-views-system reset-memory-db-fixture)
|
||||
|
||||
|
||||
|
||||
;; tests
|
||||
|
||||
(deftest can-subscribe-to-a-view
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :namespace :foo [])
|
||||
context {:my-data "arbitrary application context data"}]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
(is (future? subscribe-result))
|
||||
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
|
||||
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
|
||||
; 3. block until subscription finishes (data retrieval + initial view refresh)
|
||||
; (in this particular unit test, there is really no point in waiting)
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system))))))
|
||||
|
||||
(deftest subscribing-results-in-initial-view-data-being-sent
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
context {:my-data "arbitrary application context data"}]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data (get-view-data test-views-system view-sig)
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
; 3. block until subscription finishes (data retrieval + initial view refresh)
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}])))))
|
||||
|
||||
(deftest can-unsubscribe-from-a-view
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
context {:my-data "arbitrary application context data"}]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data (get-view-data test-views-system view-sig)
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
|
||||
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
; 3. block until subscription finishes
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
; 4. unsubscribe
|
||||
(unsubscribe! test-views-system view-sig subscriber-key context)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest multiple-subscription-and-unsubscriptions
|
||||
(let [options test-options
|
||||
subscriber-key-a 123
|
||||
subscriber-key-b 456
|
||||
view-sig (->view-sig :a :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data (get-view-data test-views-system view-sig)
|
||||
subscribe-a (subscribe! test-views-system view-sig subscriber-key-a nil)
|
||||
subscribe-b (subscribe! test-views-system view-sig subscriber-key-b nil)]
|
||||
; 3. block until both subscriptions finish
|
||||
(while (or (not (realized? subscribe-a))
|
||||
(not (realized? subscribe-b))))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key-a subscriber-key-b] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key-a])))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key-b])))
|
||||
(is (= #{subscriber-key-a subscriber-key-b} (get-in @test-views-system [:subscribers view-sig])))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key-a
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}
|
||||
{:subscriber-key subscriber-key-b
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}]))
|
||||
; 4. have one of the subscribers unsubscribe
|
||||
(unsubscribe! test-views-system view-sig subscriber-key-a nil)
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key-b] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key-b])))
|
||||
(is (= #{subscriber-key-b} (get-in @test-views-system [:subscribers view-sig])))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
; 5. have the last subscriber also unsubscribe
|
||||
(unsubscribe! test-views-system view-sig subscriber-key-b nil)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest subscriptions-to-different-views
|
||||
(let [options test-options
|
||||
subscriber-key-a 123
|
||||
subscriber-key-b 456
|
||||
view-sig-a (->view-sig :a :foo [])
|
||||
view-sig-b (->view-sig :a :bar [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data-a (get-view-data test-views-system view-sig-a)
|
||||
view-data-b (get-view-data test-views-system view-sig-b)
|
||||
subscribe-a (subscribe! test-views-system view-sig-a subscriber-key-a nil)
|
||||
subscribe-b (subscribe! test-views-system view-sig-b subscriber-key-b nil)]
|
||||
; 3. block until both subscriptions finish
|
||||
(while (or (not (realized? subscribe-a))
|
||||
(not (realized? subscribe-b))))
|
||||
(is (= #{view-sig-a view-sig-b} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key-a subscriber-key-b] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig-a} (get-in @test-views-system [:subscribed subscriber-key-a])))
|
||||
(is (= #{view-sig-b} (get-in @test-views-system [:subscribed subscriber-key-b])))
|
||||
(is (= #{subscriber-key-a} (get-in @test-views-system [:subscribers view-sig-a])))
|
||||
(is (= #{subscriber-key-b} (get-in @test-views-system [:subscribers view-sig-b])))
|
||||
(is (= (hash view-data-a) (get-in @test-views-system [:hashes view-sig-a])))
|
||||
(is (= (hash view-data-b) (get-in @test-views-system [:hashes view-sig-b])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key-a
|
||||
:view-sig (dissoc view-sig-a :namespace)
|
||||
:view-data view-data-a}
|
||||
{:subscriber-key subscriber-key-b
|
||||
:view-sig (dissoc view-sig-b :namespace)
|
||||
:view-data view-data-b}]))
|
||||
; 4. have one of the subscribers unsubscribe
|
||||
(unsubscribe! test-views-system view-sig-a subscriber-key-a nil)
|
||||
(is (= #{view-sig-b} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key-b] (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (get-in @test-views-system [:subscribed subscriber-key-a])))
|
||||
(is (= #{view-sig-b} (get-in @test-views-system [:subscribed subscriber-key-b])))
|
||||
(is (= #{subscriber-key-b} (get-in @test-views-system [:subscribers view-sig-b])))
|
||||
(is (empty? (get-in @test-views-system [:subscribers view-sig-a])))
|
||||
(is (empty? (get-in @test-views-system [:hashes view-sig-a])))
|
||||
(is (= (hash view-data-b) (get-in @test-views-system [:hashes view-sig-b])))
|
||||
; 5. have the last subscriber also unsubscribe
|
||||
(unsubscribe! test-views-system view-sig-b subscriber-key-b nil)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest duplicate-subscriptions-do-not-cause-problems
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data (get-view-data test-views-system view-sig)
|
||||
first-subscribe (subscribe! test-views-system view-sig subscriber-key nil)
|
||||
second-subscribe (subscribe! test-views-system view-sig subscriber-key nil)]
|
||||
; 3. block until both subscriptions finish
|
||||
(while (or (not (realized? first-subscribe))
|
||||
(not (realized? second-subscribe))))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
|
||||
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}
|
||||
{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}]))
|
||||
; 4. unsubscribe. only need to do this once, since only one subscription
|
||||
; should exist in the view system
|
||||
(unsubscribe! test-views-system view-sig subscriber-key nil)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest subscribing-to-non-existant-view-raises-exception
|
||||
(let [options test-options
|
||||
subscriber-key 123
|
||||
view-sig (->view-sig :namespace :non-existant-view [])]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(is (thrown? Exception (subscribe! test-views-system view-sig subscriber-key nil)))))
|
||||
|
||||
(deftest subscribe-and-unsubscribe-use-namespace-fn-if-set-and-no-namespace-in-view-sig
|
||||
(let [subscriber-key 123
|
||||
view-sig (->view-sig :foo [])
|
||||
context "some arbitrary context data"
|
||||
namespace-fn (fn [view-sig* subscriber-key* context*]
|
||||
(is (= view-sig view-sig*))
|
||||
(is (= subscriber-key subscriber-key*))
|
||||
(is (= context context*))
|
||||
:b)
|
||||
options (-> test-options
|
||||
(assoc :namespace-fn namespace-fn))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [; with the above namespace-fn, subscribe will internally use this view sig
|
||||
; when setting up subscription info within view-system. application code
|
||||
; shouldn't need to worry about this, but we will in this unit test
|
||||
view-sig-with-ns (->view-sig :b :foo [])
|
||||
; such as right here, we need to use the actual namespace that was set in
|
||||
; view-system to pass in the same parameters that subscribe! will use for
|
||||
; the view during it's initial view data refresh
|
||||
view-data (get-view-data test-views-system view-sig-with-ns)
|
||||
; passing in view-sig *without* namespace
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
; 3. block until subscription finishes
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= #{view-sig-with-ns} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig-with-ns} (get-in @test-views-system [:subscribed subscriber-key])))
|
||||
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig-with-ns])))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig-with-ns])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}]))
|
||||
; 4. unsubscribe.
|
||||
; NOTE: we are passing in view-sig, not view-sig-with-ns. this is because
|
||||
; proper namespace-fn's should be consistent with what namespace they
|
||||
; return given the same inputs. ideal namespace-fn implementations will
|
||||
; also keep this in mind even if context could vary between subscribe!
|
||||
; and unsubscribe! calls.
|
||||
(unsubscribe! test-views-system view-sig subscriber-key context)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest subscribe-and-unsubscribe-do-not-use-namespace-fn-if-namespace-included-in-view-sig
|
||||
(let [subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
context "some arbitrary context data"
|
||||
namespace-fn (fn [view-sig* subscriber-key* context*]
|
||||
; if this function is used, it will mess up several assertions in this unit test
|
||||
:b)
|
||||
options (-> test-options
|
||||
(assoc :namespace-fn namespace-fn))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data (get-view-data test-views-system view-sig)
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
; 3. block until subscription finishes
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
|
||||
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}]))
|
||||
; 4. unsubscribe.
|
||||
(unsubscribe! test-views-system view-sig subscriber-key context)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest unauthorized-subscription-using-auth-fn
|
||||
(let [subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
context "some arbitrary context data"
|
||||
auth-fn (fn [view-sig* subscriber-key* context*]
|
||||
(is (= view-sig view-sig*))
|
||||
(is (= subscriber-key subscriber-key*))
|
||||
(is (= context context*))
|
||||
; false = unauthorized
|
||||
false)
|
||||
options (-> test-options
|
||||
(assoc :auth-fn auth-fn))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
(is (nil? subscribe-result))
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest unauthorized-subscription-using-auth-fn-calls-on-unauth-fn-when-set
|
||||
(let [subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
context "some arbitrary context data"
|
||||
auth-fn (fn [view-sig* subscriber-key* context*]
|
||||
(is (= view-sig view-sig*))
|
||||
(is (= subscriber-key subscriber-key*))
|
||||
(is (= context context*))
|
||||
; false = unauthorized
|
||||
false)
|
||||
on-unauth-called? (atom false)
|
||||
on-unauth-fn (fn [view-sig* subscriber-key* context*]
|
||||
(is (= view-sig view-sig*))
|
||||
(is (= subscriber-key subscriber-key*))
|
||||
(is (= context context*))
|
||||
(reset! on-unauth-called? true))
|
||||
options (-> test-options
|
||||
(assoc :auth-fn auth-fn
|
||||
:on-unauth-fn on-unauth-fn))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
(is (nil? subscribe-result))
|
||||
(is @on-unauth-called?)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system))))))
|
||||
|
||||
(deftest authorized-subscription-using-auth-fn
|
||||
(let [subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
context "some arbitrary context data"
|
||||
auth-fn (fn [view-sig* subscriber-key* context*]
|
||||
(is (= view-sig view-sig*))
|
||||
(is (= subscriber-key subscriber-key*))
|
||||
(is (= context context*))
|
||||
true)
|
||||
options (-> test-options
|
||||
(assoc :auth-fn auth-fn))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [view-data (get-view-data test-views-system view-sig)
|
||||
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
|
||||
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
|
||||
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
|
||||
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
|
||||
(is (contains-only? @test-sent-data
|
||||
[{:subscriber-key subscriber-key
|
||||
:view-sig (dissoc view-sig :namespace)
|
||||
:view-data view-data}])))))
|
||||
|
||||
(deftest unsubscribe-before-subscription-finishes-does-not-result-in-stuck-view
|
||||
(let [subscriber-key 123
|
||||
view-sig (->view-sig :a :foo [])
|
||||
options (-> test-options
|
||||
(assoc :views slow-views))]
|
||||
; 1. init views
|
||||
(init! test-views-system options)
|
||||
; 2. subscribe to a view
|
||||
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
|
||||
(is (= #{view-sig} (subscribed-views test-views-system)))
|
||||
(is (not (realized? subscribe-result)))
|
||||
; 3. unsubscribe before subscription finishes (still waiting on initial data
|
||||
; retrieval to finish)
|
||||
(unsubscribe! test-views-system view-sig subscriber-key nil)
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system)))
|
||||
(is (empty? @test-sent-data))
|
||||
; 4. wait for subscription to finish finally
|
||||
(while (not (realized? subscribe-result)))
|
||||
(is (empty? (keys (:subscribed @test-views-system))))
|
||||
(is (empty? (keys (:subscribers @test-views-system))))
|
||||
(is (empty? (subscribed-views test-views-system)))
|
||||
(is (empty? (:hashes @test-views-system)))
|
||||
(is (empty? @test-sent-data)))))
|
|
@ -1,55 +0,0 @@
|
|||
(ns views.test-helpers
|
||||
(:use
|
||||
clojure.test
|
||||
views.protocols
|
||||
views.core)
|
||||
(:import (clojure.lang Atom)))
|
||||
|
||||
(defn contains-view?
|
||||
[^Atom view-system view-id]
|
||||
(let [view (get (:views @view-system) view-id)]
|
||||
(and view
|
||||
(satisfies? IView view))))
|
||||
|
||||
; the purpose of this function is to compare collections when the order of the elements
|
||||
; is not important, but there could be duplicates (so a set is not being used). some of
|
||||
; the operations we test are asynchronous with multiple threads performing the same
|
||||
; operation simultaneously, so at times our tests that record these operations being done
|
||||
; could end up with collections of items that are out of order between test runs.
|
||||
; this is not a problem or bug in views, but just a consequence of the multithreaded
|
||||
; operation of the library.
|
||||
(defn contains-only?
|
||||
[coll elements]
|
||||
(and (= (count coll)
|
||||
(count elements))
|
||||
(every?
|
||||
#(boolean (some #{%} elements))
|
||||
coll)
|
||||
(every?
|
||||
#(boolean (some #{%} coll))
|
||||
elements)))
|
||||
|
||||
(defn get-view-data
|
||||
[^Atom view-system view-sig]
|
||||
(data (get-in @view-system [:views (:view-id view-sig)])
|
||||
(:namespace view-sig)
|
||||
(:parameters view-sig)))
|
||||
|
||||
; the 200 being used is just a number i pulled out of thin air that "felt good"
|
||||
|
||||
(defn wait-for-refresh-views []
|
||||
(Thread/sleep 200))
|
||||
|
||||
(defn wait-for-refresh-interval [options]
|
||||
(Thread/sleep (+ 200 (:refresh-interval options))))
|
||||
|
||||
; this is kind of a hack, but necessary for some tests where we want to inspect
|
||||
; the items being sent to the refresh queue without worker threads picking out
|
||||
; the added items almost instantly.
|
||||
(defn stop-refresh-worker-threads
|
||||
[^Atom view-system]
|
||||
(swap! view-system assoc :stop-workers? true)
|
||||
(doseq [^Thread t (:workers @view-system)]
|
||||
(.interrupt t)
|
||||
(.join t))
|
||||
(swap! view-system assoc :workers nil))
|
|
@ -1,71 +0,0 @@
|
|||
(ns views.test-view-system
|
||||
(:use
|
||||
views.protocols
|
||||
views.core)
|
||||
(:import (clojure.lang Atom)))
|
||||
|
||||
(def base-memory-db-contents
|
||||
{:a {:foo 1 :bar 200 :baz [1 2 3]}
|
||||
:b {:foo 2 :bar 300 :baz [2 3 4]}})
|
||||
|
||||
(def memory-database
|
||||
(atom base-memory-db-contents))
|
||||
|
||||
(def test-views-system
|
||||
(atom {}))
|
||||
|
||||
(defn reset-memory-db-fixture [f]
|
||||
(reset! memory-database base-memory-db-contents)
|
||||
(f))
|
||||
|
||||
(defn reset-test-views-system [f]
|
||||
(reset! test-views-system {})
|
||||
(f)
|
||||
(if (seq @test-views-system)
|
||||
(shutdown! test-views-system)))
|
||||
|
||||
(def memory-view-hint-type :memory-db)
|
||||
|
||||
(defrecord MemoryView [id ks]
|
||||
IView
|
||||
(id [_] id)
|
||||
(data [_ namespace parameters]
|
||||
(get-in @memory-database (-> [namespace]
|
||||
(into ks)
|
||||
(into parameters))))
|
||||
(relevant? [_ namespace parameters hints]
|
||||
(some #(and (= namespace (:namespace %))
|
||||
(= ks (:hint %))
|
||||
(= memory-view-hint-type (:type %)))
|
||||
hints)))
|
||||
|
||||
(defrecord SlowMemoryView [id ks]
|
||||
IView
|
||||
(id [_] id)
|
||||
(data [_ namespace parameters]
|
||||
; simulate a slow database query
|
||||
(Thread/sleep 1000)
|
||||
(get-in @memory-database (-> [namespace]
|
||||
(into ks)
|
||||
(into parameters))))
|
||||
(relevant? [_ namespace parameters hints]
|
||||
(some #(and (= namespace (:namespace %))
|
||||
(= ks (:hint %))
|
||||
(= memory-view-hint-type (:type %)))
|
||||
hints)))
|
||||
|
||||
(def views
|
||||
[(MemoryView. :foo [:foo])
|
||||
(MemoryView. :bar [:bar])
|
||||
(MemoryView. :baz [:baz])])
|
||||
|
||||
(def slow-views
|
||||
[(SlowMemoryView. :foo [:foo])
|
||||
(SlowMemoryView. :bar [:bar])
|
||||
(SlowMemoryView. :baz [:baz])])
|
||||
|
||||
(defn memory-db-assoc-in!
|
||||
[^Atom view-system namespace ks v]
|
||||
(let [ms (swap! memory-database assoc-in (into [namespace] ks) v)]
|
||||
(put-hints! view-system [(hint namespace ks memory-view-hint-type)])
|
||||
ms))
|
Loading…
Reference in a new issue