DEV Community

Cover image for Build a real-time crypto analytics dashboard with Beavers and Perspective
0x26res
0x26res

Posted on • Edited on

Build a real-time crypto analytics dashboard with Beavers and Perspective

Coinbase Market Data API Analytics

This example shows how you can leverage two powerful python libraries, Beavers and Perspective, to analyse data in realtime and display it in a dashboard.
This tutorial assumes you are familiar with Kafka and Python and Apache Arrow.

Architecture Overview

We will connect to Coinbase's websocket API to receive crypto market updates in real time.
In order to share this data with other services and decouple producers from consumers, we'll publish this data over Kafka, as json.
We'll then run a Beavers job that will read the data from Kafka, enrich it, and publish it in a perspective dashboard.

Initial Set Up

You'll need:

  • Git
  • Python (at least 3.10)
  • Docker to run a Kafka cluster

The code for this tutorial is available on github

Clone the repo

git clone https://github.com/0x26res/beavers-examples
cd beavers-example/01_coinbase_analytics/
Enter fullscreen mode Exit fullscreen mode

Set Up the Virtual Environment

python3 -m venv --clear .venv
source ./.venv/bin/activate
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Set Up Kafka

We use the kafka-kraft docker image to run a super simple Kafka cluster.
To start the cluster:

docker run --name=simple_kafka -p 9092:9092 -d bashj79/kafka-kraft
Enter fullscreen mode Exit fullscreen mode

Once started you can create a Kafka topic called ticker

docker exec simple_kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic=ticker \
  --partitions=1 \
  --bootstrap-server=localhost:9092 \
  --replication-factor=1
Enter fullscreen mode Exit fullscreen mode

Publish Coinbase's Market Data on Kafka

In this step, we'll run a simple python job that listen to Coinbase's Websocket market data API, and publish the data on the ticker Kafka topic.

python ./websocket_feed.py
Enter fullscreen mode Exit fullscreen mode

You should now be able to see the Coinbase data streaming on Kafka.

docker exec simple_kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --topic=ticker \
  --bootstrap-server=localhost:9092
Enter fullscreen mode Exit fullscreen mode

Run the dashboard

python ./dashboard.py
Enter fullscreen mode Exit fullscreen mode

You can see the dashboard in http://localhost:8082/ticker.

ticker

Introducing Beavers

In order to build our dashboard, we'll use Beavers.
Beavers is a streaming python library optimized for analytics.

At its core, Beavers uses a dam DAG to process incoming data.
Each node in the DAG is a Python function.

dag = Dag()
Enter fullscreen mode Exit fullscreen mode

The first node in the dashboard DAG is a source node, called ticker.
Its output is a pyarrow.Table for which we need to specify the schema.

ticker = dag.pa.source_table(schema=TICKER_SCHEMA, name="ticker")
Enter fullscreen mode Exit fullscreen mode

This is what is displayed in the dashboard.

Simple Transformation in Beavers

Next, we want to add a derived column to ticker.
The new columns, spread, is the difference between the best_ask and best_bid.

For this we just introduce a function:

def add_spread(table: pa.Table) -> pa.Table:
    return table.append_column(
        "spread", pc.subtract(table["best_ask"], table["best_bid"])
    )
Enter fullscreen mode Exit fullscreen mode

And add the function to the DAG:

ticker_with_spread = dag.pa.table_stream(
    add_spread, schema=TICKER_WITH_SPREAD_SCHEMA
).map(ticker)
Enter fullscreen mode Exit fullscreen mode

You can see it in: http://localhost:8082/ticker_with_spread

ticker_with_spread

Advanced Transformation with Beavers

Now let's introduce a more advanced computation.
For each incoming ticker record, we would like to calculate the change in the last 5 minutes

For this, first we need to keep track of the history over the last 10 minutes (we do need a bit more than 5 minutes).

ticker_history = dag.state(TickerHistory()).map(ticker, dag.now())
Enter fullscreen mode Exit fullscreen mode

And then do an as of join, to find the price 5 minutes ago and calculate the change

ticker_with_change = dag.pa.table_stream(
    add_5min_change, TICKER_WITH_CHANGE_SCHEMA
).map(ticker, ticker_history)
Enter fullscreen mode Exit fullscreen mode

You can see it in: http://localhost:8082/ticker_with_change

ticker_with_change

Top comments (0)