super nitpicky code formatting

i apologize to anyone looking at this commit and wondering why i would
do this, but all these little "off" formatting things have been bugging
me all day while looking at this code. i simply had to do something.
This commit is contained in:
Gered 2016-05-08 19:20:28 -04:00
parent 4e63631854
commit 9d314c4d88
3 changed files with 396 additions and 339 deletions

View file

@ -4,15 +4,19 @@
[immutant.web.async :as iasync] [immutant.web.async :as iasync]
[net.thegeez.browserchannel.async-adapter :as bc-async-adapter])) [net.thegeez.browserchannel.async-adapter :as bc-async-adapter]))
(deftype ImmutantResponse [channel] (deftype ImmutantResponse
[channel]
bc-async-adapter/IAsyncAdapter bc-async-adapter/IAsyncAdapter
(head [this status headers] (head [this status headers]
(let [headers (assoc headers "Transfer-Encoding" "chunked")] (let [headers (assoc headers "Transfer-Encoding" "chunked")]
(iasync/send! channel {:status status :headers headers}))) (iasync/send! channel {:status status :headers headers})))
(write-chunk [this data] (write-chunk [this data]
(if (iasync/open? channel) (if (iasync/open? channel)
(iasync/send! channel data) (iasync/send! channel data)
(throw bc-async-adapter/ConnectionClosedException))) (throw bc-async-adapter/ConnectionClosedException)))
(close [this] (close [this]
(iasync/close channel))) (iasync/close channel)))

View file

