Asynchronous communication streams between a Pedestal server and a ReFrame SPA

How to receive and process status update messages sent from a Pedestal back-end application to a ReFrame front-end application during long-running server processing

Introduction

In a ReFrame SPA, it’s a relatively easy task to asynchronously initiate a backend process using the :http-xhrio effect handler available in the Reframe library. It’s also a rather simple matter of responding to the success and failure events (however they’re defined) produced by the back-end process when it finishes. However, what’s less obvious is how to capture any system processing messages issued by the backend process as it progresses, but before it sucessfully completes or fails.

Two mechanisms available are websockets or SSE. They will both suffice, but each has its own benefits and drawbacks.

A repo with a working version of this code is available on GitHub (tag v1.4).

Web Sockets

WebSockets are an interactive, bi-directional (full duplex), out-of-band communication protocol used for sending messages between web servers and clients. Although WebSocket connections use HTTP ports, WebSockets is not an HTTP protocol. It is a separate TCP protocol with its own semantics. WebSockets enable full-duplex communication between a web client and server, without any need for the polling that is demanded by the half-duplex nature of HTTP.

The channel is full-duplex and both endpoints (server and client) are immediately aware of any channel changes, such as a disconnection. A problem with this immediate feedback is that both client and server must respond appropriately. Was the channel closed, or are there network issues? Should the client attempt to reopen the channel? etc.

Most WebSocket implementations also impose a limit to how long they will maintain an open channel which hasn’t transmitted or received data recently. Often we would like to use WebSockets with long periods of time between bursts of activity. Therefore, it is the responsibility of the developer to periodically send ping messages along the channel in order to remain below the time-out interval and ensure the connection is maintained.

SSE (Server Sent Events)

Server Sent Event is a push technology which fully exists within the HTTP world (using content type text/event-stream). SSE allows a server to send a stream of events to a web client after the client has established an initial connection. Once a connection has been established the client can use the JavaScript EventSource API to subscribe to the stream, and to receive updates as they are issued by the server. The stream will remain open until explicitly closed by either the server or the client.

SSE streams are uni-directional (simplex) from the server to the client and unlike WebSockets there is no facility available for the client to use the channel to transmit to the server. Helpfully, SSE provides for automatic client reconnections - convenient when network issues are encountered.

On the other hand, one of the challenges of working with SSE is that generally the server is not aware that a client has disconnected until it attempts to send a message. This can be a resource drain for servers with more than a handful of zombie clients. Fortunately, most implementations check the liveness of the connections by automatically issuing periodic heartbeat empty messages to validate the status of the channel.

Because both the reconnection and heartbeat features are often part of the base implementation of the SSE protocol, SSE is often simpler in use than WebSockets.

Implementations

Background

For the purpose of this exercise, let’s suppose we’ve created a Clojure function that performs a number of activities that take some non-trivial amount of time. We’ve developed its functionality in the REPL and we’re satisfied that it’s suitable to be made available at a particular Pedestal controlled URL accessible to our front-end application.

However, we would also like to be able to execute this function (via the URL) without stalling our front-end application and to be able to monitor its progress as it proceeds to completion. Ideally, we want to receive processing status update messages from the long-running Clojure function and present them to the user in the browser.

Of course, in order to avoid stalling the front-end interface we can initiate processing using the :http-xhrio effect handler in ReFrame. However, this will send an AJAX request, executing the function in the background, and to some extent making it inaccessible from our application until it either suceeds or fails.

The :http-xhrio effect handler can also take a :progress-handler option, but this isn’t really suitable for our current requirements.

On the client, the approach we’ll take is to provide a pair of React component. One that will be used to receive incoming status messages sent by the server, and another to display those messages in the UI. We will implement these components using both WebSockets and SSE streams as the underlying transports.

In order to fully understand the login process initiated by the client and the server’s response, particularly the fields returned, it’s probably helpful to review some details. I’ve covered this in a previous post which is available here.

Client-Side React Components

We will implement two React components to provide the message logging functionality: a log-holder-element-ui component and a log-page-ui component.

The client side implementation in the repo contains separate pairs of components for the two transport types web-socket and SSE. The former is in the web.logs namespace, the latter in the web.sse-logs namespace.

