Using streams and async together in LiveView

RackNerd Leaderboard Banner

The purpose of this post is to remind myself (I always forget such features!) how to use async and streams together. There are lots of posts online regarding this topic like this one, this one, and posts from German Velasco (1, 2, 3, 4).

Here are the versions used as of this writing:

  • Phoenix v1.7.20
  • Phoenix LiveView v1.0.5
  • Elixir (asdf) v1.18.2
  • Erlang (asdf) v27.2.4
  • Faker v0.18 (for generating fake data)

For reference, here's the Github repo: https://github.com/jaeyson/async_streamer

1. setup

mix phx.new async_streamer

mix phx.gen.context Note List lists name:string is_deleted:boolean

add the fake data:

# priv/repo/seeds.exs
alias AsyncStreamer.Repo
alias AsyncStreamer.Note.List

Repo.transaction(fn ->
  lists =
    Enum.map(1..10, fn _ ->
      inserted_at = DateTime.utc_now() |> DateTime.truncate(:second)

      %{
        name: Faker.Lorem.words(2) |> Enum.join(" "),
        inserted_at: inserted_at,
        updated_at: inserted_at
      }
    end)

  Repo.insert_all(List, lists)
end)

then run migration

mix ecto.setup

2. creating the live layout

# note that "$_" will work on bash and zsh
mkdir -p lib/async_streamer_web/live/note_live && cd $_

touch index.ex index.html.heex
# in router.ex
...
  scope "/", AsyncStreamerWeb do
    pipe_through :browser

    get "/", PageController, :home
    live "/notes", NoteLive.Index
  end
...
defmodule AsyncStreamerWeb.NoteLive.Index do
  use AsyncStreamerWeb, :live_view
  alias Phoenix.LiveView.AsyncResult
  alias AsyncStreamer.Note

  @impl true
  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign(:lists, AsyncResult.loading())
      |> stream(:lists, [])
      |> start_async(:lists, fn ->
        Note.list_lists()
      end)

    {:ok, socket}
  end

  @impl true
  def handle_async(:lists, {:ok, []}, socket) do
    Process.sleep(:timer.seconds(5))

    %{lists: lists} = socket.assigns

    socket =
      socket
      |> assign(:lists, AsyncResult.failed(lists, "no data :("))

    {:noreply, socket}
  end

  @impl true
  def handle_async(:lists, {:ok, fetched_lists}, socket) do
    # added sleep to simulate delays
    # e.g. external db connection
    Process.sleep(:timer.seconds(5))

    socket =
      socket
      |> assign(:lists, AsyncResult.ok("loaded"))
      |> stream(:lists, fetched_lists, reset: true)

    {:noreply, socket}
  end

  @impl true
  def handle_async(:lists, {:exit, reason} = _async_fun_result, socket) do
    Process.sleep(:timer.seconds(5))

    %{lists: lists} = socket.assigns

    socket =
      socket
      |> assign(:lists, AsyncResult.failed(lists, reason))

    {:noreply, socket}
  end
end
<.async_result :let={_lists} assign={@lists}>
  <:loading>
    <ul class="bg-slate-500 text-white max-w-sm rounded p-4 mx-auto flex flex-col gap-2">
      <li
        :for={_ <- 1..10}
        class="flex flex-col md:flex-row items-center border border-white rounded p-2"
      >
        <div class="md:w-8">
          <div class="w-full h-6 bg-gray-200 rounded animate-pulse"></div>
        </div>
        <div class="md:w-2/3 px-2 flex flex-col gap-1">
          <div class="w-full h-1.5 bg-gray-200 rounded animate-pulse"></div>
          <div class="w-1/2 h-1.5 bg-gray-200 rounded animate-pulse"></div>
          <div class="w-3/4 h-1 bg-gray-200 rounded animate-pulse"></div>
        </div>
      </li>

    </ul>
  </:loading>
  <:failed :let={{_reason, _trace} = _failure}>
    <%!-- <div>Error: <%= Exception.message(reason) %></div> --%>
    <div>{gettext("Error fetching data")} :(</div>
  </:failed>
  <ul
    id="tests"
    phx-update="stream"
    class="bg-slate-500 text-white max-w-sm rounded p-4 mx-auto flex flex-col gap-2"
  >
    <li
      :for={{id, list} <- @streams.lists}
      class="border border-white rounded p-2 hover:bg-slate-700 cursor-pointer hover:underline hover:text-slate-200"
      id={id}
    >
      <span>{list.name}</span>
    </li>
  </ul>
</.async_result>

Then run mix phx.server and go to http://localhost:4000/notes

3. explanation

Here are the things we mostly focused on:

  • start_async with handle_async for handling the loading, failed, and success state. More control over how you handle the result than using assign_async.
  • stream: managing large collections on the client without keeping the resources on the server. Since streams accepts enumerables (e.g. lists), we needed an extra assign to know if the stream had been asynchronously loaded.
  • <.async_result> component. This helps us how to organize the load, fail, success state being displayed to users.

4. ideas

Ideally, we could have something like stream_async just to combine those 2 features. Then I found this reply from Jose Valim:

assign_async is implemented on top of start_async and the handle_async callback. So you could use those to build your own asynchronous behavior and add to the stream in the callback.

On further discussion:

Q: Or is the idea that streams and async are two semi-orthogonal ways to (among other things) handle the issue of latency, and that they shouldn’t be mixed?

A: LV streams are not related to latency at all. They’re related to memory optimizations of the server side process. They exist so you don’t need to keep (potentially long) lists of data in memory for the lifecycle of the LV process, while still being able to add/update/delete individual rows when necessary.

LV streams don’t care at all how you load data that you send to the client.