DEV Community

gumi TECH for gumi TECH Blog

Posted on • Edited on

MixとOTP 06: ETS

本稿はElixir公式サイトの許諾を得て「ETS」の解説にもとづき、加筆補正を加えて、ElixirにおけるETSの使い方についてご説明します。

プロセスを探すには、それらの登録先プロセスにメッセージを送らなければなりません。複数のプロセスから同時にアクセスされると、登録先プロセスがボトルネックになってしまいます。そこで、ETS(Erlang Term Storage)について学習し、これをキャッシュメカニズムとして使ってみましょう。

ETSをいきなりキャッシュに使わないようにしてください。アプリケーションのパフォーマンスを記録および分析し、どの部分がボトルネックなのかを明らかにするのが先です。そのうえで、そもそもキャッシュすべきか、何をキャッシュするかを決めなければなりません。これから述べるのは、キャッシュが必要とされた場合に、ETSをどう使うかという説明です。

ETSでキャッシュする

ETSを使うと、Elixirのどのような項目でもメモリ内のテーブルに納められます。ETSテーテブルはErlangの:etsモジュールで操作します。

ETSテーブルをつくるets.new/2には、ふたつの引数を渡します。テーブルの名前とオプションのリストです。つぎのコードはオプションとして、テーブルのタイプとアクセス権限を定めています。タイプ:setのテーブルはキーが複製できません。アクセス権限:protectedのテーブルに書き込みができるのは、テーブルをつくったプロセスだけです。他のプロセスからテーブルを読むことはできます。もっとも、このふたつはデフォルト値です。したがって、このあとはこれらを省きます。

iex> table = :ets.new(:buckets_registry, [:set, :protected])
#Reference<0.2154069976.707395585.80154>
iex> self()
#PID<0.133.0>
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.133.0>}]
Enter fullscreen mode Exit fullscreen mode

つくるETSテーブルに:named_tableのオプションを与えれば、テーブルが名前で参照できるようになります。

iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.133.0>}]
Enter fullscreen mode Exit fullscreen mode

KV.RegistryモジュールがETSテーブルを使えるように書き替えましょう。まず、登録プロセスを始めるstart_link/1に、必須の引数として名前を渡すことにします。これがETSテーブルとプロセス自身の名前になるのです。ふたつの名前はそれぞれ異なる場所に納められますので、重複の問題は生じません。lib/kv/registry.exのクライアントAPIの実装はつぎのように改めてください。

## クライアントAPI
@doc """
登録を始める。

引数のオプションには`:name`が必須。
"""
def start_link(opts) do
  # [1]GenServerのintに名前を渡す。
  server = Keyword.fetch!(opts, :name)
  # GenServer.start_link(__MODULE__, :ok, opts)
  GenServer.start_link(__MODULE__, server, opts)
end

@doc """
`server`に納められた`name`のプロセスのpidを探す。

プロセスがあれば`{:ok、pid}`を返し、見つからないときは`:error`を返す。
"""
def lookup(server, name) do
  # [2]lookupはサーバーでなく、ETSで直接行う。
  # GenServer.call(server, {:lookup, name})
  case :ets.lookup(server, name) do
    [{^name, pid}] -> {:ok, pid}
    [] -> :error
  end
end
Enter fullscreen mode Exit fullscreen mode

KV.Registry.lookup/2はリクエストをサーバーに送るのでなく、ETSテーブルから直に読み取るようにしました。ETSテーブルはすべてのプロセスで共有します。これが、これから実装するキャッシュの仕組みの基本です。

キャッシュが正しく働くようにETSテーブルは、サーバーコールバックのinit/1がデフォルトのアクセス権限:protectedで開始します。つまり、クライアントのプロセスからは読み取り専用です。KV.Registryプロセスだけが書き込みできます。read_concurrency: trueオプションは、同時読み取りの操作を最適化する定めです。

## サーバーコールバック
# def init(:ok) do
def init(table) do
  # [3]マップnamesをETSテーブルで置き替える。
  # names = %{}
  names = :ets.new(table, [:named_table, read_concurrency: true])
  refs = %{}
  {:ok, {names, refs}}
