add expiration listener support

This commit is contained in:
Gered 2022-01-03 22:03:44 -05:00
parent 1e4a98e946
commit 01a7a379c6
2 changed files with 124 additions and 16 deletions

View file

@ -1,6 +1,7 @@
(ns aging-session.core
"In-memory session storage with mortality."
(:require
clojure.set
[ring.middleware.session.store :refer :all])
(:import
[java.util UUID]))
@ -72,7 +73,7 @@
(all-entries [store]
"Returns a map containing all entries currently in the session store."))
(defrecord MemoryAgingStore [session-atom thread ttl refresh-on-write refresh-on-read op-counter op-threshold]
(defrecord MemoryAgingStore [session-atom thread ttl refresh-on-write refresh-on-read op-counter op-threshold on-expiry]
AgingStore
(read-timestamp [_ key]
(get-in @session-atom [key :timestamp]))
@ -82,20 +83,33 @@
SessionStore
(read-session [_ key]
(when (contains? @session-atom key)
(when-let [existing-entry (get @session-atom key)]
(let [session-map (swap! session-atom process-read-entry ttl key refresh-on-read)]
; if the entry we were about to read had expired, session-map will not have it anymore at this point
(if (contains? session-map key)
(-> session-map ; note: performs faster than get-in
(get key)
(get :value))
; TODO: notify expiry listener about expired 'key' here
))))
(when on-expiry
(on-expiry key (:value existing-entry))
nil)))))
(write-session [_ key data]
(if op-threshold
(swap! op-counter inc))
(let [key (or key (unique-id))]
(if op-threshold
(swap! op-counter inc))
(swap! session-atom process-write-entry key data refresh-on-write)
(if on-expiry
; when we have an on-expiry listener, we need to check if we are about to overwrite an entry
; that has already expired, and if so, call on-expiry for it
; (note that if it has ALREADY expired, yes, we're about to overwrite this entry anyway, but
; we DO need to treat it as an expiry, because the old value expired ...)
(let [existing-entry (get @session-atom key)
expired? (entry-expired? ttl existing-entry)]
(swap! session-atom process-write-entry key data refresh-on-write)
(if expired?
(on-expiry key (:value existing-entry))))
; if there's no on-expiry listener, we can simply process the write
(swap! session-atom process-write-entry key data refresh-on-write))
key))
(delete-session [_ key]
@ -104,13 +118,23 @@
(defn- sweeper-thread
"Sweeper thread that watches the session and cleans it."
[session-atom ttl op-counter op-threshold sweep-interval]
[session-atom ttl op-counter op-threshold sweep-interval on-expiry]
(loop []
(if op-threshold
(when (>= @op-counter op-threshold)
(swap! session-atom sweep-session ttl)
(reset! op-counter 0))
(swap! session-atom sweep-session ttl))
(let [[old new] (if op-threshold
(when (>= @op-counter op-threshold)
(reset! op-counter 0)
(swap-vals! session-atom sweep-session ttl))
(swap-vals! session-atom sweep-session ttl))]
(if (and on-expiry
(not= old new))
; TODO: is there a faster way to get the keys difference? maybe this is fine ... ?
(let [old-keys (set (.keySet old))
new-keys (set (.keySet new))
expired-keys (seq (clojure.set/difference old-keys new-keys))]
(when expired-keys
(future
(doseq [expired-key expired-keys]
(on-expiry expired-key (-> old (get expired-key) :value))))))))
(Thread/sleep sweep-interval)
(recur)))
@ -123,7 +147,7 @@
(defn aging-memory-store
"Creates an in-memory session storage engine where entries expire after the given ttl"
[ttl & [opts]]
(let [{:keys [session-atom refresh-on-write refresh-on-read sweep-threshold sweep-interval] :as opts}
(let [{:keys [session-atom refresh-on-write refresh-on-read sweep-threshold sweep-interval on-expiry] :as opts}
(merge
default-opts
{:session-atom (atom {})}
@ -139,10 +163,10 @@
^Runnable
(fn []
(try
(sweeper-thread session-atom ttl op-counter sweep-threshold sweep-interval)
(sweeper-thread session-atom ttl op-counter sweep-threshold sweep-interval on-expiry)
(catch InterruptedException e))))
store (MemoryAgingStore.
session-atom thread ttl refresh-on-write refresh-on-read op-counter sweep-threshold)]
session-atom thread ttl refresh-on-write refresh-on-read op-counter sweep-threshold on-expiry)]
(.start thread)
store))

View file

@ -234,4 +234,88 @@
(is (= (get-in sessions ["a" :value]) {:foo 1}))
(is (= (get-in sessions ["b" :value]) {:bar 2})))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(deftest expiry-listener-triggered-when-read-session-expires-entry
(let [expired (atom nil)
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2])})]
(testing "before ttl elapses"
(write-session as "foo" {:foo 1})
(is (= (read-session as "foo") {:foo 1}))
(is (nil? @expired)))
(Thread/sleep 1500)
(testing "after ttl has elapsed"
(is (nil? @expired))
(is (nil? (read-session as "foo")))
(is (= ["foo" {:foo 1}] @expired)))))
(deftest expiry-listener-not-triggered-for-other-read-sessions-even-with-an-expired-entry
(let [expired (atom nil)
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2])})]
(testing "before ttl elapses"
(write-session as "foo" {:foo 1})
(write-session as "bar" {:bar 1})
(is (= (read-session as "foo") {:foo 1}))
(is (= (read-session as "bar") {:bar 1}))
(is (nil? @expired)))
(testing "delaying while keeping the second entry alive, long enough for the first entry to expire"
(Thread/sleep 400)
(is (= (read-session as "bar") {:bar 1}))
(is (nil? @expired))
(Thread/sleep 400)
(is (= (read-session as "bar") {:bar 1}))
(is (nil? @expired))
(Thread/sleep 400)
(is (= (read-session as "bar") {:bar 1}))
(is (nil? @expired)))
(testing "after ttl has elapsed"
(is (nil? @expired))
(is (nil? (read-session as "foo")))
(is (= (read-session as "bar") {:bar 1}))
(is (= ["foo" {:foo 1}] @expired)))))
(deftest expiry-listener-triggered-when-write-session-overwrites-expired-entry
(let [expired (atom nil)
as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2])})]
(testing "before ttl elapses"
(write-session as "foo" {:foo 1})
(is (= (read-session as "foo") {:foo 1}))
(is (nil? @expired)))
(Thread/sleep 1500)
(testing "after ttl has elapsed"
(is (nil? @expired))
(write-session as "foo" {:foo 2})
(is (= (read-session as "foo") {:foo 2}))
(is (= ["foo" {:foo 1}] @expired)))))
(deftest sweeper-thread-triggers-expiry-listeners-for-all-expired-entries
(let [expired (atom {})
as (aging-memory-store 1 {:sweep-interval 1
:on-expiry #(swap! expired assoc %1 {:timestamp (System/currentTimeMillis)
:value %2})})]
(testing "before ttl elapses or sweeper thread runs"
(write-session as "foo" {:foo 1})
(write-session as "bar" {:bar 1})
(write-session as "keep" {:keep 1})
(is (= (read-session as "foo") {:foo 1}))
(is (= (read-session as "bar") {:bar 1}))
(is (= (read-session as "keep") {:keep 1}))
(is (empty? @expired)))
(testing "delaying while keeping 1 entry alive, long enough for the rest to expire and sweeper thread to run"
(Thread/sleep 500)
(is (= (read-session as "keep") {:keep 1}))
(Thread/sleep 500)
(is (= (read-session as "keep") {:keep 1}))
(Thread/sleep 3000))
(testing "after ttl elapses and sweeper thread has had enough time to run at least twice"
(is (= 3 (count @expired)))
(let [foo-bar-time-diff (Math/abs (- (:timestamp (get @expired "foo"))
(:timestamp (get @expired "bar"))))
keep-time-diff (- (:timestamp (get @expired "keep"))
(:timestamp (get @expired "bar")))]
(testing "'foo' and 'bar' should have expired at roughly the same time. 'keep' at the next sweep interval.")
(is (<= foo-bar-time-diff 200)) ; probably overly generous, but less than one sweep-interval
(is (>= keep-time-diff 800))))
(stop as)))
#_(run-tests)