Revert "alternative to prevent need of drop-queue in flush-buffer, abstracts array-buffer"
This reverts commit 64b0076fab
.
This commit is contained in:
parent
64b0076fab
commit
da42760ecc
|
@ -144,6 +144,7 @@
|
|||
listeners)))
|
||||
;; end of listeners
|
||||
|
||||
|
||||
;; Wrapper around writers on continuations
|
||||
;; the write methods raise an Exception with the wrapped response in closed
|
||||
;; @todo use a more specific Exception
|
||||
|
@ -198,73 +199,6 @@
|
|||
(async-adapter/write-chunk respond "<script>try {parent.d(); }catch (e){}</script>\n")
|
||||
(async-adapter/close respond)))
|
||||
|
||||
;;ArrayBuffer
|
||||
;;buffer of [[id_lowest data] ... [id_highest data]]
|
||||
(defprotocol IArrayBuffer
|
||||
(queue [this string])
|
||||
(acknowledge-id [this id])
|
||||
(to-flush [this])
|
||||
(last-acknowledged-id [this])
|
||||
(outstanding-bytes [this])
|
||||
)
|
||||
|
||||
(deftype ArrayBuffer [;; id of the next array that is conj'ed, can't
|
||||
;; always be derived because flush buffer might
|
||||
;; be empty
|
||||
next-array-id
|
||||
|
||||
;; needed for session status
|
||||
last-acknowledged-id
|
||||
|
||||
;; array that have been flushed, but not yet
|
||||
;; acknowledged, does not contain noop messages
|
||||
to-acknowledge-arrays
|
||||
|
||||
;; arrays to be sent out, may contain arrays
|
||||
;; that were in to-acknowledge-arrays but queued
|
||||
;; again for resending
|
||||
to-flush-arrays
|
||||
]
|
||||
IArrayBuffer
|
||||
(queue [this string]
|
||||
(ArrayBuffer. (inc next-array-id)
|
||||
last-acknowledged-id
|
||||
to-acknowledge-arrays
|
||||
(conj to-flush-arrays [next-array-id string])))
|
||||
|
||||
;; id may cause the following splits:
|
||||
;; normal case:
|
||||
;; ack-arrs <id> flush-arrs
|
||||
;; client is slow case:
|
||||
;; ack-arrs <id> ack-arrs flush-arrs
|
||||
;; after arrays have been requeued:
|
||||
;; ack-arrs flush-arrs <id> flush-arrs
|
||||
;; everything before id can be discarded, everything after id
|
||||
;; becomes new flush-arrs and is resend
|
||||
(acknowledge-id [this id]
|
||||
(ArrayBuffer. next-array-id
|
||||
id
|
||||
clojure.lang.PersistentQueue/EMPTY
|
||||
(into (drop-queue to-acknowledge-arrays id)
|
||||
(drop-queue to-flush-arrays id))))
|
||||
|
||||
;; return [seq-to-flush-array next-array-buffer] or nil if
|
||||
;; to-flush-arrays is empty
|
||||
(to-flush [this]
|
||||
(when-let [to-flush (seq to-flush-arrays)]
|
||||
[to-flush (ArrayBuffer. next-array-id
|
||||
last-acknowledged-id
|
||||
(into to-acknowledge-arrays
|
||||
(remove (fn [[id string]]
|
||||
(= string "[\"noop\"]"))
|
||||
to-flush))
|
||||
clojure.lang.PersistentQueue/EMPTY)]))
|
||||
(last-acknowledged-id [this]
|
||||
last-acknowledged-id)
|
||||
;; the sum of all the data that is still to be send
|
||||
(outstanding-bytes [this]
|
||||
(reduce + 0 (map (comp count second) to-flush-arrays))))
|
||||
|
||||
;; {sessionId -> (agent session)}
|
||||
(def sessions (atom {}))
|
||||
|
||||
|
@ -327,9 +261,15 @@
|
|||
;; multiple connections
|
||||
back-channel
|
||||
|
||||
;; ArrayBuffer
|
||||
;; arrays to send out to client
|
||||
;; is a queue
|
||||
;; [[id_lowest, data] .. [id_highest, data]]
|
||||
array-buffer
|
||||
|
||||
;; we might have flushed more than the client has
|
||||
;; acknowledged
|
||||
last-acknowledged-array-id
|
||||
|
||||
;; ScheduleTask or nil
|
||||
heartbeat-timeout
|
||||
|
||||
|
@ -404,10 +344,21 @@
|
|||
(send-off session-agent close "Timed out"))
|
||||
(:session-timeout-interval details))))))
|
||||
(queue-string [this string]
|
||||
(update-in this [:array-buffer] queue string))
|
||||
(let [next-array-id
|
||||
(inc (if (empty? array-buffer)
|
||||
last-acknowledged-array-id
|
||||
;; if flush-buffer fails after acknowledge-arrays,
|
||||
;; it could happen that last sent array id is
|
||||
;; different from last acknowledged one
|
||||
(first (last array-buffer))))]
|
||||
(-> this
|
||||
(update-in [:array-buffer] conj [next-array-id string]))))
|
||||
(acknowledge-arrays [this array-id]
|
||||
(let [array-id (Long/parseLong array-id)]
|
||||
(update-in this [:array-buffer] acknowledge-id array-id)))
|
||||
(-> this
|
||||
(assoc :last-acknowledged-array-id array-id)
|
||||
;; don't need to keep already array ids sent before acknowledged one
|
||||
(update-in [:array-buffer] drop-queue array-id))))
|
||||
;; tries to do the actual writing to the client
|
||||
;; @todo the composition is a bit awkward in this method due to the
|
||||
;; try catch and if mix
|
||||
|
@ -415,19 +366,21 @@
|
|||
(if-not back-channel
|
||||
this ;; nothing to do when there's no connection
|
||||
;; only flush unacknowledged arrays
|
||||
(if-let [[to-flush next-array-buffer] (to-flush array-buffer)]
|
||||
(if-let [buffer (seq (drop-queue array-buffer last-acknowledged-array-id))]
|
||||
(try
|
||||
;; buffer contains [[1 json-str] ...] can't use
|
||||
;; json-str which will double escape the json
|
||||
(let [data (str "["
|
||||
(str/join "," (map (fn [[n d]] (str "[" n "," d "]")) to-flush))
|
||||
(str/join "," (map (fn [[n d]] (str "[" n "," d "]")) buffer))
|
||||
"]")]
|
||||
;; write throws exception when the connection is closed
|
||||
(write (:respond back-channel) data))
|
||||
;; size is an approximation
|
||||
(let [this (let [size (reduce + 0 (map count (map second to-flush)))]
|
||||
(let [this (let [size (reduce + 0 (map count (map second buffer)))]
|
||||
(-> this
|
||||
(assoc :array-buffer next-array-buffer)
|
||||
;; assume last array just sent is acknowledge
|
||||
;; so we don't send it again on next flush
|
||||
(assoc :last-acknowledged-array-id (first (last buffer)))
|
||||
(update-in [:back-channel :bytes-sent] + size)))
|
||||
;; clear-back-channel closes the back
|
||||
;; channel when the channel does not
|
||||
|
@ -484,13 +437,10 @@
|
|||
session (-> (Session. id
|
||||
details
|
||||
nil ;; backchannel
|
||||
(ArrayBuffer.
|
||||
0 ;; next-array-id
|
||||
0 ;; last-acknowledged-id
|
||||
;; to-acknowledge-arrays
|
||||
clojure.lang.PersistentQueue/EMPTY
|
||||
;; to-flush-arrays
|
||||
clojure.lang.PersistentQueue/EMPTY)
|
||||
clojure.lang.PersistentQueue/EMPTY ;; array-buffer
|
||||
0 ;; last-acknowledged-array-id, the array
|
||||
;; with id 0 will be sent as an answer to
|
||||
;; the first forward-channel POST
|
||||
nil ;; heartbeat-timeout
|
||||
nil ;; session-timeout
|
||||
)
|
||||
|
@ -505,8 +455,13 @@
|
|||
|
||||
(defn session-status [session]
|
||||
(let [has-back-channel (if (:back-channel session) 1 0)
|
||||
array-buffer (:array-buffer session)]
|
||||
[has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)]))
|
||||
last-acknowledged-array-id (:last-acknowledged-array-id session)
|
||||
;; the sum of all the data that is still to be send
|
||||
outstanding-bytes (let [buffer (:array-buffer session)]
|
||||
(if (empty? buffer)
|
||||
0
|
||||
(reduce + 0 (map count (map second buffer)))))]
|
||||
[has-back-channel last-acknowledged-array-id outstanding-bytes]))
|
||||
|
||||
;; convience function to send data to a session
|
||||
;; the data will be queued until there is a backchannel to send it
|
||||
|
|
Reference in a new issue