add server-side success/error callback support to send-data

This commit is contained in:
Gered 2016-05-16 11:33:32 -04:00
parent f889985fe1
commit 767a22ab09
5 changed files with 394 additions and 189 deletions

View file

@ -276,7 +276,7 @@
;; sets the timeout (in milliseconds) for a forward channel request ;; sets the timeout (in milliseconds) for a forward channel request
:forward-channel-request-timeout (* 20 1000) :forward-channel-request-timeout (* 20 1000)
; base time delay for another connection attempt is made. note ; base time delay before another connection attempt is made. note
; that a random time between 0 and :connect-retry-delay-seed is ; that a random time between 0 and :connect-retry-delay-seed is
; added to this value to determine the final reconnect delay time. ; added to this value to determine the final reconnect delay time.
; time is in milliseconds ; time is in milliseconds

View file

@ -57,13 +57,15 @@
;; type preserving drop upto for queueus ;; type preserving drop upto for queueus
(defn drop-queue (defn drop-queue
[queue id] [queue id on-drop]
(let [head (peek queue)] (let [head (peek queue)]
(if-not head (if-not head
queue queue
(if (< id (first head)) (if (< id (first head))
queue queue
(recur (pop queue) id))))) (do
(if on-drop (on-drop head))
(recur (pop queue) id on-drop))))))
;; Key value pairs do not always come ordered by request number. ;; Key value pairs do not always come ordered by request number.
@ -117,9 +119,9 @@
(fn [ag e] (fn [ag e]
(log/error e (str id " agent exception.")))) (log/error e (str id " agent exception."))))
(defn- to-pair (defn- array-item-to-json-pair
[p] [item]
(str "[" (first p) "," (second p) "]")) (str "[" (first item) "," (first (second item)) "]"))
(defn decode-map (defn decode-map
[m] [m]
@ -231,10 +233,10 @@
(async-adapter/close respond))) (async-adapter/close respond)))
;;ArrayBuffer ;;ArrayBuffer
;;buffer of [[id_lowest data] ... [id_highest data]] ;;buffer of [[id_lowest [data context]] ... [id_highest [data context]]]
(defprotocol IArrayBuffer (defprotocol IArrayBuffer
(queue [this string]) (queue [this string context])
(acknowledge-id [this id]) (acknowledge-id [this id on-ack-fn])
(to-flush [this]) (to-flush [this])
(last-acknowledged-id [this]) (last-acknowledged-id [this])
(outstanding-bytes [this])) (outstanding-bytes [this]))
@ -255,15 +257,17 @@
;; arrays to be sent out, may contain arrays ;; arrays to be sent out, may contain arrays
;; that were in to-acknowledge-arrays but queued ;; that were in to-acknowledge-arrays but queued
;; again for resending ;; again for resending
;; format of each array item is [id [data context]]
;; where data is the actual string and context is a map or nil
to-flush-arrays] to-flush-arrays]
IArrayBuffer IArrayBuffer
(queue [this string] (queue [this string context]
(let [next-array-id (inc array-id)] (let [next-array-id (inc array-id)]
(ArrayBuffer. next-array-id (ArrayBuffer. next-array-id
last-acknowledged-id last-acknowledged-id
to-acknowledge-arrays to-acknowledge-arrays
(conj to-flush-arrays [next-array-id string])))) (conj to-flush-arrays [next-array-id [string context]]))))
;; id may cause the following splits: ;; id may cause the following splits:
;; normal case: ;; normal case:
@ -274,21 +278,23 @@
;; ack-arrs flush-arrs <id> flush-arrs ;; ack-arrs flush-arrs <id> flush-arrs
;; everything before id can be discarded, everything after id ;; everything before id can be discarded, everything after id
;; becomes new flush-arrs and is resend ;; becomes new flush-arrs and is resend
(acknowledge-id [this id] (acknowledge-id [this id on-ack-fn]
(ArrayBuffer. array-id (ArrayBuffer. array-id
id id
clojure.lang.PersistentQueue/EMPTY clojure.lang.PersistentQueue/EMPTY
(into (drop-queue to-acknowledge-arrays id) (into (drop-queue to-acknowledge-arrays id on-ack-fn)
(drop-queue to-flush-arrays id)))) (drop-queue to-flush-arrays id on-ack-fn))))
;; return [seq-to-flush-array next-array-buffer] or nil if ;; return [seq-to-flush-array next-array-buffer] or nil if
;; to-flush-arrays is empty ;; to-flush-arrays is empty
;; format of each array item is [id [data context]]
;; where data is the actual string and context is a map or nil
(to-flush [this] (to-flush [this]
(when-let [to-flush (seq to-flush-arrays)] (when-let [to-flush (seq to-flush-arrays)]
[to-flush (ArrayBuffer. array-id [to-flush (ArrayBuffer. array-id
last-acknowledged-id last-acknowledged-id
(into to-acknowledge-arrays (into to-acknowledge-arrays
(remove (fn [[id string]] (remove (fn [[id [string context]]]
(= string noop-string)) (= string noop-string))
to-flush)) to-flush))
clojure.lang.PersistentQueue/EMPTY)])) clojure.lang.PersistentQueue/EMPTY)]))
@ -296,9 +302,9 @@
(last-acknowledged-id [this] (last-acknowledged-id [this]
last-acknowledged-id) last-acknowledged-id)
;; the sum of all the data that is still to be send ;; the sum of all the data that is still to be sent
(outstanding-bytes [this] (outstanding-bytes [this]
(reduce + 0 (map (comp count second) to-flush-arrays)))) (reduce + 0 (map (comp count first second) to-flush-arrays))))
@ -322,7 +328,8 @@
;; the client acknowledges received arrays when creating a new backwardchannel ;; the client acknowledges received arrays when creating a new backwardchannel
(acknowledge-arrays [this array-id-str]) (acknowledge-arrays [this array-id-str])
(queue-string [this json-string]) ;; context is either a map or nil
(queue-string [this json-string context])
;; heartbeat is a timer to send noop over the backward channel ;; heartbeat is a timer to send noop over the backward channel
(clear-heartbeat [this]) (clear-heartbeat [this])
@ -437,7 +444,7 @@
(schedule (schedule
(fn [] (fn []
(send-off session-agent #(-> % (send-off session-agent #(-> %
(queue-string noop-string) (queue-string noop-string nil)
flush-buffer))) flush-buffer)))
(:heartbeat-interval details)))))) (:heartbeat-interval details))))))
@ -461,14 +468,16 @@
(send-off session-agent close nil "Timed out")) (send-off session-agent close nil "Timed out"))
(:session-timeout-interval details)))))) (:session-timeout-interval details))))))
(queue-string [this json-string] (queue-string [this json-string context]
(log/trace id ": queue-string" (pr-str json-string)) (log/trace id ": queue-string" (pr-str json-string) (if context "(has context)" ""))
(update-in this [:array-buffer] queue json-string)) (update-in this [:array-buffer] queue json-string context))
(acknowledge-arrays [this array-id-str] (acknowledge-arrays [this array-id-str]
(log/trace id ": acknowledge-arrays" array-id-str) (log/trace id ": acknowledge-arrays" array-id-str)
(let [array-id (Long/parseLong array-id-str)] (let [array-id (Long/parseLong array-id-str)]
(update-in this [:array-buffer] acknowledge-id array-id))) (update-in this [:array-buffer] acknowledge-id array-id
(fn [[id [string {:keys [on-confirm]}]]]
(if on-confirm (on-confirm))))))
;; tries to do the actual writing to the client ;; tries to do the actual writing to the client
;; @todo the composition is a bit awkward in this method due to the ;; @todo the composition is a bit awkward in this method due to the
@ -481,14 +490,16 @@
(if-let [[to-flush next-array-buffer] (to-flush array-buffer)] (if-let [[to-flush next-array-buffer] (to-flush array-buffer)]
(try (try
(log/trace id ": flushing" (count to-flush) "arrays") (log/trace id ": flushing" (count to-flush) "arrays")
;; buffer contains [[1 json-str] ...] can't use ;; buffer contains [[1 [json-str context]] ...]
;; json-str which will double escape the json ;; can't use json-str which will double escape the json
(doseq [p to-flush] (doseq [item to-flush]
(write (:respond back-channel) (str "[" (to-pair p) "]"))) (let [{:keys [on-sent] :as context} (second (second item))]
(write (:respond back-channel) (str "[" (array-item-to-json-pair item) "]"))
(if on-sent (on-sent))))
;; size is an approximation ;; size is an approximation
(let [this (let [size (reduce + 0 (map count (map second to-flush)))] (let [this (let [size (reduce + 0 (map count (map (comp first second) to-flush)))]
(-> this (-> this
(assoc :array-buffer next-array-buffer) (assoc :array-buffer next-array-buffer)
(update-in [:back-channel :bytes-sent] + size))) (update-in [:back-channel :bytes-sent] + size)))
@ -507,6 +518,8 @@
(log/trace e id ": exception during flush") (log/trace e id ": exception during flush")
;; when write failed ;; when write failed
;; non delivered arrays are still in buffer ;; non delivered arrays are still in buffer
;; (note that we don't raise any on-error callbacks for any items in the buffer
;; as we will retry sending the items next time since they're still in the buffer)
(clear-back-channel this) (clear-back-channel this)
)) ))
this ;; do nothing if buffer is empty this ;; do nothing if buffer is empty
@ -521,10 +534,25 @@
;; the heartbeat timeout is cancelled by clear-back-channel ;; the heartbeat timeout is cancelled by clear-back-channel
) )
(swap! sessions dissoc id) (swap! sessions dissoc id)
; raise callbacks for any remaining unsent messages
(let [[remaining _] (to-flush array-buffer)]
(doseq [[id [string {:keys [on-error] :as context}]] remaining]
(if on-error (on-error))))
; finally raise the session close event
(notify-listeners id request :close message) (notify-listeners id request :close message)
nil ;; the agent will no longer wrap a session nil ;; the agent will no longer wrap a session
)) ))
(defn- handle-old-session-reconnect
[req old-session-id old-session-agent old-array-id]
(log/trace old-session-id ": old session reconnect")
(send-off old-session-agent
(fn [this]
(-> (if old-array-id
(acknowledge-arrays this old-array-id)
this)
(close req "Reconnected")))))
;; creates a session agent wrapping session data and ;; creates a session agent wrapping session data and
;; adds the session to sessions ;; adds the session to sessions
(defn- create-session-agent (defn- create-session-agent
@ -535,12 +563,8 @@
old-array-id "OAID"} (:query-params req)] old-array-id "OAID"} (:query-params req)]
;; when a client specifies and old session id then that old one ;; when a client specifies and old session id then that old one
;; needs to be removed ;; needs to be removed
(when-let [old-session-agent (@sessions old-session-id)] (if-let [old-session-agent (@sessions old-session-id)]
(log/trace old-session-id ": old session reconnect") (handle-old-session-reconnect req old-session-id old-session-agent old-array-id))
(send-off old-session-agent #(-> (if old-array-id
(acknowledge-arrays % old-array-id)
%)
(close req "Reconnected"))))
(let [id (uuid) (let [id (uuid)
details {:address (:remote-addr req) details {:address (:remote-addr req)
:headers (:headers req) :headers (:headers req)
@ -551,18 +575,17 @@
session (Session. id session (Session. id
details details
nil ;; backchannel nil ;; backchannel
(ArrayBuffer. (ArrayBuffer. 0 ;; array-id, 0 is never used by the
0 ;; array-id, 0 is never used by the ;; array-buffer, it is used by the
;; array-buffer, it is used by the ;; first message with the session id.
;; first message with the session id 0 ;; last-acknowledged-id
0 ;; last-acknowledged-id ;; to-acknowledge-arrays
;; to-acknowledge-arrays clojure.lang.PersistentQueue/EMPTY
clojure.lang.PersistentQueue/EMPTY ;; to-flush-arrays
;; to-flush-arrays clojure.lang.PersistentQueue/EMPTY)
clojure.lang.PersistentQueue/EMPTY) nil ;; heartbeat-timeout
nil ;; heartbeat-timeout nil ;; session-timeout
nil ;; session-timeout )
)
session-agent (agent session)] session-agent (agent session)]
(log/trace id ": new session created" (if old-session-id (str "(old session id: " old-session-id ")") "")) (log/trace id ": new session created" (if old-session-id (str "(old session id: " old-session-id ")") ""))
(set-error-handler! session-agent (agent-error-handler-fn (str "session-" (:id session)))) (set-error-handler! session-agent (agent-error-handler-fn (str "session-" (:id session))))
@ -594,27 +617,38 @@
;; the data will be queued until there is a backchannel to send it ;; the data will be queued until there is a backchannel to send it
;; over ;; over
(defn- send-map (defn- send-map
[session-id m] [session-id m context]
(when-let [session-agent (get @sessions session-id)] (when-let [session-agent (get @sessions session-id)]
(let [string (json/generate-string m)] (let [string (json/generate-string m)]
(send-off session-agent #(-> % (send-off session-agent
(queue-string string) #(-> %
flush-buffer)) (queue-string string context)
flush-buffer))
string))) string)))
(defn send-data (defn send-data
"sends data to the client identified by session-id over the backchannel. "sends data to the client identified by session-id over the backchannel.
if there is currently no available backchannel for this client, the data if there is currently no available backchannel for this client, the data
is queued until one is available." is queued until one is available. context can contain optional callback
[session-id data] functions:
on-sent - when the data has been written to an active backchannel
on-confirm - when the client confirms it has received the data. note that
it may take awhile (minutes) for the client to send an
acknowledgement
on-error - when there was an error (of any kind) sending the data"
[session-id data & [context]]
(if data (if data
(send-map session-id (encode-map data)))) (send-map session-id (encode-map data) context)))
(defn send-data-to-all (defn send-data-to-all
"sends data to all currently connected clients over their backchannels." "sends data to all currently connected clients over their backchannels.
[data] context can contain optional callback functions which will be used for
all the data sent. see send-data for a description of the different
callback functions available."
[data & [context]]
(doseq [[session-id _] @sessions] (doseq [[session-id _] @sessions]
(send-data session-id data))) (send-data session-id data context)))
(defn connected? (defn connected?
"returns true if a client with the given session-id is currently connected." "returns true if a client with the given session-id is currently connected."

View file

@ -18,13 +18,13 @@
0 0
(->queue) (->queue)
(->queue (->queue
[1 "one"] [1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"]))] [3 ["three" :c]]))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"]])) [3 ["three" :c]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(is (= (outstanding-bytes ab) (is (= (outstanding-bytes ab)
@ -33,9 +33,9 @@
(let [ab (ArrayBuffer. 3 (let [ab (ArrayBuffer. 3
0 0
(->queue (->queue
[1 "one"] [1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"]) [3 ["three" :c]])
(->queue))] (->queue))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
nil)) nil))
@ -46,97 +46,97 @@
(deftest queue-tests (deftest queue-tests
(let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue))
(queue "one"))] (queue "one" :a))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"]])) [[1 ["one" :a]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(is (= (outstanding-bytes ab) (is (= (outstanding-bytes ab)
3))) 3)))
(let [ab (-> (ArrayBuffer. 1 0 (->queue) (->queue [1 "one"])) (let [ab (-> (ArrayBuffer. 1 0 (->queue) (->queue [1 ["one" :a]]))
(queue "two"))] (queue "two" :b))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"]])) [2 ["two" :b]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(is (= (outstanding-bytes ab) (is (= (outstanding-bytes ab)
6))) 6)))
(let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue))
(queue nil))] (queue nil nil))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 nil]])) [[1 [nil nil]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(is (= (outstanding-bytes ab) (is (= (outstanding-bytes ab)
0))) 0)))
(let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue))
(queue "one") (queue "one" :a)
(queue "two") (queue "two" :b)
(queue "three"))] (queue "three" :c))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"]])) [3 ["three" :c]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(is (= (outstanding-bytes ab) (is (= (outstanding-bytes ab)
11))) 11)))
(let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue))
(queue "one") (queue "one" :a)
(queue "two") (queue "two" :b)
(queue "three")) (queue "three" :c))
flushed (second (to-flush ab))] flushed (second (to-flush ab))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"]])) [3 ["three" :c]]]))
(is (= (first (to-flush flushed)) (is (= (first (to-flush flushed))
nil)))) nil))))
(deftest acknowledge-no-existing-data-tests (deftest acknowledge-no-existing-data-tests
(let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue))
(queue "one") (queue "one" :a)
(queue "two") (queue "two" :b)
(queue "three") (queue "three" :c)
(queue "four") (queue "four" :d)
(queue "five"))] (queue "five" :e))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"] [3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(let [ack-ab (acknowledge-id ab 2)] (let [ack-ab (acknowledge-id ab 2 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
[[3 "three"] [[3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
2)) 2))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
13))) 13)))
(let [ack-ab (acknowledge-id ab 0)] (let [ack-ab (acknowledge-id ab 0 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"] [3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
0)) 0))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
19))) 19)))
(let [ack-ab (acknowledge-id ab 6)] (let [ack-ab (acknowledge-id ab 6 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
nil)) nil))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
@ -148,38 +148,38 @@
(let [ab (-> (ArrayBuffer. 2 (let [ab (-> (ArrayBuffer. 2
0 0
(->queue (->queue
[1 "one"] [1 ["one" :a]]
[2 "two"]) [2 ["two" :b]])
(->queue)) (->queue))
(queue "three") (queue "three" :c)
(queue "four") (queue "four" :d)
(queue "five"))] (queue "five" :e))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[3 "three"] [[3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(let [ack-ab (acknowledge-id ab 4)] (let [ack-ab (acknowledge-id ab 4 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
[[5 "five"]])) [[5 ["five" :e]]]))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
4)) 4))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
4))) 4)))
(let [ack-ab (acknowledge-id ab 2)] (let [ack-ab (acknowledge-id ab 2 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
[[3 "three"] [[3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
2)) 2))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
13))) 13)))
(let [ack-ab (acknowledge-id ab 6)] (let [ack-ab (acknowledge-id ab 6 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
nil)) nil))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
@ -192,46 +192,71 @@
0 0
(->queue) (->queue)
(->queue (->queue
[1 "one"] [1 ["one" :a]]
[2 "two"])) [2 ["two" :b]]))
(queue "three") (queue "three" :c)
(queue "four") (queue "four" :d)
(queue "five"))] (queue "five" :e))]
(is (= (first (to-flush ab)) (is (= (first (to-flush ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"] [3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ab) (is (= (last-acknowledged-id ab)
0)) 0))
(let [ack-ab (acknowledge-id ab 2)] (let [ack-ab (acknowledge-id ab 2 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
[[3 "three"] [[3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
2)) 2))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
13))) 13)))
(let [ack-ab (acknowledge-id ab 0)] (let [ack-ab (acknowledge-id ab 0 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
[[1 "one"] [[1 ["one" :a]]
[2 "two"] [2 ["two" :b]]
[3 "three"] [3 ["three" :c]]
[4 "four"] [4 ["four" :d]]
[5 "five"]])) [5 ["five" :e]]]))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
0)) 0))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
19))) 19)))
(let [ack-ab (acknowledge-id ab 6)] (let [ack-ab (acknowledge-id ab 6 nil)]
(is (= (first (to-flush ack-ab)) (is (= (first (to-flush ack-ab))
nil)) nil))
(is (= (last-acknowledged-id ack-ab) (is (= (last-acknowledged-id ack-ab)
6)) 6))
(is (= (outstanding-bytes ack-ab) (is (= (outstanding-bytes ack-ab)
0))))) 0)))))
(deftest on-ack-fn-tests
(let [acknowledged (atom [])
on-ack-fn (fn [[id [string context]]]
(swap! acknowledged conj context))
ab (-> (ArrayBuffer. 0 0 (->queue) (->queue))
(queue "one" :a)
(queue "two" :b)
(queue "three" :c)
(queue "four" :d)
(queue "five" :e))]
(let [ack-ab (acknowledge-id ab 2 on-ack-fn)]
(is (= @acknowledged
[:a :b]))
(reset! acknowledged []))
(let [ack-ab (acknowledge-id ab 0 on-ack-fn)]
(is (= @acknowledged
[]))
(reset! acknowledged []))
(let [ack-ab (acknowledge-id ab 6 on-ack-fn)]
(is (= @acknowledged
[:a :b :c :d :e]))
(reset! acknowledged []))))

