Compare commits

..

No commits in common. "master" and "v2" have entirely different histories.
master ... v2

12 changed files with 451 additions and 2140 deletions

16
.gitignore vendored
View file

@ -1,18 +1,12 @@
.DS_Store
/target
/classes
/checkouts
/out
pom.xml
pom.xml.asc
*.jar
*.class
.lein-*
.nrepl-port
/*.project
/*.classpath
/.settings/
*.iml
*.ipr
*.iws
.idea
/.lein-*
/.nrepl-port
*~
*.bk
.idea

227
LICENSE
View file

@ -1,21 +1,214 @@
The MIT License (MIT)
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC
LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM
CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
Copyright (c) 2015 Kira Inc.
1. DEFINITIONS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
"Contribution" means:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
a) in the case of the initial Contributor, the initial code and
documentation distributed under this Agreement, and
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
b) in the case of each subsequent Contributor:
i) changes to the Program, and
ii) additions to the Program;
where such changes and/or additions to the Program originate from and are
distributed by that particular Contributor. A Contribution 'originates' from
a Contributor if it was added to the Program by such Contributor itself or
anyone acting on such Contributor's behalf. Contributions do not include
additions to the Program which: (i) are separate modules of software
distributed in conjunction with the Program under their own license
agreement, and (ii) are not derivative works of the Program.
"Contributor" means any person or entity that distributes the Program.
"Licensed Patents" mean patent claims licensable by a Contributor which are
necessarily infringed by the use or sale of its Contribution alone or when
combined with the Program.
"Program" means the Contributions distributed in accordance with this
Agreement.
"Recipient" means anyone who receives the Program under this Agreement,
including all Contributors.
2. GRANT OF RIGHTS
a) Subject to the terms of this Agreement, each Contributor hereby grants
Recipient a non-exclusive, worldwide, royalty-free copyright license to
reproduce, prepare derivative works of, publicly display, publicly perform,
distribute and sublicense the Contribution of such Contributor, if any, and
such derivative works, in source code and object code form.
b) Subject to the terms of this Agreement, each Contributor hereby grants
Recipient a non-exclusive, worldwide, royalty-free patent license under
Licensed Patents to make, use, sell, offer to sell, import and otherwise
transfer the Contribution of such Contributor, if any, in source code and
object code form. This patent license shall apply to the combination of the
Contribution and the Program if, at the time the Contribution is added by the
Contributor, such addition of the Contribution causes such combination to be
covered by the Licensed Patents. The patent license shall not apply to any
other combinations which include the Contribution. No hardware per se is
licensed hereunder.
c) Recipient understands that although each Contributor grants the licenses
to its Contributions set forth herein, no assurances are provided by any
Contributor that the Program does not infringe the patent or other
intellectual property rights of any other entity. Each Contributor disclaims
any liability to Recipient for claims brought by any other entity based on
infringement of intellectual property rights or otherwise. As a condition to
exercising the rights and licenses granted hereunder, each Recipient hereby
assumes sole responsibility to secure any other intellectual property rights
needed, if any. For example, if a third party patent license is required to
allow Recipient to distribute the Program, it is Recipient's responsibility
to acquire that license before distributing the Program.
d) Each Contributor represents that to its knowledge it has sufficient
copyright rights in its Contribution, if any, to grant the copyright license
set forth in this Agreement.
3. REQUIREMENTS
A Contributor may choose to distribute the Program in object code form under
its own license agreement, provided that:
a) it complies with the terms and conditions of this Agreement; and
b) its license agreement:
i) effectively disclaims on behalf of all Contributors all warranties and
conditions, express and implied, including warranties or conditions of title
and non-infringement, and implied warranties or conditions of merchantability
and fitness for a particular purpose;
ii) effectively excludes on behalf of all Contributors all liability for
damages, including direct, indirect, special, incidental and consequential
damages, such as lost profits;
iii) states that any provisions which differ from this Agreement are offered
by that Contributor alone and not by any other party; and
iv) states that source code for the Program is available from such
Contributor, and informs licensees how to obtain it in a reasonable manner on
or through a medium customarily used for software exchange.
When the Program is made available in source code form:
a) it must be made available under this Agreement; and
b) a copy of this Agreement must be included with each copy of the Program.
Contributors may not remove or alter any copyright notices contained within
the Program.
Each Contributor must identify itself as the originator of its Contribution,
if any, in a manner that reasonably allows subsequent Recipients to identify
the originator of the Contribution.
4. COMMERCIAL DISTRIBUTION
Commercial distributors of software may accept certain responsibilities with
respect to end users, business partners and the like. While this license is
intended to facilitate the commercial use of the Program, the Contributor who
includes the Program in a commercial product offering should do so in a
manner which does not create potential liability for other Contributors.
Therefore, if a Contributor includes the Program in a commercial product
offering, such Contributor ("Commercial Contributor") hereby agrees to defend
and indemnify every other Contributor ("Indemnified Contributor") against any
losses, damages and costs (collectively "Losses") arising from claims,
lawsuits and other legal actions brought by a third party against the
Indemnified Contributor to the extent caused by the acts or omissions of such
Commercial Contributor in connection with its distribution of the Program in
a commercial product offering. The obligations in this section do not apply
to any claims or Losses relating to any actual or alleged intellectual
property infringement. In order to qualify, an Indemnified Contributor must:
a) promptly notify the Commercial Contributor in writing of such claim, and
b) allow the Commercial Contributor tocontrol, and cooperate with the
Commercial Contributor in, the defense and any related settlement
negotiations. The Indemnified Contributor may participate in any such claim
at its own expense.
For example, a Contributor might include the Program in a commercial product
offering, Product X. That Contributor is then a Commercial Contributor. If
that Commercial Contributor then makes performance claims, or offers
warranties related to Product X, those performance claims and warranties are
such Commercial Contributor's responsibility alone. Under this section, the
Commercial Contributor would have to defend claims against the other
Contributors related to those performance claims and warranties, and if a
court requires any other Contributor to pay any damages as a result, the
Commercial Contributor must pay those damages.
5. NO WARRANTY
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON
AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR
CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A
PARTICULAR PURPOSE. Each Recipient is solely responsible for determining the
appropriateness of using and distributing the Program and assumes all risks
associated with its exercise of rights under this Agreement , including but
not limited to the risks and costs of program errors, compliance with
applicable laws, damage to or loss of data, programs or equipment, and
unavailability or interruption of operations.
6. DISCLAIMER OF LIABILITY
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY
CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION
LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY
OF SUCH DAMAGES.
7. GENERAL
If any provision of this Agreement is invalid or unenforceable under
applicable law, it shall not affect the validity or enforceability of the
remainder of the terms of this Agreement, and without further action by the
parties hereto, such provision shall be reformed to the minimum extent
necessary to make such provision valid and enforceable.
If Recipient institutes patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Program itself
(excluding combinations of the Program with other software or hardware)
infringes such Recipient's patent(s), then such Recipient's rights granted
under Section 2(b) shall terminate as of the date such litigation is filed.
All Recipient's rights under this Agreement shall terminate if it fails to
comply with any of the material terms or conditions of this Agreement and
does not cure such failure in a reasonable period of time after becoming
aware of such noncompliance. If all Recipient's rights under this Agreement
terminate, Recipient agrees to cease use and distribution of the Program as
soon as reasonably practicable. However, Recipient's obligations under this
Agreement and any licenses granted by Recipient relating to the Program shall
continue and survive.
Everyone is permitted to copy and distribute copies of this Agreement, but in
order to avoid inconsistency the Agreement is copyrighted and may only be
modified in the following manner. The Agreement Steward reserves the right to
publish new versions (including revisions) of this Agreement from time to
time. No one other than the Agreement Steward has the right to modify this
Agreement. The Eclipse Foundation is the initial Agreement Steward. The
Eclipse Foundation may assign the responsibility to serve as the Agreement
Steward to a suitable separate entity. Each new version of the Agreement will
be given a distinguishing version number. The Program (including
Contributions) may always be distributed subject to the version of the
Agreement under which it was received. In addition, after a new version of
the Agreement is published, Contributor may elect to distribute the Program
(including its Contributions) under the new version. Except as expressly
stated in Sections 2(a) and 2(b) above, Recipient receives no rights or
licenses to the intellectual property of any Contributor under this
Agreement, whether expressly, by implication, estoppel or otherwise. All
rights in the Program not expressly granted under this Agreement are
reserved.
This Agreement is governed by the laws of the State of Washington and the
intellectual property laws of the United States of America. No party to this
Agreement will bring a legal action under this Agreement more than one year
after the cause of action arose. Each party waives its rights to a jury trial
in any resulting litigation.

734
README.md
View file

@ -2,738 +2,26 @@
Eventually consistent external materialized views.
Also see these plugin libraries which allow you to use a views system
in a number of really great ways in your applications:
*Notice: despite being 1.x.x, this is incomplete. Version 2.0.0 will be
the first release version.*
* [views.sql](https://github.com/gered/views.sql)
* [views.honeysql](https://github.com/gered/views.honeysql)
* [views.reagent](https://github.com/gered/views.reagent)
## Leiningen
[![](https://clojars.org/net.gered/views/latest-version.svg)](https://clojars.org/net.gered/views)
## This is a fork!
I'm keeping this as a separate fork for now because I've made some
breaking or otherwise significant changes from the original, some of
which are based simply on personal preferences. I simply haven't felt
it's quite right to submit a pull request because of some of these
types of changes. Perhaps in the future.
I definitely cannot take credit for the original idea behind this
library or the core implementation or design of it. Definitely keep an
eye on the [original repository][1] which is maintained by Kira Inc.
[1]: https://github.com/kirasystems/views
## Basic Concepts
The views library allows you to manage a **view system** which is a
collection of **views** and a list of **subscribers** to those views.
Subscribers will get sent **view refreshes** in realtime when the data
represented by the views they are subscribed to changes. Relevant
changes are found through the use of **hints** which are added to the
view system by anything that is actually changing the data at the
instant it is changed.
A **view** is similar in concept to a [materialized view][2], though in
practice it may not actually keep a copy of the underlying data
represented by the view and instead just keep a copy of a query or the
location of where the data can be retrieved from when it is needed
(e.g. when view refrehses need to be sent out).
[2]: https://en.wikipedia.org/wiki/Materialized_view
A view is represented by the protocol `views.protocols/IView`:
```clj
(defprotocol IView
(data [this namespace parameters])
(relevant? [this namespace parameters hints])
(id [this]))
```
`id` simply returns a unique identifier for this view. `data` returns a
copy of the underlying data represented by this view. `relevant?`
determines if a collection of hints are relevant to the view and is
called by the view system whenever new hints are received to determine
if view refreshes need to be sent out for this view.
A **hint** is a map of the form:
```clj
{:namespace ...
:type ...
:hint ...}
```
`:type` represents the type of view (e.g. `:sql-table-name`) and is
defined by the view implementation that this hint is intended for.
`:hint` is the actual hint information itself and it's contents will
differ depending on the type of view it is intended for. As an example,
for a SQL view it may be a list of database table names.
**Namespaces** can be used to isolate multiple sets of the same type of
data being represented by the views within the view system. As an
example, for SQL views a namespace could be used to represent the
database to connect to if your system is comprised of multiple similar
databases. A view is not specifically tied to a namespace, however the
hints processed by the view system are only relevant for the namespace
specified in the hint.
When a view's `relevant?` check is determining if any given hint is
relevant or not, it will compare all the properties of a hint,
including the namespace and type to ensure that view refreshes aren't
issued incorrectly or too frequently.
**Subscribers** can be registered within the view system. A
subscription can be created within the view system by specifying the
view to subscribe to, identified by it's view ID, and also a namespace
and any parameters that the view might take. These 3 properties go
together to form a **view signature** or **view sig**. A view sig is
represented by a map:
```clj
{:namespace ...
:view-id ...
:parameters ...}
```
Subscriptions are considered unique for a subscriber based on all 3 of
these properties combined. As such a subscriber can have multiple
concurrent subscriptions to the same view if the namespace and/or
parameters are different for all of them.
A subscriber is uniquely identified by it's **subscriber key**. Common
subscriber key values include user ID's, Session ID's, or other
identifiers like client ID's used in libraries like Sente for websocket
connections.
When hints are processed by the view system and found to be relevant
for any of the views (through the use of the `relevant?` check
mentioned earlier), **view refreshes** are sent out to all of the
subscribers of the view. Up-to-date data for the view is retrieved via
the view's `data` function and then sent out.
Whenever data is refreshed a hash is kept and is compared to on each
refresh to make sure that we don't send out another refresh if the data
is unchanged from the last refresh sent.
## Design
TODO
## Usage
To explain basic usage of the views library, we'll walk through an
example building up a simple system so you can see how it works
interactively.
To begin, we'll need to use functions from the `views.core` namespace.
```clj
(require '[views.core :as views])
```
### View System Initialization
We first need to create the view system. This will be kept in an atom
and will be passed around to the different views library functions also
*as an atom* as the views system needs to maintain it's own internal
state.
```clj
(def view-system (atom {}))
```
For a fully working view system, we need to also provide a function
that will be used to send view refreshes to subscribers. For now we'll
just print view refreshes out in the REPL, but in a real system you'd
probably want this to send them to a connected Websocket client, or out
over some kind of distributed messaging service, etc.
```clj
(defn send-fn
[subscriber-key [view-sig view-data]]
(println "view refresh" subscriber-key view-sig view-data))
```
Now we're ready to actually create the view syem. To do this we call
`init!` which takes a set of options. We provide our send function
above using the `:send-fn` option. For a description of all the options
available, see `views.core/default-options`.
```clj
(views/init! view-system {:send-fn send-fn})
```
At this point, the view system is ready.
Right now there are some background threads running, one of which is
the *refresh watcher* which handles incoming hints and checks them for
relevancy. When relevant hints are found, view refresh requests are
dispatched to one or more *refresh worker* threads which actually
perform the work of retrieving updated view data and sending it off to
subscribers.
But now we need to talk about setting up some views, as we have none in
our view system.
### Adding Views
For demonstration purposes, we'll set up views for an in-memory
datastore:
```clj
(def memory-datastore
(atom {:a {:foo 1
:bar 200
:baz [1 2 3]}
:b {:foo 2
:bar 300
:baz [2 3 4]}}))
```
To retrieve or modify data within this memory datastore, we'd likely
want to use a path made up of keywords, e.g. `[:a :bar]` would
correspond with the value `200`, and `[:b :baz 2]` with the value `4`
using the initial data defined above.
So, let's create a `MemoryView`:
```clj
(require '[views.protocols :refer [IView]])
(def memory-view-hint-type :ks-path)
(defrecord MemoryView [id ks]
IView
(id [_] id)
(data [_ namespace parameters]
(get-in @memory-datastore
(-> [namespace]
(into ks)
(into parameters))))
(relevant? [_ namespace parameters hints]
(some #(and (= namespace (:namespace %))
(= ks (:hint %))
(= memory-view-hint-type (:type %)))
hints)))
```
Nothing particularly special here, `data` simply returns a value from
`memory-datastore` using a path made by combining `namespace` with a
sequence of keywords `ks` and then finally adding `parameters` (which
is a collection of parameters) to the end of the path.
Note that with this method of referencing data within
`memory-datastore`, the keys `:a` and `:b` are being used as
namespaces.
`relevant?` simply compares all 3 values of each of the hints passed in
to make sure they all match. `memory-view-hint-type` is, as it's name
implies, a value that is used to identify hints as being those intended
for memory views and not for, e.g. SQL views (if we had a view system
with multiple different types of views in it). The function returns
true if at least one of the passed in hints was found to be relevant.
Now we can add some views to our view system:
```clj
(views/add-views!
view-system
[(MemoryView. :foo [:foo])
(MemoryView. :bar [:bar])
(MemoryView. :baz [:baz])])
```
We now have 3 views, `:foo`, `:bar` and `:baz` which each refer to data
under that same path. Note that these views do not define a namespace.
That is for subscribers to specify when they register a subscription.
As well, code that updates `memory-datastore` will create hints for the
view system as we'll soon see, and at that time it will include a
namespace in any created hints.
> Most applications will probably want to just pass in a list of views
> via `views.core/init!` through the `:views` option. However, there is
> nothing wrong with using `add-views!` like this if you prefer or if
> you simply need to change views on the fly.
>
> Keep in mind though that adding views via `add-views!` will replace
> existing views in the view system with the same ID. Take care when
> doing this if there is the possibility that there are existing
> subscribers to views that are being replaced!
### Subscribing to Views
As mentioned previously, view subscriptions are keyed by a **view
signature** or view sig, which we can create using a helper function if
we wish:
```clj
(views/->view-sig :a :foo [])
=> {:namespace :a, :view-id :foo, :parameters []}
```
We create a subscription by calling `views.core/subscribe!`. For this
demonstration, we'll simply make up a subscriber key. The last argument
is where we could pass in some application/user context data that would
be helpful to use when doing subscription authorization (which we'll
discuss later and just ignore for now). For now, we'll just pass in
`nil` context.
```clj
(views/subscribe!
view-system ; view system atom
(views/->view-sig :a :foo []) ; view sig of the view to subscribe to
123 ; subscriber key
nil) ; context
```
`subscribe!` returns a `future` which will be realized when the
subscription finishes. Whenever a new subscription is added, the
subscriber is sent an initial set of data for the view. This view
refresh is done in a separate thread via a `future`.
We can see that a view refresh was sent out as a result of this
subscription as our `send-fn` function from before was called and the
following output should have appeared
```
view refresh 123 {:view-id :foo, :parameters []} 1
```
right away after the call to `subscribe!`. The `1` at the end
corresponds to the data in `memory-datastore` under the path
`[:a :foo]`.
> Note that an initial view refresh is **always** sent out to the
> subscriber when a subscription is first created. This happens even
> if the view data has not changed since the last refresh for this view
> occurred, as obviously the new subscriber was not part of that
> refresh.
### Hints and View Refreshes
Adding hints to the view system triggers refreshes of views for which
they are relevant towards. Our application code that changes data which
these views are based on needs to have a way of adding views to the
view system.
#### Hints
As mentioned previously, a hint is simply a map that contains a
namespace, a type and some data that will differ based on the types of
views in the view system. There is a helper function to create this
map:
```clj
(views/hint :a [:foo] memory-view-hint-type)
=> {:namespace :a, :hint [:foo], :type :ks-path}
```
Generally speaking the `:type` value will be the same for all hints
which are intended for the same types of views. For example, all of our
`MemoryView` views expect the type to be `:ks-path`, because the
`:hint` values they expect to compare against are all keyword paths.
#### Adding Hints to the View System
There are two main ways to do this:
1. Queue hints which will be picked up by the refresh watcher thread on
a regular interval (set by the option `:refresh-interval`).
2. Immediately trigger a refresh for a list of hints.
Using option 2 all the time generally does result in much more
responsive feeling system from the user's perspective. But you should
also consider just how frequently your code could end up triggering
refreshes.
Queueing hints as in option 1 will help to guard against duplicate
hints triggering excessive view refreshes as duplicate hints added to
the queue are dropped. But queued hints are not processed until the
refresh watcher thread runs at the next `:refresh-interval`, so you
lose some responsiveness by going this route.
There are more factors to consider in addition to all of this though.
As hints are processed, they are internally turned into view refresh
requests and dispatched to the refresh worker threads by adding them to
an internal queue. This refresh queue also drops duplicate requests,
but only if there is a backlog of refresh requests waiting in the queue
(which would happen if some views are taking too long to refresh, e.g.
slow SQL queries, overloaded server/network, not enough worker threads,
etc). If the worker threads are able to process refresh requests very
quickly, then the internal queue will usually be empty or near-empty
and some or all duplicate refresh requests might make it through.
Also keep in mind that hashes of view data are computed and then
compared each time a view refresh is about to be sent out, and while
the underlying view data must still be retrieved to compute this hash
each time a refresh request is processed, a view refresh will not
actually be sent out to the subscribers if the data is found to be
unchanged since the last refresh.
Ultimately there isn't really a right or wrong answer as to which
method you choose. Generally speaking it will usually make the most
sense to default to option 2 for most actions that need to add hints to
the view system. This will generally result in a more responsive
system. But you'll want to continually evaluate whether some actions
should possibly be switched over to queue up hints instead.
#### Option 1: Queueing Hints
Use `queue-hints!` and pass in a collection of hints. They will be
added to the queue and the refresh watcher thread will process them on
the next refresh interval.
```clj
(views/queue-hints!
view-system
[(views/hint :a [:foo] memory-view-hint-type)])
```
#### Option 2: Immediately Trigger Refreshes From Hints
Use `refresh-views!` and pass in a collection of hints. They will be
processed immediately and refresh requests will be dispatched for all
views for which there were relevant hints (and subscribers) for.
```clj
(views/refresh-views!
view-system
[(views/hint :a [:foo] memory-view-hint-type)])
```
#### But Wait -- View Data Must Be Changed First!
If you were following along and tried the above examples out, you would
have noticed that our `send-fn` function was never called. As mentioned
previously, each time a view refresh is processed a hash is taken of
the data and compared against the previous refresh's hash. Only if the
data is found to have been changed is a refresh sent out.
We haven't changed any of the data in `memory-datastore` yet, so none
of the hints we add to the system will trigger a view refresh to be
sent. This is a good thing!
Normally in your application you'll want to add hints to the view
system at the same place you do some operation that changes data. So,
we can add a function to allow us to change the data in
`memory-datastore` and add an appropriate hint about what was changed
to the view system at the same time:
```clj
(defn memdb-assoc-in!
[vs namespace ks v]
(let [path (into [namespace] ks)
hints [(views/hint namespace ks memory-view-hint-type)]]
(swap! memory-datastore assoc-in path v)
(views/refresh-views! vs hints)))
```
And then we can use it to change data relevant to the view we're
subscribed to (`:foo`):
```clj
(memdb-assoc-in! view-system :a [:foo] 42)
```
As soon as you run this you should see that `send-fn` was called to
send out a view refresh:
```
view refresh 123 {:view-id :foo, :parameters []} 42
```
And of course, `memory-datastore` was updated correctly at the same
time:
```clj
@memory-datastore
=> {:a {:foo 42, :bar 200, :baz [1 2 3]}, :b {:foo 2, :bar 300, :baz [2 3 4]}}
```
As we would expect given the current subscriptions in our view system,
view refreshes will only be sent out if we change the data under
`[:a :foo]` as refreshes are only processed if there are subscribers
for a view.
### Unsubscribing
Unsubscribing a subscriber is done through `views.core/unsubscribe!`
and the arguments are the same:
```clj
(views/unsubscribe!
view-system ; view system atom
(views/->view-sig :a :foo []) ; view sig of the view to unsubscribe from
123 ; subscriber key
nil) ; context
```
Remember that subscriptions are keyed by view sig, so to unsubscribe
from a view, you must use the exact same namespace and parameters that
was used to subscribe to it in the first place.
If you need to unsubscribe from all of a subscriber's current
subscriptions, you can use `views.core/unsubscribe-all!` which
essentially completely removes a subscriber from the views system.
```clj
(views/unsubscribe-all! view-system 123) ; where '123' is the subscriber key
```
### Shutting Down the Views System
You can stop the views system by simply calling `views.core/shutdown!`
```clj
(views/shutdown! view-system)
```
This function will by default block until the refresh watcher and all
refresh worker threads have finished (they are sent interrupt signals
when `shutdown!` is called). If for some reason you do not wish to
block, you can pass an additional argument to `shutdown!`:
```clj
(views/shutdown! view-system true) ; don't block waiting for threads to terminate
```
## Subscription Authorization
By default, no subscriptions require authorization. If you wish for
some or all views to require some kind of authorization, you should
provide an `:auth-fn` option to `views.core/init!`.
This is a function of the form:
```clj
(fn [view-sig subscriber-key context]
; ...
)
```
It should return true if the subscription is authorized. `context` is
the exact value that was passed in as the context argument to
`subscribe!`. You might wish to pass in a Ring request map or a user
profile for example.
If subscription authorization fails, `subscribe!` returns `nil`.
You can also provide the `:on-unauth-fn` option to `views.core/init!`
and set it to a function that will be called in the event that
subscription authorization failed. This function takes the same
arguments as `:auth-fn`. The return value is not used.
Your application may or may not need this depending on how you have
things set up (the fact that `subscribe!` returns `nil` if unauthorized
may be enough for you). It is just provided as an extra convenience.
## Namespaces
As has been mentioned already, namespaces can be used to isolate
subscriptions to views and view refreshes. Typical use of namespaces
within a views system would be to set them to something that specifies
which database to retrieve view data from when you have multiple
databases all with an identical structure.
Namespace information is not included in the actual view refresh data
that gets sent to subscribers. It is just considered to be a
server-side concern.
Depending on your application, you may be perfectly ok with just
passing in the specific namespace needed when creating view
subscriptions. However, you can also specify a `:namespace-fn` option
in your call to `views.core/init!` and provide a function that will
return the namespace to use for all calls to `subscribe!` and
`unsubscribe!` that get passed a view sig which **does not** include a
namespace in it.
The `:namespace-fn` function should be of the form:
```clj
(fn [view-sig subscriber-key context]
; ...
)
```
`context` will be whatever was passed in as the context argument to
`subscribe!`/`unsubscribe!`.
It bears repeating that `:namespace-fn` will **not** be called even if
it was set if you use a view sig that includes a `:namespace` key.
For this reason the helper function `->view-sig` includes an extra
overload that does not set a namespace.
```clj
; a view sig that will result in namespace-fn being called (if one is set)
(views/->view-sig :foo [])
=> {:view-id :foo, :parameters []}
; a view sig that will always use :a as the namespace, even if a namespace-fn is set
(views/->view-sig :a :foo [])
=> {:namespace :a, :view-id :foo, :parameters []}
```
## View System Initialization Options
There are a number of options that can be provided to
`views.core/init!`. The only one that absolutely must be provided for a
working system is `:send-fn` while all the other default options will
generally suffice for a non-distributed relatively low-load
application.
The default options are defined in `views.core/default-options`.
#### `:send-fn`
A function that is used to send view refresh data to subscribers.
```clj
(fn [subscriber-key [view-sig view-data]]
; ...
)
```
#### `:views`
A list of `IView` instances. These are the views that can be
subscribed to. Views can also be added/replaced in the system after
initialization by calling `views.core/add-views!`.
#### `:put-hints-fn`
A function that typically will be used by the different views plugin
libraries providing view implementations (such as
[views.sql](https://github.com/gered/views.sql) or
[views.honeysql](https://github.com/gered/views.honeysql)) to add
hints to the view system.
This function is used as a common configurable way for these different
plugin libraries to add hints because the application can provide an
alternate implementation to e.g. send hints out over a
distributed messaging service and it will affect all views in the
system (which would not be possible if all or just some were hard-coded
to use `queue-hints!` or `refresh-views!`).
```clj
(fn [^Atom view-system hints]
; ...
)
```
The default implementation is:
```clj
(fn [^Atom view-system hints]
(refresh-views! view-system hints))
```
#### `:refresh-queue-size`
The size of the internal refresh request queue used to hold refresh
requests for the refresh worker threads. If you notice some refresh
requests being dropped, you may wish to increase this (after of course
seeing if you have some slow views that could be improved).
Default is `1000`.
#### `:refresh-interval`
An interval in milliseconds at which the refresh watcher thread will
check for queued up hints and dispatch relevant view refresh requests
to the refresh worker threads.
Default is `1000`.
#### `:worker-threads`
The number of refresh worker threads that continually poll for refresh
requests and handle sending view refreshes to subscribers.
Default is `8`.
#### `:auth-fn`
A function that authorizes view subscriptions. It should return true
if the subscription is authorized. If this function is not set, no view
subscriptions will require authorization.
```clj
(fn [view-sig subscriber-key context]
; ...
)
```
#### `:on-unauth-fn`
A function that is called when subscription authorization fails. The
return value of this function is not used.
```clj
(fn [view-sig subscriber-key context]
; ...
)
```
#### `:namespace-fn`
A function that is used during subscription and unsubscription **only**
if no namespace is specified in the view sig passed in. This function
should return the namespace to be used for the
subscription/unsubscription.
```clj
(fn [view-sig subscriber-key context]
; ...
)
```
#### `:stats-log-interval`
Interval in milliseconds at which a logger will output an INFO log
entry with some view system statistics (refreshes/sec,
dropped-refreshes/sec, duplicate-refreshes/sec). If not set, no
logging is done.
## Considerations for Distributed Systems
If you're looking to use a views system with an application that will
be running on multiple servers, all you really need to do to get the
views system working consistently across all the nodes is to make sure
that when new hints are to be added to the views system, they are sent
to all application nodes.
For example, you can set up a messaging service (such as RabbitMQ, etc)
and when you need to add hints to the views system, instead of calling
`queue-hints!` or `refresh-views!` with the new hints, you simply send
them to the messaging service.
Most of the views plugin libraries providing view implementations
(such as views.sql) will call `views.core/put-hints!` to add hints to
the system. `put-hints!` uses whatever the `:put-hints-fn` function was
set to in the options passed to `views.core/init!`. The default
`:put-hints-fn` implementation simply calls `refresh-views!`, but you
can easily provide an alternative function that sends the hints to a
messaging service.
Then your application nodes need to listen for hints being received
from the messaging service. You should then call `queue-hints!` or
`refresh-views!` with the hints received this way.
TODO
## Testing
TODO
## License
Copyright © 2015-2016 Kira Inc.
Copyright © 2015 DiligenceEngine
Original authors:
* Dave Della Costa (https://github.com/ddellacosta)
* Alexander Hudek (https://github.com/akhudek)
Authors Dave Della Costa (https://github.com/ddellacosta) and Alexander Hudek (https://github.com/akhudek)
Various updates and other changes in this fork by
Gered King (https://github.com/gered)
Distributed under the MIT License.
Distributed under the Eclipse Public License either version 1.0 or (at
your option) any later version.

View file

@ -1,29 +1,25 @@
(defproject net.gered/views "1.7.0-SNAPSHOT"
:description "A view to the past helps navigate the future."
:url "https://github.com/gered/views"
:license {:name "MIT License"
:url "http://opensource.org/licenses/MIT"}
(defproject views "1.0.0"
:description "A view to the past helps navigate the future."
:dependencies [[org.clojure/tools.logging "1.2.4"]]
:url "https://github.com/diligenceengine/views"
:profiles {:provided
{:dependencies [[org.clojure/clojure "1.10.3"]]}
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:test
{:dependencies [[pjstadig/humane-test-output "0.11.0"]]
:injections [(require 'pjstadig.humane-test-output)
(pjstadig.humane-test-output/activate!)]}}
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/tools.logging "0.2.6"]
[org.clojure/core.async "0.1.303.0-886421-alpha"]
[honeysql "0.4.3"]
[clj-logging-config "1.9.10"]
[zip-visit "1.0.2"]
[prismatic/plumbing "0.3.5"]
[pjstadig/humane-test-output "0.6.0"]]
:deploy-repositories [["releases" :clojars]
["snapshots" :clojars]]
:profiles {:test {:dependencies [[org.clojure/tools.nrepl "0.2.3"]
[environ "0.4.0"]
[org.clojure/data.generators "0.1.2"]]
:release-tasks [["vcs" "assert-committed"]
["change" "version" "leiningen.release/bump-version" "release"]
["vcs" "commit"]
["vcs" "tag" "v" "--no-sign"]
["deploy"]
["change" "version" "leiningen.release/bump-version"]
["vcs" "commit" "bump to next snapshot version for future development"]
["vcs" "push"]]
:injections [(require 'pjstadig.humane-test-output)
(pjstadig.humane-test-output/activate!)]}}
)
:plugins [[lein-environ "0.4.0"]])

View file

@ -1,509 +1,200 @@
(ns views.core
(:import
(java.util.concurrent ArrayBlockingQueue TimeUnit)
(clojure.lang Atom))
[java.util.concurrent ArrayBlockingQueue TimeUnit])
(:require
[views.protocols :refer [IView id data relevant?]]
[clojure.tools.logging :refer [info debug error trace]]))
[plumbing.core :refer [swap-pair!]]
[clojure.tools.logging :refer [debug error]]))
;; The view-system data structure has this shape:
;;
;; {
;; {:views {:id1 view1, id2 view2, ...}
;; :send-fn (fn [subscriber-key data] ...)
;;
;; :refresh-queue (ArrayBlockingQueue.)
;; :views {:id1 view1, id2 view2, ...}
;; :send-fn (fn [subscriber-key data] ...)
;; :put-hints-fn (fn [hints] ... )
;; :auth-fn (fn [view-sig subscriber-key context] ...)
;; :namespace-fn (fn [view-sig subscriber-key context] ...)
;;
;; :hashes {view-sig hash, ...}
;; :subscribed {subscriber-key #{view-sig, ...}}
;; :subscribers {view-sig #{subscriber-key, ...}}
;; :hints #{hint1 hint2 ...}
;; :hashes {view-sig hash, ...}
;; :subscribed {subscriber-key #{view-sig, ...}}
;; :subscribers {view-sig #{subscriber-key, ...}}
;; :hints #{hint1 hint2 ...}
;;
;; }
;;
;; Each hint has the form {:namespace x :hint y}
(def refresh-queue (ArrayBlockingQueue. 500))
(defn reset-stats!
"Resets statistics collected back to zero."
[^Atom view-system]
(swap! view-system update-in [:statistics] assoc
:refreshes 0
:dropped 0
:deduplicated 0)
view-system)
(defn collecting-stats?
"Whether view statem statistics collection and logging is enabled or not."
[^Atom view-system]
(boolean (get-in @view-system [:statistics :logger])))
(defn ->view-sig
([namespace view-id parameters]
{:namespace namespace
:view-id view-id
:parameters parameters})
([view-id parameters]
{:view-id view-id
:parameters parameters}))
(defn- send-view-data!
[view-system subscriber-key {:keys [namespace view-id parameters] :as view-sig} data]
(if-let [send-fn (:send-fn view-system)]
(send-fn subscriber-key [(dissoc view-sig :namespace) data])
(throw (new Exception "no send-fn function set in view-system"))))
(defn- authorized-subscription?
[view-system view-sig subscriber-key context]
(if-let [auth-fn (:auth-fn view-system)]
(auth-fn view-sig subscriber-key context)
; assume that if no auth-fn is specified, that we are not doing auth checks at all
; so do not disallow access to any subscription
true))
(defn- on-unauthorized-subscription
[view-system view-sig subscriber-key context]
(if-let [on-unauth-fn (:on-unauth-fn view-system)]
(on-unauth-fn view-sig subscriber-key context)))
(defn- get-namespace
[view-system view-sig subscriber-key context]
(if-let [namespace-fn (:namespace-fn view-system)]
(namespace-fn view-sig subscriber-key context)
(:namespace view-sig)))
(defn- subscribe-view!
[view-system view-sig subscriber-key]
(trace "subscribing to view" view-sig subscriber-key)
(defn subscribe-view!
[view-system view-sig subscriber-key data-hash]
(-> view-system
(update-in [:subscribed subscriber-key] (fnil conj #{}) view-sig)
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)))
(defn- update-hash!
[view-system view-sig data-hash]
(update-in view-system [:hashes view-sig] #(or % data-hash))) ;; see note #1 in NOTES.md
(update-in [:subscribers view-sig] (fnil conj #{}) subscriber-key)
(update-in [:hashes view-sig] #(or % data-hash)))) ;; see note #1
(defn subscribe!
"Creates a subscription to a view identified by view-sig for a subscriber
identified by subscriber-key. If the subscription is not authorized,
returns nil. Additional context info can be passed in, which will be
passed to the view-system's namespace-fn and auth-fn (if provided). If
the subscription is successful, the subscriber will be sent the initial
data for the view."
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
[view-system namespace view-id parameters subscriber-key]
(if-let [view (get-in @view-system [:views view-id])]
(let [namespace (if (contains? view-sig :namespace)
namespace
(get-namespace @view-system view-sig subscriber-key context))
view-sig (->view-sig namespace view-id parameters)]
(if (authorized-subscription? @view-system view-sig subscriber-key context)
(do
(swap! view-system subscribe-view! view-sig subscriber-key)
(future
(try
(let [vdata (data view namespace parameters)
data-hash (hash vdata)]
;; Check to make sure that we are still subscribed. It's possible that
;; an unsubscription event came in while computing the view.
(when (contains? (get-in @view-system [:subscribed subscriber-key]) view-sig)
(swap! view-system update-hash! view-sig data-hash)
(send-view-data! @view-system subscriber-key view-sig vdata)))
(catch Exception e
(error e "error subscribing to view" view-sig)))))
(do
(trace "subscription not authorized" view-sig subscriber-key context)
(on-unauthorized-subscription @view-system view-sig subscriber-key context)
nil)))
(throw (new Exception (str "Subscription for non-existant view: " view-id)))))
(future
(let [vdata (data view namespace parameters)]
(swap! view-system subscribe-view! [namespace view-id parameters] subscriber-key (hash vdata))
((get @view-system :send-fn) subscriber-key [[view-id parameters] vdata])))))
(defn- remove-from-subscribers
(defn remove-from-subscribers
[view-system view-sig subscriber-key]
(-> view-system
(update-in [:subscribers view-sig] disj subscriber-key)
; remove view-sig entry if no subscribers. helps prevent the subscribers
; map from e.g. endlessly filling up with all sorts of different
; view-sigs with crazy amounts of only-slightly-varying parameters
(update-in [:subscribers]
(fn [subscribers]
(if (empty? (get subscribers view-sig))
(dissoc subscribers view-sig)
subscribers)))))
(defn- remove-from-subscribed
[view-system view-sig subscriber-key]
(-> view-system
(update-in [:subscribed subscriber-key] disj view-sig)
; remove subscriber-key entry if no current subscriptions. this helps prevent
; the subscribed map from (for example) endlessly filling up with massive
; amounts of entries with no subscriptions. this could easily happen over time
; naturally for applications with long uptimes.
(update-in [:subscribed]
(fn [subscribed]
(if (empty? (get subscribed subscriber-key))
(dissoc subscribed subscriber-key)
subscribed)))))
(defn- clean-up-unneeded-hashes
[view-system view-sig]
; hashes for view-sigs which do not have any unsubscribers are no longer necessary
; to keep around (again, at risk of endlessly filling up with tons of hashes over time)
(if-not (get (:subscribers view-system) view-sig)
(update-in view-system [:hashes] dissoc view-sig)
view-system))
(update-in view-system [:subscribers view-sig] disj subscriber-key))
(defn unsubscribe!
"Removes a subscription to a view identified by view-sig for a subscriber
identified by subscriber-key. Additional context info can be passed in,
which will be passed to the view-system's namespace-fn (if provided)."
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig} subscriber-key context]
(trace "unsubscribing from view" view-sig subscriber-key)
[view-system namespace view-id parameters subscriber-key]
(swap! view-system
(fn [view-system]
(let [namespace (if (contains? view-sig :namespace)
namespace
(get-namespace view-system view-sig subscriber-key context))
view-sig (->view-sig namespace view-id parameters)]
(-> view-system
(remove-from-subscribed view-sig subscriber-key)
(remove-from-subscribers view-sig subscriber-key)
(clean-up-unneeded-hashes view-sig)))))
view-system)
(fn [vs]
(-> vs
(update-in [:subscribed subscriber-key] disj [namespace view-id parameters])
(remove-from-subscribers [namespace view-id parameters] subscriber-key)))))
(defn unsubscribe-all!
"Removes all of a subscriber's (identified by subscriber-key) current
view subscriptions."
[^Atom view-system subscriber-key]
(trace "unsubscribing from all views" subscriber-key)
"Remove all subscriptions by a given subscriber."
[view-system subscriber-key]
(swap! view-system
(fn [view-system]
(let [view-sigs (get-in view-system [:subscribed subscriber-key])
view-system* (update-in view-system [:subscribed] dissoc subscriber-key)]
(reduce
#(-> %1
(remove-from-subscribers %2 subscriber-key)
(clean-up-unneeded-hashes %2))
view-system*
view-sigs))))
view-system)
(fn [vs]
(let [view-sigs (get-in vs [:subscribed subscriber-key])
vs* (update-in vs [:subscribed] dissoc subscriber-key)]
(reduce #(remove-from-subscribers %1 %2 subscriber-key) vs* view-sigs)))))
(defn refresh-view!
"Schedules a view (identified by view-sig) to be refreshed by one of the worker threads
only if the provided collection of hints is relevant to that view."
[^Atom view-system hints {:keys [namespace view-id parameters] :as view-sig}]
"We refresh a view if it is relevant and its data hash has changed."
[view-system hints [namespace view-id parameters :as view-sig]]
(let [v (get-in @view-system [:views view-id])]
(if-let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
(try
(if (relevant? v namespace parameters hints)
(if-not (.contains refresh-queue view-sig)
(when-not (.offer refresh-queue view-sig)
(if (collecting-stats? view-system) (swap! view-system update-in [:statistics :dropped] inc))
(error "refresh-queue full, dropping refresh request for" view-sig))
(do
(if (collecting-stats? view-system) (swap! view-system update-in [:statistics :deduplicated] inc))
(trace "already queued for refresh" view-sig))))
(catch Exception e
(error e "error determining if view is relevant" view-sig))))
view-system))
(if (relevant? v namespace parameters hints)
(if-not (.contains ^ArrayBlockingQueue refresh-queue view-sig)
(when-not (.offer ^ArrayBlockingQueue refresh-queue view-sig)
(error "refresh-queue full, dropping refresh request for" view-sig))
(debug "already queued for refresh" view-sig)))))
(defn subscribed-views
"Returns a list of all views in the system that have subscribers."
[^Atom view-system]
(reduce into #{} (vals (:subscribed @view-system))))
[view-system]
(reduce into #{} (vals (:subscribed view-system))))
(defn active-view-count
"Returns a count of views with at least one subscriber."
[^Atom view-system]
(count (remove #(empty? (val %)) (:subscribers @view-system))))
(defn- pop-hints!
[^Atom view-system]
(let [p (swap-vals! view-system assoc :hints #{})]
(defn pop-hints!
"Return hints and clear hint set atomicly."
[view-system]
(let [p (swap-pair! view-system assoc :hints #{})]
(or (:hints (first p)) #{})))
(defn refresh-views!
"Given a collection of hints, check all views in the system to find any that need refreshing
and schedule refreshes for them. If no hints are provided, will use any that have been
queued up in the view-system."
([^Atom view-system hints]
(when (seq hints)
(trace "refresh hints:" hints)
(doseq [view-sig (subscribed-views view-system)]
(refresh-view! view-system hints view-sig)))
(swap! view-system assoc :last-update (System/currentTimeMillis))
view-system)
([^Atom view-system]
(refresh-views! view-system (pop-hints! view-system))))
"Given a collection of hints, find all dirty views."
[view-system]
(let [hints (pop-hints! view-system)]
(debug "refresh hints:" hints)
(mapv #(refresh-view! view-system hints %) (subscribed-views @view-system))
(swap! view-system assoc :last-update (System/currentTimeMillis))))
(defn- can-refresh?
(defn can-refresh?
[last-update min-refresh-interval]
(> (- (System/currentTimeMillis) last-update) min-refresh-interval))
(defn- wait
(defn wait
[last-update min-refresh-interval]
(Thread/sleep (max 0 (- min-refresh-interval (- (System/currentTimeMillis) last-update)))))
(defn do-view-refresh!
[^Atom view-system {:keys [namespace view-id parameters] :as view-sig}]
(if (collecting-stats? view-system) (swap! view-system update-in [:statistics :refreshes] inc))
(try
(let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [subscriber-key (get-in @view-system [:subscribers view-sig])]
(send-view-data! @view-system subscriber-key view-sig vdata))
(swap! view-system assoc-in [:hashes view-sig] hdata)))
(catch Exception e
(error e "error refreshing:" namespace view-id parameters))))
(defn- refresh-worker-thread
[^Atom view-system]
(let [^ArrayBlockingQueue refresh-queue (:refresh-queue @view-system)]
(fn []
(try
(when-let [view-sig (.poll refresh-queue 60 TimeUnit/SECONDS)]
(trace "worker running refresh for" view-sig)
(do-view-refresh! view-system view-sig))
(catch InterruptedException e))
(if-not (:stop-workers? @view-system)
(recur)
(trace "exiting worker thread")))))
(defn- refresh-watcher-thread
[^Atom view-system min-refresh-interval]
(defn worker-thread
"Handles refresh requests."
[view-system]
(fn []
(let [last-update (:last-update @view-system)]
(when-let [[namespace view-id parameters :as view-sig] (.poll ^ArrayBlockingQueue refresh-queue 60 TimeUnit/SECONDS)]
(try
(if (can-refresh? last-update min-refresh-interval)
(refresh-views! view-system)
(wait last-update min-refresh-interval))
(catch InterruptedException e)
(let [view (get-in @view-system [:views view-id])
vdata (data view namespace parameters)
hdata (hash vdata)]
(when-not (= hdata (get-in @view-system [:hashes view-sig]))
(doseq [s (get-in @view-system [:subscribers view-sig])]
((:send-fn @view-system) s [[view-id parameters] vdata]))
(swap! view-system assoc-in [:hashes view-sig] hdata)))
(catch Exception e
(error e "exception in views")))
(if-not (:stop-refresh-watcher? @view-system)
(recur)
(trace "exiting refresh watcher thread")))))
(error "error refreshing:" namespace view-id parameters
"e:" e "msg:" (.getMessage e)))))
(recur)))
(defn start-update-watcher!
"Starts threads for the views refresh watcher and worker threads that handle queued
hints and view refresh requests."
[^Atom view-system min-refresh-interval threads]
(trace "starting refresh watcher at" min-refresh-interval "ms interval and" threads "workers")
(if (and (:refresh-watcher @view-system)
(:workers @view-system))
(error "cannot start new watcher and worker threads until existing threads are stopped")
(let [refresh-watcher (Thread. ^Runnable (refresh-watcher-thread view-system min-refresh-interval))
worker-threads (mapv (fn [_] (Thread. ^Runnable (refresh-worker-thread view-system)))
(range threads))]
(swap! view-system assoc
:last-update 0
:refresh-watcher refresh-watcher
:stop-refresh-watcher? false
:workers worker-threads
:stop-workers? false)
(.start refresh-watcher)
(doseq [^Thread t worker-threads]
(.start t))
view-system)))
(defn update-watcher!
"A single threaded view update mechanism."
[view-system min-refresh-interval threads]
(swap! view-system assoc :last-update 0)
(.start (Thread. (fn [] (let [last-update (:last-update @view-system)]
(try
(if (can-refresh? last-update min-refresh-interval)
(refresh-views! view-system)
(wait last-update min-refresh-interval))
(catch Exception e (error "exception in views e:" e "msg:"(.getMessage e))))
(recur)))))
(dotimes [i threads] (.start (Thread. ^Runnable (worker-thread view-system)))))
(defn stop-update-watcher!
"Stops threads for the views refresh watcher and worker threads."
[^Atom view-system & [dont-wait-for-threads?]]
(trace "stopping refresh watcher and workers")
(let [worker-threads (:workers @view-system)
watcher-thread (:refresh-watcher @view-system)
threads (->> worker-threads
(cons watcher-thread)
(remove nil?))]
(swap! view-system assoc
:stop-refresh-watcher? true
:stop-workers? true)
(doseq [^Thread t threads]
(.interrupt t))
(if-not dont-wait-for-threads?
(doseq [^Thread t threads]
(.join t)))
(swap! view-system assoc
:refresh-watcher nil
:workers nil))
view-system)
(defn- logger-thread
[^Atom view-system msecs]
(let [secs (/ msecs 1000)]
(fn []
(try
(Thread/sleep msecs)
(let [stats (:statistics @view-system)]
(reset-stats! view-system)
(info "subscribed views:" (active-view-count view-system)
(format "refreshes/sec: %.1f" (double (/ (:refreshes stats) secs)))
(format "dropped/sec: %.1f" (double (/ (:dropped stats) secs)))
(format "deduped/sec: %.1f" (double (/ (:deduplicated stats) secs)))))
(catch InterruptedException e))
(if-not (get-in @view-system [:statistics :stop?])
(recur)))))
(defn start-logger!
"Starts a logger thread that will enable collection of view statistics
which the logger will periodically write out to the log."
[^Atom view-system log-interval]
(trace "starting logger. logging at" log-interval "secs intervals")
(if (get-in @view-system [:statistics :logger])
(error "cannot start new logger thread until existing thread is stopped")
(let [logger (Thread. ^Runnable (logger-thread view-system log-interval))]
(swap! view-system update-in [:statistics] assoc
:logger logger
:stop? false)
(reset-stats! view-system)
(.start logger)))
view-system)
(defn stop-logger!
"Stops the logger thread."
[^Atom view-system & [dont-wait-for-thread?]]
(trace "stopping logger")
(let [^Thread logger-thread (get-in @view-system [:statistics :logger])]
(swap! view-system assoc-in [:statistics :stop?] true)
(if logger-thread (.interrupt logger-thread))
(if-not dont-wait-for-thread? (.join logger-thread))
(swap! view-system assoc-in [:statistics :logger] nil))
view-system)
(defn hint
"Create a hint."
[namespace hint type]
{:namespace namespace :hint hint :type type})
[namespace hint]
{:namespace namespace :hint hint})
(defn queue-hints!
"Queues up hints in the view system so that they will be picked up by the refresh
watcher and dispatched to the workers resulting in view updates being sent out
for the relevant views/subscribers."
[^Atom view-system hints]
(trace "queueing hints" hints)
(swap! view-system update-in [:hints] (fnil into #{}) hints)
view-system)
(defn put-hints!
"Adds a collection of hints to the view system by using the view system
configuration's :put-hints-fn."
[^Atom view-system hints]
((:put-hints-fn @view-system) view-system hints)
view-system)
(defn- ->views-map
[views]
(map vector (map id views) views))
(defn add-hint!
"Add a hint to the system."
[view-system hint]
(swap! view-system update-in [:hints] (fnil conj #{}) hint))
(defn add-views!
"Add a collection of views to the system."
[^Atom view-system views]
(swap! view-system update-in [:views] (fnil into {}) (->views-map views))
view-system)
[view-system views]
(swap! view-system update-in [:views] (fnil into {}) (map vector (map id views) views)))
(def default-options
"Default options used to initialize the views system via init!"
{
; *REQUIRED*
; a function that is used to send view refresh data to subscribers.
; this function must be set for normal operation of the views system.
; (fn [subscriber-key [view-sig view-data]] ...)
:send-fn nil
(comment
(defrecord SQLView [id query-fn]
IView
(id [_] id)
(data [_ namespace parameters]
(j/query (db/firm-connection namespace) (hsql/format (apply query-fn parameters))))
(relevant? [_ namespace parameters hints]
(let [tables (query-tables (apply query-fn parameters))]
(boolean (some #(not-empty (intersection % talbes)) hints)))))
; *REQUIRED*
; a function that adds hints to the view system. this function will be used
; by other libraries that implement IView. this function must be set for
; normal operation of the views system. the default function provided
; will trigger relevant view refreshes immediately.
; (fn [^Atom view-system hints] ... )
:put-hints-fn (fn [^Atom view-system hints] (refresh-views! view-system hints))
(def memory-system (atom {}))
; *REQUIRED*
; the size of the queue used to hold view refresh requests for
; the worker threads. for very heavy systems, this can be set
; higher if you start to get warnings about dropped refresh requests
:refresh-queue-size 1000
(reset! memory-system {:a {:foo 1 :bar 200 :baz [1 2 3]}
:b {:foo 2 :bar 300 :baz [2 3 4]}})
; *REQUIRED*
; interval in milliseconds at which the refresh watcher thread will
; check for any queued up hints and dispatch relevant view refresh
; updates to the worker threads.
:refresh-interval 1000
(defrecord MemoryView [id ks]
IView
(id [_] id)
(data [_ namespace parameters]
(get-in @memory-system (-> [namespace] (into ks) (into parameters))))
(relevant? [_ namespace parameters hints]
(some #(and (= namespace (:namespace %)) (= ks (:hint %))) hints)))
; *REQUIRED*
; the number of refresh worker threads that poll for view refresh
; requests and dispatch updated view data to subscribers.
:worker-threads 8
(def view-system
(atom
{:views {:foo (MemoryView. :foo [:foo])
:bar (MemoryView. :bar [:bar])
:baz (MemoryView. :baz [:baz])}
:send-fn (fn [subscriber-key data] (println "sending to:" subscriber-key "data:" data))}))
; a list of IView instances. these are the views that can be subscribed
; to. views can also be added/replaced after system initialization through
; the use of add-views!
:views nil
(subscribe! view-system :a :foo [] 1)
(subscribe! view-system :b :foo [] 2)
(subscribe! view-system :b :baz [] 2)
; a function that authorizes view subscriptions. should return true if the
; subscription is authorized. if not set, no view subscriptions will require
; any authorization.
; (fn [view-sig subscriber-key context] ... )
:auth-fn nil
(subscribed-views @view-system)
; a function that is called when subscription authorization fails.
; (fn [view-sig subscriber-key context] ... )
:on-unauth-fn nil
(doto view-system
(add-hint! [:foo])
(add-hint! [:baz]))
; a function that returns a namespace to use for view subscriptions
; (fn [view-sig subscriber-key context] ... )
:namespace-fn nil
; interval in milliseconds at which a logger will write view system
; statistics to the log. if not set, the logger will be disabled.
:stats-log-interval nil
})
(refresh-views! view-system)
(defn init!
"Initializes the view system for use with the list of views provided.
;; Example of function that updates and hints the view system.
(defn massoc-in!
[memory-system namespace ks v]
(let [ms (swap! memory-system assoc-in (into [namespace] ks) v)]
(add-hint! view-system ks)
ms))
An existing atom that will be used to store the state of the views
system can be provided, otherwise one will be created. Either way,
the atom with the initialized view system is returned.
(massoc-in! memory-system :a [:foo] 1)
(massoc-in! memory-system :b [:baz] [2 4 3])
options is a map of options to configure the view system with. See
views.core/default-options for a description of the available options
and the defaults that will be used for any options not provided in
the call to init!."
([^Atom view-system options]
(let [options (merge default-options options)]
(trace "initializing views system using options:" options)
(reset! view-system
{:refresh-queue (ArrayBlockingQueue. (:refresh-queue-size options))
:views (into {} (->views-map (:views options)))
:send-fn (:send-fn options)
:put-hints-fn (:put-hints-fn options)
:auth-fn (:auth-fn options)
:on-unauth-fn (:on-unauth-fn options)
:namespace-fn (:namespace-fn options)
; keeping a copy of the options used during init allows other libraries
; that plugin/extend views functionality (e.g. IView implementations)
; to make use of any options themselves
:options options})
(start-update-watcher! view-system (:refresh-interval options) (:worker-threads options))
(when-let [stats-log-interval (:stats-log-interval options)]
(swap! view-system assoc :logging? true)
(start-logger! view-system stats-log-interval))
view-system))
([options]
(init! (atom {}) options)))
(defn shutdown!
"Shuts the view system down, terminating all worker threads and clearing
all view subscriptions and data."
[^Atom view-system & [dont-wait-for-threads?]]
(trace "shutting down views sytem")
(stop-update-watcher! view-system dont-wait-for-threads?)
(if (:logging? @view-system)
(stop-logger! view-system dont-wait-for-threads?))
(reset! view-system {})
view-system)
(start-update-watcher! view-system 1000)
)

View file

@ -0,0 +1,70 @@
(ns views.honeysql.util
(:require
[honeysql.core :as hsql]
[honeysql.helpers :as hh]
[clojure.string :refer [split]]))
;; The following is used for full refresh views where we can have CTEs and
;; subselects in play.
(declare query-tables)
(defn cte-tables
[query]
(mapcat #(query-tables (second %)) (:with query)))
(defn isolate-tables
"Isolates tables from table definitions in from and join clauses."
[c]
(if (keyword? c) [c] (let [v (first c)] (if (map? v) (query-tables v) [v]))))
(defn from-tables
[query]
(mapcat isolate-tables (:from query)))
(defn every-second
[coll]
(map first (partition 2 coll)))
(defn join-tables
[query k]
(mapcat isolate-tables (every-second (k query))))
(defn collect-maps
[wc]
(cond
(coll? wc) (let [maps (filterv map? wc)
colls (filter #(and (coll? %) (not (map? %))) wc)]
(into maps (mapcat collect-maps colls)))
(map? wc) [wc]
:else []))
(defn where-tables
"This search for subqueries in the where clause."
[query]
(mapcat query-tables (collect-maps (:where query))))
(defn insert-tables
[query]
(if-let [v (:insert-into query)] [v] []))
(defn update-tables
[query]
(if-let [v (:update query)] [v] []))
(defn delete-tables
[query]
(if-let [v (:delete-from query)] [v] []))
(defn query-tables
"Return all the tables in an sql statement."
[query]
(set (concat
(cte-tables query)
(from-tables query)
(join-tables query :join)
(join-tables query :left-join)
(join-tables query :right-join)
(where-tables query)
(insert-tables query)
(update-tables query)
(delete-tables query))))

View file

@ -4,11 +4,11 @@
(data [this namespace parameters]
"Returns view data.")
(relevant? [this namespace parameters hints]
"Given hints of the form {:namespace x :hint y :type z}, the view must
return true if the hint indicates that an instance of this view
with supplied namespace and parameters might require updating.
It is always safe to return true, but false should be returned only
if you are sure this view does not need updating.")
"Given hints of the form {:namespace x :hint y}, the view must
return true if the hint indicates that an instance of this view
with supplied namespace and parameters might require updating.
It is always safe to return true, but false sure be returned only
if you are sure this view does not need updating.")
(id [this]
"A unique identifer for a view."))

View file

@ -1,113 +0,0 @@
(ns views.basic-system-init-tests
(:use
clojure.test
views.test-helpers
views.protocols
views.core
views.test-view-system)
(:import
(views.test_view_system MemoryView)
(clojure.lang Atom)))
(use-fixtures :each reset-test-views-system)
(defn dummy-send-fn [subscriber-key [view-sig view-data]])
(def test-options (merge default-options
{:views views
:send-fn dummy-send-fn}))
;; tests
(deftest inits-with-correct-config-and-shutsdown-correctly
(let [options test-options
; 1. init views
init-returned-atom (init! test-views-system test-options)]
(is (instance? Atom init-returned-atom))
(is (= init-returned-atom test-views-system))
(is (seq @test-views-system))
(is (= dummy-send-fn (:send-fn @test-views-system)))
(is (and (contains-view? test-views-system :foo)
(contains-view? test-views-system :bar)
(contains-view? test-views-system :baz)))
(is (not (:logging? @test-views-system)))
(is (not (collecting-stats? test-views-system)))
(is (empty? (subscribed-views test-views-system)))
(let [refresh-watcher (:refresh-watcher @test-views-system)
workers (:workers @test-views-system)]
(is (.isAlive ^Thread refresh-watcher))
(is (= (:worker-threads options) (count workers)))
(doseq [^Thread t workers]
(is (.isAlive t)))
; 2. shutdown views (and wait for all threads to also finish)
(shutdown! test-views-system)
(is (empty? @test-views-system))
(is (not (.isAlive ^Thread refresh-watcher)))
(doseq [^Thread t workers]
(is (not (.isAlive t)))))))
(deftest init-without-existing-view-system-atom
(let [options test-options]
(let [init-created-atom (init! options)]
(is (instance? Atom init-created-atom))
(is (seq @init-created-atom))
(is (= dummy-send-fn (:send-fn @init-created-atom)))
(is (and (contains-view? init-created-atom :foo)
(contains-view? init-created-atom :bar)
(contains-view? init-created-atom :baz)))
(shutdown! init-created-atom)
(is (empty? @init-created-atom)))))
(deftest init-can-also-start-logger
(let [options (-> test-options
(assoc :stats-log-interval 10000))]
; 1. init views
(init! test-views-system options)
(is (seq (:statistics @test-views-system)))
(is (:logging? @test-views-system))
(is (collecting-stats? test-views-system))
(let [logger-thread (get-in @test-views-system [:statistics :logger])]
(is (.isAlive ^Thread logger-thread))
; 2. shutdown views
(shutdown! test-views-system)
(is (nil? (get-in @test-views-system [:statistics :logger])))
(is (not (.isAlive ^Thread logger-thread))))))
(deftest can-add-new-views-after-init
(let [options test-options]
; 1. init views
(init! test-views-system options)
(is (and (contains-view? test-views-system :foo)
(contains-view? test-views-system :bar)
(contains-view? test-views-system :baz)))
; 2. add new views
(add-views! test-views-system
[(MemoryView. :one [:one])
(MemoryView. :two [:two])])
(is (and (contains-view? test-views-system :foo)
(contains-view? test-views-system :bar)
(contains-view? test-views-system :baz)
(contains-view? test-views-system :one)
(contains-view? test-views-system :two)))
; 3. shutdown views
(shutdown! test-views-system)))
(deftest can-replace-views-after-init
(let [options test-options
replacement-view (MemoryView. :foo [:new-foo])]
; 1. init views
(init! test-views-system options)
(is (and (contains-view? test-views-system :foo)
(contains-view? test-views-system :bar)
(contains-view? test-views-system :baz)))
(is (not= replacement-view (get-in @test-views-system [:views :foo])))
; 2. add view. has same id so should replace existing one
(add-views! test-views-system [replacement-view])
(is (and (contains-view? test-views-system :foo)
(contains-view? test-views-system :bar)
(contains-view? test-views-system :baz)))
(is (= replacement-view (get-in @test-views-system [:views :foo])))
; 3. shutdown views
(shutdown! test-views-system)))

View file

@ -1,275 +0,0 @@
(ns views.hint-tests
(:use
clojure.test
views.test-helpers
views.protocols
views.core
views.test-view-system)
(:import (clojure.lang Atom)))
(def test-sent-data
(atom []))
(defn test-send-fn [subscriber-key [view-sig view-data]]
(swap! test-sent-data conj {:subscriber-key subscriber-key
:view-sig view-sig
:view-data view-data}))
(def test-options (merge default-options
{:views views
:send-fn test-send-fn}))
(defn clear-sent-data-fixture [f]
(reset! test-sent-data [])
(f))
(use-fixtures :each clear-sent-data-fixture reset-test-views-system reset-memory-db-fixture)
;; tests
(deftest refresh-views!-instantly-attempts-view-refresh-with-given-hints
(let [options test-options
hints-refreshed (atom [])]
; 1. init views
(init! test-views-system options)
; with a view subscription (any subscription will do)
(with-redefs [subscribed-views (fn [_] #{(->view-sig :namespace :fake-view [])})
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
; 2. trigger refresh by calling refresh-views! with hints
(refresh-views! test-views-system [(hint :namespace [:foo] :fake-type)])
(is (contains-only? @hints-refreshed
[(hint :namespace [:foo] :fake-type)]))
(reset! hints-refreshed [])
; 3. same thing again, but passing in multiple hints
(refresh-views! test-views-system
[(hint :namespace [:foo] :fake-type)
(hint :namespace [:bar] :fake-type)])
(is (contains-only? @hints-refreshed
[(hint :namespace [:foo] :fake-type)
(hint :namespace [:bar] :fake-type)]))
(reset! hints-refreshed []))
; now, without any view subscriptions
(with-redefs [subscribed-views (fn [_] #{})
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
; 4. again trigger refresh by calling refresh-views! with hints
(refresh-views! test-views-system [(hint :namespace [:foo] :fake-type)])
(is (empty? @hints-refreshed))
(reset! hints-refreshed []))))
(deftest refresh-watcher-runs-at-specified-interval-and-picks-up-queued-hints
(let [options test-options
hints-refreshed (atom [])]
(with-redefs [subscribed-views (fn [_] #{(->view-sig :namespace :fake-view [])})
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
; 1. init views
(init! test-views-system options)
; 2. queue a hint and wait until the next refresh interval
(queue-hints! test-views-system [(hint :namespace [:foo] :fake-type)])
(wait-for-refresh-interval options)
(is (contains-only? @hints-refreshed
[(hint :namespace [:foo] :fake-type)]))
(reset! hints-refreshed [])
; 3. queue multiple hints and wait again
(queue-hints! test-views-system
[(hint :namespace [:foo] :fake-type)
(hint :namespace [:bar] :fake-type)])
(wait-for-refresh-interval options)
(is (contains-only? @hints-refreshed
[(hint :namespace [:foo] :fake-type)
(hint :namespace [:bar] :fake-type)]))
(reset! hints-refreshed [])
; 4. queue up no hints and wait
(wait-for-refresh-interval options)
(reset! hints-refreshed []))))
(deftest refresh-worker-thread-processes-relevant-hints
(let [options test-options
views-refreshed (atom [])]
; 1. init views
(init! test-views-system options)
(with-redefs [subscribed-views (fn [_] #{(->view-sig :a :foo [])})
do-view-refresh! (fn [_ view-sig] (swap! views-refreshed into [view-sig]))]
; 2. trigger refresh by calling refresh-views! with relevant hint
(refresh-views! test-views-system [(hint :a [:foo] memory-view-hint-type)])
(wait-for-refresh-views)
(is (contains-only? @views-refreshed [(->view-sig :a :foo [])]))
(reset! views-refreshed [])
; 3. same thing again, but passing in multiple hints (1 relevant, 1 not)
(refresh-views! test-views-system [(hint :a [:foo] memory-view-hint-type)
(hint :a [:bar] memory-view-hint-type)])
(wait-for-refresh-views)
(is (contains-only? @views-refreshed [(->view-sig :a :foo [])]))
(reset! views-refreshed [])
; 4. and lastly, passing in only irrelevant hints
(refresh-views! test-views-system
[(hint :b [:foo] memory-view-hint-type)
(hint :a [:foo] :some-other-type)])
(wait-for-refresh-views)
(is (empty? @views-refreshed))
(reset! views-refreshed []))))
; this test is really just testing that our helper function memory-db-assoc-in! works as we expect it to
; (otherwise, it is entirely redundant given the above tests)
(deftest test-memory-db-operation-triggers-proper-refresh-hints
(let [options test-options
hints-refreshed (atom [])
views-refreshed (atom [])]
; 1. init views
(init! test-views-system options)
; first tests verifying that correct hints are being sent out (don't care if relevant or not yet)
(with-redefs [subscribed-views (fn [_] #{(->view-sig :a :foo [])})
refresh-view! (fn [_ hints _] (swap! hints-refreshed into hints))]
(memory-db-assoc-in! test-views-system :a [:foo] 42)
(memory-db-assoc-in! test-views-system :a [:bar] 3.14)
(memory-db-assoc-in! test-views-system :b [:baz] [10 20 30])
(wait-for-refresh-views)
(is (contains-only? @hints-refreshed
[(hint :a [:foo] memory-view-hint-type)
(hint :a [:bar] memory-view-hint-type)
(hint :b [:baz] memory-view-hint-type)]))
(reset! views-refreshed []))
; now we test that relevant views were recognized as relevant and forwarded on to be used to
; trigger actual refreshes of view data
(with-redefs [subscribed-views (fn [_] #{(->view-sig :a :foo [])})
do-view-refresh! (fn [_ view-sig] (swap! views-refreshed into [view-sig]))]
; 2. update memory database (in a location covered by the subscribed view)
(memory-db-assoc-in! test-views-system :a [:foo] 1337)
(wait-for-refresh-interval options)
(is (contains-only? @views-refreshed [(->view-sig :a :foo [])]))
(reset! views-refreshed [])
; 3. same thing again, but update a different location not covered by any subscription
(memory-db-assoc-in! test-views-system :a [:bar] 1234.5678)
(wait-for-refresh-interval options)
(is (empty? @views-refreshed)))))
(deftest relevant-hints-cause-refreshed-data-to-be-sent-to-subscriber
(let [options test-options
subscriber-key 123
view-sig (->view-sig :a :foo [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [original-view-data (get-view-data test-views-system view-sig)
updated-view-data 21
subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
; 3. block until subscription finishes. we don't care about the initial view data refresh
(while (not (realized? subscribe-result)))
(reset! test-sent-data [])
(is (= (hash original-view-data) (get-in @test-views-system [:hashes view-sig])))
; 4. change some test data that is covered by the view subscription
(memory-db-assoc-in! test-views-system :a [:foo] updated-view-data)
(wait-for-refresh-views)
(is (= (hash updated-view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data updated-view-data}])))))
(deftest irrelevant-hints-dont-trigger-refreshes
(let [options test-options
subscriber-key 123
view-sig (->view-sig :a :foo [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
; 3. block until subscription finishes. we don't care about the initial view data refresh
(while (not (realized? subscribe-result)))
(reset! test-sent-data [])
; 4. change some test data that is NOT covered by the view subscription
(memory-db-assoc-in! test-views-system :b [:foo] 6)
(memory-db-assoc-in! test-views-system :a [:bar] 7)
(wait-for-refresh-views)
(is (empty? @test-sent-data)))))
(deftest refreshes-not-sent-if-view-data-is-unchanged-since-last-refresh
(let [options test-options
subscriber-key 123
view-sig (->view-sig :a :foo [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [updated-view-data 1111
subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
; 3. block until subscription finishes. we don't care about the initial view data refresh
(while (not (realized? subscribe-result)))
(reset! test-sent-data [])
; 4. change some test data, will cause a refresh to be sent out
(memory-db-assoc-in! test-views-system :a [:foo] updated-view-data)
(wait-for-refresh-views)
(is (= (hash updated-view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data updated-view-data}]))
(reset! test-sent-data [])
; 5. manually trigger another refresh for the view
(refresh-views! test-views-system [(hint :a [:foo] memory-view-hint-type)])
(wait-for-refresh-views)
(is (empty? @test-sent-data))
; 6. also try "updating" the db with the same values
(memory-db-assoc-in! test-views-system :a [:foo] updated-view-data)
(wait-for-refresh-views)
(is (empty? @test-sent-data)))))
(deftest refresh-queue-drops-duplicate-hints
(let [options (-> test-options
; enable statistics collection
(assoc :stats-log-interval 10000))
subscriber-key 123
view-sig (->view-sig :a :foo [])]
; 1. init views
(init! test-views-system options)
; 2. prematurely stop refresh worker threads so that we can more easily inspect the
; internal refresh queue's entries. the refresh worker threads are what remove
; hints from the refresh queue as they are added to it.
(stop-refresh-worker-threads test-views-system)
; 3. subscribe to a view
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
; 4. block until subscription finishes
(while (not (realized? subscribe-result)))
(is (= 0 (get-in @test-views-system [:statistics :deduplicated])))
; 5. add duplicate hints by changing the same set of data twice
; (hints will stay in the queue forever because we stopped the worker threads)
(memory-db-assoc-in! test-views-system :a [:foo] 6)
(memory-db-assoc-in! test-views-system :a [:foo] 7)
(wait-for-refresh-views)
(is (= 1 (get-in @test-views-system [:statistics :deduplicated])))
(is (= [view-sig]
(vec (:refresh-queue @test-views-system)))))))
(deftest refresh-queue-drops-hints-when-full
(let [options (-> test-options
; enable statistics collection
(assoc :stats-log-interval 10000
:refresh-queue-size 1))
subscriber-key 123
view-sig-a (->view-sig :a :foo [])
view-sig-b (->view-sig :b :foo [])]
; 1. init views
(init! test-views-system options)
; 2. prematurely stop refresh worker threads so that we can more easily inspect the
; internal refresh queue's entries. the refresh worker threads are what remove
; hints from the refresh queue as they are added to it.
(stop-refresh-worker-threads test-views-system)
; 3. subscribe to a view
; note: log* redef is to suppress error log output which will normally happen whenever
; another item is added to the refresh queue when it's already full
(with-redefs [clojure.tools.logging/log* (fn [& _])]
(let [subscribe-a (subscribe! test-views-system view-sig-a subscriber-key nil)
subscribe-b (subscribe! test-views-system view-sig-b subscriber-key nil)]
; 4. block until subscription finishes
(while (or (not (realized? subscribe-a))
(not (realized? subscribe-b))))
(is (= 0 (get-in @test-views-system [:statistics :dropped])))
; 5. change some data affecting the subscribed view, resulting in more then 1 hint
; being added to the refresh queue
(memory-db-assoc-in! test-views-system :a [:foo] 101010)
(memory-db-assoc-in! test-views-system :b [:foo] 010101)
(wait-for-refresh-views)
(is (= 1 (get-in @test-views-system [:statistics :dropped])))
(is (= [view-sig-a]
(vec (:refresh-queue @test-views-system))))))))

View file

@ -1,407 +0,0 @@
(ns views.subscription-tests
(:use
clojure.test
views.test-helpers
views.protocols
views.core
views.test-view-system))
(def test-sent-data
(atom []))
(defn test-send-fn [subscriber-key [view-sig view-data]]
(swap! test-sent-data conj {:subscriber-key subscriber-key
:view-sig view-sig
:view-data view-data}))
(def test-options (merge default-options
{:views views
:send-fn test-send-fn}))
(defn clear-sent-data-fixture [f]
(reset! test-sent-data [])
(f))
(use-fixtures :each clear-sent-data-fixture reset-test-views-system reset-memory-db-fixture)
;; tests
(deftest can-subscribe-to-a-view
(let [options test-options
subscriber-key 123
view-sig (->view-sig :namespace :foo [])
context {:my-data "arbitrary application context data"}]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
(is (future? subscribe-result))
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
; 3. block until subscription finishes (data retrieval + initial view refresh)
; (in this particular unit test, there is really no point in waiting)
(while (not (realized? subscribe-result)))
(is (= #{view-sig} (subscribed-views test-views-system))))))
(deftest subscribing-results-in-initial-view-data-being-sent
(let [options test-options
subscriber-key 123
view-sig (->view-sig :a :foo [])
context {:my-data "arbitrary application context data"}]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data (get-view-data test-views-system view-sig)
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
; 3. block until subscription finishes (data retrieval + initial view refresh)
(while (not (realized? subscribe-result)))
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data view-data}])))))
(deftest can-unsubscribe-from-a-view
(let [options test-options
subscriber-key 123
view-sig (->view-sig :a :foo [])
context {:my-data "arbitrary application context data"}]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data (get-view-data test-views-system view-sig)
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
(is (= #{view-sig} (subscribed-views test-views-system)))
; 3. block until subscription finishes
(while (not (realized? subscribe-result)))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
; 4. unsubscribe
(unsubscribe! test-views-system view-sig subscriber-key context)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest multiple-subscription-and-unsubscriptions
(let [options test-options
subscriber-key-a 123
subscriber-key-b 456
view-sig (->view-sig :a :foo [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data (get-view-data test-views-system view-sig)
subscribe-a (subscribe! test-views-system view-sig subscriber-key-a nil)
subscribe-b (subscribe! test-views-system view-sig subscriber-key-b nil)]
; 3. block until both subscriptions finish
(while (or (not (realized? subscribe-a))
(not (realized? subscribe-b))))
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (= [subscriber-key-a subscriber-key-b] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key-a])))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key-b])))
(is (= #{subscriber-key-a subscriber-key-b} (get-in @test-views-system [:subscribers view-sig])))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key-a
:view-sig (dissoc view-sig :namespace)
:view-data view-data}
{:subscriber-key subscriber-key-b
:view-sig (dissoc view-sig :namespace)
:view-data view-data}]))
; 4. have one of the subscribers unsubscribe
(unsubscribe! test-views-system view-sig subscriber-key-a nil)
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (= [subscriber-key-b] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key-b])))
(is (= #{subscriber-key-b} (get-in @test-views-system [:subscribers view-sig])))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
; 5. have the last subscriber also unsubscribe
(unsubscribe! test-views-system view-sig subscriber-key-b nil)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest subscriptions-to-different-views
(let [options test-options
subscriber-key-a 123
subscriber-key-b 456
view-sig-a (->view-sig :a :foo [])
view-sig-b (->view-sig :a :bar [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data-a (get-view-data test-views-system view-sig-a)
view-data-b (get-view-data test-views-system view-sig-b)
subscribe-a (subscribe! test-views-system view-sig-a subscriber-key-a nil)
subscribe-b (subscribe! test-views-system view-sig-b subscriber-key-b nil)]
; 3. block until both subscriptions finish
(while (or (not (realized? subscribe-a))
(not (realized? subscribe-b))))
(is (= #{view-sig-a view-sig-b} (subscribed-views test-views-system)))
(is (= [subscriber-key-a subscriber-key-b] (keys (:subscribed @test-views-system))))
(is (= #{view-sig-a} (get-in @test-views-system [:subscribed subscriber-key-a])))
(is (= #{view-sig-b} (get-in @test-views-system [:subscribed subscriber-key-b])))
(is (= #{subscriber-key-a} (get-in @test-views-system [:subscribers view-sig-a])))
(is (= #{subscriber-key-b} (get-in @test-views-system [:subscribers view-sig-b])))
(is (= (hash view-data-a) (get-in @test-views-system [:hashes view-sig-a])))
(is (= (hash view-data-b) (get-in @test-views-system [:hashes view-sig-b])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key-a
:view-sig (dissoc view-sig-a :namespace)
:view-data view-data-a}
{:subscriber-key subscriber-key-b
:view-sig (dissoc view-sig-b :namespace)
:view-data view-data-b}]))
; 4. have one of the subscribers unsubscribe
(unsubscribe! test-views-system view-sig-a subscriber-key-a nil)
(is (= #{view-sig-b} (subscribed-views test-views-system)))
(is (= [subscriber-key-b] (keys (:subscribed @test-views-system))))
(is (empty? (get-in @test-views-system [:subscribed subscriber-key-a])))
(is (= #{view-sig-b} (get-in @test-views-system [:subscribed subscriber-key-b])))
(is (= #{subscriber-key-b} (get-in @test-views-system [:subscribers view-sig-b])))
(is (empty? (get-in @test-views-system [:subscribers view-sig-a])))
(is (empty? (get-in @test-views-system [:hashes view-sig-a])))
(is (= (hash view-data-b) (get-in @test-views-system [:hashes view-sig-b])))
; 5. have the last subscriber also unsubscribe
(unsubscribe! test-views-system view-sig-b subscriber-key-b nil)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest duplicate-subscriptions-do-not-cause-problems
(let [options test-options
subscriber-key 123
view-sig (->view-sig :a :foo [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data (get-view-data test-views-system view-sig)
first-subscribe (subscribe! test-views-system view-sig subscriber-key nil)
second-subscribe (subscribe! test-views-system view-sig subscriber-key nil)]
; 3. block until both subscriptions finish
(while (or (not (realized? first-subscribe))
(not (realized? second-subscribe))))
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data view-data}
{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data view-data}]))
; 4. unsubscribe. only need to do this once, since only one subscription
; should exist in the view system
(unsubscribe! test-views-system view-sig subscriber-key nil)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest subscribing-to-non-existant-view-raises-exception
(let [options test-options
subscriber-key 123
view-sig (->view-sig :namespace :non-existant-view [])]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(is (thrown? Exception (subscribe! test-views-system view-sig subscriber-key nil)))))
(deftest subscribe-and-unsubscribe-use-namespace-fn-if-set-and-no-namespace-in-view-sig
(let [subscriber-key 123
view-sig (->view-sig :foo [])
context "some arbitrary context data"
namespace-fn (fn [view-sig* subscriber-key* context*]
(is (= view-sig view-sig*))
(is (= subscriber-key subscriber-key*))
(is (= context context*))
:b)
options (-> test-options
(assoc :namespace-fn namespace-fn))]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [; with the above namespace-fn, subscribe will internally use this view sig
; when setting up subscription info within view-system. application code
; shouldn't need to worry about this, but we will in this unit test
view-sig-with-ns (->view-sig :b :foo [])
; such as right here, we need to use the actual namespace that was set in
; view-system to pass in the same parameters that subscribe! will use for
; the view during it's initial view data refresh
view-data (get-view-data test-views-system view-sig-with-ns)
; passing in view-sig *without* namespace
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
; 3. block until subscription finishes
(while (not (realized? subscribe-result)))
(is (= #{view-sig-with-ns} (subscribed-views test-views-system)))
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
(is (= #{view-sig-with-ns} (get-in @test-views-system [:subscribed subscriber-key])))
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig-with-ns])))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig-with-ns])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data view-data}]))
; 4. unsubscribe.
; NOTE: we are passing in view-sig, not view-sig-with-ns. this is because
; proper namespace-fn's should be consistent with what namespace they
; return given the same inputs. ideal namespace-fn implementations will
; also keep this in mind even if context could vary between subscribe!
; and unsubscribe! calls.
(unsubscribe! test-views-system view-sig subscriber-key context)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest subscribe-and-unsubscribe-do-not-use-namespace-fn-if-namespace-included-in-view-sig
(let [subscriber-key 123
view-sig (->view-sig :a :foo [])
context "some arbitrary context data"
namespace-fn (fn [view-sig* subscriber-key* context*]
; if this function is used, it will mess up several assertions in this unit test
:b)
options (-> test-options
(assoc :namespace-fn namespace-fn))]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data (get-view-data test-views-system view-sig)
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
; 3. block until subscription finishes
(while (not (realized? subscribe-result)))
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data view-data}]))
; 4. unsubscribe.
(unsubscribe! test-views-system view-sig subscriber-key context)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest unauthorized-subscription-using-auth-fn
(let [subscriber-key 123
view-sig (->view-sig :a :foo [])
context "some arbitrary context data"
auth-fn (fn [view-sig* subscriber-key* context*]
(is (= view-sig view-sig*))
(is (= subscriber-key subscriber-key*))
(is (= context context*))
; false = unauthorized
false)
options (-> test-options
(assoc :auth-fn auth-fn))]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
(is (nil? subscribe-result))
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest unauthorized-subscription-using-auth-fn-calls-on-unauth-fn-when-set
(let [subscriber-key 123
view-sig (->view-sig :a :foo [])
context "some arbitrary context data"
auth-fn (fn [view-sig* subscriber-key* context*]
(is (= view-sig view-sig*))
(is (= subscriber-key subscriber-key*))
(is (= context context*))
; false = unauthorized
false)
on-unauth-called? (atom false)
on-unauth-fn (fn [view-sig* subscriber-key* context*]
(is (= view-sig view-sig*))
(is (= subscriber-key subscriber-key*))
(is (= context context*))
(reset! on-unauth-called? true))
options (-> test-options
(assoc :auth-fn auth-fn
:on-unauth-fn on-unauth-fn))]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
(is (nil? subscribe-result))
(is @on-unauth-called?)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system))))))
(deftest authorized-subscription-using-auth-fn
(let [subscriber-key 123
view-sig (->view-sig :a :foo [])
context "some arbitrary context data"
auth-fn (fn [view-sig* subscriber-key* context*]
(is (= view-sig view-sig*))
(is (= subscriber-key subscriber-key*))
(is (= context context*))
true)
options (-> test-options
(assoc :auth-fn auth-fn))]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [view-data (get-view-data test-views-system view-sig)
subscribe-result (subscribe! test-views-system view-sig subscriber-key context)]
(while (not (realized? subscribe-result)))
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (= [subscriber-key] (keys (:subscribed @test-views-system))))
(is (= #{view-sig} (get-in @test-views-system [:subscribed subscriber-key])))
(is (= #{subscriber-key} (get-in @test-views-system [:subscribers view-sig])))
(is (= (hash view-data) (get-in @test-views-system [:hashes view-sig])))
(is (contains-only? @test-sent-data
[{:subscriber-key subscriber-key
:view-sig (dissoc view-sig :namespace)
:view-data view-data}])))))
(deftest unsubscribe-before-subscription-finishes-does-not-result-in-stuck-view
(let [subscriber-key 123
view-sig (->view-sig :a :foo [])
options (-> test-options
(assoc :views slow-views))]
; 1. init views
(init! test-views-system options)
; 2. subscribe to a view
(let [subscribe-result (subscribe! test-views-system view-sig subscriber-key nil)]
(is (= #{view-sig} (subscribed-views test-views-system)))
(is (not (realized? subscribe-result)))
; 3. unsubscribe before subscription finishes (still waiting on initial data
; retrieval to finish)
(unsubscribe! test-views-system view-sig subscriber-key nil)
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system)))
(is (empty? @test-sent-data))
; 4. wait for subscription to finish finally
(while (not (realized? subscribe-result)))
(is (empty? (keys (:subscribed @test-views-system))))
(is (empty? (keys (:subscribers @test-views-system))))
(is (empty? (subscribed-views test-views-system)))
(is (empty? (:hashes @test-views-system)))
(is (empty? @test-sent-data)))))

View file

@ -1,55 +0,0 @@
(ns views.test-helpers
(:use
clojure.test
views.protocols
views.core)
(:import (clojure.lang Atom)))
(defn contains-view?
[^Atom view-system view-id]
(let [view (get (:views @view-system) view-id)]
(and view
(satisfies? IView view))))
; the purpose of this function is to compare collections when the order of the elements
; is not important, but there could be duplicates (so a set is not being used). some of
; the operations we test are asynchronous with multiple threads performing the same
; operation simultaneously, so at times our tests that record these operations being done
; could end up with collections of items that are out of order between test runs.
; this is not a problem or bug in views, but just a consequence of the multithreaded
; operation of the library.
(defn contains-only?
[coll elements]
(and (= (count coll)
(count elements))
(every?
#(boolean (some #{%} elements))
coll)
(every?
#(boolean (some #{%} coll))
elements)))
(defn get-view-data
[^Atom view-system view-sig]
(data (get-in @view-system [:views (:view-id view-sig)])
(:namespace view-sig)
(:parameters view-sig)))
; the 200 being used is just a number i pulled out of thin air that "felt good"
(defn wait-for-refresh-views []
(Thread/sleep 200))
(defn wait-for-refresh-interval [options]
(Thread/sleep (+ 200 (:refresh-interval options))))
; this is kind of a hack, but necessary for some tests where we want to inspect
; the items being sent to the refresh queue without worker threads picking out
; the added items almost instantly.
(defn stop-refresh-worker-threads
[^Atom view-system]
(swap! view-system assoc :stop-workers? true)
(doseq [^Thread t (:workers @view-system)]
(.interrupt t)
(.join t))
(swap! view-system assoc :workers nil))

View file

@ -1,71 +0,0 @@
(ns views.test-view-system
(:use
views.protocols
views.core)
(:import (clojure.lang Atom)))
(def base-memory-db-contents
{:a {:foo 1 :bar 200 :baz [1 2 3]}
:b {:foo 2 :bar 300 :baz [2 3 4]}})
(def memory-database
(atom base-memory-db-contents))
(def test-views-system
(atom {}))
(defn reset-memory-db-fixture [f]
(reset! memory-database base-memory-db-contents)
(f))
(defn reset-test-views-system [f]
(reset! test-views-system {})
(f)
(if (seq @test-views-system)
(shutdown! test-views-system)))
(def memory-view-hint-type :memory-db)
(defrecord MemoryView [id ks]
IView
(id [_] id)
(data [_ namespace parameters]
(get-in @memory-database (-> [namespace]
(into ks)
(into parameters))))
(relevant? [_ namespace parameters hints]
(some #(and (= namespace (:namespace %))
(= ks (:hint %))
(= memory-view-hint-type (:type %)))
hints)))
(defrecord SlowMemoryView [id ks]
IView
(id [_] id)
(data [_ namespace parameters]
; simulate a slow database query
(Thread/sleep 1000)
(get-in @memory-database (-> [namespace]
(into ks)
(into parameters))))
(relevant? [_ namespace parameters hints]
(some #(and (= namespace (:namespace %))
(= ks (:hint %))
(= memory-view-hint-type (:type %)))
hints)))
(def views
[(MemoryView. :foo [:foo])
(MemoryView. :bar [:bar])
(MemoryView. :baz [:baz])])
(def slow-views
[(SlowMemoryView. :foo [:foo])
(SlowMemoryView. :bar [:bar])
(SlowMemoryView. :baz [:baz])])
(defn memory-db-assoc-in!
[^Atom view-system namespace ks v]
(let [ms (swap! memory-database assoc-in (into [namespace] ks) v)]
(put-hints! view-system [(hint namespace ks memory-view-hint-type)])
ms))