DEV Community

Parth Patil
Parth Patil

Posted on • Edited on

Data processing with Elixir (Part 1)

I wanted to see how fast is the Elixir flow library when processing non trivial amount of data. I downloaded part of GH Events dataset from ClickHouse's website. This dataset has roughly 300 million rows of Github events. It's in TSV format. Uncompressed it comes out to be 78 GBs of data. I have split the file in 10 parts so we can benchmark processing this dataset in parallel.

The schema of this events in the ClickHouse table format looks as follows

CREATE TABLE github_events
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,
                    'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8,
                    'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11,
                    'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15,
                    'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19,
                    'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9,
                'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    body String,
    path String,
    position Int32,
    line Int32,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    labels Array(LowCardinality(String)),
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    locked UInt8,
    assignee LowCardinality(String),
    assignees Array(LowCardinality(String)),
    comments UInt32,
    author_association Enum('NONE' = 0, 'CONTRIBUTOR' = 1, 'OWNER' = 2, 'COLLABORATOR' = 3, 'MEMBER' = 4, 'MANNEQUIN' = 5),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    requested_reviewers Array(LowCardinality(String)),
    requested_teams Array(LowCardinality(String)),
    head_ref LowCardinality(String),
    head_sha String,
    base_ref LowCardinality(String),
    base_sha String,
    merged UInt8,
    mergeable UInt8,
    rebaseable UInt8,
    mergeable_state Enum('unknown' = 0, 'dirty' = 1, 'clean' = 2, 'unstable' = 3, 'draft' = 4),
    merged_by LowCardinality(String),
    review_comments UInt32,
    maintainer_can_modify UInt8,
    commits UInt32,
    additions UInt32,
    deletions UInt32,
    changed_files UInt32,
    diff_hunk String,
    original_position UInt32,
    commit_id String,
    original_commit_id String,
    push_size UInt32,
    push_distinct_size UInt32,
    member_login LowCardinality(String),
    release_tag_name String,
    release_name String,
    review_state Enum('none' = 0, 'approved' = 1, 'changes_requested' = 2, 'commented' = 3, 'dismissed' = 4, 'pending' = 5)
)
ENGINE = MergeTree
ORDER BY (event_type, repo_name, created_at)
Enter fullscreen mode Exit fullscreen mode

I am doing this benchmark locally on my Macbook M1 Pro with 10 cores and 16 GB ram.

On this dataset I wanted to do a few queries as part of benchmarking. Following are the ones I picked

  1. Frequency count of the event_type
  2. Top repository
  3. Top github user

Firstly I wanted to set a baseline on how fast can I process this data on my laptop using any modern stack and with minimal effort on my end ;) I chose ClickHouse Local Its an amazing tool to process files locally on your laptop without having to even start a server. It automatically uses all the available cores on the host machine.

I executed the 1st query via ClickHouse Local by running the following

~/tools/ch_may_23/clickhouse local -q "select c2, count(*) as cnt from file('github_events_a*', TabSeparated) group by c2"
Enter fullscreen mode Exit fullscreen mode

The query was consistently using around 5.7 cpu cores out of 10 and finished in 85 secs.

As you see ClickHouse Local allows you to directly query a bunch of files by giving a glob pattern and specifying SQL query on this data. Each column in the input data gets automatically named as c1, c2, etc...

Now let's see how fast we can process this data in Elixir. If we were doing a single core processing we would use the Stream module's functions. But that would take too long to process 300m and 78GB of data so I am not going to bother benchmarking it. Let's jump to the next best thing we can do in Elixir. I am going to use Task.async_stream to process the 10 files in parallel. This is how the code looks

tab_char = :binary.compile_pattern("\t") # BINARY

streams =
  for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
    full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
    File.stream!(full_path, [{:read_ahead, 100_000}])
  end

streams
|> Task.async_stream(fn stream ->
  stream
  |> Stream.map(fn line ->
    line |> String.split(tab_char) |> Enum.at(1)
  end)
  |> Enum.reduce(%{}, fn event_type, acc ->
    Map.update(acc, event_type, 1, &(&1 + 1))
  end)
end, max_concurrency: 10, timeout: :infinity)
|> Enum.to_list()
|> Enum.flat_map(fn {_, mp} -> Map.to_list(mp) end)
|> Enum.reduce(%{}, fn {etype, count}, acc ->
  Map.update(acc, etype, count, &(&1 + count))
end)
Enter fullscreen mode Exit fullscreen mode

In the above code there are some optimisations like pre-compiling the pattern that will be used to split each tab separated line in the files. Task.async_stream processes all the 10 files in parallel. The above took about 625 secs to process and CPU utilisation was around 8 CPU cores out of 10

625 secs for processing 300 million events and 78 GB of data is not too shabby given that we have the full power of Elixir at our disposal to process this data and are not constrained by limitations of SQL.

In Part 2 we will explore if we can make the Elixir processing faster.

Top comments (5)

Collapse
 
nicolkill profile image
Nicol Acosta

Amazing, i built an algorithm that reads csv from s3 and saves in postgres using transactions (50 elements per transaction) and in testing i tried with a 100,000 lines csv and takes 30secs in finish and i though that was impresive

this its really impresive, ill be really close of this post and the next ones my friend

not just in this post, in the clickhouse local are some resources about how to build code in the most optimal way so there is a lot of stuff to learn

Collapse
 
parthex profile image
Parth Patil

Hi @nicolkill ,

Thanks for reading the article and sharing your experience with data processing in elixir.

I am really excited about the possibility of using Elixir for big data processing by doing vertical scaling on cloud VMs. Especially when you combine Elixir with Clickhouse or DuckDB. I am going to be writing about these in the coming days. Stay tuned.

Collapse
 
geazi_anc profile image
Geazi Anc

Good article! I'm look forward to next part.

Collapse
 
parthex profile image
Parth Patil

I have now published Part2 of the series dev.to/parthex/data-processing-wit...

Collapse
 
parthex profile image
Parth Patil

Thanks @geazi_anc for reading! I am excited about the next parts in the series.