Make acknowledge-arrays work to survive disconnects

This commit is contained in:
Vassil Dichev 2012-05-19 09:48:15 +03:00 committed by Gijs Stuurman
parent a862e74f91
commit 785aae1094

View file

@ -73,18 +73,10 @@
(let [head (peek queue)] (let [head (peek queue)]
(if-not head (if-not head
queue queue
(if (< id head) (if (< id (first head))
queue queue
(recur (pop queue) id))))) (recur (pop queue) id)))))
(assert (let [ids [10 11 12 13 14 15 16]
pq (into clojure.lang.PersistentQueue/EMPTY ids)]
(= [14 15 16]
(drop-queue pq 13))))
(assert (= clojure.lang.PersistentQueue/EMPTY
(drop-queue clojure.lang.PersistentQueue/EMPTY 13)))
(defn transform-url-data [data] (defn transform-url-data [data]
(let [ofs (get data "ofs" "0") (let [ofs (get data "ofs" "0")
@ -274,8 +266,9 @@
;; [[id_lowest, data] .. [id_highest, data]] ;; [[id_lowest, data] .. [id_highest, data]]
array-buffer array-buffer
;; last flushed array id ;; we might have flushed more than the client has
last-sent-array-id ;; acknowledged
last-acknowledged-array-id
;; ScheduleTask or nil ;; ScheduleTask or nil
heartbeat-timeout heartbeat-timeout
@ -351,18 +344,30 @@
(send-off session-agent close "Timed out")) (send-off session-agent close "Timed out"))
(:session-timeout-interval details)))))) (:session-timeout-interval details))))))
(queue-string [this string] (queue-string [this string]
(let [next-array-id (inc last-sent-array-id)] (let [next-array-id
(inc (if (empty? array-buffer)
last-acknowledged-array-id
;; if flush-buffer fails after acknowledge-arrays,
;; it could happen that last sent array id is
;; different from last acknowledged one
(first (last array-buffer))))]
(-> this (-> this
(update-in [:array-buffer] conj [next-array-id string]) (update-in [:array-buffer] conj [next-array-id string]))))
(assoc :last-sent-array-id next-array-id)))) (acknowledge-arrays [this array-id]
(acknowledge-arrays [this array-id]) (let [array-id (Long/parseLong array-id)]
(-> this
(assoc :last-acknowledged-array-id array-id)
;; don't need to keep already array ids sent before acknowledged one
(update-in [:array-buffer] drop-queue array-id)
(flush-buffer))))
;; tries to do the actual writing to the client ;; tries to do the actual writing to the client
;; @todo the composition is a bit awkward in this method due to the ;; @todo the composition is a bit awkward in this method due to the
;; try catch and if mix ;; try catch and if mix
(flush-buffer [this] (flush-buffer [this]
(if-not back-channel (if-not back-channel
this ;; nothing to do when there's no connection this ;; nothing to do when there's no connection
(if-let [buffer (seq array-buffer)] ;; only flush unacknowledged arrays
(if-let [buffer (seq (drop-queue array-buffer last-acknowledged-array-id))]
(try (try
;; buffer contains [[1 json-str] ...] can't use ;; buffer contains [[1 json-str] ...] can't use
;; json-str which will double escape the json ;; json-str which will double escape the json
@ -374,8 +379,9 @@
;; size is an approximation ;; size is an approximation
(let [this (let [size (reduce + 0 (map count (map second buffer)))] (let [this (let [size (reduce + 0 (map count (map second buffer)))]
(-> this (-> this
(assoc :array-buffer clojure.lang.PersistentQueue/EMPTY) ;; assume last array just sent is acknowledge
(assoc :last-sent-array-id (first (last buffer))) ;; so we don't send it again on next flush
(assoc :last-acknowledged-array-id (first (last buffer)))
(update-in [:back-channel :bytes-sent] + size))) (update-in [:back-channel :bytes-sent] + size)))
;; clear-back-channel closes the back ;; clear-back-channel closes the back
;; channel when the channel does not ;; channel when the channel does not
@ -433,7 +439,7 @@
details details
nil ;; backchannel nil ;; backchannel
clojure.lang.PersistentQueue/EMPTY ;; array-buffer clojure.lang.PersistentQueue/EMPTY ;; array-buffer
0 ;; last-sent-array-id, the array 0 ;; last-acknowledged-array-id, the array
;; with id 0 will be sent as an answer to ;; with id 0 will be sent as an answer to
;; the first forward-channel POST ;; the first forward-channel POST
nil ;; heartbeat-timeout nil ;; heartbeat-timeout
@ -450,13 +456,13 @@
(defn session-status [session] (defn session-status [session]
(let [has-back-channel (if (:back-channel session) 1 0) (let [has-back-channel (if (:back-channel session) 1 0)
last-sent-array-id (:last-sent-array-id session) last-acknowledged-array-id (:last-acknowledged-array-id session)
;; the sum of all the data that is still to be send ;; the sum of all the data that is still to be send
outstanding-bytes (let [buffer (:array-buffer session)] outstanding-bytes (let [buffer (:array-buffer session)]
(if (empty? buffer) (if (empty? buffer)
0 0
(reduce + 0 (map count (map second buffer)))))] (reduce + 0 (map count (map second buffer)))))]
[has-back-channel last-sent-array-id outstanding-bytes])) [has-back-channel last-acknowledged-array-id outstanding-bytes]))
;; convience function to send data to a session ;; convience function to send data to a session
;; the data will be queued until there is a backchannel to send it ;; the data will be queued until there is a backchannel to send it