DEV Community

NDREAN
NDREAN

Posted on • Updated on

Elixir mock SSE server

sse process


The server is a HTTP/2 router

Server Sent Events are "simple" HTTP requests. The browser has a build-in interface, and all we need to make a SSE server is to provide a GET endpoint that sends SSEs. The Javascript front-end will connect and listen to SSEs via a simple call new EventSource(back-endpoint) (see client code at the end).
We follow the example of Plug base HTTP server to produce a Docker image of a simple mock SSE server written in Elixir with the webserver Cowboy.

To overcome the limitations of SSE over HTTP1.1, we will upgrade from HTTP to HTTP/2 via HTTPS (this is the browsers constraint).

❗ You discover that there is a limit of 50 simultaneous open connections even over HTTP/2. The need for sophistication is therefor questionable. Even if some "more serious" libraries are shown at the end, this seems an important limitation and the prefered protocol should be websocket instead of HTTP since this does not suffer from this low limit, at the expense of easy scalability.

In this simple example, the server will expose 3 endpoints: two GET and one POST. The first GET will be a mock of a periodic signal: we will emit periodically a letter of the alphabet. Why not! The second GET will emit back the message that has been posted to the POST endpoint. The event bus will use Phoenix.PubSub to pass the state - the payload - between endpoints, so that this can be used in any part of the app. This can even be distributed if needed.

defmodule SSE do
  import Plug.Conn
  use Plug.Router

  # define the front-end urls that are permitted to reach the back-end to the CORS Plug.
  @front1 http://localhost:3000
  @front2 https://front-end.surge.sh

  plug(:match)
  # set CORS between the front-end and back-end
  plug(CORSPlug, origin: [@front1, @front2])
  plug(Plug.SSL, rewrite_on: [:x_forwarded_host, :x_forwarded_port, :x_forwarded_proto])
  plug(Plug.Parsers, parsers: [:json], pass: ["text/*", "application/json"], json_decoder: Jason)
  plug(:dispatch)

  #source emits a random letter every 5 seconds
  get "/sse" do
    prepare_sse(conn)
    |> send_letter()
  end


  # message posting endpoint that we broadcast on the topic "post"
  post "/post" do
    with params <- conn.params,
         msg <- make_message(params) do
      Phoenix.PubSub.broadcast(SSE.PubSub, "post", {:post, msg})
      conn |> resp(303, "broadcasted") |> send_resp()
    end
  end

  #source emits an SSE every time a message is received on a topic "post" 
  get "/post" do
    Phoenix.PubSub.subscribe(SSE.PubSub, "post")
    prepare_sse(conn)  
    receive do
      {:post, data} ->
        chunk(conn, data)
    end

    conn
  end

  #function plug
  defp prepare_sse(conn) do
    conn
    |> Plug.Conn.put_resp_header("connection", "keep-alive")
    |> Plug.Conn.put_resp_header("content-type", "text/event-stream")
    |> send_chunked(200)
  end

  defp make_message(params) do
    data = Jason.encode!(params)
    uuid = uuid4()
    "event: message\ndata: #{data}\nid: #{uuid}\nretry: 6000\n\n"
  end

  # we send a letter of the alphabet every 5 seconds
  defp send_letter(conn, x \\ "a") do
    msg = make_message(%{msg: x})
    {:ok, _conn} = chunk(conn, msg)
    :timer.sleep(5_000)

    send_letter(conn, get_random())
  end

  defp get_random() do
    Enum.map(?a..?z, fn x -> <<x::utf8>> end)
    |> Enum.random()
  end

  def uuid4() do
    :uuid.get_v4() |> :uuid.uuid_to_string()|> to_string()
  end
end
Enter fullscreen mode Exit fullscreen mode

The application is supervised and the start function is defined below.

defmodule SSE.Application do
  use Application

  def start(_type, _args) do
    plug_options = [
      port: app_port(),
      compress: true,
      cipher_suite: :strong,
      certfile: "priv/cert/sse+2.pem",
      keyfile: "priv/cert/sse+2-key.pem",
      otp_app: :sse,
      protocol_options: [idle_timeout: :infinity]
    ]

    children = [
      {Plug.Cowboy, scheme: :https, plug: SSE.Router, options: plug_options},
      {Phoenix.PubSub, name: SSE.Pubsub}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: SSE.Supervisor)
  end

  defp app_port do
    System.get_env()
    |> Map.get("PORT", "4043")
    |> String.to_integer()
  end
end
Enter fullscreen mode Exit fullscreen mode

The mix file is:

  {:plug_cowboy, "~> 2.5"},
  {:plug_crypto, "~> 1.2"},
  {:cors_plug, "~> 3.0"},
  {:jason, "~>1.3"},
  {:uuid, ">= 2.0.4", [hex: :uuid_erl]},
  {:phoenix_pubsub, "~> 2.0"},
  {:credo, "~> 1.6", only: [:dev, :test], runtime: false},
  {:httpoison, "~> 1.8", only: [:dev, :test]}
Enter fullscreen mode Exit fullscreen mode

Test it

We will mostly test the interface. We have three ways to test: with cURL, with Elixir code and with the browser.

Curl

Just cURL in another terminal.

curl https://localhost:4000/sse

curl -H 'Content-type: application/json' \
-d {"test": "sent me via SSE"}' \ https://localhost:4000/post
Enter fullscreen mode Exit fullscreen mode

Elixir client

Since we are just using HTTP requests, we will use HTTPoison to consume SSEs as a client. You can use HTTPoison.AsyncChunk to receive SSEs and we keep the connection indefinitely open. Note that this worked with HTTP1.1 but I could not make it work with the :ssl options despite curl hitting the https endpoint without any problem.

