Using streams and async together in LiveView

RackNerd Leaderboard Banner

Note If you're using LiveView v1.1.5 above, please use stream_async/4. https://github.com/phoenixframework/phoenix_live_view/pull/3944

Versions LiveView v1.1.4 and below, continue to follow the blogpost (read below).

The purpose of this post is to remind myself (I always forget such features!) how to use async and streams together. There are lots of posts online regarding this topic like this one, this one, and posts from German Velasco (1, 2, 3, 4).

Here are the versions used as of this writing:

  • Phoenix v1.7.20
  • Phoenix LiveView v1.0.5
  • Elixir (asdf) v1.18.2
  • Erlang (asdf) v27.2.4
  • Faker v0.18 (for generating fake data)

For reference, here's the Github repo: https://github.com/jaeyson/async_streamer

1. setup

mix phx.new async_streamer

mix phx.gen.context Note List lists name:string is_deleted:boolean

add the fake data:

# priv/repo/seeds.exs
alias AsyncStreamer.Repo
alias AsyncStreamer.Note.List

Repo.transaction(fn ->
  lists =
    Enum.map(1..10, fn _ ->
      inserted_at = DateTime.utc_now() |> DateTime.truncate(:second)

      %{
        name: Faker.Lorem.words(2) |> Enum.join(" "),
        inserted_at: inserted_at,
        updated_at: inserted_at
      }
    end)

  Repo.insert_all(List, lists)
end)

then run migration

mix ecto.setup

2. creating the live layout

# note that "$_" will work on bash and zsh
mkdir -p lib/async_streamer_web/live/note_live && cd $_

touch index.ex index.html.heex
# in router.ex
...
  scope "/", AsyncStreamerWeb do
    pipe_through :browser

    get "/", PageController, :home
    live "/notes", NoteLive.Index
  end
...
defmodule AsyncStreamerWeb.NoteLive.Index do
  use AsyncStreamerWeb, :live_view
  alias Phoenix.LiveView.AsyncResult
  alias AsyncStreamer.Note

  @impl true
  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign(:lists, AsyncResult.loading())
      |> stream(:lists, [])
      |> start_async(:lists, fn ->
        Note.list_lists()
      end)

    {:ok, socket}
  end

  @impl true
  def handle_async(:lists, {:ok, []}, socket) do
    Process.sleep(:timer.seconds(5))

    %{lists: lists} = socket.assigns

    socket =
      socket
      |> assign(:lists, AsyncResult.failed(lists, "no data :("))

    {:noreply, socket}
  end

  @impl true
  def handle_async(:lists, {:ok, fetched_lists}, socket) do
    # added sleep to simulate delays
    # e.g. external db connection
    Process.sleep(:timer.seconds(5))

    socket =
      socket
      |> assign(:lists, AsyncResult.ok("loaded"))
      |> stream(:lists, fetched_lists, reset: true)

    {:noreply, socket}
  end

  @impl true
  def handle_async(:lists, {:exit, reason} = _async_fun_result, socket) do
    Process.sleep(:timer.seconds(5))

    %{lists: lists} = socket.assigns

    socket =
      socket
      |> assign(:lists, AsyncResult.failed(lists, reason))

    {:noreply, socket}
  end
end
<.async_result :let={_lists} assign={@lists}>
  <:loading>
    <ul class="bg-slate-500 text-white max-w-sm rounded p-4 mx-auto flex flex-col gap-2">
      <li
        :for={_ <- 1..10}
        class="flex flex-col md:flex-row items-center border border-white rounded p-2"
      >
        <div class="md:w-8">
          <div class="w-full h-6 bg-gray-200 rounded animate-pulse"></div>
        </div>
        <div class="md:w-2/3 px-2 flex flex-col gap-1">
          <div class="w-full h-1.5 bg-gray-200 rounded animate-pulse"></div>
          <div class="w-1/2 h-1.5 bg-gray-200 rounded animate-pulse"></div>
          <div class="w-3/4 h-1 bg-gray-200 rounded animate-pulse"></div>
        </div>
      </li>

    </ul>
  </:loading>
  <:failed :let={{_reason, _trace} = _failure}>
    <%!-- <div>Error: <%= Exception.message(reason) %></div> --%>
    <div>{gettext("Error fetching data")} :(</div>
  </:failed>
  <ul
    id="tests"
    phx-update="stream"
    class="bg-slate-500 text-white max-w-sm rounded p-4 mx-auto flex flex-col gap-2"
  >
    <li
      :for={{id, list} <- @streams.lists}
      class="border border-white rounded p-2 hover:bg-slate-700 cursor-pointer hover:underline hover:text-slate-200"
      id={id}
    >
      <span>{list.name}</span>
    </li>
  </ul>
</.async_result>

Then run mix phx.server and go to http://localhost:4000/notes

3. explanation

Here are the things we mostly focused on:

  • start_async with handle_async for handling the loading, failed, and success state. More control over how you handle the result than using assign_async.
  • stream: managing large collections on the client without keeping the resources on the server. Since streams accepts enumerables (e.g. lists), we needed an extra assign to know if the stream had been asynchronously loaded.
  • <.async_result> component. This helps us how to organize the load, fail, success state being displayed to users.

start_async and stream are 2 completely different things. One is for network/io bound operations (async) and the other is for efficient handling of resource (stream)

Streams: ...the Phoenix team has wanted a solution that elegantly addresses large collections of items without requiring the collection to live in memory on the server.

Async: It allows the user to get a working UI quickly while the system fetches some data in the background or talks to an external service, without blocking the render or event handling.

4. ideas

Ideally, we could have something like stream_async just to combine those 2 features. Then I found this reply from Jose Valim:

assign_async is implemented on top of start_async and the handle_async callback. So you could use those to build your own asynchronous behavior and add to the stream in the callback.

On further discussion:

Q: Or is the idea that streams and async are two semi-orthogonal ways to (among other things) handle the issue of latency, and that they shouldn’t be mixed?

A: LV streams are not related to latency at all. They’re related to memory optimizations of the server side process. They exist so you don’t need to keep (potentially long) lists of data in memory for the lifecycle of the LV process, while still being able to add/update/delete individual rows when necessary.

LV streams don’t care at all how you load data that you send to the client.