From 767a22ab095c3a2aee8e46580eaee43cd860d506 Mon Sep 17 00:00:00 2001 From: gered Date: Mon, 16 May 2016 11:33:32 -0400 Subject: [PATCH] add server-side success/error callback support to send-data --- .../net/thegeez/browserchannel/client.cljs | 2 +- .../src/net/thegeez/browserchannel/server.clj | 144 ++++++++----- .../server/array_buffer_tests.clj | 197 ++++++++++-------- .../browserchannel/server/session_tests.clj | 178 +++++++++++++--- .../browserchannel/server/utils_test.clj | 62 ++++-- 5 files changed, 394 insertions(+), 189 deletions(-) diff --git a/clj-browserchannel/src/net/thegeez/browserchannel/client.cljs b/clj-browserchannel/src/net/thegeez/browserchannel/client.cljs index ac9a51a..d2a6aae 100644 --- a/clj-browserchannel/src/net/thegeez/browserchannel/client.cljs +++ b/clj-browserchannel/src/net/thegeez/browserchannel/client.cljs @@ -276,7 +276,7 @@ ;; sets the timeout (in milliseconds) for a forward channel request :forward-channel-request-timeout (* 20 1000) - ; base time delay for another connection attempt is made. note + ; base time delay before another connection attempt is made. note ; that a random time between 0 and :connect-retry-delay-seed is ; added to this value to determine the final reconnect delay time. ; time is in milliseconds diff --git a/clj-browserchannel/src/net/thegeez/browserchannel/server.clj b/clj-browserchannel/src/net/thegeez/browserchannel/server.clj index 92c13c0..fb3490a 100644 --- a/clj-browserchannel/src/net/thegeez/browserchannel/server.clj +++ b/clj-browserchannel/src/net/thegeez/browserchannel/server.clj @@ -57,13 +57,15 @@ ;; type preserving drop upto for queueus (defn drop-queue - [queue id] + [queue id on-drop] (let [head (peek queue)] (if-not head queue (if (< id (first head)) queue - (recur (pop queue) id))))) + (do + (if on-drop (on-drop head)) + (recur (pop queue) id on-drop)))))) ;; Key value pairs do not always come ordered by request number. @@ -117,9 +119,9 @@ (fn [ag e] (log/error e (str id " agent exception.")))) -(defn- to-pair - [p] - (str "[" (first p) "," (second p) "]")) +(defn- array-item-to-json-pair + [item] + (str "[" (first item) "," (first (second item)) "]")) (defn decode-map [m] @@ -231,10 +233,10 @@ (async-adapter/close respond))) ;;ArrayBuffer -;;buffer of [[id_lowest data] ... [id_highest data]] +;;buffer of [[id_lowest [data context]] ... [id_highest [data context]]] (defprotocol IArrayBuffer - (queue [this string]) - (acknowledge-id [this id]) + (queue [this string context]) + (acknowledge-id [this id on-ack-fn]) (to-flush [this]) (last-acknowledged-id [this]) (outstanding-bytes [this])) @@ -255,15 +257,17 @@ ;; arrays to be sent out, may contain arrays ;; that were in to-acknowledge-arrays but queued ;; again for resending + ;; format of each array item is [id [data context]] + ;; where data is the actual string and context is a map or nil to-flush-arrays] IArrayBuffer - (queue [this string] + (queue [this string context] (let [next-array-id (inc array-id)] (ArrayBuffer. next-array-id last-acknowledged-id to-acknowledge-arrays - (conj to-flush-arrays [next-array-id string])))) + (conj to-flush-arrays [next-array-id [string context]])))) ;; id may cause the following splits: ;; normal case: @@ -274,21 +278,23 @@ ;; ack-arrs flush-arrs flush-arrs ;; everything before id can be discarded, everything after id ;; becomes new flush-arrs and is resend - (acknowledge-id [this id] + (acknowledge-id [this id on-ack-fn] (ArrayBuffer. array-id id clojure.lang.PersistentQueue/EMPTY - (into (drop-queue to-acknowledge-arrays id) - (drop-queue to-flush-arrays id)))) + (into (drop-queue to-acknowledge-arrays id on-ack-fn) + (drop-queue to-flush-arrays id on-ack-fn)))) ;; return [seq-to-flush-array next-array-buffer] or nil if ;; to-flush-arrays is empty + ;; format of each array item is [id [data context]] + ;; where data is the actual string and context is a map or nil (to-flush [this] (when-let [to-flush (seq to-flush-arrays)] [to-flush (ArrayBuffer. array-id last-acknowledged-id (into to-acknowledge-arrays - (remove (fn [[id string]] + (remove (fn [[id [string context]]] (= string noop-string)) to-flush)) clojure.lang.PersistentQueue/EMPTY)])) @@ -296,9 +302,9 @@ (last-acknowledged-id [this] last-acknowledged-id) - ;; the sum of all the data that is still to be send + ;; the sum of all the data that is still to be sent (outstanding-bytes [this] - (reduce + 0 (map (comp count second) to-flush-arrays)))) + (reduce + 0 (map (comp count first second) to-flush-arrays)))) @@ -322,7 +328,8 @@ ;; the client acknowledges received arrays when creating a new backwardchannel (acknowledge-arrays [this array-id-str]) - (queue-string [this json-string]) + ;; context is either a map or nil + (queue-string [this json-string context]) ;; heartbeat is a timer to send noop over the backward channel (clear-heartbeat [this]) @@ -437,7 +444,7 @@ (schedule (fn [] (send-off session-agent #(-> % - (queue-string noop-string) + (queue-string noop-string nil) flush-buffer))) (:heartbeat-interval details)))))) @@ -461,14 +468,16 @@ (send-off session-agent close nil "Timed out")) (:session-timeout-interval details)))))) - (queue-string [this json-string] - (log/trace id ": queue-string" (pr-str json-string)) - (update-in this [:array-buffer] queue json-string)) + (queue-string [this json-string context] + (log/trace id ": queue-string" (pr-str json-string) (if context "(has context)" "")) + (update-in this [:array-buffer] queue json-string context)) (acknowledge-arrays [this array-id-str] (log/trace id ": acknowledge-arrays" array-id-str) (let [array-id (Long/parseLong array-id-str)] - (update-in this [:array-buffer] acknowledge-id array-id))) + (update-in this [:array-buffer] acknowledge-id array-id + (fn [[id [string {:keys [on-confirm]}]]] + (if on-confirm (on-confirm)))))) ;; tries to do the actual writing to the client ;; @todo the composition is a bit awkward in this method due to the @@ -481,14 +490,16 @@ (if-let [[to-flush next-array-buffer] (to-flush array-buffer)] (try (log/trace id ": flushing" (count to-flush) "arrays") - ;; buffer contains [[1 json-str] ...] can't use - ;; json-str which will double escape the json + ;; buffer contains [[1 [json-str context]] ...] + ;; can't use json-str which will double escape the json - (doseq [p to-flush] - (write (:respond back-channel) (str "[" (to-pair p) "]"))) + (doseq [item to-flush] + (let [{:keys [on-sent] :as context} (second (second item))] + (write (:respond back-channel) (str "[" (array-item-to-json-pair item) "]")) + (if on-sent (on-sent)))) ;; size is an approximation - (let [this (let [size (reduce + 0 (map count (map second to-flush)))] + (let [this (let [size (reduce + 0 (map count (map (comp first second) to-flush)))] (-> this (assoc :array-buffer next-array-buffer) (update-in [:back-channel :bytes-sent] + size))) @@ -507,6 +518,8 @@ (log/trace e id ": exception during flush") ;; when write failed ;; non delivered arrays are still in buffer + ;; (note that we don't raise any on-error callbacks for any items in the buffer + ;; as we will retry sending the items next time since they're still in the buffer) (clear-back-channel this) )) this ;; do nothing if buffer is empty @@ -521,10 +534,25 @@ ;; the heartbeat timeout is cancelled by clear-back-channel ) (swap! sessions dissoc id) + ; raise callbacks for any remaining unsent messages + (let [[remaining _] (to-flush array-buffer)] + (doseq [[id [string {:keys [on-error] :as context}]] remaining] + (if on-error (on-error)))) + ; finally raise the session close event (notify-listeners id request :close message) nil ;; the agent will no longer wrap a session )) +(defn- handle-old-session-reconnect + [req old-session-id old-session-agent old-array-id] + (log/trace old-session-id ": old session reconnect") + (send-off old-session-agent + (fn [this] + (-> (if old-array-id + (acknowledge-arrays this old-array-id) + this) + (close req "Reconnected"))))) + ;; creates a session agent wrapping session data and ;; adds the session to sessions (defn- create-session-agent @@ -535,12 +563,8 @@ old-array-id "OAID"} (:query-params req)] ;; when a client specifies and old session id then that old one ;; needs to be removed - (when-let [old-session-agent (@sessions old-session-id)] - (log/trace old-session-id ": old session reconnect") - (send-off old-session-agent #(-> (if old-array-id - (acknowledge-arrays % old-array-id) - %) - (close req "Reconnected")))) + (if-let [old-session-agent (@sessions old-session-id)] + (handle-old-session-reconnect req old-session-id old-session-agent old-array-id)) (let [id (uuid) details {:address (:remote-addr req) :headers (:headers req) @@ -551,18 +575,17 @@ session (Session. id details nil ;; backchannel - (ArrayBuffer. - 0 ;; array-id, 0 is never used by the - ;; array-buffer, it is used by the - ;; first message with the session id - 0 ;; last-acknowledged-id - ;; to-acknowledge-arrays - clojure.lang.PersistentQueue/EMPTY - ;; to-flush-arrays - clojure.lang.PersistentQueue/EMPTY) - nil ;; heartbeat-timeout - nil ;; session-timeout - ) + (ArrayBuffer. 0 ;; array-id, 0 is never used by the + ;; array-buffer, it is used by the + ;; first message with the session id. + 0 ;; last-acknowledged-id + ;; to-acknowledge-arrays + clojure.lang.PersistentQueue/EMPTY + ;; to-flush-arrays + clojure.lang.PersistentQueue/EMPTY) + nil ;; heartbeat-timeout + nil ;; session-timeout + ) session-agent (agent session)] (log/trace id ": new session created" (if old-session-id (str "(old session id: " old-session-id ")") "")) (set-error-handler! session-agent (agent-error-handler-fn (str "session-" (:id session)))) @@ -594,27 +617,38 @@ ;; the data will be queued until there is a backchannel to send it ;; over (defn- send-map - [session-id m] + [session-id m context] (when-let [session-agent (get @sessions session-id)] (let [string (json/generate-string m)] - (send-off session-agent #(-> % - (queue-string string) - flush-buffer)) + (send-off session-agent + #(-> % + (queue-string string context) + flush-buffer)) string))) (defn send-data "sends data to the client identified by session-id over the backchannel. if there is currently no available backchannel for this client, the data - is queued until one is available." - [session-id data] + is queued until one is available. context can contain optional callback + functions: + + on-sent - when the data has been written to an active backchannel + on-confirm - when the client confirms it has received the data. note that + it may take awhile (minutes) for the client to send an + acknowledgement + on-error - when there was an error (of any kind) sending the data" + [session-id data & [context]] (if data - (send-map session-id (encode-map data)))) + (send-map session-id (encode-map data) context))) (defn send-data-to-all - "sends data to all currently connected clients over their backchannels." - [data] + "sends data to all currently connected clients over their backchannels. + context can contain optional callback functions which will be used for + all the data sent. see send-data for a description of the different + callback functions available." + [data & [context]] (doseq [[session-id _] @sessions] - (send-data session-id data))) + (send-data session-id data context))) (defn connected? "returns true if a client with the given session-id is currently connected." diff --git a/clj-browserchannel/test/net/thegeez/browserchannel/server/array_buffer_tests.clj b/clj-browserchannel/test/net/thegeez/browserchannel/server/array_buffer_tests.clj index e5ffd0e..bac7d47 100644 --- a/clj-browserchannel/test/net/thegeez/browserchannel/server/array_buffer_tests.clj +++ b/clj-browserchannel/test/net/thegeez/browserchannel/server/array_buffer_tests.clj @@ -18,13 +18,13 @@ 0 (->queue) (->queue - [1 "one"] - [2 "two"] - [3 "three"]))] + [1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]]))] (is (= (first (to-flush ab)) - [[1 "one"] - [2 "two"] - [3 "three"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]]])) (is (= (last-acknowledged-id ab) 0)) (is (= (outstanding-bytes ab) @@ -33,9 +33,9 @@ (let [ab (ArrayBuffer. 3 0 (->queue - [1 "one"] - [2 "two"] - [3 "three"]) + [1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]]) (->queue))] (is (= (first (to-flush ab)) nil)) @@ -46,97 +46,97 @@ (deftest queue-tests (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) - (queue "one"))] + (queue "one" :a))] (is (= (first (to-flush ab)) - [[1 "one"]])) + [[1 ["one" :a]]])) (is (= (last-acknowledged-id ab) 0)) (is (= (outstanding-bytes ab) 3))) - (let [ab (-> (ArrayBuffer. 1 0 (->queue) (->queue [1 "one"])) - (queue "two"))] + (let [ab (-> (ArrayBuffer. 1 0 (->queue) (->queue [1 ["one" :a]])) + (queue "two" :b))] (is (= (first (to-flush ab)) - [[1 "one"] - [2 "two"]])) + [[1 ["one" :a]] + [2 ["two" :b]]])) (is (= (last-acknowledged-id ab) 0)) (is (= (outstanding-bytes ab) 6))) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) - (queue nil))] + (queue nil nil))] (is (= (first (to-flush ab)) - [[1 nil]])) + [[1 [nil nil]]])) (is (= (last-acknowledged-id ab) 0)) (is (= (outstanding-bytes ab) 0))) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) - (queue "one") - (queue "two") - (queue "three"))] + (queue "one" :a) + (queue "two" :b) + (queue "three" :c))] (is (= (first (to-flush ab)) - [[1 "one"] - [2 "two"] - [3 "three"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]]])) (is (= (last-acknowledged-id ab) 0)) (is (= (outstanding-bytes ab) 11))) (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) - (queue "one") - (queue "two") - (queue "three")) + (queue "one" :a) + (queue "two" :b) + (queue "three" :c)) flushed (second (to-flush ab))] (is (= (first (to-flush ab)) - [[1 "one"] - [2 "two"] - [3 "three"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]]])) (is (= (first (to-flush flushed)) nil)))) (deftest acknowledge-no-existing-data-tests (let [ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) - (queue "one") - (queue "two") - (queue "three") - (queue "four") - (queue "five"))] + (queue "one" :a) + (queue "two" :b) + (queue "three" :c) + (queue "four" :d) + (queue "five" :e))] (is (= (first (to-flush ab)) - [[1 "one"] - [2 "two"] - [3 "three"] - [4 "four"] - [5 "five"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ab) 0)) - (let [ack-ab (acknowledge-id ab 2)] + (let [ack-ab (acknowledge-id ab 2 nil)] (is (= (first (to-flush ack-ab)) - [[3 "three"] - [4 "four"] - [5 "five"]])) + [[3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ack-ab) 2)) (is (= (outstanding-bytes ack-ab) 13))) - (let [ack-ab (acknowledge-id ab 0)] + (let [ack-ab (acknowledge-id ab 0 nil)] (is (= (first (to-flush ack-ab)) - [[1 "one"] - [2 "two"] - [3 "three"] - [4 "four"] - [5 "five"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ack-ab) 0)) (is (= (outstanding-bytes ack-ab) 19))) - (let [ack-ab (acknowledge-id ab 6)] + (let [ack-ab (acknowledge-id ab 6 nil)] (is (= (first (to-flush ack-ab)) nil)) (is (= (last-acknowledged-id ack-ab) @@ -148,38 +148,38 @@ (let [ab (-> (ArrayBuffer. 2 0 (->queue - [1 "one"] - [2 "two"]) + [1 ["one" :a]] + [2 ["two" :b]]) (->queue)) - (queue "three") - (queue "four") - (queue "five"))] + (queue "three" :c) + (queue "four" :d) + (queue "five" :e))] (is (= (first (to-flush ab)) - [[3 "three"] - [4 "four"] - [5 "five"]])) + [[3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ab) 0)) - (let [ack-ab (acknowledge-id ab 4)] + (let [ack-ab (acknowledge-id ab 4 nil)] (is (= (first (to-flush ack-ab)) - [[5 "five"]])) + [[5 ["five" :e]]])) (is (= (last-acknowledged-id ack-ab) 4)) (is (= (outstanding-bytes ack-ab) 4))) - (let [ack-ab (acknowledge-id ab 2)] + (let [ack-ab (acknowledge-id ab 2 nil)] (is (= (first (to-flush ack-ab)) - [[3 "three"] - [4 "four"] - [5 "five"]])) + [[3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ack-ab) 2)) (is (= (outstanding-bytes ack-ab) 13))) - (let [ack-ab (acknowledge-id ab 6)] + (let [ack-ab (acknowledge-id ab 6 nil)] (is (= (first (to-flush ack-ab)) nil)) (is (= (last-acknowledged-id ack-ab) @@ -192,46 +192,71 @@ 0 (->queue) (->queue - [1 "one"] - [2 "two"])) - (queue "three") - (queue "four") - (queue "five"))] + [1 ["one" :a]] + [2 ["two" :b]])) + (queue "three" :c) + (queue "four" :d) + (queue "five" :e))] (is (= (first (to-flush ab)) - [[1 "one"] - [2 "two"] - [3 "three"] - [4 "four"] - [5 "five"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ab) 0)) - (let [ack-ab (acknowledge-id ab 2)] + (let [ack-ab (acknowledge-id ab 2 nil)] (is (= (first (to-flush ack-ab)) - [[3 "three"] - [4 "four"] - [5 "five"]])) + [[3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ack-ab) 2)) (is (= (outstanding-bytes ack-ab) 13))) - (let [ack-ab (acknowledge-id ab 0)] + (let [ack-ab (acknowledge-id ab 0 nil)] (is (= (first (to-flush ack-ab)) - [[1 "one"] - [2 "two"] - [3 "three"] - [4 "four"] - [5 "five"]])) + [[1 ["one" :a]] + [2 ["two" :b]] + [3 ["three" :c]] + [4 ["four" :d]] + [5 ["five" :e]]])) (is (= (last-acknowledged-id ack-ab) 0)) (is (= (outstanding-bytes ack-ab) 19))) - (let [ack-ab (acknowledge-id ab 6)] + (let [ack-ab (acknowledge-id ab 6 nil)] (is (= (first (to-flush ack-ab)) nil)) (is (= (last-acknowledged-id ack-ab) 6)) (is (= (outstanding-bytes ack-ab) 0))))) + +(deftest on-ack-fn-tests + (let [acknowledged (atom []) + on-ack-fn (fn [[id [string context]]] + (swap! acknowledged conj context)) + ab (-> (ArrayBuffer. 0 0 (->queue) (->queue)) + (queue "one" :a) + (queue "two" :b) + (queue "three" :c) + (queue "four" :d) + (queue "five" :e))] + (let [ack-ab (acknowledge-id ab 2 on-ack-fn)] + (is (= @acknowledged + [:a :b])) + (reset! acknowledged [])) + + (let [ack-ab (acknowledge-id ab 0 on-ack-fn)] + (is (= @acknowledged + [])) + (reset! acknowledged [])) + + (let [ack-ab (acknowledge-id ab 6 on-ack-fn)] + (is (= @acknowledged + [:a :b :c :d :e])) + (reset! acknowledged [])))) \ No newline at end of file diff --git a/clj-browserchannel/test/net/thegeez/browserchannel/server/session_tests.clj b/clj-browserchannel/test/net/thegeez/browserchannel/server/session_tests.clj index ad8848f..d11d030 100644 --- a/clj-browserchannel/test/net/thegeez/browserchannel/server/session_tests.clj +++ b/clj-browserchannel/test/net/thegeez/browserchannel/server/session_tests.clj @@ -1,9 +1,9 @@ (ns net.thegeez.browserchannel.server.session-tests - (:import (net.thegeez.browserchannel.server ArrayBuffer Session)) (:use clojure.test net.thegeez.browserchannel.common - net.thegeez.browserchannel.server)) + net.thegeez.browserchannel.server) + (:import (net.thegeez.browserchannel.server ArrayBuffer Session))) (def written-responses (atom [])) @@ -48,17 +48,44 @@ (ArrayBuffer. 0 0 (->queue) (->queue)) nil nil) - (queue-string "\"one\"") - (queue-string "\"two\""))] + (queue-string "\"one\"" {:a "a"}) + (queue-string "\"two\"" {:b "b"}))] (is (= (type (:array-buffer session)) ArrayBuffer)) (is (= (first (to-flush (:array-buffer session))) - [[1 "\"one\""] - [2 "\"two\""]])) + [[1 ["\"one\"" {:a "a"}]] + [2 ["\"two\"" {:b "b"}]]])) (is (= (last-acknowledged-id (:array-buffer session)) 0)) (is (= (outstanding-bytes (:array-buffer session)) - 10)))) + 10)) + (close session nil "close") + (wait-for-agent-send-offs))) + +(deftest basic-queue-test-with-nil-contexts + (let [options test-options + back-channel {:respond {} + :chunk true + :bytes-sent 0} + session (-> (Session. "test-id" + options + back-channel + (ArrayBuffer. 0 0 (->queue) (->queue)) + nil + nil) + (queue-string "\"one\"" {:a "a"}) + (queue-string "\"two\"" nil))] + (is (= (type (:array-buffer session)) + ArrayBuffer)) + (is (= (first (to-flush (:array-buffer session))) + [[1 ["\"one\"" {:a "a"}]] + [2 ["\"two\"" nil]]])) + (is (= (last-acknowledged-id (:array-buffer session)) + 0)) + (is (= (outstanding-bytes (:array-buffer session)) + 10)) + (close session nil "close") + (wait-for-agent-send-offs))) (deftest basic-flush-test (let [options test-options @@ -71,8 +98,8 @@ (ArrayBuffer. 0 0 (->queue) (->queue)) nil nil) - (queue-string "\"one\"") - (queue-string "\"two\"") + (queue-string "\"one\"" {:a "a"}) + (queue-string "\"two\"" {:b "b"}) (flush-buffer))] (is (= (first (to-flush (:array-buffer session))) nil)) @@ -80,7 +107,33 @@ [[:write "[[1,\"one\"]]"] [:write "[[2,\"two\"]]"]])) (is (= (get-in session [:back-channel :bytes-sent]) - 10)))) + 10)) + (close session nil "close") + (wait-for-agent-send-offs))) + +(deftest basic-flush-test-with-nil-contexts + (let [options test-options + back-channel {:respond {} + :chunk true + :bytes-sent 0} + session (-> (Session. "test-id" + options + back-channel + (ArrayBuffer. 0 0 (->queue) (->queue)) + nil + nil) + (queue-string "\"one\"" {:a "a"}) + (queue-string "\"two\"" nil) + (flush-buffer))] + (is (= (first (to-flush (:array-buffer session))) + nil)) + (is (= @written-responses + [[:write "[[1,\"one\"]]"] + [:write "[[2,\"two\"]]"]])) + (is (= (get-in session [:back-channel :bytes-sent]) + 10)) + (close session nil "close") + (wait-for-agent-send-offs))) (deftest flush-without-back-channel-test (let [options test-options @@ -91,15 +144,17 @@ (ArrayBuffer. 0 0 (->queue) (->queue)) nil nil) - (queue-string "\"one\"") - (queue-string "\"two\"") + (queue-string "\"one\"" {:a "a"}) + (queue-string "\"two\"" {:b "b"}) (flush-buffer))] (is (nil? (:back-channel session))) (is (= (first (to-flush (:array-buffer session))) - [[1 "\"one\""] - [2 "\"two\""]])) + [[1 ["\"one\"" {:a "a"}]] + [2 ["\"two\"" {:b "b"}]]])) (is (= @written-responses - [])))) + [])) + (close session nil "close") + (wait-for-agent-send-offs))) (deftest flush-with-write-error-test (with-redefs [write (fn [response data] @@ -118,18 +173,93 @@ (ArrayBuffer. 0 0 (->queue) (->queue)) nil nil) - (queue-string "\"one\"") - (queue-string "\"two\"") - (queue-string "\"fail\"") - (queue-string "\"three\"") + (queue-string "\"one\"" {:a "a"}) + (queue-string "\"two\"" {:b "b"}) + (queue-string "\"fail\"" {:c "c"}) + (queue-string "\"three\"" {:d "d"}) (flush-buffer))] (is (= (first (to-flush (:array-buffer session))) - [[1 "\"one\""] - [2 "\"two\""] - [3 "\"fail\""] - [4 "\"three\""]])) + [[1 ["\"one\"" {:a "a"}]] + [2 ["\"two\"" {:b "b"}]] + [3 ["\"fail\"" {:c "c"}]] + [4 ["\"three\"" {:d "d"}]]])) (is (= @written-responses [[:write "[[1,\"one\"]]"] [:write "[[2,\"two\"]]"] [:write-end]])) - (is (nil? (:back-channel session)))))) + (is (nil? (:back-channel session))) + (close session nil "close") + (wait-for-agent-send-offs)))) + +(deftest on-sent-callback-test + (let [sent-count (atom 0) + on-sent (fn [] + (swap! sent-count inc)) + options test-options + back-channel {:respond {} + :chunk true + :bytes-sent 0} + session (-> (Session. "test-id" + options + back-channel + (ArrayBuffer. 0 0 (->queue) (->queue)) + nil + nil) + (queue-string "\"one\"" {:on-sent on-sent}) + (queue-string "\"two\"" {:on-sent on-sent}) + (flush-buffer))] + (is (= (first (to-flush (:array-buffer session))) + nil)) + (is (= @written-responses + [[:write "[[1,\"one\"]]"] + [:write "[[2,\"two\"]]"]])) + (is (= (get-in session [:back-channel :bytes-sent]) + 10)) + (close session nil "close") + (wait-for-agent-send-offs) + (is (= 2 @sent-count)))) + +(deftest on-error-callback-test + (with-redefs [write (fn [response data] + ; simple way to intentionally trigger a failure when + ; flush-buffer internally calls write + (if (.contains data "fail") + (throw (new Exception "intentional write failure")) + (mock-write response data)))] + (let [error-count (atom 0) + on-error (fn [] + (swap! error-count inc)) + options test-options + back-channel {:respond {} + :chunk true + :bytes-sent 0} + session (-> (Session. "test-id" + options + back-channel + (ArrayBuffer. 0 0 (->queue) (->queue)) + nil + nil) + (queue-string "\"one\"" {:on-error on-error}) + (queue-string "\"two\"" {:on-error on-error}) + (queue-string "\"fail\"" {:on-error on-error}) + (queue-string "\"three\"" {:on-error on-error}) + (flush-buffer))] + (is (= (first (to-flush (:array-buffer session))) + [[1 ["\"one\"" {:on-error on-error}]] + [2 ["\"two\"" {:on-error on-error}]] + [3 ["\"fail\"" {:on-error on-error}]] + [4 ["\"three\"" {:on-error on-error}]]])) + (is (= @written-responses + [[:write "[[1,\"one\"]]"] + [:write "[[2,\"two\"]]"] + [:write-end]])) + (is (nil? (:back-channel session))) + (close session nil "close") + (wait-for-agent-send-offs) + ; even though 2 were still written, the way flush-buffer works currently + ; is that on any error, the entire buffer contents are kept and retried + ; on the next call. when close is called, any items left in the buffer + ; are assumed to have not been sent due to error (either flush-buffer + ; failed to write them out to the backchannel, or the client did not + ; open a backchannel, etc. + (is (= 4 @error-count))))) \ No newline at end of file diff --git a/clj-browserchannel/test/net/thegeez/browserchannel/server/utils_test.clj b/clj-browserchannel/test/net/thegeez/browserchannel/server/utils_test.clj index 3492ceb..7a3c307 100644 --- a/clj-browserchannel/test/net/thegeez/browserchannel/server/utils_test.clj +++ b/clj-browserchannel/test/net/thegeez/browserchannel/server/utils_test.clj @@ -42,33 +42,49 @@ [4 "four"] [5 "five"])] - (= (drop-queue q 2) - (->queue - [3 "three"] - [4 "four"] - [5 "five"])) + (is (= (drop-queue q 2 nil) + (->queue + [3 "three"] + [4 "four"] + [5 "five"]))) - (= (drop-queue q 0) - q) + (is (= (drop-queue q 0 nil) + q)) - (= (drop-queue q 5) - (->queue)))) + (is (= (drop-queue q 5 nil) + (->queue)))) + + (let [dropped (atom []) + q (->queue + [1 "one"] + [2 "two"] + [3 "three"] + [4 "four"] + [5 "five"])] + (is (= (drop-queue q 2 #(swap! dropped conj %)) + (->queue + [3 "three"] + [4 "four"] + [5 "five"]))) + (is (= @dropped + [[1 "one"] + [2 "two"]])))) (deftest encoded-map-tests - (= (encode-map "hello, world") - {"__edn" "\"hello, world\""}) - (= (decode-map {"__edn" "\"hello, world\""}) - "hello, world") + (is (= (encode-map "hello, world") + {"__edn" "\"hello, world\""})) + (is (= (decode-map {"__edn" "\"hello, world\""}) + "hello, world")) - (= (encode-map {:foo "bar"}) - {"__edn" "{:foo \"bar\"}"}) - (= (decode-map {"__edn" "{:foo \"bar\"}"}) - {:foo "bar"}) + (is (= (encode-map {:foo "bar"}) + {"__edn" "{:foo \"bar\"}"})) + (is (= (decode-map {"__edn" "{:foo \"bar\"}"}) + {:foo "bar"})) - (= (encode-map nil) - {"__edn" "nil"}) - (= (decode-map {"__edn" "nil"}) - nil) + (is (= (encode-map nil) + {"__edn" "nil"})) + (is (= (decode-map {"__edn" "nil"}) + nil)) - (= (decode-map {:foo "bar"}) - {:foo "bar"})) + (is (= (decode-map {:foo "bar"}) + {:foo "bar"})))