From da42760ecc61c5d009474aaddc9990f9b5479dea Mon Sep 17 00:00:00 2001 From: Gijs Stuurman Date: Fri, 1 Jun 2012 21:10:33 +0200 Subject: [PATCH] Revert "alternative to prevent need of drop-queue in flush-buffer, abstracts array-buffer" This reverts commit 64b0076fabd0a8c8d7ffd6b4d0ab8515363b1c64. --- .../src/net/thegeez/browserchannel.clj | 121 ++++++------------ 1 file changed, 38 insertions(+), 83 deletions(-) diff --git a/clj-browserchannel-server/src/net/thegeez/browserchannel.clj b/clj-browserchannel-server/src/net/thegeez/browserchannel.clj index 13508ef..a367602 100644 --- a/clj-browserchannel-server/src/net/thegeez/browserchannel.clj +++ b/clj-browserchannel-server/src/net/thegeez/browserchannel.clj @@ -144,6 +144,7 @@ listeners))) ;; end of listeners + ;; Wrapper around writers on continuations ;; the write methods raise an Exception with the wrapped response in closed ;; @todo use a more specific Exception @@ -198,73 +199,6 @@ (async-adapter/write-chunk respond "\n") (async-adapter/close respond))) -;;ArrayBuffer -;;buffer of [[id_lowest data] ... [id_highest data]] -(defprotocol IArrayBuffer - (queue [this string]) - (acknowledge-id [this id]) - (to-flush [this]) - (last-acknowledged-id [this]) - (outstanding-bytes [this]) - ) - -(deftype ArrayBuffer [;; id of the next array that is conj'ed, can't - ;; always be derived because flush buffer might - ;; be empty - next-array-id - - ;; needed for session status - last-acknowledged-id - - ;; array that have been flushed, but not yet - ;; acknowledged, does not contain noop messages - to-acknowledge-arrays - - ;; arrays to be sent out, may contain arrays - ;; that were in to-acknowledge-arrays but queued - ;; again for resending - to-flush-arrays - ] - IArrayBuffer - (queue [this string] - (ArrayBuffer. (inc next-array-id) - last-acknowledged-id - to-acknowledge-arrays - (conj to-flush-arrays [next-array-id string]))) - - ;; id may cause the following splits: - ;; normal case: - ;; ack-arrs flush-arrs - ;; client is slow case: - ;; ack-arrs ack-arrs flush-arrs - ;; after arrays have been requeued: - ;; ack-arrs flush-arrs flush-arrs - ;; everything before id can be discarded, everything after id - ;; becomes new flush-arrs and is resend - (acknowledge-id [this id] - (ArrayBuffer. next-array-id - id - clojure.lang.PersistentQueue/EMPTY - (into (drop-queue to-acknowledge-arrays id) - (drop-queue to-flush-arrays id)))) - - ;; return [seq-to-flush-array next-array-buffer] or nil if - ;; to-flush-arrays is empty - (to-flush [this] - (when-let [to-flush (seq to-flush-arrays)] - [to-flush (ArrayBuffer. next-array-id - last-acknowledged-id - (into to-acknowledge-arrays - (remove (fn [[id string]] - (= string "[\"noop\"]")) - to-flush)) - clojure.lang.PersistentQueue/EMPTY)])) - (last-acknowledged-id [this] - last-acknowledged-id) - ;; the sum of all the data that is still to be send - (outstanding-bytes [this] - (reduce + 0 (map (comp count second) to-flush-arrays)))) - ;; {sessionId -> (agent session)} (def sessions (atom {})) @@ -327,9 +261,15 @@ ;; multiple connections back-channel - ;; ArrayBuffer + ;; arrays to send out to client + ;; is a queue + ;; [[id_lowest, data] .. [id_highest, data]] array-buffer + ;; we might have flushed more than the client has + ;; acknowledged + last-acknowledged-array-id + ;; ScheduleTask or nil heartbeat-timeout @@ -404,10 +344,21 @@ (send-off session-agent close "Timed out")) (:session-timeout-interval details)))))) (queue-string [this string] - (update-in this [:array-buffer] queue string)) + (let [next-array-id + (inc (if (empty? array-buffer) + last-acknowledged-array-id + ;; if flush-buffer fails after acknowledge-arrays, + ;; it could happen that last sent array id is + ;; different from last acknowledged one + (first (last array-buffer))))] + (-> this + (update-in [:array-buffer] conj [next-array-id string])))) (acknowledge-arrays [this array-id] (let [array-id (Long/parseLong array-id)] - (update-in this [:array-buffer] acknowledge-id array-id))) + (-> this + (assoc :last-acknowledged-array-id array-id) + ;; don't need to keep already array ids sent before acknowledged one + (update-in [:array-buffer] drop-queue array-id)))) ;; tries to do the actual writing to the client ;; @todo the composition is a bit awkward in this method due to the ;; try catch and if mix @@ -415,19 +366,21 @@ (if-not back-channel this ;; nothing to do when there's no connection ;; only flush unacknowledged arrays - (if-let [[to-flush next-array-buffer] (to-flush array-buffer)] + (if-let [buffer (seq (drop-queue array-buffer last-acknowledged-array-id))] (try ;; buffer contains [[1 json-str] ...] can't use ;; json-str which will double escape the json (let [data (str "[" - (str/join "," (map (fn [[n d]] (str "[" n "," d "]")) to-flush)) + (str/join "," (map (fn [[n d]] (str "[" n "," d "]")) buffer)) "]")] ;; write throws exception when the connection is closed (write (:respond back-channel) data)) ;; size is an approximation - (let [this (let [size (reduce + 0 (map count (map second to-flush)))] + (let [this (let [size (reduce + 0 (map count (map second buffer)))] (-> this - (assoc :array-buffer next-array-buffer) + ;; assume last array just sent is acknowledge + ;; so we don't send it again on next flush + (assoc :last-acknowledged-array-id (first (last buffer))) (update-in [:back-channel :bytes-sent] + size))) ;; clear-back-channel closes the back ;; channel when the channel does not @@ -484,13 +437,10 @@ session (-> (Session. id details nil ;; backchannel - (ArrayBuffer. - 0 ;; next-array-id - 0 ;; last-acknowledged-id - ;; to-acknowledge-arrays - clojure.lang.PersistentQueue/EMPTY - ;; to-flush-arrays - clojure.lang.PersistentQueue/EMPTY) + clojure.lang.PersistentQueue/EMPTY ;; array-buffer + 0 ;; last-acknowledged-array-id, the array + ;; with id 0 will be sent as an answer to + ;; the first forward-channel POST nil ;; heartbeat-timeout nil ;; session-timeout ) @@ -505,8 +455,13 @@ (defn session-status [session] (let [has-back-channel (if (:back-channel session) 1 0) - array-buffer (:array-buffer session)] - [has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)])) + last-acknowledged-array-id (:last-acknowledged-array-id session) + ;; the sum of all the data that is still to be send + outstanding-bytes (let [buffer (:array-buffer session)] + (if (empty? buffer) + 0 + (reduce + 0 (map count (map second buffer)))))] + [has-back-channel last-acknowledged-array-id outstanding-bytes])) ;; convience function to send data to a session ;; the data will be queued until there is a backchannel to send it