View file

@ -1,9 +1,9 @@
(ns net.thegeez.browserchannel.server.session-tests (ns net.thegeez.browserchannel.server.session-tests
(:import (net.thegeez.browserchannel.server ArrayBuffer Session))
(:use (:use
clojure.test clojure.test
net.thegeez.browserchannel.common net.thegeez.browserchannel.common
net.thegeez.browserchannel.server)) net.thegeez.browserchannel.server)
(:import (net.thegeez.browserchannel.server ArrayBuffer Session)))
(def written-responses (atom [])) (def written-responses (atom []))
@ -48,17 +48,44 @@
(ArrayBuffer. 0 0 (->queue) (->queue)) (ArrayBuffer. 0 0 (->queue) (->queue))
nil nil
nil) nil)
(queue-string "\"one\"") (queue-string "\"one\"" {:a "a"})
(queue-string "\"two\""))] (queue-string "\"two\"" {:b "b"}))]
(is (= (type (:array-buffer session)) (is (= (type (:array-buffer session))
ArrayBuffer)) ArrayBuffer))
(is (= (first (to-flush (:array-buffer session))) (is (= (first (to-flush (:array-buffer session)))
[[1 "\"one\""] [[1 ["\"one\"" {:a "a"}]]
[2 "\"two\""]])) [2 ["\"two\"" {:b "b"}]]]))
(is (= (last-acknowledged-id (:array-buffer session)) (is (= (last-acknowledged-id (:array-buffer session))
0)) 0))
(is (= (outstanding-bytes (:array-buffer session)) (is (= (outstanding-bytes (:array-buffer session))
10)))) 10))
(close session nil "close")
(wait-for-agent-send-offs)))
(deftest basic-queue-test-with-nil-contexts
(let [options test-options
back-channel {:respond {}
:chunk true
:bytes-sent 0}
session (-> (Session. "test-id"
options
back-channel
(ArrayBuffer. 0 0 (->queue) (->queue))
nil
nil)
(queue-string "\"one\"" {:a "a"})
(queue-string "\"two\"" nil))]
(is (= (type (:array-buffer session))
ArrayBuffer))
(is (= (first (to-flush (:array-buffer session)))
[[1 ["\"one\"" {:a "a"}]]
[2 ["\"two\"" nil]]]))
(is (= (last-acknowledged-id (:array-buffer session))
0))
(is (= (outstanding-bytes (:array-buffer session))
10))
(close session nil "close")
(wait-for-agent-send-offs)))
(deftest basic-flush-test (deftest basic-flush-test
(let [options test-options (let [options test-options
@ -71,8 +98,8 @@
(ArrayBuffer. 0 0 (->queue) (->queue)) (ArrayBuffer. 0 0 (->queue) (->queue))
nil nil
nil) nil)
(queue-string "\"one\"") (queue-string "\"one\"" {:a "a"})
(queue-string "\"two\"") (queue-string "\"two\"" {:b "b"})
(flush-buffer))] (flush-buffer))]
(is (= (first (to-flush (:array-buffer session))) (is (= (first (to-flush (:array-buffer session)))
nil)) nil))
@ -80,7 +107,33 @@
[[:write "[[1,\"one\"]]"] [[:write "[[1,\"one\"]]"]
[:write "[[2,\"two\"]]"]])) [:write "[[2,\"two\"]]"]]))
(is (= (get-in session [:back-channel :bytes-sent]) (is (= (get-in session [:back-channel :bytes-sent])
10)))) 10))
(close session nil "close")
(wait-for-agent-send-offs)))
(deftest basic-flush-test-with-nil-contexts
(let [options test-options
back-channel {:respond {}
:chunk true
:bytes-sent 0}
session (-> (Session. "test-id"
options
back-channel
(ArrayBuffer. 0 0 (->queue) (->queue))
nil
nil)
(queue-string "\"one\"" {:a "a"})
(queue-string "\"two\"" nil)
(flush-buffer))]
(is (= (first (to-flush (:array-buffer session)))
nil))
(is (= @written-responses
[[:write "[[1,\"one\"]]"]
[:write "[[2,\"two\"]]"]]))
(is (= (get-in session [:back-channel :bytes-sent])
10))
(close session nil "close")
(wait-for-agent-send-offs)))
(deftest flush-without-back-channel-test (deftest flush-without-back-channel-test
(let [options test-options (let [options test-options
@ -91,15 +144,17 @@
(ArrayBuffer. 0 0 (->queue) (->queue)) (ArrayBuffer. 0 0 (->queue) (->queue))
nil nil
nil) nil)
(queue-string "\"one\"") (queue-string "\"one\"" {:a "a"})
(queue-string "\"two\"") (queue-string "\"two\"" {:b "b"})
(flush-buffer))] (flush-buffer))]
(is (nil? (:back-channel session))) (is (nil? (:back-channel session)))
(is (= (first (to-flush (:array-buffer session))) (is (= (first (to-flush (:array-buffer session)))
[[1 "\"one\""] [[1 ["\"one\"" {:a "a"}]]
[2 "\"two\""]])) [2 ["\"two\"" {:b "b"}]]]))
(is (= @written-responses (is (= @written-responses
[])))) []))
(close session nil "close")
(wait-for-agent-send-offs)))
(deftest flush-with-write-error-test (deftest flush-with-write-error-test
(with-redefs [write (fn [response data] (with-redefs [write (fn [response data]
@ -118,18 +173,93 @@
(ArrayBuffer. 0 0 (->queue) (->queue)) (ArrayBuffer. 0 0 (->queue) (->queue))
nil nil
nil) nil)
(queue-string "\"one\"") (queue-string "\"one\"" {:a "a"})
(queue-string "\"two\"") (queue-string "\"two\"" {:b "b"})
(queue-string "\"fail\"") (queue-string "\"fail\"" {:c "c"})
(queue-string "\"three\"") (queue-string "\"three\"" {:d "d"})
(flush-buffer))] (flush-buffer))]
(is (= (first (to-flush (:array-buffer session))) (is (= (first (to-flush (:array-buffer session)))
[[1 "\"one\""] [[1 ["\"one\"" {:a "a"}]]
[2 "\"two\""] [2 ["\"two\"" {:b "b"}]]
[3 "\"fail\""] [3 ["\"fail\"" {:c "c"}]]
[4 "\"three\""]])) [4 ["\"three\"" {:d "d"}]]]))
(is (= @written-responses (is (= @written-responses
[[:write "[[1,\"one\"]]"] [[:write "[[1,\"one\"]]"]
[:write "[[2,\"two\"]]"] [:write "[[2,\"two\"]]"]
[:write-end]])) [:write-end]]))
(is (nil? (:back-channel session)))))) (is (nil? (:back-channel session)))
(close session nil "close")
(wait-for-agent-send-offs))))
(deftest on-sent-callback-test
(let [sent-count (atom 0)
on-sent (fn []
(swap! sent-count inc))
options test-options
back-channel {:respond {}
:chunk true
:bytes-sent 0}
session (-> (Session. "test-id"
options
back-channel
(ArrayBuffer. 0 0 (->queue) (->queue))
nil
nil)
(queue-string "\"one\"" {:on-sent on-sent})
(queue-string "\"two\"" {:on-sent on-sent})
(flush-buffer))]
(is (= (first (to-flush (:array-buffer session)))
nil))
(is (= @written-responses
[[:write "[[1,\"one\"]]"]
[:write "[[2,\"two\"]]"]]))
(is (= (get-in session [:back-channel :bytes-sent])
10))
(close session nil "close")
(wait-for-agent-send-offs)
(is (= 2 @sent-count))))
(deftest on-error-callback-test
(with-redefs [write (fn [response data]
; simple way to intentionally trigger a failure when
; flush-buffer internally calls write
(if (.contains data "fail")
(throw (new Exception "intentional write failure"))
(mock-write response data)))]
(let [error-count (atom 0)
on-error (fn []
(swap! error-count inc))
options test-options
back-channel {:respond {}
:chunk true
:bytes-sent 0}
session (-> (Session. "test-id"
options
back-channel
(ArrayBuffer. 0 0 (->queue) (->queue))
nil
nil)
(queue-string "\"one\"" {:on-error on-error})
(queue-string "\"two\"" {:on-error on-error})
(queue-string "\"fail\"" {:on-error on-error})
(queue-string "\"three\"" {:on-error on-error})
(flush-buffer))]
(is (= (first (to-flush (:array-buffer session)))
[[1 ["\"one\"" {:on-error on-error}]]
[2 ["\"two\"" {:on-error on-error}]]
[3 ["\"fail\"" {:on-error on-error}]]
[4 ["\"three\"" {:on-error on-error}]]]))
(is (= @written-responses
[[:write "[[1,\"one\"]]"]
[:write "[[2,\"two\"]]"]
[:write-end]]))
(is (nil? (:back-channel session)))
(close session nil "close")
(wait-for-agent-send-offs)
; even though 2 were still written, the way flush-buffer works currently
; is that on any error, the entire buffer contents are kept and retried
; on the next call. when close is called, any items left in the buffer
; are assumed to have not been sent due to error (either flush-buffer
; failed to write them out to the backchannel, or the client did not
; open a backchannel, etc.
(is (= 4 @error-count)))))

View file

@ -42,33 +42,49 @@
[4 "four"] [4 "four"]
[5 "five"])] [5 "five"])]
(= (drop-queue q 2) (is (= (drop-queue q 2 nil)
(->queue (->queue
[3 "three"] [3 "three"]
[4 "four"] [4 "four"]
[5 "five"])) [5 "five"])))
(= (drop-queue q 0) (is (= (drop-queue q 0 nil)
q) q))
(= (drop-queue q 5) (is (= (drop-queue q 5 nil)
(->queue)))) (->queue))))
(let [dropped (atom [])
q (->queue
[1 "one"]
[2 "two"]
[3 "three"]
[4 "four"]
[5 "five"])]
(is (= (drop-queue q 2 #(swap! dropped conj %))
(->queue
[3 "three"]
[4 "four"]
[5 "five"])))
(is (= @dropped
[[1 "one"]
[2 "two"]]))))
(deftest encoded-map-tests (deftest encoded-map-tests
(= (encode-map "hello, world") (is (= (encode-map "hello, world")
{"__edn" "\"hello, world\""}) {"__edn" "\"hello, world\""}))
(= (decode-map {"__edn" "\"hello, world\""}) (is (= (decode-map {"__edn" "\"hello, world\""})
"hello, world") "hello, world"))
(= (encode-map {:foo "bar"}) (is (= (encode-map {:foo "bar"})
{"__edn" "{:foo \"bar\"}"}) {"__edn" "{:foo \"bar\"}"}))
(= (decode-map {"__edn" "{:foo \"bar\"}"}) (is (= (decode-map {"__edn" "{:foo \"bar\"}"})
{:foo "bar"}) {:foo "bar"}))
(= (encode-map nil) (is (= (encode-map nil)
{"__edn" "nil"}) {"__edn" "nil"}))
(= (decode-map {"__edn" "nil"}) (is (= (decode-map {"__edn" "nil"})
nil) nil))
(= (decode-map {:foo "bar"}) (is (= (decode-map {:foo "bar"})
{:foo "bar"})) {:foo "bar"})))