The way we emit random letters on HTTP connection doesn't help to automate this test. Since we are mainly concerned by the interface, instead of making a "_test.exs" file, just run an iex session to test the interface:

> iex -S mix

# check if the broadcasted message is sent by SSE on "/post"
iex> Test.is_broadcasted("hello") === %{"test"=> "hello"}

# check if the stream is sent by SSE on "/sse"
iex> Test.sse_receiver(20000)
# a list with letters builds up...during 20s

# run 50 simultaneous open connections during 20s
iex> Enum.each(1..50, fn _ -> Test.sse_receiver(20000) end)
Enter fullscreen mode Exit fullscreen mode

❗ This module only works with HTTP

defmodule Test do
  @moduledoc false
  require Logger

  @headers [{"Content-Type", "application/json"}]
  @url_post "http://localhost:4043/post"
  @url_sse "http://localhost:4043/sse"

  defp is_posted(text \\ "ok") do
    Phoenix.PubSub.subscribe(SSE.PubSub, "post")
    {:ok, msg} = Jason.encode(%{test: text})
    case HTTPoison.post!(@url_post, msg, @headers) do
      %HTTPoison.Response{status_code: code} ->
        code
    end
  end

  def is_broadcasted(text) do
    case is_posted(text) do
      303 ->
        receive do
          {:post, data} ->
            regexme(data)
        end
    end
  end  

  def sse_receiver(time) do
    m = []
    Task.start(fn ->
      Logger.info("starting test")
      Task.start(fn ->
        HTTPoison.get!(@url_sse, [],
          recv_timeout: :infinity,
          stream_to: self()
        )

        receiver(m)
      end)

      Process.sleep(time)
      Logger.info("end of test")
      Process.exit(pid, :kill)
    end)
  end

  defp receiver(m) do
    receive do
      %HTTPoison.AsyncChunk{chunk: chunk} ->
        data = regexme(chunk)
        m = [data["msg"] | m]
        Logger.debug(m)
        receiver(m)
    end
  end

  defp regexme(text) do
    text |> String.split("\n") |> Enum.at(1) |> String.split(" ") |> Enum.at(1) |> Jason.decode!()
  end
end
Enter fullscreen mode Exit fullscreen mode

Client code

You can quickly scaffold a React app "create-react-app" and add the tiny component described at the end which will be reactive to Server Sent Events. With CORS enabled, you may not need HTTPS end-to-end in dev mode.

you can set up a secure front-end quickly with Surge: just build the code and run surge ./build with it's CLI once it's installed. You will get an url such as https://demo.surge.sh.

HTTP2 set-up. For the back-end, we can use a reverse proxy for the TLS termination. For example, Nginx Proxy Manager or Caddy Server automate the certificates for you. Caddy automatically uses HTTP2. We can alternatively terminate the connection directly to the webserver Cowboy: we then need to add self-signed certificates to it (in dev mode). You can use mkcert or the Elixir package X509: generate self-signed certificates with mix x509.gen.selfsigned.

We used Valtio to get dynamic rendering, but we could have used useEffect as well. Use something like the component below in a React scaffold.

const { proxy, useSnapshot } from 'valtio'
const { derive } from 'valtio/utils'

const state = proxy({messages: {letter: null, post: null})
const sse = derive({
  getMsg: (get) => {
    const evtSource1 = new EventSource(process.env.REACT_APP_SSE_URL_SSE);
    evtSource1.addEventListener('message', (e) => 
      get(state.messages).letter = e.data
    );
    const evtSource2 = new EventSource(process.env.REACT_APP_SSE_URL_POST);
    evtSource2.addEventListener('message', (e) => 
      get(state.messages).post = e.data
    );
 }
})

const SSE = () => {
  const { messages: {letter, post} } = useSnapshot(state)
  return <>{letter}{" "}{post}</>
}
Enter fullscreen mode Exit fullscreen mode

Run it

We can do MIX_END=prod PORT=4000 mix run --no-halt but the idea is to use the mock as a pre-build Docker image.

Dockerfile

We ship a container to deploy the server code. We build a release - so we use a multi-stage Dockerfile - to produce a tiny image (20M) of our Elixir SSE server.

ROM  bitwalker/alpine-elixir:latest AS build
ARG NAME
ARG PORT
ENV ENV=${MIX_ENV:-prod}
WORKDIR /opt/app
RUN mix do local.hex --force, local.rebar --force
COPY mix.exs mix.lock ./
COPY config /.config
RUN mix do deps.get --only ${ENV}
COPY lib ./lib
COPY priv ./priv
COPY rel ./rel/
RUN  MIX_ENV=prod mix release ${NAME} --quiet


FROM alpine:latest AS app
ARG NAME
ENV PORT=${PORT}
WORKDIR /opt/app
RUN apk --update --no-cache add openssl ncurses-libs libstdc++ libgcc
RUN chown -R nobody: /opt/app
USER nobody
EXPOSE 4043
ENV HOME=/app
ENV NAME=${NAME}
ENV MIX_ENV=prod
COPY --from=build --chown=nobody:root /opt/app/_build/${MIX_ENV}/rel/${NAME} ./

CMD ./bin/${NAME} start
Enter fullscreen mode Exit fullscreen mode

and then build (pass the NAME) and run the image (set the mandatory env PORT)

docker build --build-arg NAME=myapp -t myapp:v1 .
docker run -it --rm --env PORT=4043 -p 443:4043 myapp:v1
Enter fullscreen mode Exit fullscreen mode

SSE libraries

https://github.com/mustafaturan/sse
https://github.com/codenoid/elixir-sse-example/blob/master/lib/sse_example_web/helpers/ticker.ex
https://github.com/CrowdHailer/server_sent_event.ex

Discussion (0)