Context
We want the server to download data, ie trigger an HTTP get
request to some endpoint. The response should be a stream, and we want to append the chunks into a file. We also want to display the download progress.
We will use the Req library, a superset of Finch, which is itself a superset of Mint.
A few reasons for Req
:
- you can stream the body response,
- it takes care of reconnection when the connection fails or timeouts (not tested here).
- it takes care of redirections.
[Front-end note]: an HTTP request made from the browser uses the
fetch
API which can consume a fetch as a stream. To get a download progress, you can usefetch
and set a ReadableStream on the response body (res.body.getReader()
). Check this or that. For an upload progress, you need to use XMLHttpRequest. This is the reason why Liveview used it in the uploader snippet.
Code
The code below is "livebookable" đ or can be run in an iex
session.
========================================================
HTTP streaming with progress using Finch and Req
Mix.install([
{:finch, "~> 0.18.0"},
{:req, "~> 0.4.14"}
])
Finch.start_link(name: ExStream.Finch)
Test endpoints
We use two endpoints for testing. The last one includes a redirection.
vid = "https://sample-videos.com/video321/mp4/720/big_buck_bunny_720p_1mb.mp4"
img = "https://source.unsplash.com/QT-l619id6w"
Stream & progress with Req
We start with the Req
library.
We start with a download with streams into a file. The code is pretty compact: we pass a callback in the :into
option. This callback returns a File.stream
to collect the chunks.
defmodule ReqWriteStream do
def download(url, file_path) do
Req.get!(url, raw: true, into: File.stream!(file_path, [:write]))
end
end
We test it:
ReqWriteStream.download(img, "image2.jpg")
We now write a module that displays the progress. We grab the "content-length" with a HEAD request. The body is again streamed via a callback declared with the :into
option. This callback writes the chunks into a file, and stores the progress state into the :private
key of the struct %Req.Response{}
.
defmodule ReqProgressStream do
def download(url, file_path) do
[size] = Map.get(Req.head!(url: url).headers, "content-length", ["0"])
size = String.to_integer(size)
file_pid = File.open!(file_path, [:write, :binary])
func = fn {:data, data}, {req, res} ->
IO.binwrite(file_pid, data)
chunk_size = byte_size(data)
res = Req.Response.update_private(res, :progress, chunk_size, &(&1 + chunk_size))
if size>0, do:
{Req.Response.get_private(res, :progress) * 100 / size, chunk_size, size} |> dbg()
{:cont, {req, res}}
end
Req.get!(url: url, raw: true, into: func)
File.close(file_pid)
end
end
We test concurrent HTTP calls with Task.async_stream
because we use the same function with different arguments.
Task.async_stream(
[[vid, "video.mp4"], [img, "image.jpg"]],
&apply(ReqProgressStream, :download, &1),
timeout: :infinity
)
|> Stream.run()
Stream & progress with Finch
We continue with the Finch
library. The code below is adapted from the example coming with stream_while.
We expose a function that takeq an URL and a path. As such, it may be fragile with regards to connection errors.
defmodule FinchStream do
def download(url, file_path) do
IO.puts("Starting to process #{inspect(file_path)}...........")
# Open a file to which binary chunks will be appended to.
# this process is reset in case of redirection
file_pid = File.open!(file_path, [:write, :binary])
unless is_pid(file_pid), do: raise("File creation problem on disk")
# the HTTP stream request
Finch.build(:get, url)
|> Finch.stream_while(ExStream.Finch, nil, fn
# we put the status in the "acc" to handle redirections
{:status, status}, _acc ->
{:cont, status}
# - when we receive 302, we put the "location" header in the "acc"
# - when we receive a 200, we put the "content-length" and the file name in the "acc",
{:headers, headers}, acc ->
handle_headers(headers, acc)
# when we receive the "location" tuple, we recurse
# otherwise, we write the chunk into the file and print out the current progress.
{:data, data}, acc ->
handle_data(data, acc, file_path, file_pid)
end)
case File.close(file_pid) do
:ok ->
{:halt, {file_path, :done}}
{:error, _reason} ->
{:halt, :error}
end
end
def handle_headers(headers, status) when status in [301, 302, 303, 307, 308] do
IO.puts("REDIR: #{status}")
{:cont, Enum.find(headers, &(elem(&1, 0) == "location"))}
end
def handle_headers(headers, 200) do
{"content-length", size} =
Enum.find(headers, &(elem(&1, 0) == "content-length"))
case size do
nil ->
{:cont, {0, 0}}
size ->
{:cont, {0, String.to_integer(size)}}
end
end
def handle_headers(_, status) do
dbg(status)
{:halt, :bad_status}
end
def handle_data(_data, {"location", location}, file_path, file_pid) do
if Process.alive?(file_pid), do:
:ok = File.close(file_pid)
# recursion
download(location, file_path)
end
def handle_data(data, {processed, size}, file_path, file_pid) do
case IO.binwrite(file_pid, data) do
:ok ->
processed =
if is_integer(size) and size > 0 do
(processed + byte_size(data))
|> tap(fn processed ->
IO.inspect(Float.round(processed * 100 / size, 1),
label: "Processed #{inspect(file_path)} %: "
)
end)
else
processed + byte_size(data)
end
{:cont, {processed, size}}
{:error, reason} ->
{:error, reason}
end
end
end
We test this:
Task.async_stream(
[[vid, "video.mp4"], [img, "image.jpg"]],
&apply(FinchStream, :download, &1),
timeout: :infinity
)
|> Stream.run()
Top comments (0)