Apart from some obvious naming diferences in function names, they are more or less interchangable. For this post I will discuss the web-socket implementation. In another post I intend to outline the SSE implementation, and the manner in which it differs from its web socket sibling.

Component #1 - The log-page-ui Component

The log-page-ui component is the simpler of the two components. It’s primary responsibility is to read the vector of messages stored in ReFrame’s state db’s :log-messages field and display it in tabular form.

Each message entry in the vector is a map with keys of :source, :message and message-type. The display component subscribes to the :logs/log-messages subscription, which retrieves the vector, and displays the results. We won’t take a detailed look at this component as it’s very simple, but you can review it in the accompanying code repo.

Component #2 - The log-holder-element-ui Component

The log-holder-element-ui component is more complex. It is responsible for populating the ReFrame state db’s :log-messages vector. In order to do so, it opens a connection (web-socket or SSE) to the server, reads incoming messages, and transfers them to the state db.

The component is also responsible for managing the connection’s lifecycle, including any keep-alive requirements and recovery from network errors.

The component is a global singleton component and is used to manage and control the SPA’s entire logging functionality. It exists throughout the SPA’s lifetime, and independently of the user’s context in the application, or what action he or she is performing. This simplifies server routing of messages. The server can simply direct them at a client instance. The singleton component will then process the message into the SPA’s ReFrame state atom, and the contents can be displayed on demand by the log-page-ui component.

The log-holder-element-ui component is implemented, in reagent terms, as a Form-3 component. This is necessary because we require access to React’s lifecycle functions. Using these methods we can ensure that our component is a singleton, and that only one instance exists within the application even if the logged-in user changes. The lifecycle methods used (adopting reagent’s naming convention) are :component-did-mount, :component-did-update, :component-will-unmount and :reagent-render.

Internal State

The component maintains two internal state variables log-atom and timer-atom.

The log-atom contains a map with keys :user-id, :session-id, :connection-uuid and :web-socket, and records an association between an application session identifiers and the web socket servicing that session.

The SPA as written uses session- and jwt-based authentication. Because a session is shared between browser tabs, the :connection-uuid value is used to differentiate between tabs within the same session. The :connection-uuid is determined randomly by the client when it first loads.

There is also an :ext-token entry associated with the user map in the ReFrame db.

This is part of the application’s JWT security implementation. It is used to provide a way to validate the identity of a user in situations where session authentication is not possible.

When a user logs in to the system the server will generate and return a signed jwt token. This token can be used by the client in subsequent requests to assert a value for the user’s identity.

Because web sockets exist in the Jetty session layer rather than the Pedestal interceptor layer we cannor rely on the authentication and authorization interceptors I discussed in a previous post to provide server-side security for web-socket connection attempts.

The JWT token (ext-token) can be used to provide such security.

The timer-atom is responsible for the keep-alive activities of the connection and will send periodic PING messages to the server, from which it expects a PONG response.

A websocket is opened by the component by opening a connection to wss://<hostname>:<port>/ws?session-id=<session-id>&connection-uuid=<connection-uuid>&ext-token=<ext-token> on the server.

The session-id value is the session identifier requested by the client, which is correlated with the browser session. Therefore, different tabs will share the same session-id because they share the same browser session.

As mentioned above the connection-uuid value is a value generated by the client to identify a particular instance of the application (i.e. browser tab) and the ext-token value is the jwt token returned by the server when a user logs in (see note above).

Internally, the server will use the session and connection identifiers to direct messages to the appropriate target client.

By convention, the session-id value is a combination of the user’s application user-id (a keyword) and a unique integer e.g. :user-12. The incorporation of the user-id in the session-id in this manner simplifies the processing of certain security restrictions imposed by the server.

Lifecycle Methods

:component-did-mount is responsible for initializing the :timer-atom and starting the keep-alive process.

:component-did-update checks if either the current use has logged out, in which case it disconnects the websocket; or if a new user logs in, in which case it creates and connects a new websocket, clears the log messages stored in ReFrame’s state db, and finally initializes the internal log-atom and associates the log session’s identifiers with the websocket.

component-will-unmount stops the keep-alive process by clearing the the timer-atom and the log-atom.

:reagent-render returns a simple empty DIV element, which seems odd. However, this element is inserted in the static portion of the SPA’s main page and created only once when the SPA is loaded.

