DEV Community

Cover image for Twitch Streaming Graph Analysis - Part 1
Katarina Supe for Memgraph

Posted on • Updated on • Originally published at memgraph.com

Twitch Streaming Graph Analysis - Part 1

Introduction

Twitch is the world’s leading live streaming platform for gamers. In this blog post, you’ll explore the Twitch dataset to find out which streamers, teams, and games are the most popular. You will also discover which users or bots are great candidates for receiving a VIP or moderator badges and measure the popularity of streamers by how much followers and viewers and chatters they have. Each streamer has a network of chatters, VIPs, and moderators, so you will use the MAGE PageRank algorithm to check who is the most popular streamer. Besides that, you can calculate the betweenness centrality on this network and find out which streamer has the most influence. This blog is divided into three parts, depending on the part of the application you are building:

  • Part 1: data source and backend implementation
  • Part 2: frontend implementation
  • Part 3: streaming data from a Kafka cluster

App Architecture

The app consists of five main services:

  1. twitch-stream: A Python script that gets new chatters for certain streamers and sends them to a Kafka cluster.
  2. kafka: A Kafka cluster consisting of one topic named chatters.
  3. memgraph-mage: A graph analytics platform that we query for relevant statistics. This platform also stores the incoming Twitch data from Kafka and performs PageRank and betweenness centrality algorithms on all streamers.
  4. twitch-app: A Flask server that sends all the data we query from memgraph-mage to the react-app. It also consumes the Kafka stream and sends it to the react-app.
  5. react-app: A React app that visualizes the Twitch network with the D3.js library.

apparchitecture

Dataset and Graph Schema

You can collect the data using the Twitch API and then rearrange it to fit the idea of graph databases. You can check out the script that creates .csv files which you'll import to Memgraph. You'll use the files: streamers.csv, teams.csv, vips.csv, moderators.csv and chatters.csv. Once generated, streamers.csv will hold important information about the languages the user speaks and the games the user streams.

graphschema

All nodes, except for the :User:Stream node, have only the name property. The :User:Stream node represents live stream users and its followers and totalViewCount properties are important for measuring their popularity. Language, team, and a game could have been properties on :User:Stream node, but since there are many users who speak the same language, belong to the same team, or play the same game, it's better if they're connected. Now that you understand the Twitch network better, it's time to visualize it by making a web application!

Prerequisites

Let's install all the tools you'll need to build the app:

  • Docker and Docker Compose:

  • Node.js

    • You'll also need Node.js for the npx command you'll use to create the React app.

Project structure

Below you can see the how the structure of the project should look like at the end. I will explain the most important parts of the implementation and you can check the whole repository if you are missing any other details.

|   docker-compose.yml
|   
+---backend
|   |   app.py
|   |   models.py
|   |   twitch_data.py
|   |   requirements.txt
|   |   Dockerfile
|   +---import-data
|   |       chatters.csv
|   |       moderators.csv
|   |       streamers.csv
|   |       teams.csv
|   |       vips.csv
|   
+---frontend
|   |   craco.config.js
|   |   Dockerfile
|   |   package.json
|   |   package-lock.json
|   +---node_modules
|   +---public
|   +---src
|
+---memgraph
|   |   Dockerfile
|   +---query_modules
|   |       twitch.py
|   +---mg_log
|   +---mg_lib
|
+---twitch-stream
|   |   Dockerfile
|   |   dummy.py
|   |   setup.py
|   |   chatters.csv
|   |   requirements.txt
Enter fullscreen mode Exit fullscreen mode

Let's build the application from bottom to top. First, create the backend and memgraph directories. Within the backend directory, create the import-data subdirectory and move all the scraped CSV files there or use the files you already have. Also, create docker-compose.yml file at the root folder of the project. The project structure should look like this:

|   docker-compose.yml
|   
+---backend
|   |   app.py
|   |   models.py
|   |   twitch_data.py
|   |   requirements.txt
|   |   Dockerfile
|   +---import-data
|   |       chatters.csv
|   |       moderators.csv
|   |       streamers.csv
|   |       teams.csv
|   |       vips.csv
|
+---memgraph
|   |   Dockerfile
|   +---mg_log
|   +---mg_lib
Enter fullscreen mode Exit fullscreen mode

Dockerizing Memgraph and the Backend

The docker-compose.yml file looks like this:

version: "3"
networks:
  app-tier:
    driver: bridge
services:
  memgraph-mage:
    build: ./memgraph
    volumes:
      - ./memgraph/mg_lib:/var/lib/memgraph
      - ./memgraph/mg_log:/var/log/memgraph
    entrypoint:
      [
        "/usr/lib/memgraph/memgraph",
        "--telemetry-enabled=false"
      ]
    ports:
      - "7687:7687"
    networks:
      - app-tier
  twitch-app:
    build: ./backend
    volumes:
      - ./backend:/app
    ports:
      - "5000:5000"
    environment:
      MG_HOST: memgraph-mage
      MG_PORT: 7687
    depends_on:
      - memgraph-mage
    networks:
      - app-tier
Enter fullscreen mode Exit fullscreen mode

This application has many services which depend on one another. With the docker-compose.yml file, you can simply run the application using docker-compose build and docker-compose up. The memgraph-mage service has a running Memgraph instance along with MAGE - Memgraph Advanced Graph Extensions. The twitch-app is the backend service and it depends on memgraph-mage, because you need a running Memgraph instance in order to save data into Memgraph and run queries. The backend will be running on port 5000. Now, take a look at the backend Dockerfile:

