PREVIOUSLY, we created a small proof-of-concept OCaml project to prove out how easy it is to make small but helpful utilities. That project was a simple server that forked off a new process to handle each incoming connection, using nothing but OCaml's built-in libraries. It's a simple, brute-force method, but it quickly reaches a limit in scalability.
To get around this limit, many technologies have evolved over the years. However, as OCaml's Multicore edition (version 5.00) nears its ship date, we will leapfrog past all those technologies and go back to the future–direct-style nonblocking concurrent I/O, running in parallel threads.
Eio
To enable access to all these features, an exciting new library called Eio is being developed. It uses a new paradigm of direct-style concurrent I/O programming, without the need for monads or async/await, thus avoiding the function colour problem.
Let's update our example server to use Eio, with the following goals:
- Run multi-threaded (N parallel threads where N is the number of CPUs). OCaml's threads are called 'domains' and they map 1:1 to OS threads.
- Handle each incoming request in a non-blocking fiber in each domain.
Note: to better understand the rest of this post, I highly recommend reviewing the previous proof-of-concept project.
Fibers are essentially non-blocking green threads that run in each domain. A domain can run one fiber at a time. When a fiber gets blocked waiting for I/O, it yields and lets another one run. This achieves concurrency. Meanwhile, other fibers can run in parallel in other domains. This achieves parallelism. By combining the two, we can fully saturate all our CPU cores.
Setup
Now, let's install Multicore OCaml. It hasn't officially shipped yet, so we will get the alpha version. Note, refer to the Up and Running page to install opam, the OCaml Package Manager. Then:
# 'mc' is just a short name we choose
# We need to explicitly add the opam alpha repository with higher priority than the default repo, to allow installing some multicore-only libraries:
opam switch create mc 5.0.0~alpha1 --repositories=mc=git+https://github.com/kit-ty-kate/opam-alpha-repository.git,default
eval $(opam env)
opam install dune utop eio
The project
Now, we create the statsd filter project almost exactly the same way as before:
mkdir ocaml_statsd_filter
cd ocaml_statsd_filter
Create the dune-project
file:
(lang dune 3.6)
Create the dune
file:
(executable
(name ocaml_statsd_filter)
(libraries str eio_main))
Now, we come to the first interesting changes. A couple of the configs need to be expressed slightly differently. Here's the cfg.ml
file:
let num_threads =
try int_of_string (Sys.getenv "num_threads")
with Not_found -> Domain.recommended_domain_count
let listen_port =
try int_of_string (Sys.getenv "listen_port") with Not_found -> 8125
let target_host = try Sys.getenv "target_host" with Not_found -> "localhost"
let target_port = try Sys.getenv "target_port" with Not_found -> "8126"
let blocklist =
try "blocklist"
|> Sys.getenv
|> String.split_on_char ','
|> List.map Str.regexp_string
with
Not_found -> []
We add a new num_threads
config variable to allow overriding the default, which is the 'recommended number of domains' from the standard library.
Also, the target_host
can now be just a host name as we are now using the Eio.Net.getaddrinfo_stream
function (shown below) to look up the IP address.
The rest of the configs are unchanged.
Finally, we come to the main file, ocaml_statsd_filter.ml
:
open Eio
let max_size = 8192
let listen_addr = `Tcp (Net.Ipaddr.V4.any, Cfg.listen_port)
let target_addr net =
match Net.getaddrinfo_stream net Cfg.target_host ~service:Cfg.target_port with
| [] -> invalid_arg Cfg.target_host
| addr :: _ -> addr
let allow data = Cfg.blocklist
|> List.exists (fun regexp -> Str.string_match regexp data 0)
|> not
let on_error = traceln "Connection handling error: %a" Fmt.exn
let main net new_domain =
Switch.run @@ fun sw ->
let target = Net.connect ~sw net (target_addr net) in
let listen_socket = Net.listen ~backlog:128 ~sw net listen_addr in
traceln "Listening on :%d" Cfg.listen_port;
let domain_loop () =
new_domain @@ fun () ->
let domain_id = (Domain.self () :> int) in
Switch.run @@ fun sw ->
while true do
Net.accept_fork ~sw listen_socket ~on_error @@ fun client _ ->
let buf_str = client
|> Buf_read.parse_exn ~max_size Buf_read.take_all
|> String.trim
in
if allow buf_str then begin
Flow.copy_string buf_str target;
traceln "Domain %d: sent: '%s'" domain_id buf_str
end else
traceln "Domain %d: did not send: '%s'" domain_id buf_str
done
in
let domains = List.init Cfg.num_threads (fun _ -> domain_loop) in
Fiber.all domains
let () =
Eio_main.run @@ fun env ->
main (Stdenv.net env) (Domain_manager.run @@ Stdenv.domain_mgr env)
This has quite a lot of new concepts. The first thing we notice is that the old Unix
module open is replaced with Eio
. Eio moves away from the old-style I/O which was very much modelled on Unix/C system calls, and tries to introduce new, safer abstractions for raw I/O.
Another change is in how we express network addresses: we now use the types and functions in the Eio.Net
module to construct them.
The allowlist data checking is unchanged as it uses only the regular expressions module, Str
.
We introduce an error handler to report on any exceptions that occurred while handling requests.
Finally, we come to the meat of the server. This is a main
function which takes two parameters: an Eio.Net.t
'capability', and a function which can spawn a new domain safely. Eio encourages a 'capabilities style' of programming, in which functions are passed in only sufficient permissions to do the work they need to, and nothing else. This is explained quite extensively in the excellent readme documentation.
With the capabilities given to the main
function, we can now start a new 'switch' (essentially, a resource manager that safely disposes of all open resources when their containing scope ends), then use it to connect to the target (upstream StatsD) address and also bind to the listening address on the local host.
We then define the 'domain loop' (which is my little pun on 'main loop') function. This function immediately spawns a new domain and returns. Asynchronously in the new domain, we capture the domain ID (an integer) for logging purposes, then start handling requests from our clients. The key to this is the Eio.Net.accept_fork
function, which handles each client connection in a new fiber. This is where the asynchronous I/O magic happens. In each domain we handle each request in a new fiber. So essentially, N domains * M fibers.
The other point of note here is Eio.Flow
which is an abstraction over byte streams. The actual data being shunted back and forth in requests and responses is done in terms of flows, e.g. Flow.copy_string buf_str target
. And we no longer need to think in terms of 'read N bytes from that socket, then check that N bytes were read'. Flows handle all that for us.
In fact they handle it a little too well, and a client could potentially send a gigantic request which could eat up all our memory. To prevent this, we use Eio.Buf_read.parse_exn ~max_size Buf_read.take_all
, which will take all the characters from the source flow, upto our max_size
limit–which is actually the same as before, 8192 bytes.
The max_size
parameter is explained in Eio.Buf_read.of_flow
.
Notice that we output some trace log messages to prove to ourselves that multiple domains are actually running in parallel. We will show this in action later. We spin up the domains in the final line of the main
function: Fiber.all domains
, which runs each of the domain spawns concurrently in new fibers.
The final lines are about calling the main
function with the required arguments. To get those, we need to call Eio_main.run
and pass it a callback in which we set up the arguments with the env
parameter. env
is the ultimate source of all the capabilities, and from it we extract only the exact permissions we need for our main
function–namely accessing the network, and running new domains.
Callbacks
You may have noticed a perhaps unexpected number of callbacks in this code. In most modern async I/O code today we see almost no callbacks–everyone has switched to using monadic promises/futures (async/await style). So why does Eio reintroduce a seemingly retro style?
There are two main types of callbacks:
- Callbacks to access resources which the library then automatically cleans up for us, e.g. the Eio environment, switches
- Callbacks to delay execution of code which needs to run in new domains or fibers
In my opinion, the second type of callback is most likely unavoidable. This is because we are not using any specific effect system which can delay execution and move it to different fibers/domains for us. We're using direct style and need to delay such executions ourselves. Fortunately, the type system tells us exactly where callbacks are required.
The first type of callback may go away in future iterations of OCaml and Eio, if say we standardize on a let-operator like let&
to indicate 'use a resource'. E.g., hypothetically,
let& sw = Switch.run in
...
let& env = Eio_main.run in
...
This remains to be seen however and for now a callback is the obvious way to do it. The main win here is that async I/O calls don't need callbacks–we read the client's request as a string directly, and sent it to the target address again, directly. Eio takes care of the boring details of yielding the fiber when it's blocked on I/O, and resuming when it's unblocked.
Run
Now, we need to actually run this to prove to ourselves that it works. We'll need three terminals:
Mock StatsD server on port 8126 (or optionally a real one if you prefer). I just spin up a Python HTTP server:
python3 -m http.server 8126
. It doesn't matter what type of server it is, we just need something listening on the correct port.Run our filter:
OCAMLRUNPARAM=b blocklist=foo,bar dune exec ./ocaml_statsd_filter.exe
. This will immediately throw an exception and exit if the server in (1) is not running.Finally, send a few requests to our filter:
export i=0
while [ $i -lt 50 ]; do echo "bar:$i|c" | nc localhost 8125; i=$(expr $i + 1); done
This sends about 50 requests to our filter and gives it a chance to go through multiple domains. If you check the trace logs, they look like this:
+Listening on :8125
+Domain 1: did not send: 'bar:0|c'
+Domain 2: did not send: 'bar:1|c'
+Domain 3: did not send: 'bar:2|c'
+Domain 4: did not send: 'bar:3|c'
+Domain 5: did not send: 'bar:4|c'
+Domain 6: did not send: 'bar:5|c'
+Domain 7: did not send: 'bar:6|c'
+Domain 8: did not send: 'bar:7|c'
+Domain 1: did not send: 'bar:8|c'
+Domain 2: did not send: 'bar:9|c'
+Domain 3: did not send: 'bar:10|c'
+Domain 4: did not send: 'bar:11|c'
+Domain 5: did not send: 'bar:12|c'
+Domain 6: did not send: 'bar:13|c'
+Domain 7: did not send: 'bar:14|c'
+Domain 8: did not send: 'bar:15|c'
+Domain 1: did not send: 'bar:16|c'
...
The domains are cycled through in a round-robin manner, which proves that they're all running in parallel (otherwise domains 2 through 8 would be blocked on 1, and the trace would print out only domain 1 for every request).
Now we come to the end of our little experiment. Hopefully this gives you some idea of what's coming in OCaml 5.00! For fun, if you want to compare this code to its original inspiration, the equivalent Rust project is here: https://github.com/askldjd/statsd-filter-proxy-rs/blob/main/src/server.rs
Top comments (9)
Note: instead of providing the raw bytes, you could use
Eio.Net.getaddrinfo_datagram
to parse the string. It accepts IP addresses and names.Thanks for the hint! This looks like it would give me a datagram socket address, from which I could create a datagram socket. But a datagram socket is not a two-way flow or even a sink flow, so I would have to use the
send
function to 'manually' send a cstruct instead of just usingFlow.copy_string
. Is that the intended usage?Oh, yeah, I didn't read it carefully enough! You want
Eio.Net.getaddrinfo_stream
then.Got it! Updating the post now.
I think this is memory unsafe because of the use of
Str
:). Str is full of global state and should not be used with multiple domains (or at all, really).I also wonder if
Eio.Domain_manager
would be preferable to starting domains manually? @talex5 would know. This way it's also possible to shutdown the program gracefully if needed.Hi, you mean a race condition because of multiple domains using
Str
? I think in this specific case it's safe because we are just usingStr.string_match
to get abool
(match or no match) answer, we are not trying to get the matched string or any groups in it. I agree it's almost almost advisable to use a safe regex library though.Also we are indeed using
Eio.Domain_manager.run
to start the domains instead of doing it manually, last line of the module :-)Oh I missed that
new_domain
was passed as a parameter. My bad.Is
Str.string_match
thread-safe though? I wouldn't swear it doesn't depend on C globals internally. In any case I think it's dicy to rely on it :)Fair enough!
Looks like Str has been upgraded to use domain-local storage, so I think it's multicore safe now: github.com/ocaml/ocaml/commits/tru...
(but switching fibers within a domain can still corrupt the state)