Asynchronous communication streams between a Pedestal server and a ReFrame SPA
How to receive and process status update messages sent from a Pedestal back-end application to a ReFrame front-end application during long-running server processing
Introduction
In a ReFrame SPA, it’s a relatively easy task to asynchronously initiate a backend process using the :http-xhrio
effect handler available in the Reframe library. It’s also a rather simple matter of responding to the success and failure events (however they’re defined) produced by the back-end process when it finishes. However, what’s less obvious is how to capture any system processing messages issued by the backend process as it progresses, but before it sucessfully completes or fails.
Two mechanisms available are websockets or SSE. They will both suffice, but each has its own benefits and drawbacks.
A repo with a working version of this code is available on GitHub (tag v1.4).
Web Sockets
WebSockets are an interactive, bi-directional (full duplex), out-of-band communication protocol used for sending messages between web servers and clients. Although WebSocket connections use HTTP ports, WebSockets is not an HTTP protocol. It is a separate TCP protocol with its own semantics. WebSockets enable full-duplex communication between a web client and server, without any need for the polling that is demanded by the half-duplex nature of HTTP.
The channel is full-duplex and both endpoints (server and client) are immediately aware of any channel changes, such as a disconnection. A problem with this immediate feedback is that both client and server must respond appropriately. Was the channel closed, or are there network issues? Should the client attempt to reopen the channel? etc.
Most WebSocket implementations also impose a limit to how long they will maintain an open channel which hasn’t transmitted or received data recently. Often we would like to use WebSockets with long periods of time between bursts of activity. Therefore, it is the responsibility of the developer to periodically send ping messages along the channel in order to remain below the time-out interval and ensure the connection is maintained.
SSE (Server Sent Events)
Server Sent Event is a push technology which fully exists within the HTTP world (using content type text/event-stream
). SSE allows a server to send a stream of events to a web client after the client has established an initial connection. Once a connection has been established the client can use the JavaScript EventSource API to subscribe to the stream, and to receive updates as they are issued by the server. The stream will remain open until explicitly closed by either the server or the client.
SSE streams are uni-directional (simplex) from the server to the client and unlike WebSockets there is no facility available for the client to use the channel to transmit to the server. Helpfully, SSE provides for automatic client reconnections - convenient when network issues are encountered.
On the other hand, one of the challenges of working with SSE is that generally the server is not aware that a client has disconnected until it attempts to send a message. This can be a resource drain for servers with more than a handful of zombie clients. Fortunately, most implementations check the liveness of the connections by automatically issuing periodic heartbeat empty messages to validate the status of the channel.
Because both the reconnection and heartbeat features are often part of the base implementation of the SSE protocol, SSE is often simpler in use than WebSockets.
Implementations
Background
For the purpose of this exercise, let’s suppose we’ve created a Clojure function that performs a number of activities that take some non-trivial amount of time. We’ve developed its functionality in the REPL and we’re satisfied that it’s suitable to be made available at a particular Pedestal controlled URL accessible to our front-end application.
However, we would also like to be able to execute this function (via the URL) without stalling our front-end application and to be able to monitor its progress as it proceeds to completion. Ideally, we want to receive processing status update messages from the long-running Clojure function and present them to the user in the browser.
Of course, in order to avoid stalling the front-end interface we can initiate processing using the :http-xhrio
effect handler in ReFrame. However, this will send an AJAX request, executing the function in the background, and to some extent making it inaccessible from our application until it either suceeds or fails.
The :http-xhrio
effect handler can also take a :progress-handler
option, but this isn’t really suitable for our current requirements.
On the client, the approach we’ll take is to provide a pair of React component. One that will be used to receive incoming status messages sent by the server, and another to display those messages in the UI. We will implement these components using both WebSockets and SSE streams as the underlying transports.
Client-Side React Components
We will implement two React components to provide the message logging functionality: a log-holder-element-ui
component and a log-page-ui
component.
The client side implementation in the repo contains separate pairs of components for the two transport types web-socket and SSE. The former is in the web.logs
namespace, the latter in the web.sse-logs
namespace.
Apart from some obvious naming diferences in function names, they are more or less interchangable. For this post I will discuss the web-socket implementation. In another post I intend to outline the SSE implementation, and the manner in which it differs from its web socket sibling.
Component #1 - The log-page-ui
Component
The log-page-ui
component is the simpler of the two components. It’s primary responsibility is to read the vector of messages stored in ReFrame’s state db’s :log-messages
field and display it in tabular form.
Each message entry in the vector is a map with keys of :source
, :message
and message-type
. The display component subscribes to the :logs/log-messages
subscription, which retrieves the vector, and displays the results. We won’t take a detailed look at this component as it’s very simple, but you can review it in the accompanying code repo.
Component #2 - The log-holder-element-ui
Component
The log-holder-element-ui
component is more complex. It is responsible for populating the ReFrame state db’s :log-messages
vector. In order to do so, it opens a connection (web-socket or SSE) to the server, reads incoming messages, and transfers them to the state db.
The component is also responsible for managing the connection’s lifecycle, including any keep-alive requirements and recovery from network errors.
The component is a global singleton component and is used to manage and control the SPA’s entire logging functionality. It exists throughout the SPA’s lifetime, and independently of the user’s context in the application, or what action he or she is performing. This simplifies server routing of messages. The server can simply direct them at a client instance. The singleton component will then process the message into the SPA’s ReFrame state atom, and the contents can be displayed on demand by the log-page-ui
component.
The log-holder-element-ui
component is implemented, in reagent terms, as a Form-3 component. This is necessary because we require access to React’s lifecycle functions. Using these methods we can ensure that our component is a singleton, and that only one instance exists within the application even if the logged-in user changes. The lifecycle methods used (adopting reagent’s naming convention) are :component-did-mount
, :component-did-update
, :component-will-unmount
and :reagent-render
.
Internal State
The component maintains two internal state variables log-atom
and timer-atom
.
The log-atom
contains a map with keys :user-id
, :session-id
, :connection-uuid
and :web-socket
, and records an association between an application session identifiers and the web socket servicing that session.
The SPA as written uses session- and jwt-based authentication. Because a session is shared between browser tabs, the :connection-uuid
value is used to differentiate between tabs within the same session. The :connection-uuid
is determined randomly by the client when it first loads.
There is also an :ext-token
entry associated with the user
map in the ReFrame db.
This is part of the application’s JWT security implementation. It is used to provide a way to validate the identity of a user in situations where session authentication is not possible.
When a user logs in to the system the server will generate and return a signed jwt token. This token can be used by the client in subsequent requests to assert a value for the user’s identity.
Because web sockets exist in the Jetty session layer rather than the Pedestal interceptor layer we cannor rely on the authentication and authorization interceptors I discussed in a previous post to provide server-side security for web-socket connection attempts.
The JWT token (ext-token
) can be used to provide such security.
The timer-atom
is responsible for the keep-alive activities of the connection and will send periodic PING messages to the server, from which it expects a PONG response.
A websocket is opened by the component by opening a connection to wss://<hostname>:<port>/ws?session-id=<session-id>&connection-uuid=<connection-uuid>&ext-token=<ext-token>
on the server.
The session-id
value is the session identifier requested by the client, which is correlated with the browser session. Therefore, different tabs will share the same session-id
because they share the same browser session.
As mentioned above the connection-uuid
value is a value generated by the client to identify a particular instance of the application (i.e. browser tab) and the ext-token
value is the jwt token returned by the server when a user logs in (see note above).
Internally, the server will use the session and connection identifiers to direct messages to the appropriate target client.
By convention, the session-id
value is a combination of the user’s application user-id (a keyword) and a unique integer e.g. :user-12
. The incorporation of the user-id in the session-id in this manner simplifies the processing of certain security restrictions imposed by the server.
Lifecycle Methods
:component-did-mount
is responsible for initializing the :timer-atom
and starting the keep-alive process.
:component-did-update
checks if either the current use has logged out, in which case it disconnects the websocket; or if a new user logs in, in which case it creates and connects a new websocket, clears the log messages stored in ReFrame’s state db, and finally initializes the internal log-atom
and associates the log session’s identifiers with the websocket.
component-will-unmount
stops the keep-alive process by clearing the the timer-atom
and the log-atom
.
:reagent-render
returns a simple empty DIV
element, which seems odd. However, this element is inserted in the static portion of the SPA’s main page and created only once when the SPA is loaded.
The implementation of the log-holder-element-ui
component is included below. The :component-did-update
lifecycle method checks whether the user is logging out i.e. there exists a current session (existing-session-id
) and the new-session-id
value is nil; or whether a new user has logged in.
(defn log-holder-element-ui
[{session-id :client-id
user-id :user
ext-token :ext-token
connection-uuid :connection-uuid
:as current-logged-in-user}]
(let
[timer-atom (atom nil)
log-atom (atom nil)]
(if connection-uuid
(ws-utils/connect-ws-for-log
user-id
session-id
connection-uuid
ext-token
log-atom)
(println "No connection-uuid available for logging element "
"during creation of component"))
(reagent/create-class
{:display-name
:alloc-log-component
:component-did-mount
(fn[this]
(println "Log Component didMount")
(reset!
timer-atom
(js/setInterval
(fn[]
(if @log-atom
(ws-utils/send-ping
log-atom)))
(* 1000 60 2))))
:component-did-update
(fn[this old-argv]
(println "Log Component didUpdate")
(let [new-argv (rest (reagent/argv this))
{new-session-id :client-id
new-connection-uuid :connection-uuid
new-user-id :user
new-ext-token :ext-token
:as current-logged-in-user}
(first new-argv)
{existing-user-id :user
existing-session-id :client-id}
(first (rest old-argv))]
(if (and (nil? new-session-id) existing-session-id)
(do
(println
(str "Clearing websocket for user "
(pr-str existing-user-id) ", "
"with client-id "
(pr-str existing-session-id)))
(ws-utils/disconnect-web-socket-for-log log-atom)
(reset! log-atom nil)))
(if new-session-id
(do
(println
(str "Creating websocket for user "
(pr-str new-user-id)
"with client-id "
(pr-str new-session-id)))
(ws-utils/connect-ws-for-log
new-user-id
new-session-id
new-connection-uuid
new-ext-token
log-atom)))))
:component-will-unmount
(fn[this]
(println "Log Component willUnmount")
(js/clearInterval @timer-atom)
(reset! timer-atom nil)
(reset! log-atom nil))
:reagent-render
(fn[{session-id :client-id :as current-logged-in-user}]
[:div])})))
The code above uses a number of utility functions to manage the WebSocket connection. We will taken a deeper look at some of them below.
The Web Socket Libary
The WebSocket library used by our implementation is haslett.
(require '[haslett.client :as ws])
(require '[haslett.format :as fmt])
Connecting
Connections are handled by the connect-ws-for-log
function, which is passed a user-id
, session-id
, connection-uuid
and ext-token
.
It is also passed (in log-atom
) the internal state atom of the log-holder-element-ui
component.
(defn connect-ws-for-log
[user-id session-id connection-uuid ext-token log-atom]
(println "connecting logging websocket for "
"user " (pr-str user-id) ", with client-id "
(pr-str session-id) "and connection-uuid "
(pr-str connection-uuid))
(let [{existing-websocket :web-socket
existing-user-id :user-id
existing-session-id :session-id
existing-connection-uuid :connection-uuid}
@log-atom]
(when existing-websocket
(if (ws/connected? existing-websocket)
(do
(println
(str "Web Socket is currently connected. "
"Closing existing connected websocket for "
(keyword (name existing-user-id)
(name existing-session-id))
" with connection-uuid "
(pr-str existing-connection-uuid)))
(ws/close existing-websocket))
(println
(str "Existing websocket to "
(keyword (name existing-user-id)
(name existing-session-id)) " "
"exists, but it is not connected.")))))
(go
(let
[web-socket
(<! (ws/connect
(str
api-urls/base-websocket-url
"?session-id=" session-id
"&ext-token=" ext-token
"&connection-uuid=" connection-uuid)
{:format fmt/transit}))]
(swap!
log-atom
(fn[o n]
(js/console.log
"Resetting log details to "
(pr-str (select-keys n [:user-id :session-id :connection-uuid]))
" from "
(pr-str (select-keys o [:user-id :session-id :connection-uuid])))
n)
{:user-id user-id
:session-id session-id
:connection-uuid connection-uuid
:web-socket web-socket
:ext-token ext-token})
(go
(loop []
(when-let
[msg (<! (:source web-socket))]
(add-message-to-log msg)
(recur))))
(when-let
[msg (<! (:close-status web-socket))]
(js/console.log
"close-status message received -> " (pr-str msg))
(decide-and-restart
msg
user-id
session-id
connection-uuid
ext-token
log-atom)))))
The function first conducts a number of sanity checks, and then, using haslett’s connect
function, creates a connection to the url on the server. The result of the call to connect
is stored in the state atom that was passed as an argument.
We use Haslett’s connect
function to create the underlying WebSocket, and specify that the communication format should be transit. The connect
function returns a promise channel that will create a map containing four elements: a :socket
, a :source
, a :sink
and a :close-status
.
The :socket
entry will contain the WebSocket instance that was created. The :source
and :sink
keys are core.async
channels that we will use for reading and writing to the underlying web socket using Clojure’s familiar channel metaphors.
Information about the connection is stored in the log-atom
which is a stateful part of the log-holder-element-ui
component. The information stored is all the information supplied by the client to the server and the websocket instance.
The connect-ws-for-log
function then starts the messages processing loop in a go
block.
Within the go
block, as each message is received on the :source
channel connected to the WebSocket, the function add-message-to-log
is called and passed the message’s contents as an argument. The add-message-to-log
function inserts the message in ReFrame’s state db.
After the message reading go
loop there is a section of code that responds to close events received in the haslett websocket’s :close-status
channel. When a close message event is received, the decide-and-restart
function is called. This function may, depending on the nature of the close event, decide to restart the web socket or not.
Disconnecting
The disconnect-web-socket-for-log
function, used when we’re shutting down the connection, is also passed the state atom. It closes the WebSocket.
(defn disconnect-web-socket-for-log
[log-atom]
(let
[{web-socket :web-socket
user-id :user-id
session-id :session-id
connection-uuid :connection-uuid} @log-atom]
(if (and session-id connection-uuid)
(do
(println
(str "Disconnecting web socket associated with "
(pr-str user-id) "/" (pr-str session-id)
" with connection uuid "
connection-uuid))
(if (ws/connected? web-socket)
(ws/close web-socket)
(println "Unable to close web socket. It's not connected.")))
(println "No session id available. Declined to issue close()."))))
Web Socket Keep-Alive (Client)
A browser will close a WebSocket if no traffic is seen in some particular interval of time (typically 5 minutes). However, status updates often occur in bursts with long periods of inactivity between these bursts. When the log-holder-element-ui
component is mounted we start a loop (using js/setInterval
) which calls send-ping
, a function that places (at two minute intervals) a ping message on the channel connected to the WebSocket.
This let’s the server know that it shouldn’t close the connection.
(defn send-ping[log-atom]
(let
[{web-socket :web-socket
user-id :user-id
session-id :session-id
ext-token :ext-token
connection-uuid :connection-uuid} @log-atom]
(if session-id
(do
(println
(str "Sending Ping for "
(pr-str user-id) "/"
(pr-str session-id)
" at " (pr-str connection-uuid)))
(if (ws/connected? web-socket)
(go
(>! (:sink web-socket)
{:asys/ping user-id
:asys/session-id session-id
:asys/connection-uuid connection-uuid}))
(do
(println
(str "PING: No websocket connected. "
"Attempting reconnect for "
(pr-str user-id) "/"
(pr-str session-id) ", "
"at " (pr-str connection-uuid)))
(connect-ws-for-log
user-id
session-id
connection-uuid
ext-token log-atom))))
(println "No session id available. No ping sent."))))
When the server receives a ping it should respond with a pong. If a web socket isn’t connected, and the client expects it to be, the send-ping
will attempt to reopen the connection. This may occur when connectivity is lost and the decide-and-restart
function has declined to reconnect. A consequence of this approach is that if the server goes away for any reason, the client will attempt to reconnect forever.
The Server Side
Now we turn our attention to how web socket connections are handled by the server.
Design Decisions
In order to decouple the underlying transport mechanism (e.g. WebSocket or SSE) from the Server’s higher-level message creation and dispatch functions we first create a single core.async
channel to which we can write our messages destined for the client.
We also create a publication of this channel using a selector of :topic
on the received message map. A helpful side-effect of this approach is that a publication of a topic without a matching subscription is simply dropped. The topic we’ll use for our log messages is :log-msg
. Therefore, a message pushed to our channel with the form {... :topic :log-msg ...}
will be forwarded to our publication.
Finally, we create a subscription to the topic :log-msg
, and a servicing function to remove messages from it and forward them to the client using whatever transport mechanism is desired. This decoupling serves two purposes: it allows us to change or update our messaging transport without unnecessarily impacting the server’s code; and adding other topics is a relatively simple extension.
We also need an atom to store the Server’s active subscription’s channel.
Creating the Channel & Publication
Below is the code that creates the single message channel, its associated publication and the atom used to store the subscription’s channel.
The atom will be initialized correctly at application startup time.
(defonce
server-messages-channel-destined-for-all-clients
(atom nil))
(defonce
server-messages-channel
(chan 100))
(defonce
server-messages-publication
(pub server-messages-channel #(:topic %)))
Starting Pedestal with WebSocket Support
In order for Pedestal/Jetty to start with WebSocket support you must, within its service map’s ::http/container-options
entry, supply a value for the :context-configurator
key.
The value should be a reference to a function that configures the Jetty Servlet’s web-socket behavior for Pedestal. The function should at least call the add-ws-endpoints
function in the io.pedestal.http.jetty.websockets
namespace. In our application this function is server.messaging.websocket/websocket-configurator-for-jetty
shown below.
(defn websocket-configurator-for-jetty
[jetty-servlet-context]
(ws/add-ws-endpoints
jetty-servlet-context
ws-paths))
The add-ws-endpoints
function takes as parameters a ServletContext and a configuration map. The configuration map passed indicates the websocket uri(s) that Jetty should use as web sockets end-points, and also the functions that should be called when the web socket service receives the on-connect
, on-text
, on-error
and on-close
events.
The configuation map used by the applications is shown below
{"/ws"
{:on-connect
(ws/start-ws-connection
new-ws-client)
:on-text
(fn [raw-msg]
(process-incoming-text-message raw-msg))
:on-binary
(fn [payload offset length]
(process-incoming-binary-message
payload offset length))
:on-error
(fn [error]
(process-error error))
:on-close
(fn [num-code reason-text]
(process-close
num-code reason-text))}}
The on-connect
handler uses Pedestal’s web-socket functionality to start the connection. Pedestal’s start-ws-connection
function takes as its single argument a function that should accept two parameters passed to it by Pedestal: a websocket (of type org.eclipse.jetty.websocket.api.Session
) and an async channel connected to the web socket.
(defn new-ws-client
[^Session ws-session send-ch]
(log/info
(str "new-ws-client: creating new websocket to client with "
"uri: " (str (.getRequestURI ^Session ws-session))))
(let
[ws-endpoint
(get-ws-endpoint-from-session ws-session)
query-string-map
(some-> ws-session
(.getRequestURI)
(.getQuery)
(route/parse-query-string))
[session-id ext-token connection-uuid]
(as->
query-string-map v
(mapv #(get v %) [:session-id :ext-token :connection-uuid]))]
(if (and session-id connection-uuid)
(let
[user-id-from-token
(or
(auth-utils/get-id-from-ext-token ext-token)
:anonymous)
message-text (str "new-ws-client: starting web socket "
"with user " (pr-str user-id-from-token) " "
"for session-id " session-id " "
"with connection-uuid " (pr-str connection-uuid) " "
"from " (pr-str ws-endpoint))
message (value->transit-string
{:time (gen-utils/get-local-timestamp-with-offset)
:text message-text})]
(log/info message-text)
(async/put!
send-ch
message)
(swap! ws-clients
assoc
(keyword
(name (gen-utils/possible-string-as-keyword user-id-from-token))
(str connection-uuid))
[ws-session send-ch
(keyword
(name (gen-utils/possible-string-as-keyword user-id-from-token))
(name (gen-utils/possible-string-as-keyword session-id)))]))
(log/warn (str "No session-id and connection-uuid "
"supplied for websocket creation."
"Websocket not created.")))))
Because web sockets exist in the Jetty session layer rather than the Pedestal routing layer we cannot rely on the authentication and authorization interceptors I discussed in a previous
post to provide security for connection attempts at the server’s /ws
url. Therefore, before a client is allowed to establish a connection with a particular session-id (which encode the user’s id) the jwt token, passed as a query parameter in the connection url, is examined to determine the user’s id.
We can extract the client’s requested session-id from the url used by the client to request a web-socket connection.
As described earlier, the form of the url is wss://<hostname>:<port>/ws?session-id=<session-id>&connection-uuid=<connection-uuid>&ext-token=<ext-token>
.
The ws-clients atom is a record of the currently connected web-socket clients. It contains a map keyed by each client’s session-id with values that is a vector of the client’s Session instance and its connected channel.
If the user has a valid user id the connection is allowed.
In new-ws-client
we extract the session-id requested by the client, sends a short you’re connected message, and add a vector describing the connection to the ws-clients
atom using a key composed of the user’s id and the connection-uuid.
You’ll notice in the on-text
handler function how the server’s side of the web socket’s keep-alive functionality is implemented in process-incoming-text-message
.
(defn process-incoming-text-message
[raw-msg]
(let
[message (transit-string->value raw-msg)]
(log/info "Websocket message received "
(pr-str message))
(if
(and
(map? message)
(contains? message :asys/ping))
(let
[{user-id :asys/ping
session-id :asys/session-id
connection-uuid :asys/connection-uuid} message]
(send-messages-to-clients
{:msg
{:asys/pong user-id
:asys/session-id session-id
:asys/connection-uuid connection-uuid}
:target-client
(keyword
(name user-id)
(str connection-uuid))}))
(log/warn
(str "Unexpected websocket message received: "
(pr-str message) ", type: "
(type message))))))
If the server receives a message with an :asys/ping
key, the server automatically responds with an :asys/pong
message. The value of the :asys/ping
entry is the user id, session identifier and connection uuid of the client that sent the ping message and enables the server to direct its response correctly.
Starting the Message Processing
When the application starts the web-server, the message-transport
function is also called to initialize the application’s messaging functionality. This function takes two parameters, a transport-type
(which can be either :web-socket
or :sse
) and the atom used to store the subscription’s channel.
(defn message-transport
[transport-type message-channel-atom]
(start-processing-sub transport-type message-channel-atom))
The message-transport
function calls start-processing-sub
which is the function that starts the go loop - accepting messages from the subscription channel and forwarding them to the client(s).
(defn start-processing-sub
[transport-type message-channel-atom]
;; If the channel already exists, then close it.
(when-let [ch (deref message-channel-atom)]
(async/close! ch))
;; subscribe a channel to be contained in the
;; server-messages-channel-destined-for-all-clients atom
;; to the server-messages-publication with the topic of
;; :log-msg i.e. anything in the publication with key of
;; :topic and a value of :log-msg
(async/sub
rlog/server-messages-publication
:log-msg
(reset! message-channel-atom (async/chan)))
;; start a go-loop that takes messages off the channel in the
;; atom and pass it to the send-messages-to-clients function
(log/info "Starting " (pr-str transport-type) " log message loop")
(async/go
(loop []
(when-let
[log-msg (async/<! (deref message-channel-atom))]
(send-messages-to-clients transport-type log-msg)
(recur)))))
The send-messages-to-clients
function is defined as a multi-method which selects on the transport-type
value and makes sure that the correct send-
function(s) are called for the selected transport type.
Essentially, this gets reduced to calling the send-messages-to-client
function in either the server.messaging.websocket
or the server.messaging.sse
namespaces.
Sending Messages
If we review the send-message-
functions in the server.messaging.websocket
namespace, we can see how they work. (The implementation for SSE is similar, and simpler).
The send-messages-to-clients
function is the namespace’s main public function. It accepts as an argument a map representing the message to be sent. Within the map is an entry :target-client
which indicates the message’s destination. The function handles broadcast messages, i.e. messages destined for all connected clients, and also messages destined for only one client. In either case, this function will call the private function send-message-to-client
in the same namespace as many times as is necessary.
(defn send-messages-to-clients
[{message :msg
target-client :target-client
message-type :message-type
:as whole-message}]
(if (and (some? target-client)
(not= :none target-client))
(do
(log/info
(str "websocket: asked to send message to clients "
(pr-str target-client)
", msg: "
(pr-str whole-message)))
(if (= :all target-client)
(doseq
[target-client (keys @ws-clients)]
(send-message-to-client
target-client
message
message-type))
(send-message-to-client
target-client
message
message-type)))))
The send-message-to-client
is shown below. It take a session-id
indicating the message’s destination and a message
which is a map.
(defn- send-message-to-client
[connection-identifier message & [message-type]]
(log/info
(str "Asked to send web-socket message to individual client "
(pr-str connection-identifier)
", msg is "
(pr-str message)))
(if-let
[ws-connections (get-ws-conns-for-session-id
@ws-clients
connection-identifier)]
(doall
(map
(fn[connection-key]
(if-let
[[ws-session send-ch combined-session-id]
(get @ws-clients connection-key)]
(if (or
(not (.isOpen ws-session))
(ap/closed? send-ch))
(do
(log/warn
"While trying to send-message-to-client,"
"found websocket or websocket channel was closed."
"combined-session-id" (pr-str combined-session-id)
"connection-key " (pr-str connection-key))
(clean-up-ws-clients))
(async/put!
send-ch
(value->transit-string
{:time (gen-utils/get-local-timestamp-with-offset)
:text message
:message-type (or message-type :info)})
(fn[v]
(log/debug
(str
"put! in send-message-to-client "
(pr-str connection-key)
" returned "
(pr-str v))))))
(log/warn (str "Couldn't find websocket session "
"for client connection "
(pr-str connection-key)
". Available connections "
"are " (pr-str (keys @ws-clients))))))
ws-connections))
(log/warn "Couldn't find any web sockets for target-client"
(:target-client message))))
Connecting the long-running Clojure function
Now that the messaging infrastructure is in place we’ll turn our attention to “instrumenting” our clojure function to send status update message.
A convenient, but not the only, approach is to leverage Clojure’s logging functionality. Often, within a Clojure we will often use calls to log/info
and log/debug
calls to mark and record important processing events. We could take the opportunity to selectively forward some of these messages to the remote client giving it feedback similar to what might be seen if the function was run in a REPL.
This is the approach I’ve chosen.
First I create a macro that can wrap a call to functions in log
namespace, but which can also accept a session-id
.
(defmacro with-forward-context
([body]
(list `with-forward-context nil body))
([target-id body]
(list `with-forward-context target-id body {}))
([target-id body options]
(list 'do body
(list
`apply
`write-message-to-server-message-channel
(concat
(list
'list
(list 'clojure.string/join " "
(conj
(map
#(if
(instance? java.lang.Throwable %)
"ERROR"
(list
'clojure.string/trim
(list
`str %)))
(rest body))
'list)))
(if target-id
(list target-id)
'())
(if options
(list options)
'()))))))
Then we can selectively wrap any log
calls we like as follow:
(rlog/with-forward-context
session-id
(log/info "OK"))
The result is that the message is logged in the usual fashion, but a call to write-message-to-server-message-channel
is also made, ensuring that the content of the log message is also sent to the client matching session-id
.
Closure
Hopefully, if you’re attempting to use websockets with your ClojureScript SPA this has been helpful in some small way. In a subsequent post I will discuss implementing a similar for of messaging but using SSE.