DEV Community

aziz abdullaev
aziz abdullaev

Posted on • Updated on

AI powered app (with open-source LLMs like Llama) with Elixir, Phoenix, LiveView, and TogetherAI

Ever wanted to raise millions of $ by adding "AI" to your company name; and you happen to want to code it using Elixir, Phoenix, LiveView? This is the tutorial on making API calls to model providers like Together AI; Wait, not just calls, but HTTP streams to receive the data in chunks.

Image description

Big picture for any stack:

  1. Get prompt from the user
  2. Send the prompt to the TogetherAI through API call
  3. Handle incoming chunks of LLM output

TLDR:

In Elixir world, we are going to have two processes, one for liveview and another process that will handle HTTP call with streams. LiveView will send the prompt and its pid (process id) to the handler, that in turn will spawn a separate process that will make HTTP call and send the chunks of LLM output to the LiveView as the chunks arrive. When the last chunk arrives, we then notify the LiveView that the text generation has finished.

Setup

Generate new Phoenix project by running

mix phx.new phoenix_playground

cd ./phoenix_playground
# you will need to create running PostgreSQL db. Alternatively, just use --sqlite flag when generating phoenix project 
mix ecto.create
iex -S mix phx.server
Enter fullscreen mode Exit fullscreen mode

Go to mix.exs and add Req to dependencies:

{:req, "~> 0.5.0"}

Implementation

Let's start with the LiveView. Here is the scaffold of the LiveView with a form that will handle input and on submission, set loading state to true and send the prompt to the TogetherAI.

defmodule PhoenixPlaygroundWeb.HomeLive do
  alias PhoenixPlayground.TogetherAi
  use PhoenixPlaygroundWeb, :live_view

  def mount(_params, _session, socket) do
    socket =
      socket |> assign(:loading, false) |> assign(:text, "")

    {:ok, socket}
  end

  def render(assigns) do
    ~H"""
    <div class="mx-auto max-w-3xl my-20">
      <div class="flex items-start space-x-4">
        <div class="min-w-0 flex-1">
          <form phx-change="validate" phx-submit="submit" id="prompt-form" class="relative">
            <div class="">
              <label for="prompt" class="sr-only">Add your prompt</label>
              <input
                type="text"
                name="prompt"
                id="prompt"
                class="block w-full "
                placeholder="Let me know what you want to achieve"
              />
            </div>

            <div class="flex-shrink-0">
              <button
                :if={not @loading}
                type="submit"
                class="tailwind-goes-here"
              >
                Send
              </button>
              <button
                :if={@loading}
                type="submit"
                class="tailwind-goes-here"
              >
                Loading...
              </button>
            </div>
          </form>
        </div>
      </div>

      <p :if={not is_nil(@text)}><%= @text %></p>
    </div>
    """
    
    def handle_event("validate", _, socket) do
        {:noreply, socket}
    end

    def handle_event("submit", %{"prompt" => prompt}, socket) do
        # TODO: submit our prompt to the Together AI (or OpenAI or Mistral)
        TogetherAI.stream_completion(prompt, self())

        socket =
          socket |> update(:loading, &toggle_loading/1)

        {:noreply, socket}
    end

  end
Enter fullscreen mode Exit fullscreen mode

Now, add this LiveView to router.ex:

live "/home", HomeLive

You will need to register at TogetherAI and get your API key to access the platform. Upon registration, you will receive $5 in credits which is more than enough for this tutorial. Assuming you have your API key, go to the paltform, then pick a LLM model you would like to use.

Let's start writing TogetherAI module.

