From 785aae10940748e7c6825408f56b9c45f450e807 Mon Sep 17 00:00:00 2001 From: Vassil Dichev Date: Sat, 19 May 2012 09:48:15 +0300 Subject: [PATCH] Make acknowledge-arrays work to survive disconnects --- .../src/net/thegeez/browserchannel.clj | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/clj-browserchannel-server/src/net/thegeez/browserchannel.clj b/clj-browserchannel-server/src/net/thegeez/browserchannel.clj index c622d35..f55615f 100644 --- a/clj-browserchannel-server/src/net/thegeez/browserchannel.clj +++ b/clj-browserchannel-server/src/net/thegeez/browserchannel.clj @@ -73,18 +73,10 @@ (let [head (peek queue)] (if-not head queue - (if (< id head) + (if (< id (first head)) queue (recur (pop queue) id))))) -(assert (let [ids [10 11 12 13 14 15 16] - pq (into clojure.lang.PersistentQueue/EMPTY ids)] - (= [14 15 16] - (drop-queue pq 13)))) -(assert (= clojure.lang.PersistentQueue/EMPTY - (drop-queue clojure.lang.PersistentQueue/EMPTY 13))) - - (defn transform-url-data [data] (let [ofs (get data "ofs" "0") @@ -274,8 +266,9 @@ ;; [[id_lowest, data] .. [id_highest, data]] array-buffer - ;; last flushed array id - last-sent-array-id + ;; we might have flushed more than the client has + ;; acknowledged + last-acknowledged-array-id ;; ScheduleTask or nil heartbeat-timeout @@ -351,18 +344,30 @@ (send-off session-agent close "Timed out")) (:session-timeout-interval details)))))) (queue-string [this string] - (let [next-array-id (inc last-sent-array-id)] + (let [next-array-id + (inc (if (empty? array-buffer) + last-acknowledged-array-id + ;; if flush-buffer fails after acknowledge-arrays, + ;; it could happen that last sent array id is + ;; different from last acknowledged one + (first (last array-buffer))))] (-> this - (update-in [:array-buffer] conj [next-array-id string]) - (assoc :last-sent-array-id next-array-id)))) - (acknowledge-arrays [this array-id]) + (update-in [:array-buffer] conj [next-array-id string])))) + (acknowledge-arrays [this array-id] + (let [array-id (Long/parseLong array-id)] + (-> this + (assoc :last-acknowledged-array-id array-id) + ;; don't need to keep already array ids sent before acknowledged one + (update-in [:array-buffer] drop-queue array-id) + (flush-buffer)))) ;; tries to do the actual writing to the client ;; @todo the composition is a bit awkward in this method due to the ;; try catch and if mix (flush-buffer [this] (if-not back-channel this ;; nothing to do when there's no connection - (if-let [buffer (seq array-buffer)] + ;; only flush unacknowledged arrays + (if-let [buffer (seq (drop-queue array-buffer last-acknowledged-array-id))] (try ;; buffer contains [[1 json-str] ...] can't use ;; json-str which will double escape the json @@ -374,8 +379,9 @@ ;; size is an approximation (let [this (let [size (reduce + 0 (map count (map second buffer)))] (-> this - (assoc :array-buffer clojure.lang.PersistentQueue/EMPTY) - (assoc :last-sent-array-id (first (last buffer))) + ;; assume last array just sent is acknowledge + ;; so we don't send it again on next flush + (assoc :last-acknowledged-array-id (first (last buffer))) (update-in [:back-channel :bytes-sent] + size))) ;; clear-back-channel closes the back ;; channel when the channel does not @@ -433,7 +439,7 @@ details nil ;; backchannel clojure.lang.PersistentQueue/EMPTY ;; array-buffer - 0 ;; last-sent-array-id, the array + 0 ;; last-acknowledged-array-id, the array ;; with id 0 will be sent as an answer to ;; the first forward-channel POST nil ;; heartbeat-timeout @@ -450,13 +456,13 @@ (defn session-status [session] (let [has-back-channel (if (:back-channel session) 1 0) - last-sent-array-id (:last-sent-array-id session) + last-acknowledged-array-id (:last-acknowledged-array-id session) ;; the sum of all the data that is still to be send outstanding-bytes (let [buffer (:array-buffer session)] (if (empty? buffer) 0 (reduce + 0 (map count (map second buffer)))))] - [has-back-channel last-sent-array-id outstanding-bytes])) + [has-back-channel last-acknowledged-array-id outstanding-bytes])) ;; convience function to send data to a session ;; the data will be queued until there is a backchannel to send it