diff --git a/src/aging_session/core.clj b/src/aging_session/core.clj index 91b489f..d618483 100644 --- a/src/aging_session/core.clj +++ b/src/aging_session/core.clj @@ -1,6 +1,7 @@ (ns aging-session.core "In-memory session storage with mortality." (:require + clojure.set [ring.middleware.session.store :refer :all]) (:import [java.util UUID])) @@ -72,7 +73,7 @@ (all-entries [store] "Returns a map containing all entries currently in the session store.")) -(defrecord MemoryAgingStore [session-atom thread ttl refresh-on-write refresh-on-read op-counter op-threshold] +(defrecord MemoryAgingStore [session-atom thread ttl refresh-on-write refresh-on-read op-counter op-threshold on-expiry] AgingStore (read-timestamp [_ key] (get-in @session-atom [key :timestamp])) @@ -82,20 +83,33 @@ SessionStore (read-session [_ key] - (when (contains? @session-atom key) + (when-let [existing-entry (get @session-atom key)] (let [session-map (swap! session-atom process-read-entry ttl key refresh-on-read)] + ; if the entry we were about to read had expired, session-map will not have it anymore at this point (if (contains? session-map key) (-> session-map ; note: performs faster than get-in (get key) (get :value)) - ; TODO: notify expiry listener about expired 'key' here - )))) + (when on-expiry + (on-expiry key (:value existing-entry)) + nil))))) (write-session [_ key data] + (if op-threshold + (swap! op-counter inc)) (let [key (or key (unique-id))] - (if op-threshold - (swap! op-counter inc)) - (swap! session-atom process-write-entry key data refresh-on-write) + (if on-expiry + ; when we have an on-expiry listener, we need to check if we are about to overwrite an entry + ; that has already expired, and if so, call on-expiry for it + ; (note that if it has ALREADY expired, yes, we're about to overwrite this entry anyway, but + ; we DO need to treat it as an expiry, because the old value expired ...) + (let [existing-entry (get @session-atom key) + expired? (entry-expired? ttl existing-entry)] + (swap! session-atom process-write-entry key data refresh-on-write) + (if expired? + (on-expiry key (:value existing-entry)))) + ; if there's no on-expiry listener, we can simply process the write + (swap! session-atom process-write-entry key data refresh-on-write)) key)) (delete-session [_ key] @@ -104,13 +118,23 @@ (defn- sweeper-thread "Sweeper thread that watches the session and cleans it." - [session-atom ttl op-counter op-threshold sweep-interval] + [session-atom ttl op-counter op-threshold sweep-interval on-expiry] (loop [] - (if op-threshold - (when (>= @op-counter op-threshold) - (swap! session-atom sweep-session ttl) - (reset! op-counter 0)) - (swap! session-atom sweep-session ttl)) + (let [[old new] (if op-threshold + (when (>= @op-counter op-threshold) + (reset! op-counter 0) + (swap-vals! session-atom sweep-session ttl)) + (swap-vals! session-atom sweep-session ttl))] + (if (and on-expiry + (not= old new)) + ; TODO: is there a faster way to get the keys difference? maybe this is fine ... ? + (let [old-keys (set (.keySet old)) + new-keys (set (.keySet new)) + expired-keys (seq (clojure.set/difference old-keys new-keys))] + (when expired-keys + (future + (doseq [expired-key expired-keys] + (on-expiry expired-key (-> old (get expired-key) :value)))))))) (Thread/sleep sweep-interval) (recur))) @@ -123,7 +147,7 @@ (defn aging-memory-store "Creates an in-memory session storage engine where entries expire after the given ttl" [ttl & [opts]] - (let [{:keys [session-atom refresh-on-write refresh-on-read sweep-threshold sweep-interval] :as opts} + (let [{:keys [session-atom refresh-on-write refresh-on-read sweep-threshold sweep-interval on-expiry] :as opts} (merge default-opts {:session-atom (atom {})} @@ -139,10 +163,10 @@ ^Runnable (fn [] (try - (sweeper-thread session-atom ttl op-counter sweep-threshold sweep-interval) + (sweeper-thread session-atom ttl op-counter sweep-threshold sweep-interval on-expiry) (catch InterruptedException e)))) store (MemoryAgingStore. - session-atom thread ttl refresh-on-write refresh-on-read op-counter sweep-threshold)] + session-atom thread ttl refresh-on-write refresh-on-read op-counter sweep-threshold on-expiry)] (.start thread) store)) diff --git a/test/aging_session/core_test.clj b/test/aging_session/core_test.clj index 243d322..8c41447 100644 --- a/test/aging_session/core_test.clj +++ b/test/aging_session/core_test.clj @@ -234,4 +234,88 @@ (is (= (get-in sessions ["a" :value]) {:foo 1})) (is (= (get-in sessions ["b" :value]) {:bar 2}))))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(deftest expiry-listener-triggered-when-read-session-expires-entry + (let [expired (atom nil) + as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2])})] + (testing "before ttl elapses" + (write-session as "foo" {:foo 1}) + (is (= (read-session as "foo") {:foo 1})) + (is (nil? @expired))) + (Thread/sleep 1500) + (testing "after ttl has elapsed" + (is (nil? @expired)) + (is (nil? (read-session as "foo"))) + (is (= ["foo" {:foo 1}] @expired))))) + +(deftest expiry-listener-not-triggered-for-other-read-sessions-even-with-an-expired-entry + (let [expired (atom nil) + as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2])})] + (testing "before ttl elapses" + (write-session as "foo" {:foo 1}) + (write-session as "bar" {:bar 1}) + (is (= (read-session as "foo") {:foo 1})) + (is (= (read-session as "bar") {:bar 1})) + (is (nil? @expired))) + (testing "delaying while keeping the second entry alive, long enough for the first entry to expire" + (Thread/sleep 400) + (is (= (read-session as "bar") {:bar 1})) + (is (nil? @expired)) + (Thread/sleep 400) + (is (= (read-session as "bar") {:bar 1})) + (is (nil? @expired)) + (Thread/sleep 400) + (is (= (read-session as "bar") {:bar 1})) + (is (nil? @expired))) + (testing "after ttl has elapsed" + (is (nil? @expired)) + (is (nil? (read-session as "foo"))) + (is (= (read-session as "bar") {:bar 1})) + (is (= ["foo" {:foo 1}] @expired))))) + +(deftest expiry-listener-triggered-when-write-session-overwrites-expired-entry + (let [expired (atom nil) + as (aging-memory-store 1 {:on-expiry #(reset! expired [%1 %2])})] + (testing "before ttl elapses" + (write-session as "foo" {:foo 1}) + (is (= (read-session as "foo") {:foo 1})) + (is (nil? @expired))) + (Thread/sleep 1500) + (testing "after ttl has elapsed" + (is (nil? @expired)) + (write-session as "foo" {:foo 2}) + (is (= (read-session as "foo") {:foo 2})) + (is (= ["foo" {:foo 1}] @expired))))) + +(deftest sweeper-thread-triggers-expiry-listeners-for-all-expired-entries + (let [expired (atom {}) + as (aging-memory-store 1 {:sweep-interval 1 + :on-expiry #(swap! expired assoc %1 {:timestamp (System/currentTimeMillis) + :value %2})})] + (testing "before ttl elapses or sweeper thread runs" + (write-session as "foo" {:foo 1}) + (write-session as "bar" {:bar 1}) + (write-session as "keep" {:keep 1}) + (is (= (read-session as "foo") {:foo 1})) + (is (= (read-session as "bar") {:bar 1})) + (is (= (read-session as "keep") {:keep 1})) + (is (empty? @expired))) + (testing "delaying while keeping 1 entry alive, long enough for the rest to expire and sweeper thread to run" + (Thread/sleep 500) + (is (= (read-session as "keep") {:keep 1})) + (Thread/sleep 500) + (is (= (read-session as "keep") {:keep 1})) + (Thread/sleep 3000)) + (testing "after ttl elapses and sweeper thread has had enough time to run at least twice" + (is (= 3 (count @expired))) + (let [foo-bar-time-diff (Math/abs (- (:timestamp (get @expired "foo")) + (:timestamp (get @expired "bar")))) + keep-time-diff (- (:timestamp (get @expired "keep")) + (:timestamp (get @expired "bar")))] + (testing "'foo' and 'bar' should have expired at roughly the same time. 'keep' at the next sweep interval.") + (is (<= foo-bar-time-diff 200)) ; probably overly generous, but less than one sweep-interval + (is (>= keep-time-diff 800)))) + (stop as))) + #_(run-tests)