defmodule PhoenixPlayground.TogetherAi do
  def stream_completion(prompt, pid) when is_binary(prompt) do
    url = "https://api.together.xyz/v1/chat/completions"

    body = %{
      model: "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo", # your model id
      messages: [
        # some system prompts to help fine tune the output. Write whatever you want here or just ignore it
        %{
          role: "system",
          content:
            "Your response must be in a format of bullet list with actionable items, like a todo-list. Your response must not start with 'Here is...'. Give only 2 actionable items, no more. Do not add any additional comments after actionable items. Give brief response."
        },
        # here, we are including the prompt by user
        %{role: "user", content: prompt}
      ],
      stream: true,
      max_tokens: 512,
      temperature: 0.7,
      top_p: 0.7,
      top_k: 50,
      stop: "[DONE]"
    }

    Task.async(fn ->
      Req.post(
        url,
        json: body,
        auth: {:bearer, "<your-api-key>"},
        into: fn {:data, _data} = data, {_req, _resp} = req_resp ->
          handle_stream(data, req_resp, pid)
        end
      )
    end)
  end
end
Enter fullscreen mode Exit fullscreen mode

Here, we are constructing the request configs as needed, then creating a separate process with Task.async() where we make a post request to the platform and specify the function that will be handling the streams.

Req does a lot of heavy lifing for us when it comes to HTTP request and streams. We will just need to write handle_stream(). More on it here https://hexdocs.pm/req/Req.Request.html.

Here is the implementation for handle_stream() given that succes is successful with 200 status code.

  defp handle_stream(
         {:data, data},
         {req = %Req.Request{}, resp = %Req.Response{status: 200}},
         pid
       ) do

    # string manipulations
    decoded =
      data
      |> String.split("data: ")
      |> Enum.map(fn str ->
        str
        |> String.trim()
        |> decode_body()
      end)
      |> Enum.filter(fn d -> d != :ok end)

    case handle_response(decoded) do
      # LLM finished generating, so we are informing the LiveView about it
      {text, :finish} ->
        send(pid, {__MODULE__, "last_chunk", text})

      # LLM generated text, so we are sending it to the LiveView process
      generated_text ->
        send(pid, {__MODULE__, "chunk", generated_text})
    end

    {:cont, {req, resp}}
  end


  defp handle_response(decoded) when is_map(decoded) do
    decoded
    |> Map.get("choices")
    |> List.first()
    |> Map.get("delta")
    |> Map.get("content")
  end

  defp handle_response(decoded) when is_list(decoded) do
    result =
      Enum.reduce(decoded, "", fn choices_map, acc ->
        case choices_map do
          :finish ->
            {acc, :finish}

          map ->
            acc <> handle_response(map)
        end
      end)

    result
  end

  defp decode_body(""), do: :ok
  defp decode_body("[DONE]"), do: :finish
  defp decode_body(json), do: Jason.decode!(json)
Enter fullscreen mode Exit fullscreen mode

After receiving a response chunk, we need to do couple of string manipulations before decoding it with Jason. Note that decode_body() function returns :ok on empty string, and :finish when we reach the stop string (in our case it is "[DONE]".

After we decode the response, we need to extract the text generated by LLM. Here is the sample response decoded from LLM:

  %{
    "choices" => [
      %{
        "delta" => %{
          "content" => "•",
          "role" => "assistant",
          "token_id" => 6806,
          "tool_calls" => nil
        },
        "finish_reason" => nil,
        "index" => 0,
        "logprobs" => nil,
        "seed" => nil,
        "text" => "•"
      }
    ],
    "created" => 1721912534,
    "id" => "some-id",
    "model" => "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
    "object" => "chat.completion.chunk",
    "usage" => nil
  }
Enter fullscreen mode Exit fullscreen mode

Streamed LLM output might contain a single world or multiple words. When it contains a single world, we handle it as a map. When it contains many words in one response chunk, we need to extract the words together and put them together. That's what is going on in this function:

  defp handle_response(decoded) when is_list(decoded) do
    result =
      Enum.reduce(decoded, "", fn choices_map, acc ->
        case choices_map do
          :finish ->
            {acc, :finish}

          map ->
            acc <> handle_response(map)
        end
      end)

    result
  end
Enter fullscreen mode Exit fullscreen mode

Here, we are looping through multiple outputs and extracting the text by recursively calling handle_response(map).

Output with multiple chunks of text will keep arriving. A list of chunks may arrive where the last statement will be "[DONE]" indicating the end of LLM output. When the last message arrives, we will decode it to :finish which we will send to the LiveView.

When we reach the end of output, we are doing to receive the :finish atom, but we are also going to receive the chunk as below, with usage that indicates how many tokens were used, and finish_reason with the finish reason instead of nil. The token usage and finish reasons are not to be ignored when you build your SAAS. Remember, this is the part where you multiple token cost by multiple X to make profit :)

  %{
    "choices" => [
      %{
        "delta" => %{
          "content" => " measurements",
          "role" => "assistant",
          "token_id" => 22323,
          "tool_calls" => nil
        },
        "finish_reason" => "length",
        "index" => 0,
        "logprobs" => nil,
        "seed" => 16606951688656440000,
        "text" => " measurements"
      }
    ],
    "created" => 1721827856,
    "id" => "8a8dcdd7e2d74-ARN",
    "model" => "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
    "object" => "chat.completion.chunk",
    "usage" => %{
      "completion_tokens" => 512,
      "prompt_tokens" => 93,
      "total_tokens" => 605
    }
  }
