Listen very carefully, I shall say zis only once
It often so happens that multiple processes need the same, potentially expensive function to be executed. A common solution is to cache the computed value for future callers, but on a cold cache (or when a cached value expires) it could still happen that the function is executed multiple times. Such a spike in resource usage is known as the stampeding herd problem. If only the animals would proceed in an orderly fashion.
Elixir's GenServer
provides the handle_call
callback, which is guaranteed to execute synchronously. As such, if several processes make a call to a GenServer
, they would execute sequentially, and the value could even be cached for the benefit of the next caller. Problem solved.
However, anything that runs within the handle_call
or any other callback is blocking, potentially making the GenServer
appear unresponsive and turn it into a bottleneck. Ever heard of Python's GIL :)
But what if the GenServer
could dish out tasks to be executed as separate processes and keep the execution time of the callback to a minimum? To make this work, we need to cover two concepts.
handle_call
doesn't have to reply immediately
You are probably familiar with the anatomy of a typical handle_call
implementation:
def handle_call(request, from, state) do
result, new_state = calculate_state(request, state )
{:reply, result, new_state}
end
The :reply
in the tuple means that the result will be sent immediately to the caller (the from
). But handle_call
may also return {:noreply, new_state}
in which case the caller will be left hanging until it receives a reply or times out.
To send a reply to a caller after having initially returned {:noreply, new_state}
we have to use GenServer.reply/2
, which needs the from
and some kind of result. This can be called from within any callback implementation.
Task
is built on message passing
It's good advice to only call Task.async
if you are also going to await the result. Typically you would do so with Task.await
or some of the other functions available in the Task
module. But under the hood these functions rely on receiving the result from the task by means of message passing. That is, they make use of Kernel.SpecialForms.receive/1
to wait for the result to roll in.
When you call Task.async
from within a GenServer
callback, you could await it with Task.await
as usual, but for the duration of the computation of the task, the callback would be blocking, preventing the GenServer
from handling any other calls. Depending on what you want from your GenServer
this may defeat the purpose altogether.
Fortunately, within a GenServer
you can await the success or failure of a task as you would await any other message - with a handle_info
callback implementation:
def handle_info({ref, result}, state) do
handle_task_success(ref, result, state)
end
def handle_info({:DOWN, ref, _, _, reason}, state) do
handle_task_failure(ref, reason, state)
end
Make sure to guard that ref
is a reference, otherwise you might catch unintended calls.
A solution
So here is a broad overview of a solution to the stampeding herd problem. Processes that are interested in the results of a function, can call a GenServer
to provide them with the result. The GenServer
returns the result if it has it, or spawns a Task
to compute it and keeps track of the caller, or if it has spawned a Task
in the past, it doesn't do so again, but still keeps track of the caller. On receiving the response, it notifies all the callers using GenServer.reply
.
Here is the whole thing together:
defmodule GenHerder do
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, [], Keyword.put(opts, :name, __MODULE__))
end
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def handle_call(request, from, state) do
case state[request] do
nil ->
task =
Task.Supervisor.async_nolink(__MODULE__.TaskSupervisor, fn ->
handle_request(request)
end)
{:noreply, Map.put(state, request, {:task, task, [from]})}
{:task, task, froms} ->
{:noreply, Map.put(state, request, {:task, task, [from | froms]})}
{:result, result} ->
{:reply, result, state}
end
end
defp handle_request(_request) do
:only_once_kenobi
end
@impl true
def handle_info({ref, result}, state) when is_reference(ref) do
handle_task_success(ref, result, state)
end
@impl true
def handle_info({:DOWN, ref, _, _, reason}, state) do
handle_task_failure(ref, reason, state)
end
defp handle_task_success(ref, result, state) do
# The task succeeded so we can cancel the monitoring and discard the DOWN message
Process.demonitor(ref, [:flush])
{request, _task_and_froms} =
Enum.find(state, fn
{_request, {:task, task, _forms}} -> task.ref == ref
_ -> false
end)
{{:task, _task, froms}, state} = Map.pop(state, request)
state = Map.put(state, request, {:result, result})
# Send the result to everyone that asked for it
for from <- froms do
GenServer.reply(from, result)
end
{:noreply, state}
end
defp handle_task_failure(ref, reason, state) do
{request, _task_and_froms} =
Enum.find(state, fn
{_request, {:task, task, _forms}} -> task.ref == ref
_ -> false
end)
{{:task, _task, froms}, state} = Map.pop(state, request)
# Send the result to everyone that asked for it
for from <- froms do
GenServer.reply(from, {:error, reason})
end
{:noreply, state}
end
end
You have to start it with something like this in a supervision tree:
children = [
{Task.Supervisor, name: GenHerder.TaskSupervisor},
GenHerder
]
Supervisor.start_link(children, Keyword.put(opts, :strategy, :one_for_one))
And then you can call it with:
GenServer.call(GenHerder, request, timeout)
Obviously, you can add nice wrappers around it. But a simpler way might be to use my package on hexpm. Call mix hex.info gen_herder
for more info.
It implements the above with some sugar and also adds optional expiry of results.
Top comments (0)