diff --git a/clj-browserchannel-server/src/net/thegeez/browserchannel.clj b/clj-browserchannel-server/src/net/thegeez/browserchannel.clj index a367602..13508ef 100644 --- a/clj-browserchannel-server/src/net/thegeez/browserchannel.clj +++ b/clj-browserchannel-server/src/net/thegeez/browserchannel.clj @@ -144,7 +144,6 @@ listeners))) ;; end of listeners - ;; Wrapper around writers on continuations ;; the write methods raise an Exception with the wrapped response in closed ;; @todo use a more specific Exception @@ -199,6 +198,73 @@ (async-adapter/write-chunk respond "\n") (async-adapter/close respond))) +;;ArrayBuffer +;;buffer of [[id_lowest data] ... [id_highest data]] +(defprotocol IArrayBuffer + (queue [this string]) + (acknowledge-id [this id]) + (to-flush [this]) + (last-acknowledged-id [this]) + (outstanding-bytes [this]) + ) + +(deftype ArrayBuffer [;; id of the next array that is conj'ed, can't + ;; always be derived because flush buffer might + ;; be empty + next-array-id + + ;; needed for session status + last-acknowledged-id + + ;; array that have been flushed, but not yet + ;; acknowledged, does not contain noop messages + to-acknowledge-arrays + + ;; arrays to be sent out, may contain arrays + ;; that were in to-acknowledge-arrays but queued + ;; again for resending + to-flush-arrays + ] + IArrayBuffer + (queue [this string] + (ArrayBuffer. (inc next-array-id) + last-acknowledged-id + to-acknowledge-arrays + (conj to-flush-arrays [next-array-id string]))) + + ;; id may cause the following splits: + ;; normal case: + ;; ack-arrs flush-arrs + ;; client is slow case: + ;; ack-arrs ack-arrs flush-arrs + ;; after arrays have been requeued: + ;; ack-arrs flush-arrs flush-arrs + ;; everything before id can be discarded, everything after id + ;; becomes new flush-arrs and is resend + (acknowledge-id [this id] + (ArrayBuffer. next-array-id + id + clojure.lang.PersistentQueue/EMPTY + (into (drop-queue to-acknowledge-arrays id) + (drop-queue to-flush-arrays id)))) + + ;; return [seq-to-flush-array next-array-buffer] or nil if + ;; to-flush-arrays is empty + (to-flush [this] + (when-let [to-flush (seq to-flush-arrays)] + [to-flush (ArrayBuffer. next-array-id + last-acknowledged-id + (into to-acknowledge-arrays + (remove (fn [[id string]] + (= string "[\"noop\"]")) + to-flush)) + clojure.lang.PersistentQueue/EMPTY)])) + (last-acknowledged-id [this] + last-acknowledged-id) + ;; the sum of all the data that is still to be send + (outstanding-bytes [this] + (reduce + 0 (map (comp count second) to-flush-arrays)))) + ;; {sessionId -> (agent session)} (def sessions (atom {})) @@ -261,15 +327,9 @@ ;; multiple connections back-channel - ;; arrays to send out to client - ;; is a queue - ;; [[id_lowest, data] .. [id_highest, data]] + ;; ArrayBuffer array-buffer - ;; we might have flushed more than the client has - ;; acknowledged - last-acknowledged-array-id - ;; ScheduleTask or nil heartbeat-timeout @@ -344,21 +404,10 @@ (send-off session-agent close "Timed out")) (:session-timeout-interval details)))))) (queue-string [this string] - (let [next-array-id - (inc (if (empty? array-buffer) - last-acknowledged-array-id - ;; if flush-buffer fails after acknowledge-arrays, - ;; it could happen that last sent array id is - ;; different from last acknowledged one - (first (last array-buffer))))] - (-> this - (update-in [:array-buffer] conj [next-array-id string])))) + (update-in this [:array-buffer] queue string)) (acknowledge-arrays [this array-id] (let [array-id (Long/parseLong array-id)] - (-> this - (assoc :last-acknowledged-array-id array-id) - ;; don't need to keep already array ids sent before acknowledged one - (update-in [:array-buffer] drop-queue array-id)))) + (update-in this [:array-buffer] acknowledge-id array-id))) ;; tries to do the actual writing to the client ;; @todo the composition is a bit awkward in this method due to the ;; try catch and if mix @@ -366,21 +415,19 @@ (if-not back-channel this ;; nothing to do when there's no connection ;; only flush unacknowledged arrays - (if-let [buffer (seq (drop-queue array-buffer last-acknowledged-array-id))] + (if-let [[to-flush next-array-buffer] (to-flush array-buffer)] (try ;; buffer contains [[1 json-str] ...] can't use ;; json-str which will double escape the json (let [data (str "[" - (str/join "," (map (fn [[n d]] (str "[" n "," d "]")) buffer)) + (str/join "," (map (fn [[n d]] (str "[" n "," d "]")) to-flush)) "]")] ;; write throws exception when the connection is closed (write (:respond back-channel) data)) ;; size is an approximation - (let [this (let [size (reduce + 0 (map count (map second buffer)))] + (let [this (let [size (reduce + 0 (map count (map second to-flush)))] (-> this - ;; assume last array just sent is acknowledge - ;; so we don't send it again on next flush - (assoc :last-acknowledged-array-id (first (last buffer))) + (assoc :array-buffer next-array-buffer) (update-in [:back-channel :bytes-sent] + size))) ;; clear-back-channel closes the back ;; channel when the channel does not @@ -437,10 +484,13 @@ session (-> (Session. id details nil ;; backchannel - clojure.lang.PersistentQueue/EMPTY ;; array-buffer - 0 ;; last-acknowledged-array-id, the array - ;; with id 0 will be sent as an answer to - ;; the first forward-channel POST + (ArrayBuffer. + 0 ;; next-array-id + 0 ;; last-acknowledged-id + ;; to-acknowledge-arrays + clojure.lang.PersistentQueue/EMPTY + ;; to-flush-arrays + clojure.lang.PersistentQueue/EMPTY) nil ;; heartbeat-timeout nil ;; session-timeout ) @@ -455,13 +505,8 @@ (defn session-status [session] (let [has-back-channel (if (:back-channel session) 1 0) - last-acknowledged-array-id (:last-acknowledged-array-id session) - ;; the sum of all the data that is still to be send - outstanding-bytes (let [buffer (:array-buffer session)] - (if (empty? buffer) - 0 - (reduce + 0 (map count (map second buffer)))))] - [has-back-channel last-acknowledged-array-id outstanding-bytes])) + array-buffer (:array-buffer session)] + [has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)])) ;; convience function to send data to a session ;; the data will be queued until there is a backchannel to send it