renaming on-expiry to on-removal
mainly because calling on-expiry for delete-session doesn't feel appropriate if we're talking about an "expiration listener." calling it a "removal listener" feels much better (especially now that we also have a third argument containing the reason for removal, allowing people to tell which is which).
This commit is contained in:
parent
8228408082
commit
52dd71f26b
|
@ -73,7 +73,7 @@
|
||||||
(all-entries [store]
|
(all-entries [store]
|
||||||
"Returns a map containing all entries currently in the session store."))
|
"Returns a map containing all entries currently in the session store."))
|
||||||
|
|
||||||
(defrecord MemoryAgingStore [session-atom thread ttl refresh-on-write refresh-on-read op-counter op-threshold on-expiry]
|
(defrecord MemoryAgingStore [session-atom thread ttl refresh-on-write refresh-on-read op-counter op-threshold on-removal]
|
||||||
AgingStore
|
AgingStore
|
||||||
(read-timestamp [_ key]
|
(read-timestamp [_ key]
|
||||||
(get-in @session-atom [key :timestamp]))
|
(get-in @session-atom [key :timestamp]))
|
||||||
|
@ -90,50 +90,50 @@
|
||||||
(-> session-map ; note: performs faster than get-in
|
(-> session-map ; note: performs faster than get-in
|
||||||
(get key)
|
(get key)
|
||||||
(get :value))
|
(get :value))
|
||||||
(when on-expiry
|
(when on-removal
|
||||||
(on-expiry key (:value existing-entry) :expired)
|
(on-removal key (:value existing-entry) :expired)
|
||||||
nil)))))
|
nil)))))
|
||||||
|
|
||||||
(write-session [_ key data]
|
(write-session [_ key data]
|
||||||
(if op-threshold
|
(if op-threshold
|
||||||
(swap! op-counter inc))
|
(swap! op-counter inc))
|
||||||
(let [key (or key (unique-id))]
|
(let [key (or key (unique-id))]
|
||||||
(if on-expiry
|
(if on-removal
|
||||||
; when we have an on-expiry listener, we need to check if we are about to overwrite an entry
|
; when we have an on-removal listener, we need to check if we are about to overwrite an entry
|
||||||
; that has already expired, and if so, call on-expiry for it
|
; that has already expired, and if so, call on-removal for it
|
||||||
; (note that if it has ALREADY expired, yes, we're about to overwrite this entry anyway, but
|
; (note that if it has ALREADY expired, yes, we're about to overwrite this entry anyway, but
|
||||||
; we DO need to treat it as an expiry, because the old value expired ...)
|
; we DO need to treat it as an expiry, because the old value expired ...)
|
||||||
(let [existing-entry (get @session-atom key)
|
(let [existing-entry (get @session-atom key)
|
||||||
expired? (entry-expired? ttl existing-entry)]
|
expired? (entry-expired? ttl existing-entry)]
|
||||||
(swap! session-atom process-write-entry key data refresh-on-write)
|
(swap! session-atom process-write-entry key data refresh-on-write)
|
||||||
(if expired?
|
(if expired?
|
||||||
(on-expiry key (:value existing-entry) :expired)))
|
(on-removal key (:value existing-entry) :expired)))
|
||||||
; if there's no on-expiry listener, we can simply process the write
|
; if there's no on-removal listener, we can simply process the write
|
||||||
(swap! session-atom process-write-entry key data refresh-on-write))
|
(swap! session-atom process-write-entry key data refresh-on-write))
|
||||||
key))
|
key))
|
||||||
|
|
||||||
(delete-session [_ key]
|
(delete-session [_ key]
|
||||||
(if on-expiry
|
(if on-removal
|
||||||
; if we have an on-expiry listener, we need to check if we actually removed the entry
|
; if we have an on-removal listener, we need to check if we actually removed the entry
|
||||||
; and then call on-expiry
|
; and then call on-removal
|
||||||
(let [[old new] (swap-vals! session-atom dissoc key)]
|
(let [[old new] (swap-vals! session-atom dissoc key)]
|
||||||
(if (and (contains? old key)
|
(if (and (contains? old key)
|
||||||
(not (contains? new key)))
|
(not (contains? new key)))
|
||||||
(on-expiry key (-> old (get key) :value) :deleted)))
|
(on-removal key (-> old (get key) :value) :deleted)))
|
||||||
; if there's no on-expiry listener, just do the delete
|
; if there's no on-removal listener, just do the delete
|
||||||
(swap! session-atom dissoc key))
|
(swap! session-atom dissoc key))
|
||||||
nil))
|
nil))
|
||||||
|
|
||||||
(defn- sweeper-thread
|
(defn- sweeper-thread
|
||||||
"Sweeper thread that watches the session and cleans it."
|
"Sweeper thread that watches the session and cleans it."
|
||||||
[session-atom ttl op-counter op-threshold sweep-interval on-expiry]
|
[session-atom ttl op-counter op-threshold sweep-interval on-removal]
|
||||||
(loop []
|
(loop []
|
||||||
(let [[old new] (if op-threshold
|
(let [[old new] (if op-threshold
|
||||||
(when (>= @op-counter op-threshold)
|
(when (>= @op-counter op-threshold)
|
||||||
(reset! op-counter 0)
|
(reset! op-counter 0)
|
||||||
(swap-vals! session-atom sweep-session ttl))
|
(swap-vals! session-atom sweep-session ttl))
|
||||||
(swap-vals! session-atom sweep-session ttl))]
|
(swap-vals! session-atom sweep-session ttl))]
|
||||||
(if (and on-expiry
|
(if (and on-removal
|
||||||
(not= old new))
|
(not= old new))
|
||||||
; TODO: is there a faster way to get the keys difference? maybe this is fine ... ?
|
; TODO: is there a faster way to get the keys difference? maybe this is fine ... ?
|
||||||
(let [old-keys (set (.keySet old))
|
(let [old-keys (set (.keySet old))
|
||||||
|
@ -142,7 +142,7 @@
|
||||||
(when expired-keys
|
(when expired-keys
|
||||||
(future
|
(future
|
||||||
(doseq [expired-key expired-keys]
|
(doseq [expired-key expired-keys]
|
||||||
(on-expiry expired-key
|
(on-removal expired-key
|
||||||
(-> old (get expired-key) :value)
|
(-> old (get expired-key) :value)
|
||||||
:expired)))))))
|
:expired)))))))
|
||||||
(Thread/sleep sweep-interval)
|
(Thread/sleep sweep-interval)
|
||||||
|
@ -157,7 +157,7 @@
|
||||||
(defn aging-memory-store
|
(defn aging-memory-store
|
||||||
"Creates an in-memory session storage engine where entries expire after the given ttl"
|
"Creates an in-memory session storage engine where entries expire after the given ttl"
|
||||||
[ttl & [opts]]
|
[ttl & [opts]]
|
||||||
(let [{:keys [session-atom refresh-on-write refresh-on-read sweep-threshold sweep-interval on-expiry] :as opts}
|
(let [{:keys [session-atom refresh-on-write refresh-on-read sweep-threshold sweep-interval on-removal] :as opts}
|
||||||
(merge
|
(merge
|
||||||
default-opts
|
default-opts
|
||||||
{:session-atom (atom {})}
|
{:session-atom (atom {})}
|
||||||
|
@ -173,10 +173,10 @@
|
||||||
^Runnable
|
^Runnable
|
||||||
(fn []
|
(fn []
|
||||||
(try
|
(try
|
||||||
(sweeper-thread session-atom ttl op-counter sweep-threshold sweep-interval on-expiry)
|
(sweeper-thread session-atom ttl op-counter sweep-threshold sweep-interval on-removal)
|
||||||
(catch InterruptedException e))))
|
(catch InterruptedException e))))
|
||||||
store (MemoryAgingStore.
|
store (MemoryAgingStore.
|
||||||
session-atom thread ttl refresh-on-write refresh-on-read op-counter sweep-threshold on-expiry)]
|
session-atom thread ttl refresh-on-write refresh-on-read op-counter sweep-threshold on-removal)]
|
||||||
(.start thread)
|
(.start thread)
|
||||||
store))
|
store))
|
||||||
|
|
||||||
|
|
|
@ -236,62 +236,62 @@
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(deftest expiry-listener-triggered-when-read-session-expires-entry
|
(deftest removal-listener-triggered-when-read-session-expires-entry
|
||||||
(let [expired (atom nil)
|
(let [removed (atom nil)
|
||||||
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2 %3])})]
|
as (aging-memory-store 1 {:on-removal #(reset! removed [%1 %2 %3])})]
|
||||||
(testing "before ttl elapses"
|
(testing "before ttl elapses"
|
||||||
(write-session as "foo" {:foo 1})
|
(write-session as "foo" {:foo 1})
|
||||||
(is (= (read-session as "foo") {:foo 1}))
|
(is (= (read-session as "foo") {:foo 1}))
|
||||||
(is (nil? @expired)))
|
(is (nil? @removed)))
|
||||||
(Thread/sleep 1500)
|
(Thread/sleep 1500)
|
||||||
(testing "after ttl has elapsed"
|
(testing "after ttl has elapsed"
|
||||||
(is (nil? @expired))
|
(is (nil? @removed))
|
||||||
(is (nil? (read-session as "foo")))
|
(is (nil? (read-session as "foo")))
|
||||||
(is (= ["foo" {:foo 1} :expired] @expired)))))
|
(is (= ["foo" {:foo 1} :expired] @removed)))))
|
||||||
|
|
||||||
(deftest expiry-listener-not-triggered-for-other-read-sessions-even-with-an-expired-entry
|
(deftest removal-listener-not-triggered-for-other-read-sessions-even-with-an-expired-entry
|
||||||
(let [expired (atom nil)
|
(let [removed (atom nil)
|
||||||
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2 %3])})]
|
as (aging-memory-store 1 {:on-removal #(reset! removed [%1 %2 %3])})]
|
||||||
(testing "before ttl elapses"
|
(testing "before ttl elapses"
|
||||||
(write-session as "foo" {:foo 1})
|
(write-session as "foo" {:foo 1})
|
||||||
(write-session as "bar" {:bar 1})
|
(write-session as "bar" {:bar 1})
|
||||||
(is (= (read-session as "foo") {:foo 1}))
|
(is (= (read-session as "foo") {:foo 1}))
|
||||||
(is (= (read-session as "bar") {:bar 1}))
|
(is (= (read-session as "bar") {:bar 1}))
|
||||||
(is (nil? @expired)))
|
(is (nil? @removed)))
|
||||||
(testing "delaying while keeping the second entry alive, long enough for the first entry to expire"
|
(testing "delaying while keeping the second entry alive, long enough for the first entry to expire"
|
||||||
(Thread/sleep 400)
|
(Thread/sleep 400)
|
||||||
(is (= (read-session as "bar") {:bar 1}))
|
(is (= (read-session as "bar") {:bar 1}))
|
||||||
(is (nil? @expired))
|
(is (nil? @removed))
|
||||||
(Thread/sleep 400)
|
(Thread/sleep 400)
|
||||||
(is (= (read-session as "bar") {:bar 1}))
|
(is (= (read-session as "bar") {:bar 1}))
|
||||||
(is (nil? @expired))
|
(is (nil? @removed))
|
||||||
(Thread/sleep 400)
|
(Thread/sleep 400)
|
||||||
(is (= (read-session as "bar") {:bar 1}))
|
(is (= (read-session as "bar") {:bar 1}))
|
||||||
(is (nil? @expired)))
|
(is (nil? @removed)))
|
||||||
(testing "after ttl has elapsed"
|
(testing "after ttl has elapsed"
|
||||||
(is (nil? @expired))
|
(is (nil? @removed))
|
||||||
(is (nil? (read-session as "foo")))
|
(is (nil? (read-session as "foo")))
|
||||||
(is (= (read-session as "bar") {:bar 1}))
|
(is (= (read-session as "bar") {:bar 1}))
|
||||||
(is (= ["foo" {:foo 1} :expired] @expired)))))
|
(is (= ["foo" {:foo 1} :expired] @removed)))))
|
||||||
|
|
||||||
(deftest expiry-listener-triggered-when-write-session-overwrites-expired-entry
|
(deftest removal-listener-triggered-when-write-session-overwrites-expired-entry
|
||||||
(let [expired (atom nil)
|
(let [removed (atom nil)
|
||||||
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2 %3])})]
|
as (aging-memory-store 1 {:on-removal #(reset! removed [%1 %2 %3])})]
|
||||||
(testing "before ttl elapses"
|
(testing "before ttl elapses"
|
||||||
(write-session as "foo" {:foo 1})
|
(write-session as "foo" {:foo 1})
|
||||||
(is (= (read-session as "foo") {:foo 1}))
|
(is (= (read-session as "foo") {:foo 1}))
|
||||||
(is (nil? @expired)))
|
(is (nil? @removed)))
|
||||||
(Thread/sleep 1500)
|
(Thread/sleep 1500)
|
||||||
(testing "after ttl has elapsed"
|
(testing "after ttl has elapsed"
|
||||||
(is (nil? @expired))
|
(is (nil? @removed))
|
||||||
(write-session as "foo" {:foo 2})
|
(write-session as "foo" {:foo 2})
|
||||||
(is (= (read-session as "foo") {:foo 2}))
|
(is (= (read-session as "foo") {:foo 2}))
|
||||||
(is (= ["foo" {:foo 1} :expired] @expired)))))
|
(is (= ["foo" {:foo 1} :expired] @removed)))))
|
||||||
|
|
||||||
(deftest sweeper-thread-triggers-expiry-listeners-for-all-expired-entries
|
(deftest sweeper-thread-triggers-removal-listeners-for-all-expired-entries
|
||||||
(let [expired (atom {})
|
(let [removed (atom {})
|
||||||
as (aging-memory-store 1 {:sweep-interval 1
|
as (aging-memory-store 1 {:sweep-interval 1
|
||||||
:on-expiry #(swap! expired assoc %1 {:timestamp (System/currentTimeMillis)
|
:on-removal #(swap! removed assoc %1 {:timestamp (System/currentTimeMillis)
|
||||||
:value %2
|
:value %2
|
||||||
:reason %3})})]
|
:reason %3})})]
|
||||||
(testing "before ttl elapses or sweeper thread runs"
|
(testing "before ttl elapses or sweeper thread runs"
|
||||||
|
@ -301,7 +301,7 @@
|
||||||
(is (= (read-session as "foo") {:foo 1}))
|
(is (= (read-session as "foo") {:foo 1}))
|
||||||
(is (= (read-session as "bar") {:bar 1}))
|
(is (= (read-session as "bar") {:bar 1}))
|
||||||
(is (= (read-session as "keep") {:keep 1}))
|
(is (= (read-session as "keep") {:keep 1}))
|
||||||
(is (empty? @expired)))
|
(is (empty? @removed)))
|
||||||
(testing "delaying while keeping 1 entry alive, long enough for the rest to expire and sweeper thread to run"
|
(testing "delaying while keeping 1 entry alive, long enough for the rest to expire and sweeper thread to run"
|
||||||
(Thread/sleep 500)
|
(Thread/sleep 500)
|
||||||
(is (= (read-session as "keep") {:keep 1}))
|
(is (= (read-session as "keep") {:keep 1}))
|
||||||
|
@ -309,31 +309,31 @@
|
||||||
(is (= (read-session as "keep") {:keep 1}))
|
(is (= (read-session as "keep") {:keep 1}))
|
||||||
(Thread/sleep 3000))
|
(Thread/sleep 3000))
|
||||||
(testing "after ttl elapses and sweeper thread has had enough time to run at least twice"
|
(testing "after ttl elapses and sweeper thread has had enough time to run at least twice"
|
||||||
(is (= 3 (count @expired)))
|
(is (= 3 (count @removed)))
|
||||||
(is (every? #(= :expired (:reason %)) (vals @expired)))
|
(is (every? #(= :expired (:reason %)) (vals @removed)))
|
||||||
(let [foo-bar-time-diff (Math/abs (- (:timestamp (get @expired "foo"))
|
(let [foo-bar-time-diff (Math/abs (- (:timestamp (get @removed "foo"))
|
||||||
(:timestamp (get @expired "bar"))))
|
(:timestamp (get @removed "bar"))))
|
||||||
keep-time-diff (- (:timestamp (get @expired "keep"))
|
keep-time-diff (- (:timestamp (get @removed "keep"))
|
||||||
(:timestamp (get @expired "bar")))]
|
(:timestamp (get @removed "bar")))]
|
||||||
(testing "'foo' and 'bar' should have expired at roughly the same time. 'keep' at the next sweep interval.")
|
(testing "'foo' and 'bar' should have expired at roughly the same time. 'keep' at the next sweep interval.")
|
||||||
(is (<= foo-bar-time-diff 200)) ; probably overly generous, but less than one sweep-interval
|
(is (<= foo-bar-time-diff 200)) ; probably overly generous, but less than one sweep-interval
|
||||||
(is (>= keep-time-diff 800))))
|
(is (>= keep-time-diff 800))))
|
||||||
(stop as)))
|
(stop as)))
|
||||||
|
|
||||||
(deftest expiry-listener-triggered-when-delete-session-removes-entry
|
(deftest removal-listener-triggered-when-delete-session-removes-entry
|
||||||
(let [expired (atom nil)
|
(let [removed (atom nil)
|
||||||
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2 %3])})]
|
as (aging-memory-store 1 {:on-removal #(reset! removed [%1 %2 %3])})]
|
||||||
(write-session as "foo" {:foo 1})
|
(write-session as "foo" {:foo 1})
|
||||||
(is (= (read-session as "foo") {:foo 1}))
|
(is (= (read-session as "foo") {:foo 1}))
|
||||||
(is (nil? @expired))
|
(is (nil? @removed))
|
||||||
(delete-session as "foo")
|
(delete-session as "foo")
|
||||||
(is (= ["foo" {:foo 1} :deleted] @expired))
|
(is (= ["foo" {:foo 1} :deleted] @removed))
|
||||||
(is (nil? (read-session as "foo")))))
|
(is (nil? (read-session as "foo")))))
|
||||||
|
|
||||||
(deftest expiry-listener-not-triggered-when-delete-session-called-for-non-existent-key
|
(deftest removal-listener-not-triggered-when-delete-session-called-for-non-existent-key
|
||||||
(let [expired (atom nil)
|
(let [removed (atom nil)
|
||||||
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2 %3])})]
|
as (aging-memory-store 1 {:on-removal #(reset! removed [%1 %2 %3])})]
|
||||||
(delete-session as "foo")
|
(delete-session as "foo")
|
||||||
(is (nil? @expired))))
|
(is (nil? @removed))))
|
||||||
|
|
||||||
#_(run-tests)
|
#_(run-tests)
|
||||||
|
|
Loading…
Reference in a new issue