FROM python:3.8
# Install CMake
RUN apt-get update && \
  apt-get --yes install cmake && \
  rm -rf /var/lib/apt/lists/*
# Install packages
COPY requirements.txt ./
RUN pip3 install -r requirements.txt
COPY app.py /app/app.py
COPY import-data /app/import-data
WORKDIR /app
ENV FLASK_ENV=development
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
ENTRYPOINT ["python3", "app.py", "--populate"]
Enter fullscreen mode Exit fullscreen mode

The requirements.txt is copied to the container to install all the dependecies correctly. You can check out what's in there:

Flask==1.1.2
gqlalchemy==1.1.2
Enter fullscreen mode Exit fullscreen mode

Flask, a micro web framework, will be installed in the Docker container first. Flask wraps Werkzeug, a comprehensive Web Server Gateway Interface (WSGI) web application library. It will enable application to communicate with Memgraph on request and send a response back. Next, gqlalchemy, a library developed to assist in writing and running queries on Memgraph, will be installed inside the container. GQLAlchemy is a fully open-source Python library that aims to be the go-to Object Graph Mapper (OGM) -- a link between Graph Database objects and Python objects.

The import-data folder is copied to the Docker container where the twitch-app service is running since you'll load the data from the CSV files there and then save it into Memgraph. Include the flag --populate in the Dockerfile in order to load data into Memgraph. Later on, if you restart the app, remove the --populate flag. Due to the created volumes, the data will stay loaded into Memgraph.

Backend Implementation

Let's build an API that will help frontend get all the necessary data from Memgraph. First, you need to set up everything for the Flask server. Add the arguments, such as --host, --port and --debug flags and create the previously mentioned --populate flag.

memgraph = Memgraph()
app = Flask(
    __name__,
)
args = None
def parse_args():
    """Parse command line arguments."""
    parser = ArgumentParser(description=__doc__)
    parser.add_argument("--host", default="0.0.0.0", help="Host address.")
    parser.add_argument("--port", default=5000, type=int, help="App port.")
    parser.add_argument(
        "--debug",
        default=True,
        action="store_true",
        help="Run web server in debug mode.",
    )
    parser.add_argument(
        "--populate",
        dest="populate",
        action="store_true",
        help="Run app with data loading."
    )
    parser.set_defaults(populate=False)
    log.info(__doc__)
    return parser.parse_args()
@log_time
def load_data():
    """Load data into the database."""
    if not args.populate:
        log.info("Data is loaded in Memgraph.")
        return
    log.info("Loading data into Memgraph.")
    try:
        memgraph.drop_database()
        twitch_data.load()
    except Exception as e:
        log.info("Data loading error.")
def main():
    global args
    args = parse_args()
    if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
        init_log()
        connect_to_memgraph()
        load_data()
    app.run(host=args.host, port=args.port, debug=args.debug)
if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

First, you should set all arguments. Then, when you start the app.py, you are connecting to Memgraph. After that, the data is loaded from the CSV files and saved into Memgraph, if there is a --populate flag. All loading methods are located in the twitch_data.py script. There are several different ways to import data, and LOAD CSV clause is probably the best option for importing large datasets. But, if Python plays a big part in your tech stack, try loading and saving the data using GQLAlchemy's object graph mapper (OGM).

To do that, you first have to map Python classes to the appropriate nodes and relationships. You can find all mappings in models.py. Check out how you can create User, Stream, Language and Game nodes, as well as Speaks and Plays relationships:

class User(Node):
    name: str = Field(index=True, exists=True, unique=True, db=memgraph)
class Stream(User):
    name: Optional[str] = Field(
        index=True, exists=True, unique=True, db=memgraph, label="User"
    )
    id: str = Field(index=True, exists=True, unique=True, db=memgraph)
    url: Optional[str] = Field()
    followers: Optional[int] = Field()
    createdAt: Optional[str] = Field()
    totalViewCount: Optional[int] = Field()
    description: Optional[str] = Field()
class Language(Node):
    name: str = Field(unique=True, db=memgraph)
class Game(Node):
    name: str = Field(unique=True, db=memgraph)
class Speaks(Relationship, type="SPEAKS"):
    pass
class Plays(Relationship, type="PLAYS"):
    pass
Enter fullscreen mode Exit fullscreen mode

Now, check how the data from the streamers.csv can be saved into Memgraph:

def load_streams(path):
    with open(path) as read_obj:
        csv_reader = reader(read_obj)
        header = next(csv_reader)
        if header != None:
            for row in csv_reader:
                stream = models.Stream(
                    id=row[1],
                    name=row[3],
                    url=row[6],
                    followers=row[7],
                    createdAt=row[10],
                    totalViewCount=row[9],
                    description=row[8],
                ).save(memgraph)
                language = models.Language(name=row[5]).save(memgraph)
                game = models.Game(name=row[4]).save(memgraph)
                speaks_rel = models.Speaks(
                    _start_node_id=stream._id, _end_node_id=language._id
                ).save(memgraph)
                plays_rel = models.Plays(
                    _start_node_id=stream._id, _end_node_id=game._id
                ).save(memgraph)
Enter fullscreen mode Exit fullscreen mode

Here nodes with the labels :User:Stream are created, along with their properties. A streamer speaks a language and plays a game, and nodes :Language and :Game are created to represent those connections.

If you want to learn more about GQLAchemy's OGM and query builder, check out our how-to guides.
After the data is in Memgraph, the Flask server will run. For now, the only thing you can actually check is whether your data is in Memgraph. The easiest way, especially if you are a visual type, is by using Memgraph Lab. But first, don't forget to build the whole project in order to try it out. Placing yourself in the project root folder and run:

docker-compose build memgraph-mage
docker-compose build twitch-app
Enter fullscreen mode Exit fullscreen mode

To start the Memgraph instance (along with MAGE), run:

docker-compose up memgraph-mage
Enter fullscreen mode Exit fullscreen mode

Now that you have a running Memgraph instance, you can install Memgraph Lab, open it and click on the Connect button. Then run the twitch-app server to start the data loading process:

docker-compose up twitch-app
Enter fullscreen mode Exit fullscreen mode

You can see the total number of nodes and edges in your database at the Overview tab:

memlab

Nice - the data is really there! Try adding some simple methods in the app.py and test the connection with Memgraph.

@app.route("/nodes", methods=["GET"])
@log_time
def get_nodes():
    """Get the number of nodes in database."""
    try:
        num_of_nodes = next(
            Match()
            .node(variable="node")
            .return_({"count(node)": "num_of_nodes"})
            .execute()
        )["num_of_nodes"]
        response = {"nodes": num_of_nodes}
        return Response(
            response=dumps(response), status=200, mimetype="application/json"
        )
    except Exception as e:
        log.info("Fetching number of nodes went wrong.")
        log.info(e)
        return ("", 500)
Enter fullscreen mode Exit fullscreen mode

App route says that you can get the response at localhost:5000/nodes. The method creates query using the query builder and Memgraph executes it and returns the results. In this case, there is only one result, and that is the total number of nodes in the database. Try it out!

If your data is already in Memgraph, make sure to remove --populate flag from the backend Dockerfile. Then build the backend service again by running docker-compose build twitch app. Next time you run twitch-app you won't have to wait for the data import, since the data will already be there. This is also useful for all the other changes you're going to make on the backend service.
The idea behind these methods is to find out interesting statistics about the network. Check out how you can figure out what games are played by the most players:

@app.route("/top-games/<num_of_games>", methods=["GET"])
@log_time
def get_top_games(num_of_games):
    """Get top num_of_games games by number of streamers who play them."""
    try:
        results = list(
            Match()
            .node("User", variable="u")
            .to("PLAYS")
            .node("Game", variable="g")
            .return_({"g.name": "game_name", "count(u)": "num_of_players"})
            .order_by("num_of_players DESC")
            .limit(num_of_games)
            .execute()
        )
        games_list = list()
        players_list = list()
        for result in results:
            game_name = result["game_name"]
            num_of_players = result["num_of_players"]
            games_list.append(game_name)
            players_list.append(num_of_players)
        games = [{"name": game_name} for game_name in games_list]
        players = [{"players": player_count} for player_count in players_list]
        response = {"games": games, "players": players}
        return Response(
            response=dumps(response), status=200, mimetype="application/json"
        )
    except Exception as e:
        log.info("Fetching top games went wrong.")
        log.info(e)
        return ("", 500)
Enter fullscreen mode Exit fullscreen mode

With query builder from GQLAlchemy it's easy to get a game name and the number of players that play that game, in descending order. After that, you can put the data into a dictionary to get the response in JSON format which is easy to work with. The response is a list of games along with the number of players that play that game, in descending order. The length of the list depends on the argument num_of_games.

Besides general statistics, it would be nice to visualize some information about streamers. You can search the database for your favorite streamer and get the game the streamer plays, the language the streamer speaks and the team the streamer is part of.

@app.route("/streamer/<streamer_name>", methods=["GET"])
@log_time
def get_streamer(streamer_name):
    """Get info about streamer whose name is streamer_name."""
    try:
        counter = next(
            Match()
            .node("User", "u")
            .where("u.name", "=", streamer_name)
            .return_({"count(u)": "num_of_streamers"})
            .execute()
        )["num_of_streamers"]
        # If the streamer exists, return its relationships
        if counter != 0:
            results = list(
                Match()
                .node("User", variable="u")
                .to()
                .node(variable="n")
                .where("u.name", "=", streamer_name)
                .return_(
                    {
                        "u.id": "streamer_id",
                        "u.name": "streamer_name",
                        "n.name": "node_name",
                        "labels(n)": "labels",
                    }
                )
                .execute()
            )
            links_set = set()
            nodes_set = set()
            for result in results:
                if result["labels"][0] != "Stream" and result["labels"][0] != "User":
                    source_id = result["streamer_id"]
                    source_name = result["streamer_name"]
                    source_label = "Stream"
                    target_id = result["node_name"]
                    target_name = result["node_name"]
                    target_label = result["labels"][0]
                    nodes_set.add((source_id, source_label, source_name))
                    nodes_set.add((target_id, target_label, target_name))
                    if (source_id, target_id) not in links_set and (
                        target_id,
                        source_id,
                    ) not in links_set:
                        links_set.add((source_id, target_id))
            nodes = [
                {"id": node_id, "label": node_label, "name": node_name}
                for node_id, node_label, node_name in nodes_set
            ]
            links = [{"source": n_id, "target": m_id} for (n_id, m_id) in links_set]
        # If the streamer doesn't exist, return empty response
        else:
            nodes = []
            links = []
        response = {"nodes": nodes, "links": links}
        return Response(
            response=dumps(response), status=200, mimetype="application/json"
        )
    except Exception as e:
        log.info("Fetching streamer by name went wrong.")
        log.info(e)
        return ("", 500)
Enter fullscreen mode Exit fullscreen mode

Within the response you'll get the streamer node along with the links the streamer is connected to. This data will be useful on the frontend side, where you can use it to draw graphs with D3.js. Now, let’s see how Memgraph’s query modules work, particularly PageRank and Betweenness Centrality algorithms. With these you can measure the popularity and influence of streamers and simply determine which node is the most relevant in the graph. Here is the get_page_rank() method:

@app.route("/page-rank", methods=["GET"])
@log_time
def get_page_rank():
    """Call the Page rank procedure and return top 50 in descending order."""
    try:
        results = list(
            Call("pagerank.get")
            .yield_()
            .with_({"node": "node", "rank": "rank"})
            .add_custom_cypher("WHERE node:Stream OR node:User")
            .return_({"node.name": "node_name", "rank": "rank"})
            .order_by("rank DESC")
            .limit(50)
            .execute()
        )
        page_rank_dict = dict()
        page_rank_list = list()
        for result in results:
            user_name = result["node_name"]
            rank = float(result["rank"])
            page_rank_dict = {"name": user_name, "rank": rank}
            dict_copy = page_rank_dict.copy()
            page_rank_list.append(dict_copy)
        response = {"page_rank": page_rank_list}
        return Response(
            response=dumps(response), status=200, mimetype="application/json"
        )
    except Exception as e:
        log.info("Fetching users' ranks using pagerank went wrong.")
        log.info(e)
        return ("", 500)
Enter fullscreen mode Exit fullscreen mode

There are many different methods which you can come up with, but here is the list of the ones that are already implemented in app.py:

Method Description
get_top_games(num_of_games) Get top num_of_games games by number of streamers who play them.
get_top_teams(num_of_teams) Get top num_of_teams teams by number of streamers who are part of them.
get_top_vips(num_of_vips) Get top num_of_vips vips by number of streamers who gave them the vip badge.
get_top_moderators(num_of_moderators) Get top num_of_moderators moderators by number of streamers who gave them the moderator badge.
get_top_streamers_by_views(num_of_streamers) Get top num_of_streamers streamers by total number of views.
get_top_streamers_by_followers(num_of_streamers) Get top num_of_streamers streamers by total number of followers.
get_streamer(streamer_name) Get info about streamer whose name is streamer_name.
get_streamers(language, game) Get all streamers who stream certain game in certain language.
get_nodes() Get the number of nodes in database.
get_edges() Get the number of edges in database.
get_page_rank() Call the PageRank procedure and return top 50 in descending order.
get_bc() Call the Betweenness centrality procedure and return top 50 in descending order.

Conclusion

That's it for now! In the next part of this blog post, find out how to build React application on top of the backend, and in the third part, learn how to ingest new data with Kafka and see how you can visualize the streaming changes of your data. Feel free to join our Discord Community server, where you can ask any question related to this blog post or anything else you want to know about Memgraph.

Top comments (2)

Collapse
 
dtomicevic profile image
Dominik Tomicevic

Good post. Waiting for part 2!

Collapse
 
katelatte profile image
Katarina Supe

Thanks! It will be soon :)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.