From 9d314c4d887f0b9b2b80467b4c10bc5014654847 Mon Sep 17 00:00:00 2001 From: gered Date: Sun, 8 May 2016 19:20:28 -0400 Subject: [PATCH] super nitpicky code formatting i apologize to anyone looking at this commit and wondering why i would do this, but all these little "off" formatting things have been bugging me all day while looking at this code. i simply had to do something. --- .../browserchannel/immutant_async_adapter.clj | 6 +- .../browserchannel/jetty_async_adapter.clj | 52 +- .../src/net/thegeez/browserchannel/server.clj | 677 ++++++++++-------- 3 files changed, 396 insertions(+), 339 deletions(-) diff --git a/clj-browserchannel-immutant-adapter/src/net/thegeez/browserchannel/immutant_async_adapter.clj b/clj-browserchannel-immutant-adapter/src/net/thegeez/browserchannel/immutant_async_adapter.clj index dfc5278..3c98ac1 100644 --- a/clj-browserchannel-immutant-adapter/src/net/thegeez/browserchannel/immutant_async_adapter.clj +++ b/clj-browserchannel-immutant-adapter/src/net/thegeez/browserchannel/immutant_async_adapter.clj @@ -4,15 +4,19 @@ [immutant.web.async :as iasync] [net.thegeez.browserchannel.async-adapter :as bc-async-adapter])) -(deftype ImmutantResponse [channel] +(deftype ImmutantResponse + [channel] bc-async-adapter/IAsyncAdapter + (head [this status headers] (let [headers (assoc headers "Transfer-Encoding" "chunked")] (iasync/send! channel {:status status :headers headers}))) + (write-chunk [this data] (if (iasync/open? channel) (iasync/send! channel data) (throw bc-async-adapter/ConnectionClosedException))) + (close [this] (iasync/close channel))) diff --git a/clj-browserchannel-jetty-adapter/src/net/thegeez/browserchannel/jetty_async_adapter.clj b/clj-browserchannel-jetty-adapter/src/net/thegeez/browserchannel/jetty_async_adapter.clj index 4e07c32..581ae8b 100644 --- a/clj-browserchannel-jetty-adapter/src/net/thegeez/browserchannel/jetty_async_adapter.clj +++ b/clj-browserchannel-jetty-adapter/src/net/thegeez/browserchannel/jetty_async_adapter.clj @@ -1,45 +1,51 @@ (ns net.thegeez.browserchannel.jetty-async-adapter "BrowserChannel adapter for the Jetty webserver, with async HTTP." - (:import [org.eclipse.jetty.server.handler AbstractHandler] - [org.eclipse.jetty.server Server Request Response] - [org.eclipse.jetty.server.nio SelectChannelConnector] - [org.eclipse.jetty.server.ssl SslSelectChannelConnector] - [org.eclipse.jetty.util.ssl SslContextFactory] - [org.eclipse.jetty.continuation Continuation ContinuationSupport ContinuationListener] - [javax.servlet.http HttpServletRequest] - [java.security KeyStore]) - (:require [ring.util.servlet :as servlet] - [net.thegeez.browserchannel.async-adapter :as async-adapter])) + (:import + [org.eclipse.jetty.server.handler AbstractHandler] + [org.eclipse.jetty.server Server Request Response] + [org.eclipse.jetty.server.nio SelectChannelConnector] + [org.eclipse.jetty.server.ssl SslSelectChannelConnector] + [org.eclipse.jetty.util.ssl SslContextFactory] + [org.eclipse.jetty.continuation Continuation ContinuationSupport ContinuationListener] + [javax.servlet.http HttpServletRequest] + [java.security KeyStore]) + (:require + [ring.util.servlet :as servlet] + [net.thegeez.browserchannel.async-adapter :as async-adapter])) ;; Based on ring-jetty-async-adapter by Mark McGranaghan ;; (https://github.com/mmcgrana/ring/tree/jetty-async) ;; This has failed write support -(deftype JettyAsyncResponse [^Continuation continuation] +(deftype JettyAsyncResponse + [^Continuation continuation] async-adapter/IAsyncAdapter + (head [this status headers] (doto (.getServletResponse continuation) - (servlet/update-servlet-response {:status status, :headers (assoc headers "Transfer-Encoding" "chunked")}) + (servlet/update-servlet-response {:status status :headers (assoc headers "Transfer-Encoding" "chunked")}) (.flushBuffer))) + (write-chunk [this data] - (doto (.getWriter (.getServletResponse continuation)) - (.write ^String data) - (.flush)) - (when (.checkError (.getWriter (.getServletResponse continuation))) - (throw async-adapter/ConnectionClosedException))) + (doto (.getWriter (.getServletResponse continuation)) + (.write ^String data) + (.flush)) + (when (.checkError (.getWriter (.getServletResponse continuation))) + (throw async-adapter/ConnectionClosedException))) + (close [this] (doto (.getWriter (.getServletResponse continuation)) (.write "") (.flush)) - (.complete continuation))) - + (.complete continuation))) + (defn- add-ssl-connector! "Add an SslSelectChannelConnector to a Jetty Server instance." [^Server server options] (let [ssl-context-factory (SslContextFactory.)] (doto ssl-context-factory (.setKeyStorePath (options :keystore)) - (.setKeyStorePassword (options :key-password))) + (.setKeyStorePassword (options :key-password))) (when (options :truststore) (.setTrustStore ssl-context-factory ^KeyStore (options :truststore))) (when (options :trust-password) @@ -56,16 +62,16 @@ [handler options] (proxy [AbstractHandler] [] (handle [target ^Request base-request ^HttpServletRequest request response] - (let [request-map (servlet/build-request-map request) + (let [request-map (servlet/build-request-map request) response-map (handler request-map)] (condp = (:async response-map) - nil + nil (do (servlet/update-servlet-response response response-map) (.setHandled base-request true)) :http (let [reactor (:reactor response-map) - continuation ^Continuation (.startAsync request) ;; continuation lives until written to! + continuation ^Continuation (.startAsync request) ;; continuation lives until written to! emit (JettyAsyncResponse. continuation)] (.addContinuationListener continuation (proxy [ContinuationListener] [] diff --git a/clj-browserchannel/src/net/thegeez/browserchannel/server.clj b/clj-browserchannel/src/net/thegeez/browserchannel/server.clj index 7617512..fcf115c 100644 --- a/clj-browserchannel/src/net/thegeez/browserchannel/server.clj +++ b/clj-browserchannel/src/net/thegeez/browserchannel/server.clj @@ -1,9 +1,11 @@ (ns net.thegeez.browserchannel.server "BrowserChannel server implementation in Clojure." - (:require [ring.middleware.params :as params] - [clojure.data.json :as json] - [net.thegeez.browserchannel.async-adapter :as async-adapter]) - (:import [java.util.concurrent ScheduledExecutorService Executors TimeUnit Callable ScheduledFuture])) + (:import + [java.util.concurrent ScheduledExecutorService Executors TimeUnit Callable ScheduledFuture]) + (:require + [ring.middleware.params :as params] + [clojure.data.json :as json] + [net.thegeez.browserchannel.async-adapter :as async-adapter])) ;; @todo: out of order acks and maps - AKH the maps at least is taken care of. ;; @todo use a more specific Exception for failing writes, which @@ -42,28 +44,33 @@ ;;;;; Utils ;; to create session ids -(defn uuid [] (str (java.util.UUID/randomUUID))) +(defn uuid + [] + (str (java.util.UUID/randomUUID))) (def scheduler (Executors/newScheduledThreadPool 1)) ;; scheduling a task returns a ScheduledFuture, which can be stopped ;; with (.cancel task false) false says not to interrupt running tasks -(defn schedule [^Callable f ^long secs] +(defn schedule + [^Callable f ^long secs] (.schedule ^ScheduledExecutorService scheduler f secs TimeUnit/SECONDS)) ;; json responses are sent as "size-of-response\njson-response" -(defn size-json-str [^String json] +(defn size-json-str + [^String json] (let [size (alength (.getBytes json "UTF-8"))] (str size "\n" json))) ;; make sure the root URI for channels starts with a / for route matching -(defn standard-base [s] +(defn standard-base + [s] (let [wofirst (if (= \/ (first s)) (apply str (rest s)) s) - wolast (if (= \/ (last wofirst)) - (apply str (butlast wofirst)) - wofirst)] + wolast (if (= \/ (last wofirst)) + (apply str (butlast wofirst)) + wofirst)] (str "/" wolast))) ;; @todo to test file @@ -71,7 +78,8 @@ (map standard-base ["foo" "/foo" "foo/" "/foo"]))) ;; type preserving drop upto for queueus -(defn drop-queue [queue id] +(defn drop-queue + [queue id] (let [head (peek queue)] (if-not head queue @@ -82,15 +90,16 @@ ;; Key value pairs do not always come ordered by request number. ;; E.g. {req0_key1 val01, req1_key1 val11, req0_key2 val02, req1_key2 val12} -(defn transform-url-data [data] +(defn transform-url-data + [data] (let [ofs (get data "ofs" "0") pieces (dissoc data "count" "ofs")] - {:ofs (Long/parseLong ofs) + {:ofs (Long/parseLong ofs) :maps (->> (for [[k v] pieces] (let [[_ n k] (re-find #"req(\d+)_(\w+)" k)] [(Long/parseLong n) {k v}])) - (group-by first) ; {0 [[0 [k1 v2]] [0 [k2 v2]]],1 [[1 [k1 v1]] [1 [k2 v2]]]} - (sort-by first) ;; order by request number so that messages are recieved on server in same order + (group-by first) ; {0 [[0 [k1 v2]] [0 [k2 v2]]],1 [[1 [k1 v1]] [1 [k2 v2]]]} + (sort-by first) ;; order by request number so that messages are recieved on server in same order (map #(into {} (map second (val %)))))})) #_(assert (= {:ofs 0 :maps [{"x" "3" "y" "10"} {"abc" "def"}]} @@ -115,7 +124,8 @@ ;; "req1_abc" "def"} ;; => ;;{:ofs 0 :maps [{"x" "3" "y" "10"},{"abc": "def"}]} -(defn get-maps [req] +(defn get-maps + [req] (let [data (:form-params req)] (when-not (zero? (count data)) ;; number of entries in form-params, @@ -125,9 +135,10 @@ (:maps (transform-url-data data))))) ;; rather crude but straight from google -(defn error-response [status-code message] +(defn error-response + [status-code message] {:status status-code - :body (str "

" message "

")}) + :body (str "

" message "

")}) (defn agent-error-handler-fn "Prints the error and tries to restart the agent." @@ -147,11 +158,13 @@ (set-error-mode! listeners-agent :continue) -(defn add-listener [session-id event-key f] +(defn add-listener + [session-id event-key f] (send-off listeners-agent update-in [session-id event-key] #(conj (or % []) f))) -(defn notify-listeners [session-id request event-key & data] +(defn notify-listeners + [session-id request event-key & data] (send-off listeners-agent (fn [listeners] (doseq [callback (get-in listeners [session-id event-key])] @@ -169,49 +182,59 @@ (write-end [this])) ;; for writing on backchannel to non-IE clients -(deftype XHRWriter [;; respond calls functions on the continuation - respond - headers] +(deftype XHRWriter + [;; respond calls functions on the continuation + respond + headers] IResponseWrapper + (write-head [this] - (async-adapter/head respond 200 headers)) + (async-adapter/head respond 200 headers)) + (write [this data] - (write-raw this (size-json-str data))) + (write-raw this (size-json-str data))) + (write-raw [this data] - (async-adapter/write-chunk respond data)) + (async-adapter/write-chunk respond data)) + (write-end [this] - (async-adapter/close respond))) + (async-adapter/close respond))) ;; for writing on backchannels to IE clients -(deftype IEWriter [;; respond calls functions on the continuation - respond - headers - ;; DOMAIN value from query string - domain - ;; first write requires padding, - ;; padding-sent is a flag for the first time - ^{:volatile-mutable true} write-padding-sent - ;; likewise for write raw, used during test phase - ^{:volatile-mutable true} write-raw-padding-sent] +(deftype IEWriter + [;; respond calls functions on the continuation + respond + headers + ;; DOMAIN value from query string + domain + ;; first write requires padding, + ;; padding-sent is a flag for the first time + ^{:volatile-mutable true} write-padding-sent + ;; likewise for write raw, used during test phase + ^{:volatile-mutable true} write-raw-padding-sent] IResponseWrapper + (write-head [this] - (async-adapter/head respond 200 (merge headers ie-headers)) - (async-adapter/write-chunk respond "\n") - (when (seq domain) - (async-adapter/write-chunk respond (str "\n")))) + (async-adapter/head respond 200 (merge headers ie-headers)) + (async-adapter/write-chunk respond "\n") + (when (seq domain) + (async-adapter/write-chunk respond (str "\n")))) + (write [this data] - (async-adapter/write-chunk respond (str "\n")) - (when-not write-padding-sent - (async-adapter/write-chunk respond ie-stream-padding) - (set! write-padding-sent true))) + (async-adapter/write-chunk respond (str "\n")) + (when-not write-padding-sent + (async-adapter/write-chunk respond ie-stream-padding) + (set! write-padding-sent true))) + (write-raw [this data] - (async-adapter/write-chunk respond (str "\n")) - (when-not write-raw-padding-sent - (async-adapter/write-chunk respond ie-stream-padding) - (set! write-raw-padding-sent true))) + (async-adapter/write-chunk respond (str "\n")) + (when-not write-raw-padding-sent + (async-adapter/write-chunk respond ie-stream-padding) + (set! write-raw-padding-sent true))) + (write-end [this] - (async-adapter/write-chunk respond "\n") - (async-adapter/close respond))) + (async-adapter/write-chunk respond "\n") + (async-adapter/close respond))) ;;ArrayBuffer ;;buffer of [[id_lowest data] ... [id_highest data]] @@ -220,33 +243,33 @@ (acknowledge-id [this id]) (to-flush [this]) (last-acknowledged-id [this]) - (outstanding-bytes [this]) - ) + (outstanding-bytes [this])) -(deftype ArrayBuffer [;; id of the last array that is conj'ed, can't - ;; always be derived because flush buffer might - ;; be empty - array-id +(deftype ArrayBuffer + [;; id of the last array that is conj'ed, can't + ;; always be derived because flush buffer might + ;; be empty + array-id - ;; needed for session status - last-acknowledged-id - - ;; array that have been flushed, but not yet - ;; acknowledged, does not contain noop messages - to-acknowledge-arrays + ;; needed for session status + last-acknowledged-id - ;; arrays to be sent out, may contain arrays - ;; that were in to-acknowledge-arrays but queued - ;; again for resending - to-flush-arrays - ] + ;; array that have been flushed, but not yet + ;; acknowledged, does not contain noop messages + to-acknowledge-arrays + + ;; arrays to be sent out, may contain arrays + ;; that were in to-acknowledge-arrays but queued + ;; again for resending + to-flush-arrays] IArrayBuffer + (queue [this string] - (let [next-array-id (inc array-id)] - (ArrayBuffer. next-array-id - last-acknowledged-id - to-acknowledge-arrays - (conj to-flush-arrays [next-array-id string])))) + (let [next-array-id (inc array-id)] + (ArrayBuffer. next-array-id + last-acknowledged-id + to-acknowledge-arrays + (conj to-flush-arrays [next-array-id string])))) ;; id may cause the following splits: ;; normal case: @@ -258,28 +281,29 @@ ;; everything before id can be discarded, everything after id ;; becomes new flush-arrs and is resend (acknowledge-id [this id] - (ArrayBuffer. array-id - id - clojure.lang.PersistentQueue/EMPTY - (into (drop-queue to-acknowledge-arrays id) - (drop-queue to-flush-arrays id)))) + (ArrayBuffer. array-id + id + clojure.lang.PersistentQueue/EMPTY + (into (drop-queue to-acknowledge-arrays id) + (drop-queue to-flush-arrays id)))) ;; return [seq-to-flush-array next-array-buffer] or nil if ;; to-flush-arrays is empty (to-flush [this] - (when-let [to-flush (seq to-flush-arrays)] - [to-flush (ArrayBuffer. array-id - last-acknowledged-id - (into to-acknowledge-arrays - (remove (fn [[id string]] - (= string noop-string)) - to-flush)) - clojure.lang.PersistentQueue/EMPTY)])) + (when-let [to-flush (seq to-flush-arrays)] + [to-flush (ArrayBuffer. array-id + last-acknowledged-id + (into to-acknowledge-arrays + (remove (fn [[id string]] + (= string noop-string)) + to-flush)) + clojure.lang.PersistentQueue/EMPTY)])) (last-acknowledged-id [this] - last-acknowledged-id) + last-acknowledged-id) + ;; the sum of all the data that is still to be send (outstanding-bytes [this] - (reduce + 0 (map (comp count second) to-flush-arrays)))) + (reduce + 0 (map (comp count second) to-flush-arrays)))) ;; {sessionId -> (agent session)} (def sessions (atom {})) @@ -318,172 +342,186 @@ ;; removes session for sessions (close [this request message])) -(defrecord BackChannel [;; respond wraps the continuation, which is - ;; the actual connection of the backward - ;; channel to the client - respond - ;; when true use streaming - chunk - ;; this is used for diagnostic purposes by the client - bytes-sent]) +(defrecord BackChannel + [;; respond wraps the continuation, which is + ;; the actual connection of the backward + ;; channel to the client + respond + ;; when true use streaming + chunk + ;; this is used for diagnostic purposes by the client + bytes-sent]) (defn to-pair [p] (str "[" (first p) "," (second p) "]")) -(defrecord Session [;; must be unique - id +(defrecord Session + [;; must be unique + id - ;; {:address - ;; :headers - ;; :app-version - ;; :heartbeat-interval - ;; :session-timeout-interval - ;; :data-threshold - ;;} - details + ;; {:address + ;; :headers + ;; :app-version + ;; :heartbeat-interval + ;; :session-timeout-interval + ;; :data-threshold + ;;} + details - ;; back-channel might be nil, as a session spans - ;; multiple connections - back-channel + ;; back-channel might be nil, as a session spans + ;; multiple connections + back-channel - ;; ArrayBuffer - array-buffer + ;; ArrayBuffer + array-buffer - ;; ScheduleTask or nil - heartbeat-timeout + ;; ScheduleTask or nil + heartbeat-timeout - ;; ScheduleTask or nil - ;; when the backchannel is closed from this - ;; session, the session is only removes when this - ;; timer expires during this time the client can - ;; reconnect to its session - session-timeout] + ;; ScheduleTask or nil + ;; when the backchannel is closed from this + ;; session, the session is only removes when this + ;; timer expires during this time the client can + ;; reconnect to its session + session-timeout] ISession + (clear-back-channel [this] - (try - (when back-channel - (write-end (:respond back-channel))) - (catch Exception e - nil ;; close back channel regardless - )) - (-> this - clear-heartbeat - (assoc :back-channel nil) - refresh-session-timeout)) + (try + (when back-channel + (write-end (:respond back-channel))) + (catch Exception e + nil ;; close back channel regardless + )) + (-> this + clear-heartbeat + (assoc :back-channel nil) + refresh-session-timeout)) + (set-back-channel [this respond req] - (let [bc (BackChannel. respond - ;; can we stream responses - ;; back? - ;; CI is determined client - ;; side with /test - (= (get-in req [:query-params "CI"]) "0") - ;; no bytes sent yet - 0)] - (-> this - clear-back-channel - ;; clear-back-channel sets the session-timeout - ;; here we know the session is alive and - ;; well due to this new backchannel - clear-session-timeout - (assoc :back-channel bc) - refresh-heartbeat - ;; try to send any data that was buffered - ;; while there was no backchannel - flush-buffer))) + (let [bc (BackChannel. respond + ;; can we stream responses + ;; back? + ;; CI is determined client + ;; side with /test + (= (get-in req [:query-params "CI"]) "0") + ;; no bytes sent yet + 0)] + (-> this + clear-back-channel + ;; clear-back-channel sets the session-timeout + ;; here we know the session is alive and + ;; well due to this new backchannel + clear-session-timeout + (assoc :back-channel bc) + refresh-heartbeat + ;; try to send any data that was buffered + ;; while there was no backchannel + flush-buffer))) + (clear-heartbeat [this] - (when heartbeat-timeout - (.cancel ^ScheduledFuture heartbeat-timeout - false ;; do not interrupt running tasks - )) - (assoc this :heartbeat-timeout nil)) + (when heartbeat-timeout + (.cancel ^ScheduledFuture heartbeat-timeout + false ;; do not interrupt running tasks + )) + (assoc this :heartbeat-timeout nil)) + (refresh-heartbeat [this] - (-> this - clear-heartbeat - (assoc :heartbeat-timeout - ;; *agent* not bound when executed later - ;; through schedule, therefor passed explicitly - (let [session-agent *agent*] - (schedule (fn [] - (send-off session-agent #(-> % - (queue-string noop-string) - flush-buffer))) - (:heartbeat-interval details)))))) + (-> this + clear-heartbeat + (assoc :heartbeat-timeout + ;; *agent* not bound when executed later + ;; through schedule, therefor passed explicitly + (let [session-agent *agent*] + (schedule + (fn [] + (send-off session-agent #(-> % + (queue-string noop-string) + flush-buffer))) + (:heartbeat-interval details)))))) + (clear-session-timeout [this] - (when session-timeout - (.cancel ^ScheduledFuture session-timeout - false ;; do not interrupt running tasks - )) - (assoc this :session-timeout nil)) + (when session-timeout + (.cancel ^ScheduledFuture session-timeout + false ;; do not interrupt running tasks + )) + (assoc this :session-timeout nil)) + (refresh-session-timeout [this] - (-> this - clear-session-timeout - (assoc :session-timeout - (let [session-agent *agent*] - (schedule (fn [] - (send-off session-agent close nil "Timed out")) - (:session-timeout-interval details)))))) + (-> this + clear-session-timeout + (assoc :session-timeout + (let [session-agent *agent*] + (schedule + (fn [] + (send-off session-agent close nil "Timed out")) + (:session-timeout-interval details)))))) + (queue-string [this string] - (update-in this [:array-buffer] queue string)) + (update-in this [:array-buffer] queue string)) + (acknowledge-arrays [this array-id] (let [array-id (Long/parseLong array-id)] (update-in this [:array-buffer] acknowledge-id array-id))) + ;; tries to do the actual writing to the client ;; @todo the composition is a bit awkward in this method due to the ;; try catch and if mix (flush-buffer [this] - (if-not back-channel - this ;; nothing to do when there's no connection - ;; only flush unacknowledged arrays - (if-let [[to-flush next-array-buffer] (to-flush array-buffer)] - (try - ;; buffer contains [[1 json-str] ...] can't use - ;; json-str which will double escape the json + (if-not back-channel + this ;; nothing to do when there's no connection + ;; only flush unacknowledged arrays + (if-let [[to-flush next-array-buffer] (to-flush array-buffer)] + (try + ;; buffer contains [[1 json-str] ...] can't use + ;; json-str which will double escape the json - (doseq [p to-flush #_(next to-flush)] - (write (:respond back-channel) (str "[" (to-pair p) "]"))) + (doseq [p to-flush #_(next to-flush)] + (write (:respond back-channel) (str "[" (to-pair p) "]"))) + + ;; size is an approximation + (let [this (let [size (reduce + 0 (map count (map second to-flush)))] + (-> this + (assoc :array-buffer next-array-buffer) + (update-in [:back-channel :bytes-sent] + size))) + ;; clear-back-channel closes the back + ;; channel when the channel does not + ;; support streaming or when a large + ;; amount of data has been sent + this (if (or (not (get-in this [:back-channel :chunk])) + (< (:data-threshold details) (get-in this [:back-channel :bytes-sent]))) + (clear-back-channel this) + this)] + ;; this sending of data keeps the connection alive + ;; make a new heartbeat + (refresh-heartbeat this)) + (catch Exception e + ;; when write failed + ;; non delivered arrays are still in buffer + (clear-back-channel this) + )) + this ;; do nothing if buffer is empty + ))) - ;; size is an approximation - (let [this (let [size (reduce + 0 (map count (map second to-flush)))] - (-> this - (assoc :array-buffer next-array-buffer) - (update-in [:back-channel :bytes-sent] + size))) - ;; clear-back-channel closes the back - ;; channel when the channel does not - ;; support streaming or when a large - ;; amount of data has been sent - this (if (or (not (get-in this [:back-channel :chunk])) - (< (:data-threshold details) (get-in this [:back-channel :bytes-sent]))) - (clear-back-channel this) - this)] - ;; this sending of data keeps the connection alive - ;; make a new heartbeat - (refresh-heartbeat this)) - (catch Exception e - ;; when write failed - ;; non delivered arrays are still in buffer - (clear-back-channel this) - )) - this ;; do nothing if buffer is empty - ))) ;; closes the session and removes it from sessions (close [this request message] - - (-> this - clear-back-channel - clear-session-timeout - ;; the heartbeat timeout is cancelled by clear-back-channel - ) - (swap! sessions dissoc id) - (notify-listeners id request :close message) - nil ;; the agent will no longer wrap a session - )) + (-> this + clear-back-channel + clear-session-timeout + ;; the heartbeat timeout is cancelled by clear-back-channel + ) + (swap! sessions dissoc id) + (notify-listeners id request :close message) + nil ;; the agent will no longer wrap a session + )) ;; creates a session agent wrapping session data and ;; adds the session to sessions -(defn create-session-agent [req options] - (let [{initial-rid "RID" ;; identifier for forward channel - app-version "CVER" ;; client can specify a custom app-version +(defn create-session-agent + [req options] + (let [{initial-rid "RID" ;; identifier for forward channel + app-version "CVER" ;; client can specify a custom app-version old-session-id "OSID" - old-array-id "OAID"} (:query-params req)] + old-array-id "OAID"} (:query-params req)] ;; when a client specifies and old session id then that old one ;; needs to be removed (when-let [old-session-agent (@sessions old-session-id)] @@ -491,31 +529,31 @@ (acknowledge-arrays % old-array-id) %) (close req "Reconnected")))) - (let [id (uuid) - details {:address (:remote-addr req) - :headers (:headers req) - :app-version app-version - :heartbeat-interval (:keep-alive-interval options) - :session-timeout-interval (:session-timeout-interval options) - :data-threshold (:data-threshold options)} - session (-> (Session. id - details - nil ;; backchannel - (ArrayBuffer. - 0 ;; array-id, 0 is never used by the - ;; array-buffer, it is used by the - ;; first message with the session id - 0 ;; last-acknowledged-id - ;; to-acknowledge-arrays - clojure.lang.PersistentQueue/EMPTY - ;; to-flush-arrays - clojure.lang.PersistentQueue/EMPTY) - nil ;; heartbeat-timeout - nil ;; session-timeout - ) - ;; this first session-timeout is for the case - ;; when the client never connects with a backchannel - refresh-session-timeout) + (let [id (uuid) + details {:address (:remote-addr req) + :headers (:headers req) + :app-version app-version + :heartbeat-interval (:keep-alive-interval options) + :session-timeout-interval (:session-timeout-interval options) + :data-threshold (:data-threshold options)} + session (-> (Session. id + details + nil ;; backchannel + (ArrayBuffer. + 0 ;; array-id, 0 is never used by the + ;; array-buffer, it is used by the + ;; first message with the session id + 0 ;; last-acknowledged-id + ;; to-acknowledge-arrays + clojure.lang.PersistentQueue/EMPTY + ;; to-flush-arrays + clojure.lang.PersistentQueue/EMPTY) + nil ;; heartbeat-timeout + nil ;; session-timeout + ) + ;; this first session-timeout is for the case + ;; when the client never connects with a backchannel + refresh-session-timeout) session-agent (agent session)] (set-error-handler! session-agent (agent-error-handler-fn (str "session-" (:id session)))) (set-error-mode! session-agent :continue) @@ -526,21 +564,24 @@ (if on-open (on-open id req))) session-agent))) -(defn session-status [session] +(defn session-status + [session] (let [has-back-channel (if (:back-channel session) 1 0) - array-buffer (:array-buffer session)] + array-buffer (:array-buffer session)] [has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)])) ;; convience function to send data to a session ;; the data will be queued until there is a backchannel to send it ;; over -(defn send-string [session-id string] +(defn send-string + [session-id string] (when-let [session-agent (get @sessions session-id)] (send-off session-agent #(-> % (queue-string string) flush-buffer)))) -(defn send-map [session-id map] +(defn send-map + [session-id map] (send-string session-id (json/write-str map))) (defn send-map-to-all @@ -559,28 +600,30 @@ ;; wrap the respond function from :reactor with the proper ;; responsewrapper for either IE or other clients -(defn wrap-continuation-writers [handler options] +(defn wrap-continuation-writers + [handler options] (fn [req] (let [res (handler req)] (if (:async res) (let [reactor (:reactor res) - type (get-in req [:query-params "TYPE"])] + type (get-in req [:query-params "TYPE"])] (assoc res :reactor - (fn [respond] - (reactor (let [headers (assoc (:headers options) - "Transfer-Encoding" "chunked")] - (if (= type "html") - (let [domain (get-in req [:query-params "DOMAIN"])] - ;; last two false are the padding - ;; sent flags - (IEWriter. respond headers domain false false)) - (XHRWriter. respond headers))))))) + (fn [respond] + (reactor (let [headers (assoc (:headers options) + "Transfer-Encoding" "chunked")] + (if (= type "html") + (let [domain (get-in req [:query-params "DOMAIN"])] + ;; last two false are the padding + ;; sent flags + (IEWriter. respond headers domain false false)) + (XHRWriter. respond headers))))))) res ;; do not touch responses without :async )))) ;; test channel is used to determine which host to connect to ;; and if the connection can support streaming -(defn handle-test-channel [req options] +(defn handle-test-channel + [req options] (if-not (= "8" (get-in req [:query-params "VER"])) (error-response 400 "Version 8 required") ;; phase 1 @@ -590,27 +633,27 @@ (if (= (get-in req [:query-params "MODE"]) "init") (let [host-prefix (when-let [prefixes (seq (:host-prefixes options))] (rand-nth prefixes))] - {:status 200 + {:status 200 :headers (assoc (:headers options) "X-Accept" "application/json; application/x-www-form-urlencoded") - :body (json/write-str [host-prefix,nil])}) - + :body (json/write-str [host-prefix, nil])}) + ;; else phase 2 for get /test ;; client checks if connection is buffered ;; send 11111, wait 2 seconds, send 2 ;; if client gets two chunks, then there is no buffering ;; proxy in the way - {:async :http - :reactor - (fn [respond] - (write-head respond) - (write-raw respond "11111") - (schedule #(do (write-raw respond "2") - (write-end respond)) - 2))}))) + {:async :http + :reactor (fn [respond] + (write-head respond) + (write-raw respond "11111") + (schedule #(do (write-raw respond "2") + (write-end respond)) + 2))}))) ;; POST req client -> server is a forward channel ;; session might be nil, when this is the first POST by client -(defn handle-forward-channel [req session-agent options] +(defn handle-forward-channel + [req session-agent options] (let [[session-agent is-new-session] (if session-agent [session-agent false] [(create-session-agent req options) true]) @@ -622,14 +665,13 @@ ;; response is first array sent for this session: ;; [[0,["c", session-id, host-prefix, version (always 8)]]] ;; send as json for XHR and IE - (let [session @session-agent - session-id (:id session) + (let [session @session-agent + session-id (:id session) ;; @todo extract the used host-prefix from the request if any host-prefix nil] - {:status 200 + {:status 200 :headers (assoc (:headers options) "Content-Type" "application/javascript") - :body - (size-json-str (json/write-str [[0,["c", session-id, host-prefix, 8]]]))}) + :body (size-json-str (json/write-str [[0, ["c", session-id, host-prefix, 8]]]))}) ;; For existing sessions: ;; Forward sent data by client to listeners ;; reply with @@ -640,35 +682,36 @@ (doseq [map maps] (notify-listeners (:id @session-agent) req :map map)) (let [status (session-status @session-agent)] - {:status 200 + {:status 200 :headers (:headers options) - :body (size-json-str (json/write-str status))}))))) + :body (size-json-str (json/write-str status))}))))) ;; GET req server->client is a backwardchannel opened by client -(defn handle-backward-channel [req session-agent options] +(defn handle-backward-channel + [req session-agent options] (let [type (get-in req [:query-params "TYPE"])] - (cond - (#{"xmlhttp" "html"} type) - ;; @todo check that query RID is "rpc" - {:async :http - :reactor - (fn [respond] - (write-head respond) - (send-off session-agent set-back-channel respond req))} - (= type "terminate") - ;; this is a request made in an img tag - (do ;;end session - (when session-agent - (send-off session-agent close req "Client disconnected")) - {:status 200 - :headers (:headers options) - :body ""} - )))) + (cond + (#{"xmlhttp" "html"} type) + ;; @todo check that query RID is "rpc" + {:async :http + :reactor (fn [respond] + (write-head respond) + (send-off session-agent set-back-channel respond req))} + (= type "terminate") + ;; this is a request made in an img tag + (do ;;end session + (when session-agent + (send-off session-agent close req "Client disconnected")) + {:status 200 + :headers (:headers options) + :body ""} + )))) ;; get to //bind is client->server msg ;; post to //bind is initiate server->client channel -(defn handle-bind-channel [req options] - (let [SID (get-in req [:query-params "SID"]) +(defn handle-bind-channel + [req options] + (let [SID (get-in req [:query-params "SID"]) ;; session-agent might be nil, then it will be created by ;; handle-forward-channel session-agent (@sessions SID)] @@ -684,24 +727,28 @@ (when-let [AID (get-in req [:query-params "AID"])] (send-off session-agent acknowledge-arrays AID))) (condp = (:request-method req) - :post (handle-forward-channel req session-agent options) - :get (handle-backward-channel req session-agent options)))))) + :post (handle-forward-channel req session-agent options) + :get (handle-backward-channel req session-agent options)))))) ;; see default-options for describtion of options -(defn wrap-browserchannel [handler & [options]] +(defn wrap-browserchannel + [handler & [options]] (let [options (merge default-options options) - base (str (:base options))] + base (str (:base options))] (-> (fn [req] - (let [uri ^String (:uri req) + (let [uri ^String (:uri req) method (:request-method req)] (cond - (and (.startsWith uri (str base "/test")) - (= method :get)) - (handle-test-channel req options) - (.startsWith uri (str base "/bind")) - (handle-bind-channel req options) - :else (handler req)))) + (and (.startsWith uri (str base "/test")) + (= method :get)) + (handle-test-channel req options) + + (.startsWith uri (str base "/bind")) + (handle-bind-channel req options) + + :else + (handler req)))) (wrap-continuation-writers options) params/wrap-params )))