How to asynchronously download files in Elixir

I was recently cleaning up a project that mixed usages of HTTPotion and HTTPoison. There was a function for downloading bigger files asynchronously. I wanted to switch the libraries so that the whole project would use just one.

The documentation of both libraries describes how to make asynchronous requests by just presenting the messages that will arrive (HTTPoison here, and HTTPotion here). While this is enough to figure out yourself what to do, future me would love to have an example that can be simply copy-pasted and modified as needed (she’s quite lazy).

Function overview

1
2
3
4
5
6
  @doc """
  Asynchronously downloads a file and saves it under the given path.
  Returns the file's size.
  """
  @spec download(String.t(), String.t()) :: {:ok, integer()} | {:error, String.t()}
  def download(url, path)

In both libraries, asynchronous requests are started by passing the stream_to option with the receiver’s pid. The receiving process will have to handle messages of a few different types. First AsyncStatus and/or AsyncHeaders, then AsyncChunk with chunks of the actual file, and then AsyncEnd as a confirmation of completion. All messages carry the request’s id.

Because receiving messages is involved, I need to spawn a new process (eg. Task.Async). If I didn’t, I would be in trouble when I would try to use the function from within a process that already expects some other kind of messages.

Of course, once I call Task.await at the end, the whole operation is no longer asynchronous, but the HTTP call still is. This is convenient in a simple script downloading a single file, but will not scale well.

I am assuming that the server responds with a correct Content-Length header and I will check if the size of the downloaded content is equal to the one claimed by the header.

The timeout for the task needs to be big enough to receive all chunks of the file. I have chosen 5 minutes because the purpose of the function was to download big video files. You might need significantly less.

Version 1 - HTTPotion

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def download(url, path) do
  timeout = 300_000 # 5 minutes
  do_download = fn ->
    {:ok, file} = File.open(path, [:write])

    opts = [stream_to: self(), follow_redirects: true, timeout: timeout]
    %HTTPotion.AsyncResponse{id: id} = HTTPotion.get(url, opts)
    result =
      receive_data(file, %{
        received_bytes: 0,
        total_bytes: 0,
        id: id
      })

    :ok = File.close(file)

    result
  end

  do_download
  |> Task.async()
  |> Task.await(timeout + 500) # blocking a potentially very long process!
end

defp receive_data(file, %{id: id} = state) do
  receive do
    %HTTPotion.AsyncHeaders{status_code: code, headers: headers} ->
      case code do
        200 ->
          total_bytes = headers[:"Content-Length"] |> to_int
          receive_data(file, %{state | total_bytes: total_bytes})

        404 ->
          {:error, "File not found"}

        _ ->
          {:error, "Received unexpected status code #{code}"}
      end

    %HTTPotion.AsyncChunk{chunk: chunk, id: ^id} ->
      IO.binwrite(file, chunk)
      new_state = %{state |
        received_bytes: state.received_bytes + byte_size(chunk)
      }

      receive_data(file, new_state)

    %HTTPotion.AsyncEnd{id: ^id} ->
      if state.total_bytes === state.received_bytes do
        {:ok, state.received_bytes}
      else
        {:error, "Expected to receive #{state.total_bytes} bytes but got #{state.received_bytes}"}
      end
      
    %HTTPotion.AsyncTimeout{id: ^id} ->
      {:error, "Request timed out"}
  end
end

Version 2 - HTTPoison

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def download(url, path) do
  timeout = 300_000 # 5 minutes
  do_download = fn ->
    {:ok, file} = File.open(path, [:write])

    opts = [stream_to: self(), follow_redirect: true]
    {:ok, %HTTPoison.AsyncResponse{id: id}} = HTTPoison.get(url, [], opts)
    result =
      receive_data(file, %{
        received_bytes: 0,
        total_bytes: 0,
        id: id
      })

    :ok = File.close(file)

    result
  end

  do_download
  |> Task.async()
  |> Task.await(timeout) # blocking a potentially very long process!
end

defp receive_data(file, %{id: id} = state) do
  receive do
    %HTTPoison.AsyncStatus{code: code, id: id} ->
      case code do
        200 ->
          receive_data(file, %{state | id: id})

        404 ->
          {:error, "File not found"}

        _ ->
          {:error, "Received unexpected status code #{code}"}
      end

    %HTTPoison.AsyncHeaders{headers: headers} ->
      total_bytes =
        headers
        |> Enum.find(fn {name, _} -> name == "Content-Length" end)
        |> elem(1)
        |> to_int()

      receive_data(file, %{state | total_bytes: total_bytes})

    %HTTPoison.AsyncChunk{chunk: chunk, id: ^id} ->
      IO.binwrite(file, chunk)
      new_state = %{state |
        received_bytes: state.received_bytes + byte_size(chunk)
      }

      receive_data(file, new_state)

    %HTTPoison.AsyncEnd{id: ^id} ->
      if state.total_bytes === state.received_bytes do
        {:ok, state.received_bytes}
      else
        {:error, "Expected to receive #{state.total_bytes} bytes but got #{state.received_bytes}"}
      end

    %HTTPoison.Error{id: ^id, reason: {:closed, :timeout}} ->
      {:error, "Receiving a response chunk timed out"}
  end
end

Differences

1. Timeout behavior

HTTPotion has one timeout option: timeout for the whole request. Exceeding it sends a message. Its value needs to be however long it takes to download the whole file.

HTTPoison has two timeout options: timeout for establishing a connection and recv_timeout for receiving a response (just a chunk). Exceeding the first does not send a message, exceeding the second does. Keeping default values for both works well.

2. Status code and headers

HTTPotion includes those in a single message, HTTPoison splits them into two.

ExVCR warning

Recording cassettes for asynchronous requests with ExVCR currently (v0.10.3) does not work.