end

# [4]lookupのコールバックhandle_callは削除。
# def handle_call({:lookup, name}, _from, {names, _} = state) do
#   {:reply, Map.fetch(names, name), state}
# end

def handle_cast({:create, name}, {names, refs}) do
  # [5]マップでなくETSテーブルに読み書きする。
  # if Map.has_key?(names, name) do
  case lookup(names, name) do
    {:ok, _pid} ->
      {:noreply, {names, refs}}
  # else
    :error ->
      {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      # names = Map.put(names, name, pid)
      :ets.insert(names, {name, pid})
      {:noreply, {names, refs}}
  end
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
  # [6]マップでなくETSテーブルから削除する。
  {name, refs} = Map.pop(refs, ref)
  # names = Map.delete(names, name)
  :ets.delete(names, name)
  {:noreply, {names, refs}}
end
Enter fullscreen mode Exit fullscreen mode

登録のプロセスを起動するには、:nameオプションが求められることになりました。また、lookup/2などの操作には、PIDでなく名前を引数として渡さなければなりません。その名前でETSテーブルを参照するためです。そこで、test/kv/registry_test.exsのテストも、setup関数をつぎのように書き替えてください。

# setup do
setup context do
  # registry = start_supervised!(KV.Registry)
  _ = start_supervised!({KV.Registry, name: context.test})
  # %{registry: registry}
  %{registry: context.test}
end
Enter fullscreen mode Exit fullscreen mode

この手直しだけではテストはとおりません。たとえば、"spawns buckets"のテストは、以下のようなエラーメッセージが示されるでしょう。KV.Registry.lookup/2が期待した値を返していません。これは、つぎのふたつの間に食い違いがあるからです。

  1. ETSテーブルをキャッシュにして早まった最適化をしました。
  2. lookup/2cast/2を呼び出しています(call/2を使うのが適切です)。
  2) test spawns buckets (KV.RegistryTest)
     test/kv/registry_test.exs:
     match (=) failed
     code:  assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
     right: :error
     stacktrace:
       test/kv/registry_test.exs: (test)
Enter fullscreen mode Exit fullscreen mode

競合状態

Elixirの開発でも競合状態は起こります。けれど、デフォルトで共有はしないという抽象化がされているので、競合しているおおもとの原因がつきとめやすくなっているのです。

テストでは、ETSテーブルに加えた操作とその変化が表れるまでに時間差が生じます。テストで期待されたのは、つぎのように処理が進むことです。

  1. KV.Registry.create(registry, "shopping")を呼び出します。
  2. 登録プロセスは子プロセスをつくってキャッシュテーブルが更新されます。
  3. KV.Registry.lookup(registry, "shopping")でテーブルから情報を参照します。
  4. {:ok, bucket}が返されます。

ところが、KV.Registry.create/2cast/2の操作でした。すると、実際にテーブルに書き込まないうちに、コマンドが返ってしまいます。つまり、処理はつぎのように進んでいたのです。

  1. KV.Registry.create(registry, "shopping")を呼び出します。
  2. KV.Registry.lookup(registry, "shopping")でテーブルから情報を参照します。
  3. まだ情報がないので返るのは:errorです。
  4. 登録プロセスは子プロセスをつくってキャッシュテーブルが更新されます。

この失敗を修正する方法は、KV.Registry.create/2cast/2でなくcall/2を使って同期処理にすることです。これで、クライアントが処理を進めるのは、テーブルが変更され終えてからになります。KV.Registryモジュールのコールバックは以下のように書き替えてください。

コールバックhandle_cast/2handle_call/3に改めます。戻り値にはつくったプロセスのPIDを含めなければなりません。一般にElixirの開発には、call/2の方がcast/2よりも好まれます。レスポンスを確実に得て、処理が進められるからです。十分な必要性なくcast/2を用いるのは、早まった最適化でしょう。