@ -1,45 +1,51 @@
(ns net.thegeez.browserchannel.jetty-async-adapter (ns net.thegeez.browserchannel.jetty-async-adapter
"BrowserChannel adapter for the Jetty webserver, with async HTTP." "BrowserChannel adapter for the Jetty webserver, with async HTTP."
(:import [org.eclipse.jetty.server.handler AbstractHandler] (:import
[org.eclipse.jetty.server Server Request Response] [org.eclipse.jetty.server.handler AbstractHandler]
[org.eclipse.jetty.server.nio SelectChannelConnector] [org.eclipse.jetty.server Server Request Response]
[org.eclipse.jetty.server.ssl SslSelectChannelConnector] [org.eclipse.jetty.server.nio SelectChannelConnector]
[org.eclipse.jetty.util.ssl SslContextFactory] [org.eclipse.jetty.server.ssl SslSelectChannelConnector]
[org.eclipse.jetty.continuation Continuation ContinuationSupport ContinuationListener] [org.eclipse.jetty.util.ssl SslContextFactory]
[javax.servlet.http HttpServletRequest] [org.eclipse.jetty.continuation Continuation ContinuationSupport ContinuationListener]
[java.security KeyStore]) [javax.servlet.http HttpServletRequest]
(:require [ring.util.servlet :as servlet] [java.security KeyStore])
[net.thegeez.browserchannel.async-adapter :as async-adapter])) (:require
[ring.util.servlet :as servlet]
[net.thegeez.browserchannel.async-adapter :as async-adapter]))
;; Based on ring-jetty-async-adapter by Mark McGranaghan ;; Based on ring-jetty-async-adapter by Mark McGranaghan
;; (https://github.com/mmcgrana/ring/tree/jetty-async) ;; (https://github.com/mmcgrana/ring/tree/jetty-async)
;; This has failed write support ;; This has failed write support
(deftype JettyAsyncResponse [^Continuation continuation] (deftype JettyAsyncResponse
[^Continuation continuation]
async-adapter/IAsyncAdapter async-adapter/IAsyncAdapter
(head [this status headers] (head [this status headers]
(doto (.getServletResponse continuation) (doto (.getServletResponse continuation)
(servlet/update-servlet-response {:status status, :headers (assoc headers "Transfer-Encoding" "chunked")}) (servlet/update-servlet-response {:status status :headers (assoc headers "Transfer-Encoding" "chunked")})
(.flushBuffer))) (.flushBuffer)))
(write-chunk [this data] (write-chunk [this data]
(doto (.getWriter (.getServletResponse continuation)) (doto (.getWriter (.getServletResponse continuation))
(.write ^String data) (.write ^String data)
(.flush)) (.flush))
(when (.checkError (.getWriter (.getServletResponse continuation))) (when (.checkError (.getWriter (.getServletResponse continuation)))
(throw async-adapter/ConnectionClosedException))) (throw async-adapter/ConnectionClosedException)))
(close [this] (close [this]
(doto (.getWriter (.getServletResponse continuation)) (doto (.getWriter (.getServletResponse continuation))
(.write "") (.write "")
(.flush)) (.flush))
(.complete continuation))) (.complete continuation)))
(defn- add-ssl-connector! (defn- add-ssl-connector!
"Add an SslSelectChannelConnector to a Jetty Server instance." "Add an SslSelectChannelConnector to a Jetty Server instance."
[^Server server options] [^Server server options]
(let [ssl-context-factory (SslContextFactory.)] (let [ssl-context-factory (SslContextFactory.)]
(doto ssl-context-factory (doto ssl-context-factory
(.setKeyStorePath (options :keystore)) (.setKeyStorePath (options :keystore))
(.setKeyStorePassword (options :key-password))) (.setKeyStorePassword (options :key-password)))
(when (options :truststore) (when (options :truststore)
(.setTrustStore ssl-context-factory ^KeyStore (options :truststore))) (.setTrustStore ssl-context-factory ^KeyStore (options :truststore)))
(when (options :trust-password) (when (options :trust-password)
@ -56,16 +62,16 @@
[handler options] [handler options]
(proxy [AbstractHandler] [] (proxy [AbstractHandler] []
(handle [target ^Request base-request ^HttpServletRequest request response] (handle [target ^Request base-request ^HttpServletRequest request response]
(let [request-map (servlet/build-request-map request) (let [request-map (servlet/build-request-map request)
response-map (handler request-map)] response-map (handler request-map)]
(condp = (:async response-map) (condp = (:async response-map)
nil nil
(do (do
(servlet/update-servlet-response response response-map) (servlet/update-servlet-response response response-map)
(.setHandled base-request true)) (.setHandled base-request true))
:http :http
(let [reactor (:reactor response-map) (let [reactor (:reactor response-map)
continuation ^Continuation (.startAsync request) ;; continuation lives until written to! continuation ^Continuation (.startAsync request) ;; continuation lives until written to!
emit (JettyAsyncResponse. continuation)] emit (JettyAsyncResponse. continuation)]
(.addContinuationListener continuation (.addContinuationListener continuation
(proxy [ContinuationListener] [] (proxy [ContinuationListener] []

View file

@ -1,9 +1,11 @@
(ns net.thegeez.browserchannel.server (ns net.thegeez.browserchannel.server
"BrowserChannel server implementation in Clojure." "BrowserChannel server implementation in Clojure."
(:require [ring.middleware.params :as params] (:import
[clojure.data.json :as json] [java.util.concurrent ScheduledExecutorService Executors TimeUnit Callable ScheduledFuture])
[net.thegeez.browserchannel.async-adapter :as async-adapter]) (:require
(:import [java.util.concurrent ScheduledExecutorService Executors TimeUnit Callable ScheduledFuture])) [ring.middleware.params :as params]
[clojure.data.json :as json]
[net.thegeez.browserchannel.async-adapter :as async-adapter]))
;; @todo: out of order acks and maps - AKH the maps at least is taken care of. ;; @todo: out of order acks and maps - AKH the maps at least is taken care of.
;; @todo use a more specific Exception for failing writes, which ;; @todo use a more specific Exception for failing writes, which
@ -42,28 +44,33 @@
;;;;; Utils ;;;;; Utils
;; to create session ids ;; to create session ids
(defn uuid [] (str (java.util.UUID/randomUUID))) (defn uuid
[]
(str (java.util.UUID/randomUUID)))
(def scheduler (Executors/newScheduledThreadPool 1)) (def scheduler (Executors/newScheduledThreadPool 1))
;; scheduling a task returns a ScheduledFuture, which can be stopped ;; scheduling a task returns a ScheduledFuture, which can be stopped
;; with (.cancel task false) false says not to interrupt running tasks ;; with (.cancel task false) false says not to interrupt running tasks
(defn schedule [^Callable f ^long secs] (defn schedule
[^Callable f ^long secs]
(.schedule ^ScheduledExecutorService scheduler f secs TimeUnit/SECONDS)) (.schedule ^ScheduledExecutorService scheduler f secs TimeUnit/SECONDS))
;; json responses are sent as "size-of-response\njson-response" ;; json responses are sent as "size-of-response\njson-response"
(defn size-json-str [^String json] (defn size-json-str
[^String json]
(let [size (alength (.getBytes json "UTF-8"))] (let [size (alength (.getBytes json "UTF-8"))]
(str size "\n" json))) (str size "\n" json)))
;; make sure the root URI for channels starts with a / for route matching ;; make sure the root URI for channels starts with a / for route matching
(defn standard-base [s] (defn standard-base
[s]
(let [wofirst (if (= \/ (first s)) (let [wofirst (if (= \/ (first s))
(apply str (rest s)) (apply str (rest s))
s) s)
wolast (if (= \/ (last wofirst)) wolast (if (= \/ (last wofirst))
(apply str (butlast wofirst)) (apply str (butlast wofirst))
wofirst)] wofirst)]
(str "/" wolast))) (str "/" wolast)))
;; @todo to test file ;; @todo to test file
@ -71,7 +78,8 @@
(map standard-base ["foo" "/foo" "foo/" "/foo"]))) (map standard-base ["foo" "/foo" "foo/" "/foo"])))
;; type preserving drop upto for queueus ;; type preserving drop upto for queueus
(defn drop-queue [queue id] (defn drop-queue
[queue id]
(let [head (peek queue)] (let [head (peek queue)]
(if-not head (if-not head
queue queue
@ -82,15 +90,16 @@
;; Key value pairs do not always come ordered by request number. ;; Key value pairs do not always come ordered by request number.
;; E.g. {req0_key1 val01, req1_key1 val11, req0_key2 val02, req1_key2 val12} ;; E.g. {req0_key1 val01, req1_key1 val11, req0_key2 val02, req1_key2 val12}
(defn transform-url-data [data] (defn transform-url-data
[data]
(let [ofs (get data "ofs" "0") (let [ofs (get data "ofs" "0")
pieces (dissoc data "count" "ofs")] pieces (dissoc data "count" "ofs")]
{:ofs (Long/parseLong ofs) {:ofs (Long/parseLong ofs)
:maps (->> (for [[k v] pieces] :maps (->> (for [[k v] pieces]
(let [[_ n k] (re-find #"req(\d+)_(\w+)" k)] (let [[_ n k] (re-find #"req(\d+)_(\w+)" k)]
[(Long/parseLong n) {k v}])) [(Long/parseLong n) {k v}]))
(group-by first) ; {0 [[0 [k1 v2]] [0 [k2 v2]]],1 [[1 [k1 v1]] [1 [k2 v2]]]} (group-by first) ; {0 [[0 [k1 v2]] [0 [k2 v2]]],1 [[1 [k1 v1]] [1 [k2 v2]]]}
(sort-by first) ;; order by request number so that messages are recieved on server in same order (sort-by first) ;; order by request number so that messages are recieved on server in same order
(map #(into {} (map second (val %)))))})) (map #(into {} (map second (val %)))))}))
#_(assert (= {:ofs 0 :maps [{"x" "3" "y" "10"} {"abc" "def"}]} #_(assert (= {:ofs 0 :maps [{"x" "3" "y" "10"} {"abc" "def"}]}
@ -115,7 +124,8 @@
;; "req1_abc" "def"} ;; "req1_abc" "def"}
;; => ;; =>
;;{:ofs 0 :maps [{"x" "3" "y" "10"},{"abc": "def"}]} ;;{:ofs 0 :maps [{"x" "3" "y" "10"},{"abc": "def"}]}
(defn get-maps [req] (defn get-maps
[req]
(let [data (:form-params req)] (let [data (:form-params req)]
(when-not (zero? (count data)) (when-not (zero? (count data))
;; number of entries in form-params, ;; number of entries in form-params,
@ -125,9 +135,10 @@
(:maps (transform-url-data data))))) (:maps (transform-url-data data)))))
;; rather crude but straight from google ;; rather crude but straight from google
(defn error-response [status-code message] (defn error-response
[status-code message]
{:status status-code {:status status-code
:body (str "<html><body><h1>" message "</h1></body></html>")}) :body (str "<html><body><h1>" message "</h1></body></html>")})
(defn agent-error-handler-fn (defn agent-error-handler-fn
"Prints the error and tries to restart the agent." "Prints the error and tries to restart the agent."
@ -147,11 +158,13 @@
(set-error-mode! listeners-agent :continue) (set-error-mode! listeners-agent :continue)
(defn add-listener [session-id event-key f] (defn add-listener
[session-id event-key f]
(send-off listeners-agent (send-off listeners-agent
update-in [session-id event-key] #(conj (or % []) f))) update-in [session-id event-key] #(conj (or % []) f)))
(defn notify-listeners [session-id request event-key & data] (defn notify-listeners
[session-id request event-key & data]
(send-off listeners-agent (send-off listeners-agent
(fn [listeners] (fn [listeners]
(doseq [callback (get-in listeners [session-id event-key])] (doseq [callback (get-in listeners [session-id event-key])]
@ -169,49 +182,59 @@
(write-end [this])) (write-end [this]))
;; for writing on backchannel to non-IE clients ;; for writing on backchannel to non-IE clients
(deftype XHRWriter [;; respond calls functions on the continuation (deftype XHRWriter
respond [;; respond calls functions on the continuation
headers] respond
headers]
IResponseWrapper IResponseWrapper
(write-head [this] (write-head [this]
(async-adapter/head respond 200 headers)) (async-adapter/head respond 200 headers))
(write [this data] (write [this data]
(write-raw this (size-json-str data))) (write-raw this (size-json-str data)))
(write-raw [this data] (write-raw [this data]
(async-adapter/write-chunk respond data)) (async-adapter/write-chunk respond data))
(write-end [this] (write-end [this]
(async-adapter/close respond))) (async-adapter/close respond)))
;; for writing on backchannels to IE clients ;; for writing on backchannels to IE clients
(deftype IEWriter [;; respond calls functions on the continuation (deftype IEWriter
respond [;; respond calls functions on the continuation
headers respond
;; DOMAIN value from query string headers
domain ;; DOMAIN value from query string
;; first write requires padding, domain
;; padding-sent is a flag for the first time ;; first write requires padding,
^{:volatile-mutable true} write-padding-sent ;; padding-sent is a flag for the first time
;; likewise for write raw, used during test phase ^{:volatile-mutable true} write-padding-sent
^{:volatile-mutable true} write-raw-padding-sent] ;; likewise for write raw, used during test phase
^{:volatile-mutable true} write-raw-padding-sent]
IResponseWrapper IResponseWrapper
(write-head [this] (write-head [this]
(async-adapter/head respond 200 (merge headers ie-headers)) (async-adapter/head respond 200 (merge headers ie-headers))
(async-adapter/write-chunk respond "<html><body>\n") (async-adapter/write-chunk respond "<html><body>\n")
(when (seq domain) (when (seq domain)
(async-adapter/write-chunk respond (str "<script>try{document.domain=\"" (pr-str (json/write-str domain)) "\";}catch(e){}</script>\n")))) (async-adapter/write-chunk respond (str "<script>try{document.domain=\"" (pr-str (json/write-str domain)) "\";}catch(e){}</script>\n"))))
(write [this data] (write [this data]
(async-adapter/write-chunk respond (str "<script>try {parent.m(" (pr-str data) ")} catch(e) {}</script>\n")) (async-adapter/write-chunk respond (str "<script>try {parent.m(" (pr-str data) ")} catch(e) {}</script>\n"))
(when-not write-padding-sent (when-not write-padding-sent
(async-adapter/write-chunk respond ie-stream-padding) (async-adapter/write-chunk respond ie-stream-padding)
(set! write-padding-sent true))) (set! write-padding-sent true)))
(write-raw [this data] (write-raw [this data]
(async-adapter/write-chunk respond (str "<script>try {parent.m(" (pr-str data) ")} catch(e) {}</script>\n")) (async-adapter/write-chunk respond (str "<script>try {parent.m(" (pr-str data) ")} catch(e) {}</script>\n"))
(when-not write-raw-padding-sent (when-not write-raw-padding-sent
(async-adapter/write-chunk respond ie-stream-padding) (async-adapter/write-chunk respond ie-stream-padding)
(set! write-raw-padding-sent true))) (set! write-raw-padding-sent true)))
(write-end [this] (write-end [this]
(async-adapter/write-chunk respond "<script>try {parent.d(); }catch (e){}</script>\n") (async-adapter/write-chunk respond "<script>try {parent.d(); }catch (e){}</script>\n")
(async-adapter/close respond))) (async-adapter/close respond)))
;;ArrayBuffer ;;ArrayBuffer
;;buffer of [[id_lowest data] ... [id_highest data]] ;;buffer of [[id_lowest data] ... [id_highest data]]
@ -220,33 +243,33 @@
(acknowledge-id [this id]) (acknowledge-id [this id])
(to-flush [this]) (to-flush [this])
(last-acknowledged-id [this]) (last-acknowledged-id [this])
(outstanding-bytes [this]) (outstanding-bytes [this]))
)
(deftype ArrayBuffer [;; id of the last array that is conj'ed, can't (deftype ArrayBuffer
;; always be derived because flush buffer might [;; id of the last array that is conj'ed, can't
;; be empty ;; always be derived because flush buffer might
array-id ;; be empty
array-id
;; needed for session status ;; needed for session status
last-acknowledged-id last-acknowledged-id
;; array that have been flushed, but not yet
;; acknowledged, does not contain noop messages
to-acknowledge-arrays
;; arrays to be sent out, may contain arrays ;; array that have been flushed, but not yet
;; that were in to-acknowledge-arrays but queued ;; acknowledged, does not contain noop messages
;; again for resending to-acknowledge-arrays
to-flush-arrays
] ;; arrays to be sent out, may contain arrays
;; that were in to-acknowledge-arrays but queued
;; again for resending
to-flush-arrays]
IArrayBuffer IArrayBuffer
(queue [this string] (queue [this string]
(let [next-array-id (inc array-id)] (let [next-array-id (inc array-id)]
(ArrayBuffer. next-array-id (ArrayBuffer. next-array-id
last-acknowledged-id last-acknowledged-id
to-acknowledge-arrays to-acknowledge-arrays
(conj to-flush-arrays [next-array-id string])))) (conj to-flush-arrays [next-array-id string]))))
;; id may cause the following splits: ;; id may cause the following splits:
;; normal case: ;; normal case:
@ -258,28 +281,29 @@
;; everything before id can be discarded, everything after id ;; everything before id can be discarded, everything after id
;; becomes new flush-arrs and is resend ;; becomes new flush-arrs and is resend
(acknowledge-id [this id] (acknowledge-id [this id]
(ArrayBuffer. array-id (ArrayBuffer. array-id
id id
clojure.lang.PersistentQueue/EMPTY clojure.lang.PersistentQueue/EMPTY
(into (drop-queue to-acknowledge-arrays id) (into (drop-queue to-acknowledge-arrays id)
(drop-queue to-flush-arrays id)))) (drop-queue to-flush-arrays id))))
;; return [seq-to-flush-array next-array-buffer] or nil if ;; return [seq-to-flush-array next-array-buffer] or nil if
;; to-flush-arrays is empty ;; to-flush-arrays is empty
(to-flush [this] (to-flush [this]
(when-let [to-flush (seq to-flush-arrays)] (when-let [to-flush (seq to-flush-arrays)]
[to-flush (ArrayBuffer. array-id [to-flush (ArrayBuffer. array-id
last-acknowledged-id last-acknowledged-id
(into to-acknowledge-arrays (into to-acknowledge-arrays
(remove (fn [[id string]] (remove (fn [[id string]]
(= string noop-string)) (= string noop-string))
to-flush)) to-flush))
clojure.lang.PersistentQueue/EMPTY)])) clojure.lang.PersistentQueue/EMPTY)]))
(last-acknowledged-id [this] (last-acknowledged-id [this]
last-acknowledged-id) last-acknowledged-id)
;; the sum of all the data that is still to be send ;; the sum of all the data that is still to be send
(outstanding-bytes [this] (outstanding-bytes [this]
(reduce + 0 (map (comp count second) to-flush-arrays)))) (reduce + 0 (map (comp count second) to-flush-arrays))))
;; {sessionId -> (agent session)} ;; {sessionId -> (agent session)}
(def sessions (atom {})) (def sessions (atom {}))
@ -318,172 +342,186 @@
;; removes session for sessions ;; removes session for sessions
(close [this request message])) (close [this request message]))
(defrecord BackChannel [;; respond wraps the continuation, which is (defrecord BackChannel
;; the actual connection of the backward [;; respond wraps the continuation, which is
;; channel to the client ;; the actual connection of the backward
respond ;; channel to the client
;; when true use streaming respond
chunk ;; when true use streaming
;; this is used for diagnostic purposes by the client chunk
bytes-sent]) ;; this is used for diagnostic purposes by the client
bytes-sent])
(defn to-pair [p] (str "[" (first p) "," (second p) "]")) (defn to-pair [p] (str "[" (first p) "," (second p) "]"))
(defrecord Session [;; must be unique (defrecord Session
id [;; must be unique
id
;; {:address ;; {:address
;; :headers ;; :headers
;; :app-version ;; :app-version
;; :heartbeat-interval ;; :heartbeat-interval
;; :session-timeout-interval ;; :session-timeout-interval
;; :data-threshold ;; :data-threshold
;;} ;;}
details details
;; back-channel might be nil, as a session spans ;; back-channel might be nil, as a session spans
;; multiple connections ;; multiple connections
back-channel back-channel
;; ArrayBuffer ;; ArrayBuffer
array-buffer array-buffer
;; ScheduleTask or nil ;; ScheduleTask or nil
heartbeat-timeout heartbeat-timeout
;; ScheduleTask or nil ;; ScheduleTask or nil
;; when the backchannel is closed from this ;; when the backchannel is closed from this
;; session, the session is only removes when this ;; session, the session is only removes when this
;; timer expires during this time the client can ;; timer expires during this time the client can
;; reconnect to its session ;; reconnect to its session
session-timeout] session-timeout]
ISession ISession
(clear-back-channel [this] (clear-back-channel [this]
(try (try
(when back-channel (when back-channel
(write-end (:respond back-channel))) (write-end (:respond back-channel)))
(catch Exception e (catch Exception e
nil ;; close back channel regardless nil ;; close back channel regardless
)) ))
(-> this (-> this
clear-heartbeat clear-heartbeat
(assoc :back-channel nil) (assoc :back-channel nil)
refresh-session-timeout)) refresh-session-timeout))
(set-back-channel [this respond req] (set-back-channel [this respond req]
(let [bc (BackChannel. respond (let [bc (BackChannel. respond
;; can we stream responses ;; can we stream responses
;; back? ;; back?
;; CI is determined client ;; CI is determined client
;; side with /test ;; side with /test
(= (get-in req [:query-params "CI"]) "0") (= (get-in req [:query-params "CI"]) "0")
;; no bytes sent yet ;; no bytes sent yet
0)] 0)]
(-> this (-> this
clear-back-channel clear-back-channel
;; clear-back-channel sets the session-timeout ;; clear-back-channel sets the session-timeout
;; here we know the session is alive and ;; here we know the session is alive and
;; well due to this new backchannel ;; well due to this new backchannel
clear-session-timeout clear-session-timeout
(assoc :back-channel bc) (assoc :back-channel bc)
refresh-heartbeat refresh-heartbeat
;; try to send any data that was buffered ;; try to send any data that was buffered
;; while there was no backchannel ;; while there was no backchannel
flush-buffer))) flush-buffer)))
(clear-heartbeat [this] (clear-heartbeat [this]
(when heartbeat-timeout (when heartbeat-timeout
(.cancel ^ScheduledFuture heartbeat-timeout (.cancel ^ScheduledFuture heartbeat-timeout
false ;; do not interrupt running tasks false ;; do not interrupt running tasks
)) ))
(assoc this :heartbeat-timeout nil)) (assoc this :heartbeat-timeout nil))
(refresh-heartbeat [this] (refresh-heartbeat [this]
(-> this (-> this
clear-heartbeat clear-heartbeat
(assoc :heartbeat-timeout (assoc :heartbeat-timeout
;; *agent* not bound when executed later ;; *agent* not bound when executed later
;; through schedule, therefor passed explicitly ;; through schedule, therefor passed explicitly
(let [session-agent *agent*] (let [session-agent *agent*]
(schedule (fn [] (schedule
(send-off session-agent #(-> % (fn []
(queue-string noop-string) (send-off session-agent #(-> %
flush-buffer))) (queue-string noop-string)
(:heartbeat-interval details)))))) flush-buffer)))
(:heartbeat-interval details))))))
(clear-session-timeout [this] (clear-session-timeout [this]
(when session-timeout (when session-timeout
(.cancel ^ScheduledFuture session-timeout (.cancel ^ScheduledFuture session-timeout
false ;; do not interrupt running tasks false ;; do not interrupt running tasks
)) ))
(assoc this :session-timeout nil)) (assoc this :session-timeout nil))
(refresh-session-timeout [this] (refresh-session-timeout [this]
(-> this (-> this
clear-session-timeout clear-session-timeout
(assoc :session-timeout (assoc :session-timeout
(let [session-agent *agent*] (let [session-agent *agent*]
(schedule (fn [] (schedule
(send-off session-agent close nil "Timed out")) (fn []
(:session-timeout-interval details)))))) (send-off session-agent close nil "Timed out"))
(:session-timeout-interval details))))))
(queue-string [this string] (queue-string [this string]
(update-in this [:array-buffer] queue string)) (update-in this [:array-buffer] queue string))
(acknowledge-arrays [this array-id] (acknowledge-arrays [this array-id]
(let [array-id (Long/parseLong array-id)] (let [array-id (Long/parseLong array-id)]
(update-in this [:array-buffer] acknowledge-id array-id))) (update-in this [:array-buffer] acknowledge-id array-id)))
;; tries to do the actual writing to the client ;; tries to do the actual writing to the client
;; @todo the composition is a bit awkward in this method due to the ;; @todo the composition is a bit awkward in this method due to the
;; try catch and if mix ;; try catch and if mix
(flush-buffer [this] (flush-buffer [this]
(if-not back-channel (if-not back-channel
this ;; nothing to do when there's no connection this ;; nothing to do when there's no connection
;; only flush unacknowledged arrays ;; only flush unacknowledged arrays
(if-let [[to-flush next-array-buffer] (to-flush array-buffer)] (if-let [[to-flush next-array-buffer] (to-flush array-buffer)]
(try (try
;; buffer contains [[1 json-str] ...] can't use ;; buffer contains [[1 json-str] ...] can't use
;; json-str which will double escape the json ;; json-str which will double escape the json
(doseq [p to-flush #_(next to-flush)] (doseq [p to-flush #_(next to-flush)]
(write (:respond back-channel) (str "[" (to-pair p) "]"))) (write (:respond back-channel) (str "[" (to-pair p) "]")))
;; size is an approximation
(let [this (let [size (reduce + 0 (map count (map second to-flush)))]
(-> this
(assoc :array-buffer next-array-buffer)
(update-in [:back-channel :bytes-sent] + size)))
;; clear-back-channel closes the back
;; channel when the channel does not
;; support streaming or when a large
;; amount of data has been sent
this (if (or (not (get-in this [:back-channel :chunk]))
(< (:data-threshold details) (get-in this [:back-channel :bytes-sent])))
(clear-back-channel this)
this)]
;; this sending of data keeps the connection alive
;; make a new heartbeat
(refresh-heartbeat this))
(catch Exception e
;; when write failed
;; non delivered arrays are still in buffer
(clear-back-channel this)
))
this ;; do nothing if buffer is empty
)))
;; size is an approximation
(let [this (let [size (reduce + 0 (map count (map second to-flush)))]
(-> this
(assoc :array-buffer next-array-buffer)
(update-in [:back-channel :bytes-sent] + size)))
;; clear-back-channel closes the back
;; channel when the channel does not
;; support streaming or when a large
;; amount of data has been sent
this (if (or (not (get-in this [:back-channel :chunk]))
(< (:data-threshold details) (get-in this [:back-channel :bytes-sent])))
(clear-back-channel this)
this)]
;; this sending of data keeps the connection alive
;; make a new heartbeat
(refresh-heartbeat this))
(catch Exception e
;; when write failed
;; non delivered arrays are still in buffer
(clear-back-channel this)
))
this ;; do nothing if buffer is empty
)))
;; closes the session and removes it from sessions ;; closes the session and removes it from sessions
(close [this request message] (close [this request message]
(-> this
(-> this clear-back-channel
clear-back-channel clear-session-timeout
clear-session-timeout ;; the heartbeat timeout is cancelled by clear-back-channel
;; the heartbeat timeout is cancelled by clear-back-channel )
) (swap! sessions dissoc id)
(swap! sessions dissoc id) (notify-listeners id request :close message)
(notify-listeners id request :close message) nil ;; the agent will no longer wrap a session
nil ;; the agent will no longer wrap a session ))
))
;; creates a session agent wrapping session data and ;; creates a session agent wrapping session data and
;; adds the session to sessions ;; adds the session to sessions
(defn create-session-agent [req options] (defn create-session-agent
(let [{initial-rid "RID" ;; identifier for forward channel [req options]
app-version "CVER" ;; client can specify a custom app-version (let [{initial-rid "RID" ;; identifier for forward channel
app-version "CVER" ;; client can specify a custom app-version
old-session-id "OSID" old-session-id "OSID"
old-array-id "OAID"} (:query-params req)] old-array-id "OAID"} (:query-params req)]
;; when a client specifies and old session id then that old one ;; when a client specifies and old session id then that old one
;; needs to be removed ;; needs to be removed
(when-let [old-session-agent (@sessions old-session-id)] (when-let [old-session-agent (@sessions old-session-id)]
@ -491,31 +529,31 @@
(acknowledge-arrays % old-array-id) (acknowledge-arrays % old-array-id)
%) %)
(close req "Reconnected")))) (close req "Reconnected"))))
(let [id (uuid) (let [id (uuid)
details {:address (:remote-addr req) details {:address (:remote-addr req)
:headers (:headers req) :headers (:headers req)
:app-version app-version :app-version app-version
:heartbeat-interval (:keep-alive-interval options) :heartbeat-interval (:keep-alive-interval options)
:session-timeout-interval (:session-timeout-interval options) :session-timeout-interval (:session-timeout-interval options)
:data-threshold (:data-threshold options)} :data-threshold (:data-threshold options)}
session (-> (Session. id session (-> (Session. id
details details
nil ;; backchannel nil ;; backchannel
(ArrayBuffer. (ArrayBuffer.
0 ;; array-id, 0 is never used by the 0 ;; array-id, 0 is never used by the
;; array-buffer, it is used by the ;; array-buffer, it is used by the
;; first message with the session id ;; first message with the session id
0 ;; last-acknowledged-id 0 ;; last-acknowledged-id
;; to-acknowledge-arrays ;; to-acknowledge-arrays
clojure.lang.PersistentQueue/EMPTY clojure.lang.PersistentQueue/EMPTY
;; to-flush-arrays ;; to-flush-arrays
clojure.lang.PersistentQueue/EMPTY) clojure.lang.PersistentQueue/EMPTY)
nil ;; heartbeat-timeout nil ;; heartbeat-timeout
nil ;; session-timeout nil ;; session-timeout
) )
;; this first session-timeout is for the case ;; this first session-timeout is for the case
;; when the client never connects with a backchannel ;; when the client never connects with a backchannel
refresh-session-timeout) refresh-session-timeout)
session-agent (agent session)] session-agent (agent session)]
(set-error-handler! session-agent (agent-error-handler-fn (str "session-" (:id session)))) (set-error-handler! session-agent (agent-error-handler-fn (str "session-" (:id session))))
(set-error-mode! session-agent :continue) (set-error-mode! session-agent :continue)
@ -526,21 +564,24 @@
(if on-open (on-open id req))) (if on-open (on-open id req)))
session-agent))) session-agent)))
(defn session-status [session] (defn session-status
[session]
(let [has-back-channel (if (:back-channel session) 1 0) (let [has-back-channel (if (:back-channel session) 1 0)
array-buffer (:array-buffer session)] array-buffer (:array-buffer session)]
[has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)])) [has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)]))
;; convience function to send data to a session ;; convience function to send data to a session
;; the data will be queued until there is a backchannel to send it ;; the data will be queued until there is a backchannel to send it
;; over ;; over
(defn send-string [session-id string] (defn send-string
[session-id string]
(when-let [session-agent (get @sessions session-id)] (when-let [session-agent (get @sessions session-id)]
(send-off session-agent #(-> % (send-off session-agent #(-> %
(queue-string string) (queue-string string)
flush-buffer)))) flush-buffer))))
(defn send-map [session-id map] (defn send-map
[session-id map]
(send-string session-id (json/write-str map))) (send-string session-id (json/write-str map)))
(defn send-map-to-all (defn send-map-to-all
@ -559,28 +600,30 @@
;; wrap the respond function from :reactor with the proper ;; wrap the respond function from :reactor with the proper
;; responsewrapper for either IE or other clients ;; responsewrapper for either IE or other clients
(defn wrap-continuation-writers [handler options] (defn wrap-continuation-writers
[handler options]
(fn [req] (fn [req]
(let [res (handler req)] (let [res (handler req)]
(if (:async res) (if (:async res)
(let [reactor (:reactor res) (let [reactor (:reactor res)
type (get-in req [:query-params "TYPE"])] type (get-in req [:query-params "TYPE"])]
(assoc res :reactor (assoc res :reactor
(fn [respond] (fn [respond]
(reactor (let [headers (assoc (:headers options) (reactor (let [headers (assoc (:headers options)
"Transfer-Encoding" "chunked")] "Transfer-Encoding" "chunked")]
(if (= type "html") (if (= type "html")
(let [domain (get-in req [:query-params "DOMAIN"])] (let [domain (get-in req [:query-params "DOMAIN"])]
;; last two false are the padding ;; last two false are the padding
;; sent flags ;; sent flags
(IEWriter. respond headers domain false false)) (IEWriter. respond headers domain false false))
(XHRWriter. respond headers))))))) (XHRWriter. respond headers)))))))
res ;; do not touch responses without :async res ;; do not touch responses without :async
)))) ))))
;; test channel is used to determine which host to connect to ;; test channel is used to determine which host to connect to
;; and if the connection can support streaming ;; and if the connection can support streaming
(defn handle-test-channel [req options] (defn handle-test-channel
[req options]
(if-not (= "8" (get-in req [:query-params "VER"])) (if-not (= "8" (get-in req [:query-params "VER"]))
(error-response 400 "Version 8 required") (error-response 400 "Version 8 required")
;; phase 1 ;; phase 1
@ -590,27 +633,27 @@
(if (= (get-in req [:query-params "MODE"]) "init") (if (= (get-in req [:query-params "MODE"]) "init")
(let [host-prefix (when-let [prefixes (seq (:host-prefixes options))] (let [host-prefix (when-let [prefixes (seq (:host-prefixes options))]
(rand-nth prefixes))] (rand-nth prefixes))]
{:status 200 {:status 200
:headers (assoc (:headers options) "X-Accept" "application/json; application/x-www-form-urlencoded") :headers (assoc (:headers options) "X-Accept" "application/json; application/x-www-form-urlencoded")
:body (json/write-str [host-prefix,nil])}) :body (json/write-str [host-prefix, nil])})
;; else phase 2 for get /test ;; else phase 2 for get /test
;; client checks if connection is buffered ;; client checks if connection is buffered
;; send 11111, wait 2 seconds, send 2 ;; send 11111, wait 2 seconds, send 2
;; if client gets two chunks, then there is no buffering ;; if client gets two chunks, then there is no buffering
;; proxy in the way ;; proxy in the way
{:async :http {:async :http
:reactor :reactor (fn [respond]
(fn [respond] (write-head respond)
(write-head respond) (write-raw respond "11111")
(write-raw respond "11111") (schedule #(do (write-raw respond "2")
(schedule #(do (write-raw respond "2") (write-end respond))
(write-end respond)) 2))})))
2))})))
;; POST req client -> server is a forward channel ;; POST req client -> server is a forward channel
;; session might be nil, when this is the first POST by client ;; session might be nil, when this is the first POST by client
(defn handle-forward-channel [req session-agent options] (defn handle-forward-channel
[req session-agent options]
(let [[session-agent is-new-session] (if session-agent (let [[session-agent is-new-session] (if session-agent
[session-agent false] [session-agent false]
[(create-session-agent req options) true]) [(create-session-agent req options) true])
@ -622,14 +665,13 @@
;; response is first array sent for this session: ;; response is first array sent for this session:
;; [[0,["c", session-id, host-prefix, version (always 8)]]] ;; [[0,["c", session-id, host-prefix, version (always 8)]]]
;; send as json for XHR and IE ;; send as json for XHR and IE
(let [session @session-agent (let [session @session-agent
session-id (:id session) session-id (:id session)
;; @todo extract the used host-prefix from the request if any ;; @todo extract the used host-prefix from the request if any
host-prefix nil] host-prefix nil]
{:status 200 {:status 200
:headers (assoc (:headers options) "Content-Type" "application/javascript") :headers (assoc (:headers options) "Content-Type" "application/javascript")
:body :body (size-json-str (json/write-str [[0, ["c", session-id, host-prefix, 8]]]))})
(size-json-str (json/write-str [[0,["c", session-id, host-prefix, 8]]]))})
;; For existing sessions: ;; For existing sessions:
;; Forward sent data by client to listeners ;; Forward sent data by client to listeners
;; reply with ;; reply with
@ -640,35 +682,36 @@
(doseq [map maps] (doseq [map maps]
(notify-listeners (:id @session-agent) req :map map)) (notify-listeners (:id @session-agent) req :map map))
(let [status (session-status @session-agent)] (let [status (session-status @session-agent)]
{:status 200 {:status 200
:headers (:headers options) :headers (:headers options)
:body (size-json-str (json/write-str status))}))))) :body (size-json-str (json/write-str status))})))))
;; GET req server->client is a backwardchannel opened by client ;; GET req server->client is a backwardchannel opened by client
(defn handle-backward-channel [req session-agent options] (defn handle-backward-channel
[req session-agent options]
(let [type (get-in req [:query-params "TYPE"])] (let [type (get-in req [:query-params "TYPE"])]
(cond (cond
(#{"xmlhttp" "html"} type) (#{"xmlhttp" "html"} type)
;; @todo check that query RID is "rpc" ;; @todo check that query RID is "rpc"
{:async :http {:async :http
:reactor :reactor (fn [respond]
(fn [respond] (write-head respond)
(write-head respond) (send-off session-agent set-back-channel respond req))}
(send-off session-agent set-back-channel respond req))} (= type "terminate")
(= type "terminate") ;; this is a request made in an img tag
;; this is a request made in an img tag (do ;;end session
(do ;;end session (when session-agent
(when session-agent (send-off session-agent close req "Client disconnected"))
(send-off session-agent close req "Client disconnected")) {:status 200
{:status 200 :headers (:headers options)
:headers (:headers options) :body ""}
:body ""} ))))
))))
;; get to /<base>/bind is client->server msg ;; get to /<base>/bind is client->server msg
;; post to /<base>/bind is initiate server->client channel ;; post to /<base>/bind is initiate server->client channel
(defn handle-bind-channel [req options] (defn handle-bind-channel
(let [SID (get-in req [:query-params "SID"]) [req options]
(let [SID (get-in req [:query-params "SID"])
;; session-agent might be nil, then it will be created by ;; session-agent might be nil, then it will be created by
;; handle-forward-channel ;; handle-forward-channel
session-agent (@sessions SID)] session-agent (@sessions SID)]
@ -684,24 +727,28 @@
(when-let [AID (get-in req [:query-params "AID"])] (when-let [AID (get-in req [:query-params "AID"])]
(send-off session-agent acknowledge-arrays AID))) (send-off session-agent acknowledge-arrays AID)))
(condp = (:request-method req) (condp = (:request-method req)
:post (handle-forward-channel req session-agent options) :post (handle-forward-channel req session-agent options)
:get (handle-backward-channel req session-agent options)))))) :get (handle-backward-channel req session-agent options))))))
;; see default-options for describtion of options ;; see default-options for describtion of options
(defn wrap-browserchannel [handler & [options]] (defn wrap-browserchannel
[handler & [options]]
(let [options (merge default-options options) (let [options (merge default-options options)
base (str (:base options))] base (str (:base options))]
(-> (fn [req] (-> (fn [req]
(let [uri ^String (:uri req) (let [uri ^String (:uri req)
method (:request-method req)] method (:request-method req)]
(cond (cond
(and (.startsWith uri (str base "/test")) (and (.startsWith uri (str base "/test"))
(= method :get)) (= method :get))
(handle-test-channel req options) (handle-test-channel req options)
(.startsWith uri (str base "/bind"))
(handle-bind-channel req options) (.startsWith uri (str base "/bind"))
:else (handler req)))) (handle-bind-channel req options)
:else
(handler req))))
(wrap-continuation-writers options) (wrap-continuation-writers options)
params/wrap-params params/wrap-params
))) )))