alternative to prevent need of drop-queue in flush-buffer, abstracts array-buffer

This commit is contained in:
Gijs Stuurman 2012-06-01 21:08:35 +02:00
parent 42c30ad723
commit 64b0076fab

View file

@ -144,7 +144,6 @@
listeners))) listeners)))
;; end of listeners ;; end of listeners
;; Wrapper around writers on continuations ;; Wrapper around writers on continuations
;; the write methods raise an Exception with the wrapped response in closed ;; the write methods raise an Exception with the wrapped response in closed
;; @todo use a more specific Exception ;; @todo use a more specific Exception
@ -199,6 +198,73 @@
(async-adapter/write-chunk respond "<script>try {parent.d(); }catch (e){}</script>\n") (async-adapter/write-chunk respond "<script>try {parent.d(); }catch (e){}</script>\n")
(async-adapter/close respond))) (async-adapter/close respond)))
;;ArrayBuffer
;;buffer of [[id_lowest data] ... [id_highest data]]
(defprotocol IArrayBuffer
(queue [this string])
(acknowledge-id [this id])
(to-flush [this])
(last-acknowledged-id [this])
(outstanding-bytes [this])
)
(deftype ArrayBuffer [;; id of the next array that is conj'ed, can't
;; always be derived because flush buffer might
;; be empty
next-array-id
;; needed for session status
last-acknowledged-id
;; array that have been flushed, but not yet
;; acknowledged, does not contain noop messages
to-acknowledge-arrays
;; arrays to be sent out, may contain arrays
;; that were in to-acknowledge-arrays but queued
;; again for resending
to-flush-arrays
]
IArrayBuffer
(queue [this string]
(ArrayBuffer. (inc next-array-id)
last-acknowledged-id
to-acknowledge-arrays
(conj to-flush-arrays [next-array-id string])))
;; id may cause the following splits:
;; normal case:
;; ack-arrs <id> flush-arrs
;; client is slow case:
;; ack-arrs <id> ack-arrs flush-arrs
;; after arrays have been requeued:
;; ack-arrs flush-arrs <id> flush-arrs
;; everything before id can be discarded, everything after id
;; becomes new flush-arrs and is resend
(acknowledge-id [this id]
(ArrayBuffer. next-array-id
id
clojure.lang.PersistentQueue/EMPTY
(into (drop-queue to-acknowledge-arrays id)
(drop-queue to-flush-arrays id))))
;; return [seq-to-flush-array next-array-buffer] or nil if
;; to-flush-arrays is empty
(to-flush [this]
(when-let [to-flush (seq to-flush-arrays)]
[to-flush (ArrayBuffer. next-array-id
last-acknowledged-id
(into to-acknowledge-arrays
(remove (fn [[id string]]
(= string "[\"noop\"]"))
to-flush))
clojure.lang.PersistentQueue/EMPTY)]))
(last-acknowledged-id [this]
last-acknowledged-id)
;; the sum of all the data that is still to be send
(outstanding-bytes [this]
(reduce + 0 (map (comp count second) to-flush-arrays))))
;; {sessionId -> (agent session)} ;; {sessionId -> (agent session)}
(def sessions (atom {})) (def sessions (atom {}))
@ -261,15 +327,9 @@
;; multiple connections ;; multiple connections
back-channel back-channel
;; arrays to send out to client ;; ArrayBuffer
;; is a queue
;; [[id_lowest, data] .. [id_highest, data]]
array-buffer array-buffer
;; we might have flushed more than the client has
;; acknowledged
last-acknowledged-array-id
;; ScheduleTask or nil ;; ScheduleTask or nil
heartbeat-timeout heartbeat-timeout
@ -344,21 +404,10 @@
(send-off session-agent close "Timed out")) (send-off session-agent close "Timed out"))
(:session-timeout-interval details)))))) (:session-timeout-interval details))))))
(queue-string [this string] (queue-string [this string]
(let [next-array-id (update-in this [:array-buffer] queue string))
(inc (if (empty? array-buffer)
last-acknowledged-array-id
;; if flush-buffer fails after acknowledge-arrays,
;; it could happen that last sent array id is
;; different from last acknowledged one
(first (last array-buffer))))]
(-> this
(update-in [:array-buffer] conj [next-array-id string]))))
(acknowledge-arrays [this array-id] (acknowledge-arrays [this array-id]
(let [array-id (Long/parseLong array-id)] (let [array-id (Long/parseLong array-id)]
(-> this (update-in this [:array-buffer] acknowledge-id array-id)))
(assoc :last-acknowledged-array-id array-id)
;; don't need to keep already array ids sent before acknowledged one
(update-in [:array-buffer] drop-queue array-id))))
;; tries to do the actual writing to the client ;; tries to do the actual writing to the client
;; @todo the composition is a bit awkward in this method due to the ;; @todo the composition is a bit awkward in this method due to the
;; try catch and if mix ;; try catch and if mix
@ -366,21 +415,19 @@
(if-not back-channel (if-not back-channel
this ;; nothing to do when there's no connection this ;; nothing to do when there's no connection
;; only flush unacknowledged arrays ;; only flush unacknowledged arrays
(if-let [buffer (seq (drop-queue array-buffer last-acknowledged-array-id))] (if-let [[to-flush next-array-buffer] (to-flush array-buffer)]
(try (try
;; buffer contains [[1 json-str] ...] can't use ;; buffer contains [[1 json-str] ...] can't use
;; json-str which will double escape the json ;; json-str which will double escape the json
(let [data (str "[" (let [data (str "["
(str/join "," (map (fn [[n d]] (str "[" n "," d "]")) buffer)) (str/join "," (map (fn [[n d]] (str "[" n "," d "]")) to-flush))
"]")] "]")]
;; write throws exception when the connection is closed ;; write throws exception when the connection is closed
(write (:respond back-channel) data)) (write (:respond back-channel) data))
;; size is an approximation ;; size is an approximation
(let [this (let [size (reduce + 0 (map count (map second buffer)))] (let [this (let [size (reduce + 0 (map count (map second to-flush)))]
(-> this (-> this
;; assume last array just sent is acknowledge (assoc :array-buffer next-array-buffer)
;; so we don't send it again on next flush
(assoc :last-acknowledged-array-id (first (last buffer)))
(update-in [:back-channel :bytes-sent] + size))) (update-in [:back-channel :bytes-sent] + size)))
;; clear-back-channel closes the back ;; clear-back-channel closes the back
;; channel when the channel does not ;; channel when the channel does not
@ -437,10 +484,13 @@
session (-> (Session. id session (-> (Session. id
details details
nil ;; backchannel nil ;; backchannel
clojure.lang.PersistentQueue/EMPTY ;; array-buffer (ArrayBuffer.
0 ;; last-acknowledged-array-id, the array 0 ;; next-array-id
;; with id 0 will be sent as an answer to 0 ;; last-acknowledged-id
;; the first forward-channel POST ;; to-acknowledge-arrays
clojure.lang.PersistentQueue/EMPTY
;; to-flush-arrays
clojure.lang.PersistentQueue/EMPTY)
nil ;; heartbeat-timeout nil ;; heartbeat-timeout
nil ;; session-timeout nil ;; session-timeout
) )
@ -455,13 +505,8 @@
(defn session-status [session] (defn session-status [session]
(let [has-back-channel (if (:back-channel session) 1 0) (let [has-back-channel (if (:back-channel session) 1 0)
last-acknowledged-array-id (:last-acknowledged-array-id session) array-buffer (:array-buffer session)]
;; the sum of all the data that is still to be send [has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)]))
outstanding-bytes (let [buffer (:array-buffer session)]
(if (empty? buffer)
0
(reduce + 0 (map count (map second buffer)))))]
[has-back-channel last-acknowledged-array-id outstanding-bytes]))
;; convience function to send data to a session ;; convience function to send data to a session
;; the data will be queued until there is a backchannel to send it ;; the data will be queued until there is a backchannel to send it