def create(server, name) do
  # GenServer.cast(server, {:create, name})
  GenServer.call(server, {:create, name})
end

# def handle_cast({:create, name}, {names, refs}) do
def handle_call({:create, name}, _from, {names, refs}) do
  # [5]マップでなくETSテーブルに読み書きする。
  case lookup(names, name) do
    # {:ok, _pid} ->
    {:ok, pid} ->
      # {:noreply, {names, refs}}
      {:reply, pid, {names, refs}}
    :error ->
      {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      :ets.insert(names, {name, pid})
      # {:noreply, {names, refs}}
      {:reply, pid, {names, refs}}
  end
end
Enter fullscreen mode Exit fullscreen mode

改めてテストしてみしましょう。mix testコマンドに--traceオプションを添えて試します。このオプションは、テストがデッドロックしたり、競合状態になる場合に有効です。すべてのテストを同期的に行い(async: trueは無効)、テストの詳しい情報を示します。すると、つぎのような障害が生じるかもしれません。

$ mix test test/kv/registry_test.exs --trace

  1) test removes buckets on exit (KV.RegistryTest)
     test/kv/registry_test.exs:19
     Assertion with == failed
     code: KV.Registry.lookup(registry, "shopping") == :error
     lhs:  {:ok, #PID<0.109.0>}
     rhs:  :error
     stacktrace:
       test/kv/registry_test.exs:23
Enter fullscreen mode Exit fullscreen mode

エラーメッセージによれば、テーブルからプロセスはなくなっているはずなのに、それが残っているようです。この問題は先に解決した競合状態の逆になります。先ほどの問題は、プロセスをつくるコマンドからテーブルが更新されるまでの遅れでした。今回は、プロセスを停止したのち、テーブルから除かれるまでに遅れが生じています。つまり、これもまた競合状態です。結果として、競合による障害は起こらないかもしれません。けれど、遅れがあることは確かです。

残念ながら、handle_info/2に対応する同期処理の関数はありません。そのため、ETSテーブルの更新を、同期的な操作に置き替えるという解決はできないのです。プロセスが停止したとき、登録プロセスから:DOWNの通知が必ず送られるような方法を考えなければなりません。

簡単なのは、登録のプロセスに同期的な要求を送ることです。すると、メッセージは順番に処理されます。Agent.stop/3の呼び出しのあと送信された要求に登録プロセスが応答すれば、:DOWNメッセージが処理されたことになるでしょう。KV.Registry.create/2は同期的な要求です。つぎのふたつのテストで、Agent.stop/3が呼び出される前にダミーのプロセスをつくりましょう。これで競合状態による障害は起こりません。

test "removes buckets on exit", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  Agent.stop(bucket)
  # 登録プロセスに:DOWNメッセージを遅らせるための呼び出し
  _ = KV.Registry.create(registry, "bogus")
  assert KV.Registry.lookup(registry, "shopping") == :error
end

test "removes bucket on crash", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  # プロセスを正常でない理由で止める
  Agent.stop(bucket, :shutdown)
  # 登録プロセスに:DOWNメッセージを遅らせるための呼び出し
  _ = KV.Registry.create(registry, "bogus")
  assert KV.Registry.lookup(registry, "shopping") == :error
end
Enter fullscreen mode Exit fullscreen mode

以上が、キャッシュメカニズムとしてのETSの使い方です。テーブルのデータはどのプロセスからも読み取りでき、書き込みはひとつのプロセスが順に行いました。そして大切なのは、データが非同期に読み込まれる場合、競合状態が起こりうることに気をつけなければならないということです。

実際には、動的プロセスの登録が必要になったら、ElixirのモジュールのひとつRegistryを用いるべきです。このモジュールには、GenServer:etsを使ってここでつくったアプリケーションと似た機能が備わっています。さらに、読み書きも同時に行えるのです。40コアのマシンでも、すべてのコアにスケールできることがベンチマークされました(「Elixir v1.4 released」の「Registry」にリンクされた「Comparing registries」)。

MixとOTPもくじ

Top comments (0)