The implementation of the log-holder-element-ui component is included below. The :component-did-update lifecycle method checks whether the user is logging out i.e. there exists a current session (existing-session-id) and the new-session-id value is nil; or whether a new user has logged in.

(defn log-holder-element-ui
  [{session-id :client-id
    user-id :user
    ext-token :ext-token
    connection-uuid :connection-uuid
    :as current-logged-in-user}]
  (let
    [timer-atom (atom nil)
     log-atom (atom nil)]
    (if connection-uuid
      (ws-utils/connect-ws-for-log
        user-id
        session-id
        connection-uuid
        ext-token
        log-atom)
      (println "No connection-uuid available for logging element "
               "during creation of component"))

    (reagent/create-class
      {:display-name
       :alloc-log-component

       :component-did-mount
       (fn[this]
         (println "Log Component didMount")
         (reset!
           timer-atom
           (js/setInterval
             (fn[]
               (if @log-atom
                 (ws-utils/send-ping
                   log-atom)))
             (* 1000 60 2))))

       :component-did-update
       (fn[this old-argv]
         (println "Log Component didUpdate")
         (let [new-argv (rest (reagent/argv this))
               {new-session-id :client-id
                new-connection-uuid :connection-uuid
                new-user-id :user
                new-ext-token :ext-token
                :as current-logged-in-user}
               (first new-argv)
               {existing-user-id :user
                existing-session-id :client-id}
               (first (rest old-argv))]
           (if (and (nil? new-session-id) existing-session-id)
             (do
               (println
                 (str "Clearing websocket for user "
                      (pr-str existing-user-id) ", "
                      "with client-id "
                      (pr-str existing-session-id)))
               (ws-utils/disconnect-web-socket-for-log log-atom)
               (reset! log-atom nil)))
           (if new-session-id
             (do
               (println
                 (str "Creating websocket for user "
                      (pr-str new-user-id)
                      "with client-id "
                      (pr-str new-session-id)))
               (ws-utils/connect-ws-for-log
                 new-user-id
                 new-session-id
                 new-connection-uuid
                 new-ext-token
                 log-atom)))))

       :component-will-unmount
       (fn[this]
         (println "Log Component willUnmount")
         (js/clearInterval @timer-atom)
         (reset! timer-atom nil)
         (reset! log-atom nil))

       :reagent-render
       (fn[{session-id :client-id :as current-logged-in-user}]
         [:div])})))

The code above uses a number of utility functions to manage the WebSocket connection. We will taken a deeper look at some of them below.

The Web Socket Libary

The WebSocket library used by our implementation is haslett.

