diff --git a/clj-browserchannel/src/net/thegeez/browserchannel/server.clj b/clj-browserchannel/src/net/thegeez/browserchannel/server.clj index fcf115c..9f1b57c 100644 --- a/clj-browserchannel/src/net/thegeez/browserchannel/server.clj +++ b/clj-browserchannel/src/net/thegeez/browserchannel/server.clj @@ -22,8 +22,7 @@ "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" "Pragma" "no-cache" "Expires" "Fri, 01 Jan 1990 00:00:00 GMT" - "X-Content-Type-Options" "nosniff" - } + "X-Content-Type-Options" "nosniff"} :base "/channel" ;; root for /test and /bind urls :keep-alive-interval 30 ;; seconds, keep less than session-time-out :session-timeout-interval 120 ;; seconds @@ -33,37 +32,42 @@ }) -(def noop-string "[\"noop\"]") +(def ^:private noop-string "[\"noop\"]") ;; almost all special cases are for making this work with IE -(def ie-headers +(def ^:private ie-headers {"Content-Type" "text/html"}) ;; appended to first write to ie to prevent whole page buffering -(def ie-stream-padding "7cca69475363026330a0d99468e88d23ce95e222591126443015f5f462d9a177186c8701fb45a6ffee0daf1a178fc0f58cd309308fba7e6f011ac38c9cdd4580760f1d4560a84d5ca0355ecbbed2ab715a3350fe0c479050640bd0e77acec90c58c4d3dd0f5cf8d4510e68c8b12e087bd88cad349aafd2ab16b07b0b1b8276091217a44a9fe92fedacffff48092ee693af\n") +(def ^:private ie-stream-padding "7cca69475363026330a0d99468e88d23ce95e222591126443015f5f462d9a177186c8701fb45a6ffee0daf1a178fc0f58cd309308fba7e6f011ac38c9cdd4580760f1d4560a84d5ca0355ecbbed2ab715a3350fe0c479050640bd0e77acec90c58c4d3dd0f5cf8d4510e68c8b12e087bd88cad349aafd2ab16b07b0b1b8276091217a44a9fe92fedacffff48092ee693af\n") + + + + + ;;;;; Utils ;; to create session ids -(defn uuid +(defn- uuid [] (str (java.util.UUID/randomUUID))) -(def scheduler (Executors/newScheduledThreadPool 1)) +(def ^:private scheduler (Executors/newScheduledThreadPool 1)) ;; scheduling a task returns a ScheduledFuture, which can be stopped ;; with (.cancel task false) false says not to interrupt running tasks -(defn schedule +(defn- schedule [^Callable f ^long secs] (.schedule ^ScheduledExecutorService scheduler f secs TimeUnit/SECONDS)) ;; json responses are sent as "size-of-response\njson-response" -(defn size-json-str +(defn- size-json-str [^String json] (let [size (alength (.getBytes json "UTF-8"))] (str size "\n" json))) ;; make sure the root URI for channels starts with a / for route matching -(defn standard-base +(defn- standard-base [s] (let [wofirst (if (= \/ (first s)) (apply str (rest s)) @@ -78,7 +82,7 @@ (map standard-base ["foo" "/foo" "foo/" "/foo"]))) ;; type preserving drop upto for queueus -(defn drop-queue +(defn- drop-queue [queue id] (let [head (peek queue)] (if-not head @@ -90,7 +94,7 @@ ;; Key value pairs do not always come ordered by request number. ;; E.g. {req0_key1 val01, req1_key1 val11, req0_key2 val02, req1_key2 val12} -(defn transform-url-data +(defn- transform-url-data [data] (let [ofs (get data "ofs" "0") pieces (dissoc data "count" "ofs")] @@ -124,7 +128,7 @@ ;; "req1_abc" "def"} ;; => ;;{:ofs 0 :maps [{"x" "3" "y" "10"},{"abc": "def"}]} -(defn get-maps +(defn- get-maps [req] (let [data (:form-params req)] (when-not (zero? (count data)) @@ -135,25 +139,34 @@ (:maps (transform-url-data data))))) ;; rather crude but straight from google -(defn error-response +(defn- error-response [status-code message] {:status status-code :body (str "

" message "

")}) -(defn agent-error-handler-fn +(defn- agent-error-handler-fn "Prints the error and tries to restart the agent." [id] (fn [ag ^Exception e] (println "ERROR:" id "agent threw" e (.getMessage e)))) +(defn- to-pair + [p] + (str "[" (first p) "," (second p) "]")) + ;;;;;; end of utils + + + + + ;;;; listeners ;; @todo clean this up, perhaps store listeners in the session? ;; @todo replace with lamina? ;; sessionId -> :event -> [call back] ;; event: :map | :close -(def listeners-agent (agent {})) +(def ^:private listeners-agent (agent {})) (set-error-handler! listeners-agent (agent-error-handler-fn "listener")) (set-error-mode! listeners-agent :continue) @@ -172,6 +185,10 @@ listeners))) ;; end of listeners + + + + ;; Wrapper around writers on continuations ;; the write methods raise an Exception with the wrapped response in closed ;; @todo use a more specific Exception @@ -305,8 +322,13 @@ (outstanding-bytes [this] (reduce + 0 (map (comp count second) to-flush-arrays)))) + + + + + ;; {sessionId -> (agent session)} -(def sessions (atom {})) +(def ^:private sessions (atom {})) ;; All methods meant to be fn send to an agent, therefor all need to return a Session (defprotocol ISession @@ -352,8 +374,6 @@ ;; this is used for diagnostic purposes by the client bytes-sent]) -(defn to-pair [p] (str "[" (first p) "," (second p) "]")) - (defrecord Session [;; must be unique id @@ -516,7 +536,7 @@ ;; creates a session agent wrapping session data and ;; adds the session to sessions -(defn create-session-agent +(defn- create-session-agent [req options] (let [{initial-rid "RID" ;; identifier for forward channel app-version "CVER" ;; client can specify a custom app-version @@ -564,16 +584,21 @@ (if on-open (on-open id req))) session-agent))) -(defn session-status +(defn- session-status [session] (let [has-back-channel (if (:back-channel session) 1 0) array-buffer (:array-buffer session)] [has-back-channel (last-acknowledged-id array-buffer) (outstanding-bytes array-buffer)])) + + + + + ;; convience function to send data to a session ;; the data will be queued until there is a backchannel to send it ;; over -(defn send-string +(defn- send-string [session-id string] (when-let [session-agent (get @sessions session-id)] (send-off session-agent #(-> % @@ -598,9 +623,14 @@ (if-let [session-agent (get @sessions session-id)] (send-off session-agent close nil (or reason "Client disconnected by server")))) + + + + + ;; wrap the respond function from :reactor with the proper ;; responsewrapper for either IE or other clients -(defn wrap-continuation-writers +(defn- wrap-continuation-writers [handler options] (fn [req] (let [res (handler req)] @@ -622,7 +652,7 @@ ;; test channel is used to determine which host to connect to ;; and if the connection can support streaming -(defn handle-test-channel +(defn- handle-test-channel [req options] (if-not (= "8" (get-in req [:query-params "VER"])) (error-response 400 "Version 8 required") @@ -652,7 +682,7 @@ ;; POST req client -> server is a forward channel ;; session might be nil, when this is the first POST by client -(defn handle-forward-channel +(defn- handle-forward-channel [req session-agent options] (let [[session-agent is-new-session] (if session-agent [session-agent false] @@ -687,7 +717,7 @@ :body (size-json-str (json/write-str status))}))))) ;; GET req server->client is a backwardchannel opened by client -(defn handle-backward-channel +(defn- handle-backward-channel [req session-agent options] (let [type (get-in req [:query-params "TYPE"])] (cond @@ -709,7 +739,7 @@ ;; get to //bind is client->server msg ;; post to //bind is initiate server->client channel -(defn handle-bind-channel +(defn- handle-bind-channel [req options] (let [SID (get-in req [:query-params "SID"]) ;; session-agent might be nil, then it will be created by @@ -750,5 +780,4 @@ :else (handler req)))) (wrap-continuation-writers options) - params/wrap-params - ))) + params/wrap-params)))