Using streams and async together in LiveView

The purpose of this post is to remind myself (I always forget such features!) how to use async and streams together. There are lots of posts online regarding this topic like this one, this one, and posts from German Velasco (1, 2, 3, 4).
Here are the versions used as of this writing:
- Phoenix v1.7.20
- Phoenix LiveView v1.0.5
- Elixir (asdf) v1.18.2
- Erlang (asdf) v27.2.4
- Faker v0.18 (for generating fake data)
For reference, here's the Github repo: https://github.com/jaeyson/async_streamer
1. setup
mix phx.new async_streamer
mix phx.gen.context Note List lists name:string is_deleted:boolean
add the fake data:
# priv/repo/seeds.exs
alias AsyncStreamer.Repo
alias AsyncStreamer.Note.List
Repo.transaction(fn ->
lists =
Enum.map(1..10, fn _ ->
inserted_at = DateTime.utc_now() |> DateTime.truncate(:second)
%{
name: Faker.Lorem.words(2) |> Enum.join(" "),
inserted_at: inserted_at,
updated_at: inserted_at
}
end)
Repo.insert_all(List, lists)
end)
then run migration
mix ecto.setup
2. creating the live layout
# note that "$_" will work on bash and zsh
mkdir -p lib/async_streamer_web/live/note_live && cd $_
touch index.ex index.html.heex
# in router.ex
...
scope "/", AsyncStreamerWeb do
pipe_through :browser
get "/", PageController, :home
live "/notes", NoteLive.Index
end
...
defmodule AsyncStreamerWeb.NoteLive.Index do
use AsyncStreamerWeb, :live_view
alias Phoenix.LiveView.AsyncResult
alias AsyncStreamer.Note
@impl true
def mount(_params, _session, socket) do
socket =
socket
|> assign(:lists, AsyncResult.loading())
|> stream(:lists, [])
|> start_async(:lists, fn ->
Note.list_lists()
end)
{:ok, socket}
end
@impl true
def handle_async(:lists, {:ok, []}, socket) do
Process.sleep(:timer.seconds(5))
%{lists: lists} = socket.assigns
socket =
socket
|> assign(:lists, AsyncResult.failed(lists, "no data :("))
{:noreply, socket}
end
@impl true
def handle_async(:lists, {:ok, fetched_lists}, socket) do
# added sleep to simulate delays
# e.g. external db connection
Process.sleep(:timer.seconds(5))
socket =
socket
|> assign(:lists, AsyncResult.ok("loaded"))
|> stream(:lists, fetched_lists, reset: true)
{:noreply, socket}
end
@impl true
def handle_async(:lists, {:exit, reason} = _async_fun_result, socket) do
Process.sleep(:timer.seconds(5))
%{lists: lists} = socket.assigns
socket =
socket
|> assign(:lists, AsyncResult.failed(lists, reason))
{:noreply, socket}
end
end
<.async_result :let={_lists} assign={@lists}>
<:loading>
<ul class="bg-slate-500 text-white max-w-sm rounded p-4 mx-auto flex flex-col gap-2">
<li
:for={_ <- 1..10}
class="flex flex-col md:flex-row items-center border border-white rounded p-2"
>
<div class="md:w-8">
<div class="w-full h-6 bg-gray-200 rounded animate-pulse"></div>
</div>
<div class="md:w-2/3 px-2 flex flex-col gap-1">
<div class="w-full h-1.5 bg-gray-200 rounded animate-pulse"></div>
<div class="w-1/2 h-1.5 bg-gray-200 rounded animate-pulse"></div>
<div class="w-3/4 h-1 bg-gray-200 rounded animate-pulse"></div>
</div>
</li>
</ul>
</:loading>
<:failed :let={{_reason, _trace} = _failure}>
<%!-- <div>Error: <%= Exception.message(reason) %></div> --%>
<div>{gettext("Error fetching data")} :(</div>
</:failed>
<ul
id="tests"
phx-update="stream"
class="bg-slate-500 text-white max-w-sm rounded p-4 mx-auto flex flex-col gap-2"
>
<li
:for={{id, list} <- @streams.lists}
class="border border-white rounded p-2 hover:bg-slate-700 cursor-pointer hover:underline hover:text-slate-200"
id={id}
>
<span>{list.name}</span>
</li>
</ul>
</.async_result>
Then run mix phx.server
and go to http://localhost:4000/notes
3. explanation
Here are the things we mostly focused on:
start_async
withhandle_async
for handling the loading, failed, and success state. More control over how you handle the result than usingassign_async
.stream
: managing large collections on the client without keeping the resources on the server. Since streams accepts enumerables (e.g. lists), we needed an extra assign to know if the stream had been asynchronously loaded.<.async_result>
component. This helps us how to organize the load, fail, success state being displayed to users.
4. ideas
Ideally, we could have something like stream_async
just to combine those 2 features. Then I found this reply from Jose Valim:
assign_async is implemented on top of start_async and the handle_async callback. So you could use those to build your own asynchronous behavior and add to the stream in the callback.
On further discussion:
Q: Or is the idea that streams and async are two semi-orthogonal ways to (among other things) handle the issue of latency, and that they shouldn’t be mixed?
A: LV streams are not related to latency at all. They’re related to memory optimizations of the server side process. They exist so you don’t need to keep (potentially long) lists of data in memory for the lifecycle of the LV process, while still being able to add/update/delete individual rows when necessary.
LV streams don’t care at all how you load data that you send to the client.