(require '[haslett.client :as ws])
(require '[haslett.format :as fmt])
Connecting

Connections are handled by the connect-ws-for-log function, which is passed a user-id, session-id, connection-uuid and ext-token.

It is also passed (in log-atom) the internal state atom of the log-holder-element-ui component.

(defn connect-ws-for-log
  [user-id session-id connection-uuid ext-token log-atom]
  (println "connecting logging websocket for "
           "user " (pr-str user-id) ", with client-id "
           (pr-str session-id) "and connection-uuid "
           (pr-str connection-uuid))
  (let [{existing-websocket :web-socket
         existing-user-id :user-id
         existing-session-id :session-id
         existing-connection-uuid :connection-uuid}
        @log-atom]
    (when existing-websocket
      (if (ws/connected? existing-websocket)
        (do
          (println
            (str "Web Socket is currently connected. "
                 "Closing existing connected websocket for "
                 (keyword (name existing-user-id)
                          (name existing-session-id))
                 " with connection-uuid "
                 (pr-str existing-connection-uuid)))
          (ws/close existing-websocket))
        (println
          (str "Existing websocket to "
               (keyword (name existing-user-id)
                        (name existing-session-id)) " "
               "exists, but it is not connected.")))))
  (go
    (let
      [web-socket
       (<! (ws/connect
             (str
               api-urls/base-websocket-url
               "?session-id=" session-id
               "&ext-token=" ext-token
               "&connection-uuid=" connection-uuid)
             {:format fmt/transit}))]
      (swap!
        log-atom
        (fn[o n]
          (js/console.log
            "Resetting log details to "
            (pr-str (select-keys n [:user-id :session-id :connection-uuid]))
            " from "
            (pr-str (select-keys o [:user-id :session-id :connection-uuid])))
          n)
        {:user-id user-id
         :session-id session-id
         :connection-uuid connection-uuid
         :web-socket web-socket
         :ext-token ext-token})
      (go
        (loop []
          (when-let
            [msg (<! (:source web-socket))]
            (add-message-to-log msg)
            (recur))))
      (when-let
        [msg (<! (:close-status web-socket))]
        (js/console.log
          "close-status message received -> " (pr-str msg))
        (decide-and-restart
          msg
          user-id
          session-id
          connection-uuid
          ext-token
          log-atom)))))

The function first conducts a number of sanity checks, and then, using haslett’s connect function, creates a connection to the url on the server. The result of the call to connect is stored in the state atom that was passed as an argument.

We use Haslett’s connect function to create the underlying WebSocket, and specify that the communication format should be transit. The connect function returns a promise channel that will create a map containing four elements: a :socket, a :source, a :sink and a :close-status.

The :socket entry will contain the WebSocket instance that was created. The :source and :sink keys are core.async channels that we will use for reading and writing to the underlying web socket using Clojure’s familiar channel metaphors.

Information about the connection is stored in the log-atom which is a stateful part of the log-holder-element-ui component. The information stored is all the information supplied by the client to the server and the websocket instance.

The connect-ws-for-log function then starts the messages processing loop in a go block.

Within the go block, as each message is received on the :source channel connected to the WebSocket, the function add-message-to-log is called and passed the message’s contents as an argument. The add-message-to-log function inserts the message in ReFrame’s state db.

After the message reading go loop there is a section of code that responds to close events received in the haslett websocket’s :close-status channel. When a close message event is received, the decide-and-restart function is called. This function may, depending on the nature of the close event, decide to restart the web socket or not.

Disconnecting

The disconnect-web-socket-for-log function, used when we’re shutting down the connection, is also passed the state atom. It closes the WebSocket.

(defn disconnect-web-socket-for-log
  [log-atom]
  (let
    [{web-socket :web-socket
      user-id :user-id
      session-id :session-id
      connection-uuid :connection-uuid} @log-atom]
    (if (and session-id connection-uuid)
      (do
        (println
          (str "Disconnecting web socket associated with "
               (pr-str user-id) "/" (pr-str session-id)
               " with connection uuid "
               connection-uuid))
        (if (ws/connected? web-socket)
          (ws/close web-socket)
          (println "Unable to close web socket. It's not connected.")))
      (println "No session id available. Declined to issue close()."))))
Web Socket Keep-Alive (Client)

A browser will close a WebSocket if no traffic is seen in some particular interval of time (typically 5 minutes). However, status updates often occur in bursts with long periods of inactivity between these bursts. When the log-holder-element-ui component is mounted we start a loop (using js/setInterval) which calls send-ping, a function that places (at two minute intervals) a ping message on the channel connected to the WebSocket.

This let’s the server know that it shouldn’t close the connection.

(defn send-ping[log-atom]
  (let
    [{web-socket :web-socket
      user-id :user-id
      session-id :session-id
      ext-token :ext-token
      connection-uuid :connection-uuid} @log-atom]
    (if session-id
      (do
        (println
          (str "Sending Ping for "
               (pr-str user-id) "/"
               (pr-str session-id)
               " at " (pr-str connection-uuid)))
        (if (ws/connected? web-socket)
          (go
            (>! (:sink web-socket)
                {:asys/ping user-id
                 :asys/session-id session-id
                 :asys/connection-uuid connection-uuid}))
          (do
            (println
              (str "PING: No websocket connected. "
                   "Attempting reconnect for "
                   (pr-str user-id) "/"
                   (pr-str session-id) ", "
                   "at " (pr-str connection-uuid)))
            (connect-ws-for-log
              user-id
              session-id
              connection-uuid
              ext-token log-atom))))
      (println "No session id available. No ping sent."))))

When the server receives a ping it should respond with a pong. If a web socket isn’t connected, and the client expects it to be, the send-ping will attempt to reopen the connection. This may occur when connectivity is lost and the decide-and-restart function has declined to reconnect. A consequence of this approach is that if the server goes away for any reason, the client will attempt to reconnect forever.

The Server Side

Now we turn our attention to how web socket connections are handled by the server.

Design Decisions

In order to decouple the underlying transport mechanism (e.g. WebSocket or SSE) from the Server’s higher-level message creation and dispatch functions we first create a single core.async channel to which we can write our messages destined for the client.

We also create a publication of this channel using a selector of :topic on the received message map. A helpful side-effect of this approach is that a publication of a topic without a matching subscription is simply dropped. The topic we’ll use for our log messages is :log-msg. Therefore, a message pushed to our channel with the form {... :topic :log-msg ...} will be forwarded to our publication.

Finally, we create a subscription to the topic :log-msg, and a servicing function to remove messages from it and forward them to the client using whatever transport mechanism is desired. This decoupling serves two purposes: it allows us to change or update our messaging transport without unnecessarily impacting the server’s code; and adding other topics is a relatively simple extension.

We also need an atom to store the Server’s active subscription’s channel.

Creating the Channel & Publication

Below is the code that creates the single message channel, its associated publication and the atom used to store the subscription’s channel.

The atom will be initialized correctly at application startup time.

(defonce
  server-messages-channel-destined-for-all-clients
  (atom nil))

(defonce
  server-messages-channel
  (chan 100))

(defonce
  server-messages-publication
  (pub server-messages-channel #(:topic %)))

Starting Pedestal with WebSocket Support

In order for Pedestal/Jetty to start with WebSocket support you must, within its service map’s ::http/container-options entry, supply a value for the :context-configurator key.

The value should be a reference to a function that configures the Jetty Servlet’s web-socket behavior for Pedestal. The function should at least call the add-ws-endpoints function in the io.pedestal.http.jetty.websockets namespace. In our application this function is server.messaging.websocket/websocket-configurator-for-jetty shown below.

(defn websocket-configurator-for-jetty
  [jetty-servlet-context]
  (ws/add-ws-endpoints
    jetty-servlet-context
    ws-paths))

The add-ws-endpoints function takes as parameters a ServletContext and a configuration map. The configuration map passed indicates the websocket uri(s) that Jetty should use as web sockets end-points, and also the functions that should be called when the web socket service receives the on-connect, on-text, on-error and on-close events.

The configuation map used by the applications is shown below

{"/ws"
   {:on-connect
    (ws/start-ws-connection
      new-ws-client)
    :on-text
    (fn [raw-msg]
      (process-incoming-text-message raw-msg))
    :on-binary
    (fn [payload offset length]
      (process-incoming-binary-message
        payload offset length))
    :on-error
    (fn [error]
      (process-error error))
    :on-close
    (fn [num-code reason-text]
      (process-close
        num-code reason-text))}}

The on-connect handler uses Pedestal’s web-socket functionality to start the connection. Pedestal’s start-ws-connection function takes as its single argument a function that should accept two parameters passed to it by Pedestal: a websocket (of type org.eclipse.jetty.websocket.api.Session) and an async channel connected to the web socket.

(defn new-ws-client
  [^Session ws-session send-ch]

  (log/info
    (str "new-ws-client: creating new websocket to client with "
         "uri: " (str (.getRequestURI ^Session ws-session))))
  (let
    [ws-endpoint
     (get-ws-endpoint-from-session ws-session)
     query-string-map
     (some-> ws-session
             (.getRequestURI)
             (.getQuery)
             (route/parse-query-string))
     [session-id ext-token connection-uuid]
     (as->
       query-string-map v
       (mapv #(get v %) [:session-id :ext-token :connection-uuid]))]
    (if (and session-id connection-uuid)
      (let
        [user-id-from-token
         (or
           (auth-utils/get-id-from-ext-token ext-token)
           :anonymous)
         message-text (str "new-ws-client: starting web socket "
                           "with user " (pr-str user-id-from-token) " "
                           "for session-id " session-id " "
                           "with connection-uuid " (pr-str connection-uuid) " "
                           "from " (pr-str ws-endpoint))
         message (value->transit-string
                   {:time (gen-utils/get-local-timestamp-with-offset)
                    :text message-text})]

        (log/info message-text)

        (async/put!
          send-ch
          message)

        (swap! ws-clients
               assoc
               (keyword
                 (name (gen-utils/possible-string-as-keyword user-id-from-token))
                 (str connection-uuid))
               [ws-session send-ch
                (keyword
                  (name (gen-utils/possible-string-as-keyword user-id-from-token))
                  (name (gen-utils/possible-string-as-keyword session-id)))]))
      (log/warn (str "No session-id and connection-uuid "
                     "supplied for websocket creation."
                     "Websocket not created.")))))

Because web sockets exist in the Jetty session layer rather than the Pedestal routing layer we cannot rely on the authentication and authorization interceptors I discussed in a previous post to provide security for connection attempts at the server’s /ws url. Therefore, before a client is allowed to establish a connection with a particular session-id (which encode the user’s id) the jwt token, passed as a query parameter in the connection url, is examined to determine the user’s id.

We can extract the client’s requested session-id from the url used by the client to request a web-socket connection.

As described earlier, the form of the url is wss://<hostname>:<port>/ws?session-id=<session-id>&connection-uuid=<connection-uuid>&ext-token=<ext-token>.

The ws-clients atom is a record of the currently connected web-socket clients. It contains a map keyed by each client’s session-id with values that is a vector of the client’s Session instance and its connected channel.

If the user has a valid user id the connection is allowed.

In new-ws-client we extract the session-id requested by the client, sends a short you’re connected message, and add a vector describing the connection to the ws-clients atom using a key composed of the user’s id and the connection-uuid.

You’ll notice in the on-text handler function how the server’s side of the web socket’s keep-alive functionality is implemented in process-incoming-text-message.

(defn process-incoming-text-message
  [raw-msg]
  (let
    [message (transit-string->value raw-msg)]
    (log/info "Websocket message received "
              (pr-str message))
    (if
      (and
        (map? message)
        (contains? message :asys/ping))
      (let
        [{user-id :asys/ping
          session-id :asys/session-id
          connection-uuid :asys/connection-uuid} message]
        (send-messages-to-clients
          {:msg
           {:asys/pong user-id
            :asys/session-id session-id
            :asys/connection-uuid connection-uuid}
           :target-client
           (keyword
             (name user-id)
             (str connection-uuid))}))
      (log/warn
        (str "Unexpected websocket message received: "
             (pr-str message) ", type: "
             (type message))))))

If the server receives a message with an :asys/ping key, the server automatically responds with an :asys/pong message. The value of the :asys/ping entry is the user id, session identifier and connection uuid of the client that sent the ping message and enables the server to direct its response correctly.

Starting the Message Processing

When the application starts the web-server, the message-transport function is also called to initialize the application’s messaging functionality. This function takes two parameters, a transport-type (which can be either :web-socket or :sse) and the atom used to store the subscription’s channel.

(defn message-transport
  [transport-type message-channel-atom]
  (start-processing-sub transport-type message-channel-atom))

The message-transport function calls start-processing-sub which is the function that starts the go loop - accepting messages from the subscription channel and forwarding them to the client(s).

(defn start-processing-sub
  [transport-type message-channel-atom]
  ;; If the channel already exists, then close it.
  (when-let [ch (deref message-channel-atom)]
    (async/close! ch))
  ;; subscribe a channel to be contained in the
  ;; server-messages-channel-destined-for-all-clients atom
  ;; to the server-messages-publication with the topic of
  ;; :log-msg i.e. anything in the publication with key of
  ;; :topic and a value of :log-msg
  (async/sub
    rlog/server-messages-publication
    :log-msg
    (reset! message-channel-atom (async/chan)))
  ;; start a go-loop that takes messages off the channel in the
  ;; atom and pass it to the send-messages-to-clients function
  (log/info "Starting " (pr-str transport-type) " log message loop")
  (async/go
    (loop []
      (when-let
        [log-msg (async/<! (deref message-channel-atom))]
        (send-messages-to-clients transport-type log-msg)
        (recur)))))

The send-messages-to-clients function is defined as a multi-method which selects on the transport-type value and makes sure that the correct send- function(s) are called for the selected transport type.

Essentially, this gets reduced to calling the send-messages-to-client function in either the server.messaging.websocket or the server.messaging.sse namespaces.

Sending Messages

If we review the send-message- functions in the server.messaging.websocket namespace, we can see how they work. (The implementation for SSE is similar, and simpler).

The send-messages-to-clients function is the namespace’s main public function. It accepts as an argument a map representing the message to be sent. Within the map is an entry :target-client which indicates the message’s destination. The function handles broadcast messages, i.e. messages destined for all connected clients, and also messages destined for only one client. In either case, this function will call the private function send-message-to-client in the same namespace as many times as is necessary.

(defn send-messages-to-clients
  [{message :msg
    target-client :target-client
    message-type :message-type
    :as whole-message}]
  (if (and (some? target-client)
           (not= :none target-client))
    (do
      (log/info
        (str "websocket: asked to send message to clients "
             (pr-str target-client)
             ", msg: "
             (pr-str whole-message)))
      (if (= :all target-client)
        (doseq
          [target-client (keys @ws-clients)]
          (send-message-to-client
            target-client
            message
            message-type))
        (send-message-to-client
          target-client
          message
          message-type)))))

The send-message-to-client is shown below. It take a session-id indicating the message’s destination and a message which is a map.

(defn- send-message-to-client
  [connection-identifier message & [message-type]]
  (log/info
    (str "Asked to send web-socket message to individual client "
         (pr-str connection-identifier)
         ", msg is "
         (pr-str message)))
  (if-let
    [ws-connections (get-ws-conns-for-session-id
                      @ws-clients
                      connection-identifier)]
    (doall
      (map
        (fn[connection-key]
          (if-let
            [[ws-session send-ch combined-session-id]
             (get @ws-clients connection-key)]
            (if (or
                  (not (.isOpen ws-session))
                  (ap/closed? send-ch))
              (do
                (log/warn
                  "While trying to send-message-to-client,"
                  "found websocket or websocket channel was closed."
                  "combined-session-id" (pr-str combined-session-id)
                  "connection-key " (pr-str connection-key))
                (clean-up-ws-clients))
              (async/put!
                send-ch
                (value->transit-string
                  {:time (gen-utils/get-local-timestamp-with-offset)
                   :text message
                   :message-type (or message-type :info)})
                (fn[v]
                  (log/debug
                    (str
                      "put! in send-message-to-client "
                      (pr-str connection-key)
                      " returned "
                      (pr-str v))))))
            (log/warn (str "Couldn't find websocket session "
                           "for client connection "
                           (pr-str connection-key)
                           ". Available connections "
                           "are " (pr-str (keys @ws-clients))))))
        ws-connections))
    (log/warn "Couldn't find any web sockets for target-client"
              (:target-client message))))

Connecting the long-running Clojure function

Now that the messaging infrastructure is in place we’ll turn our attention to “instrumenting” our clojure function to send status update message.

A convenient, but not the only, approach is to leverage Clojure’s logging functionality. Often, within a Clojure we will often use calls to log/info and log/debug calls to mark and record important processing events. We could take the opportunity to selectively forward some of these messages to the remote client giving it feedback similar to what might be seen if the function was run in a REPL.

This is the approach I’ve chosen.

First I create a macro that can wrap a call to functions in log namespace, but which can also accept a session-id.

(defmacro with-forward-context
  ([body]
   (list `with-forward-context nil body))
  ([target-id body]
   (list `with-forward-context target-id body {}))
  ([target-id body options]
   (list 'do body
         (list
           `apply
           `write-message-to-server-message-channel
           (concat
             (list
               'list
               (list 'clojure.string/join " "
                     (conj
                       (map
                         #(if
                            (instance? java.lang.Throwable %)
                            "ERROR"
                            (list
                              'clojure.string/trim
                              (list
                                `str %)))
                         (rest body))
                       'list)))
             (if target-id
               (list target-id)
               '())
             (if options
               (list options)
               '()))))))

Then we can selectively wrap any log calls we like as follow:

(rlog/with-forward-context
      session-id
      (log/info "OK"))

The result is that the message is logged in the usual fashion, but a call to write-message-to-server-message-channel is also made, ensuring that the content of the log message is also sent to the client matching session-id.

Closure

Hopefully, if you’re attempting to use websockets with your ClojureScript SPA this has been helpful in some small way. In a subsequent post I will discuss implementing a similar for of messaging but using SSE.

Edit this page

Kieran Owens
Kieran Owens
CTO of Timpson Gray

Experienced Technology Leader with a particular interest in the use of functional languages for building accounting systems.

comments powered by Disqus

Related