Enter fullscreen mode Exit fullscreen mode

Now, let's handle the responses on the LiveView. There are two function that we will handle the incoming message from another process. One handles the LLM chunk(s) and other handles finishing chunk(s). As we receive the text, we are going to append it with the text we have received before.

When we receive the last chunk, then we are going to show the flash with "Finished generating".

  def handle_info({PhoenixPlayground.TogetherAi, "chunk", text}, socket) when is_binary(text) do
    socket =
      socket |> update(:text, &(&1 <> text))

    {:noreply, socket}
  end

  def handle_info({PhoenixPlayground.TogetherAi, "last_chunk", text}, socket)
      when is_binary(text) do
    socket =
      socket
      |> update(:text, &(&1 <> text))
      |> update(:loading, &toggle_loading/1)
      |> put_flash(:info, "Finished generating")

    {:noreply, socket}
  end
Enter fullscreen mode Exit fullscreen mode

There is still a couple of more things we need to consider. We need to add error handling as well as message handler for Task.

Let's assume that the API call to TogetherAI will not succeed. In that case, we will need to add another handle_stream() function that will receive the response with error code. For the sake of simplicity, I put the handle_stream that receives the success 200 first, so it will be called first, if does not pattern match (e.g. different status code), it will go to this handle_stream where we assume it will be response with error and error message.

  defp handle_stream({:data, data}, {req = %Req.Request{}, resp = %Req.Response{status: _}}, pid) do
    error_msg = data |> Jason.decode!() |> Map.get("error") |> Map.get("message")
    send(pid, {__MODULE__, :error, error_msg})
    {:cont, {req, resp}}
  end
Enter fullscreen mode Exit fullscreen mode

Remeber that we are calling spawning new process with Task.async()? We will need to handle other messages that the Task sends to its parent. Please refer to officials docs on Task.async.

 def handle_info({PhoenixPlayground.TogetherAi, :error, error_msg}, socket) do
    socket = socket |> put_flash(:error, error_msg)
    {:noreply, socket}
  end

  def handle_info(_msg, socket) do
    # message that come here unhandled are:
    # 1. {:DOWN, _ref, :process, _pid, :normal}
    # 2. {_ref, {:ok, response = %Req.Response{}}}
    # TODO: other Task.async responses

    {:noreply, socket}
  end
Enter fullscreen mode Exit fullscreen mode

Now, run iex -S mix phx.server go to /home and enjoy!

Here is the repo for the project. https://github.com/azyzz228/elixir-llm-tutorial

Discussion:

more error handling should be added?
Handle the last chunk with Token usage info and finish_reason better
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
jorinvo profile image
jorin

This is awesome! Thanks for sharing 🙌