Introduction
Everyone who's using a cloud provider wants to monitor the system to detect anomalies in the usage. We run some internal data services, our website/blog and a few demo clusters on AWS and we wanted a low-maintenance way to monitor the infrastructure for issues, so we took the opportunity to dogfood Bytewax, of course :).
In this blog post, we will walk you through the process of building a cloud-based anomaly detection system using Bytewax, Redpanda, and Amazon Web Services (AWS). Our goal is to create a dataflow that detects anomalies in EC2 instance CPU utilization. To achieve this, we will collect usage data from AWS CloudWatch using Logstash and store it using Redpanda, a Kafka-compatible streaming data platform. Finally, we will use Bytewax, a Python stream processor, to build our anomaly detection system.
This is exactly the same infrastructure we use internally at Bytewax and, in fact, we haven't touched it for months!
Setting Up the Required Infrastructure on AWS
Before we begin, ensure that you have the following prerequisites set up:
- AWS CLI configured with admin access
- Helm
- Docker
- A Kubernetes cluster running in AWS and kubectl configured to access it
Configuring Kubernetes and Redpanda
In this section, we will configure Kubernetes and Redpanda using the provided code snippets. Make sure you have a running Kubernetes cluster in AWS and kubectl configured to access it.
Step 1: Set up a namespace
Create a new namespace for Redpanda and set it as the active context:
kubectl create ns redpanda-bytewax
kubectl config set-context --current --namespace=redpanda-bytewax
Step 2: Install Cert-Manager and Redpanda Operator
The Redpanda operator requires cert-manager to create certificates for TLS communication. To install cert-manager with Helm:
helm repo add jetstack https://charts.jetstack.io && \
helm repo update && \
helm install \
cert-manager jetstack/cert-manager \
--namespace cert-manager \
--create-namespace \
--version v1.4.4 \
--set installCRDs=true
Fetch the latest Redpanda Operator version, add the Redpanda Helm repo, and install the Redpanda Operator:
export VERSION=$(curl -s https://api.github.com/repos/redpanda-data/redpanda/releases/latest | jq -r .tag_name)
helm repo add redpanda https://charts.vectorized.io/ && helm repo update
kubectl apply -k https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=$VERSION
helm install redpanda-operator redpanda/redpanda-operator --namespace redpanda-system --create-namespace --version $VERSION
Step 3: Create Redpanda cluster
Save the following YAML configuration in a file named 3_node_cluster.yaml
:
apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
name: three-node-cluster
spec:
image: "vectorized/redpanda"
version: "latest"
replicas: 3
resources:
requests:
cpu: 1
memory: 1.2Gi
limits:
cpu: 1
memory: 1.2Gi
configuration:
rpcServer:
port: 33145
kafkaApi:
- port: 9092
pandaproxyApi:
- port: 8082
schemaRegistry:
port: 8081
adminApi:
- port: 9644
developerMode: true
Apply the Redpanda cluster configuration:
kubectl apply -f ./3_node_cluster.yaml
Check the status of Redpanda pods:
kubectl get po -lapp.kubernetes.io/component=redpanda
Export the broker addresses:
export BROKERS=`kubectl get clusters three-node-cluster -o=jsonpath='{.status.nodes.internal}' | jq -r 'join(",")'`
Step 4: Set up topics
Run an rpk container to create and manage topics:
kubectl run rpk-shell --rm -i --tty --image vectorized/redpanda --command /bin/bash
In the rpk terminal, export the broker addresses:
export BROKERS=three-node-cluster-0.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-1.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-2.three-node-cluster.redpanda-bytewax.svc.cluster.local.
View the cluster information:
rpk --brokers $BROKERS cluster info
Create two topics with 5 partitions each:
rpk --brokers $BROKERS topic create ec2_metrics -p 5
rpk --brokers $BROKERS topic create ec2_metrics_anomalies -p 5
List the topics:
rpk --brokers $BROKERS topic list
Consume messages from the ec2_metrics
topic:
rpk --brokers $BROKERS topic consume ec2_metrics -o start
Exporting CloudWatch EC2 Metrics to our Redpanda Cluster with Logstash
Logstash is an open-source data processing pipeline that can ingest data from multiple sources, transform it, and send it to various destinations, such as Redpanda. In this case, we'll use Logstash to collect EC2 metrics from CloudWatch and send them to our Redpanda cluster for further processing.
Logstash Permissions
First, we need to create an AWS policy and user with the required permissions for Logstash to access CloudWatch and EC2. Save the following JSON configuration in a file named cloudwatch-logstash-policy.json
:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1444715676000",
"Effect": "Allow",
"Action": [
"cloudwatch:GetMetricStatistics",
"cloudwatch:ListMetrics"
],
"Resource": "*"
},
{
"Sid": "Stmt1444716576170",
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances"
],
"Resource": "*"
}
]
}
Now we can create the policy and user, and attach the policy to the user:
aws iam create-policy --policy-name CloudwatchLogstash --policy-document file://cloudwatch-logstash-policy.json
aws iam create-user --user-name logstash-user
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
aws iam attach-user-policy --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/CloudwatchLogstash --user-name logstash-user
To provide access, we can create Kubernetes secrets for the AWS access key and secret access key:
kubectl create secret generic aws-secret-access-key --from-literal=value=$(aws iam create-access-key --user-name logstash-user | jq -r .AccessKey.SecretAccessKey)
kubectl create secret generic aws-access-key-id --from-literal=value=$(aws iam list-access-keys --user-name logstash-user --query "AccessKeyMetadata[0].AccessKeyId" --output text)
Now we can create an Amazon Elastic Container Registry (ECR) repository to store the custom Logstash image:
aws ecr create-repository --repository-name redpanda-bytewax
export REPOSITORY_URI=$(aws ecr describe-repositories --repository-names redpanda-bytewax --profile sso-admin --output text --query "repositories[0].repositoryUri")
Next, we create a Logstash Image with CloudWatch Input Plugin installed by creating a Dockerfile named logstash-Dockerfile
that has the plugin installed as a RUN
step in the Dockerfile like shown in the dockerfile code snippet:
FROM docker.elastic.co/logstash/logstash:7.17.3
RUN bin/logstash-plugin install logstash-input-cloudwatch
Finally, we build and push the Logstash image to the ECR repository:
docker build -f logstash-Dockerfile -t $REPOSITORY_URI:\logstash-cloudwatch .
export AWS_REGION=us-west-2
aws ecr get-login-password --region $AWS_REGION --profile sso-admin | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com
docker push $REPOSITORY_URI:\logstash-cloudwatch
Deploy Logstash on Kubernetes
Now that we have our custom Logstash image, we will deploy it on Kubernetes using the Helm chart provided by Elastic. First, we need to gather some information and create a logstash-values.yaml file with the necessary configuration.
Run the following commands to obtain the required information:
echo $REPOSITORY_URI
echo $AWS_REGION
echo $BROKERS | sed -e 's/local\./local\:9092/g'
Create a logstash-values.yaml file and replace the placeholders (shown with <>
) with the information obtained above:
image: "<YOUR REPOSITORY URI>"
imageTag: "logstash-cloudwatch"
imagePullPolicy: "Always"
persistence:
enabled: true
logstashConfig:
logstash.yml: |
http.host: 0.0.0.0
xpack.monitoring.enabled: false
logstashPipeline:
uptime.conf: |
input {
cloudwatch {
namespace => "AWS/EC2"
metrics => ["CPUUtilization"]
region => "<YOUR AWS REGION>"
interval => 300
period => 300
}
}
filter {
mutate {
add_field => {
"[index]" => "0"
"[value]" => "%{maximum}"
"[instance]" => "%{InstanceId}"
}
}
}
output {
kafka {
bootstrap_servers => "<YOUR REDPANDA BROKERS>"
topic_id => 'EC2Metrics'
codec => json
}
}
extraEnvs:
- name: 'AWS_ACCESS_KEY_ID'
valueFrom:
secretKeyRef:
name: aws-access-key-id
key: value
- name: 'AWS_SECRET_ACCESS_KEY'
valueFrom:
secretKeyRef:
name: aws-secret-access-key
key: value
With the logstash-values.yaml file ready, install the Logstash Helm chart:
helm upgrade --install logstash elastic/logstash -f logstash-values.yaml
Now to verify that Logstash is exporting the EC2 metrics to the Redpanda cluster, open a terminal with rpk and consume the ec2_metrics topic:
rpk --brokers $BROKERS topic consume ec2_metrics -o start
Use CTRL-C
to quit the rpk terminal when you're done.
Building a Dataflow to Detect Anomalies with Bytewax
With our infrastructure in place, it's time to build a dataflow to detect anomalies. We will use Bytewax and Waxctl to define and deploy a dataflow that processes the EC2 instance CPU utilization data stored in the Redpanda cluster.
Anomaly Detection with Half Space Trees
Half Space Trees (HST) is an unsupervised machine learning algorithm used for detecting anomalies in streaming data. The algorithm is designed to efficiently handle high-dimensional and high-velocity data streams. HST builds a set of binary trees to partition the feature space into half spaces, where each tree captures a different view of the data. By observing the frequency of points falling into each half space, the algorithm can identify regions that are less dense than others, suggesting that data points within those regions are potential anomalies.
In our case, we will use HST to detect anomalous CPU usage in EC2 metrics. We'll leverage the Python library River, which provides an implementation of the HST algorithm, and Bytewax, a platform for creating data processing pipelines.
Building the Dataflow for Anomaly Detection
To create our dataflow, we'll first import the necessary libraries and set up Kafka connections. The following code snippet demonstrates how to create a dataflow with River and Bytewax to consume EC2 metrics from Kafka and detect anomalous CPU usage using HST:
import json
import os
import datetime as dt
from pathlib import Path
from bytewax.connectors.kafka import KafkaInput, KafkaOutput
from bytewax.dataflow import Dataflow
from bytewax.recovery import SqliteRecoveryConfig
from river import anomaly
kafka_servers = os.getenv("BYTEWAX_KAFKA_SERVER", "localhost:9092")
kafka_topic = os.getenv("BYTEWAX_KAFKA_TOPIC", "ec2_metrics")
kafka_output_topic = os.getenv("BYTEWAX_KAFKA_OUTPUT_TOPIC", "ec2_metrics_anomalies")
# Define the dataflow object and kafka input.
flow = Dataflow()
flow.input("inp", KafkaInput(kafka_servers.split(","), [kafka_topic]))
# convert to percentages and group by instance id
def group_instance_and_normalize(key__data):
_, data = key__data
data = json.loads(data)
data["value"] = float(data["value"]) / 100
return data["instance"], data
flow.map(group_instance_and_normalize)
# ("c6585a", {"index": "1", "value": "0.11", "instance": "c6585a"})
# Stateful operator for anomaly detection
class AnomalyDetector(anomaly.HalfSpaceTrees):
Our anomaly detector inherits from the HalfSpaceTrees object from the river package and has the following inputs
n_trees – defaults to 10 height – defaults to 8 window_size – defaults to 250 limits (Dict[Hashable, Tuple[float, float]]) – defaults to None seed (int) – defaults to None
def __init__ (self, *args, **kwargs):
super(). __init__ (*args, n_trees=5, height=3, window_size=5, seed=42, **kwargs)
def update(self, data):
self.learn_one({"value": data["value"]})
data["score"] = self.score_one({"value": data["value"]})
if data["score"] > 0.7:
data["anom"] = 1
else:
data["anom"] = 0
return self, (
data["index"],
data["timestamp"],
data["value"],
data["score"],
data["anom"],
)
flow.stateful_map("detector", lambda: AnomalyDetector(), AnomalyDetector.update)
# (("c6585a", {"index": "1", "value":0.08, "instance": "fe7f93", "score":0.02}))
# filter out non-anomalous values
flow.filter(lambda x: bool(x[1][4]))
flow.map(lambda x: (x[0], json.dumps(x[1][4])))
flow.output("output", KafkaOutput([kafka_servers], kafka_output_topic))
In this dataflow, we first read data from Kafka and deserialize the JSON message. We then normalize the CPU usage values and group them by the instance ID. Next, we apply the AnomalyDetector class inside a stateful operator, which calculates the anomaly score for each data point using HST. We set a threshold for the anomaly score (0.7 in this example) and mark data points as anomalous if their scores exceed the threshold. Finally, we filter out non-anomalous values and output the anomalous data points to a separate Kafka topic.
Using this dataflow, we can continuously monitor EC2 metrics and detect anomalous CPU usage, helping us identify potential issues in our infrastructure.
Creating a Dataflow docker image
dataflow-Dockerfile
FROM bytewax/bytewax:0.16.0-python3.9
RUN /venv/bin/pip install river==0.10.1 pandas confluent-kafka
docker build -f dataflow-Dockerfile -t $REPOSITORY_URI:\dataflow .
docker push $REPOSITORY_URI:\dataflow
Deploying the Dataflow
To deploy the dataflow, we'll use the Bytewax command-line tool, waxctl. There are two options for deploying the dataflow, depending on how you have set up your Kafka server environment variable. When we deploy our dataflow we will set the processes (denoted by p
) to 5 to match the number of partitions we set when we intially created our redpanda topic.
Option 1: Generate waxctl command
Use the following command to generate the waxctl command with the appropriate environment variables:
echo"
waxctl df deploy ./dataflow.py \\
--name ec2-cpu-ad \\
-p 5 \\
-i $REPOSITORY_URI \\
-t dataflow \\
-e '\"BYTEWAX_KAFKA_SERVER=$BROKERS\"' \\
-e BYTEWAX_KAFKA_TOPIC_GROUP_ID=dataflow_group \\
--debug
"
This will output the waxctl command with the correct Kafka server values. Copy the output and run it to deploy the dataflow.
Option 2: Hardcoded BYTEWAX_KAFKA_SERVER value
If you prefer to hardcode the Kafka server values, use the following command to deploy the dataflow:
waxctl df deploy ./dataflow.py \
--name ec2-cpu-ad \
-p 5 \
-i $REPOSITORY_URL \
-t dataflow \
-e '"BYTEWAX_KAFKA_SERVER=three-node-cluster-0.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-1.three-node-cluster.redpanda-bytewax.svc.cluster.local.,three-node-cluster-2.three-node-cluster.redpanda-bytewax.svc.cluster.local."' \
-e BYTEWAX_KAFKA_TOPIC_GROUP_ID=dataflow_group \
--debug
Now that we have deployed our dataflow, after enough time, you'll be able to consume from the anomalies topic to see any anomalies.
rpk --brokers $BROKERS topic consume ec2_metrics_anomalies -o start
As a next step, you could deploy a dataflow to consume from the anomalies and alert you in Slack! Or add rerun like we demonstrated in the previous blog post to visualize the anomalies.
Conclusion
In this blog post, we have demonstrated how to set up a system for monitoring EC2 metrics and detecting anomalous CPU usage. By leveraging tools like Logstash, Redpanda, River, and Bytewax, we've created a robust and scalable pipeline for processing and analyzing streaming data.
This system provides a range of benefits, including:
- Efficiently processing high-dimensional and high-velocity data streams
- Using the Half Space Trees unsupervised machine learning algorithm for detecting anomalies in streaming data
- Continuously monitoring EC2 metrics and identifying potential issues in the infrastructure
With this setup, you can effectively monitor your EC2 instances and ensure that your infrastructure is running smoothly, helping you proactively address any issues that may arise.
That's it! You now have a working cloud-based anomaly detection system using Bytewax, Redpanda, and AWS. Feel free to adapt this setup to your specific use case and explore the various features and capabilities offered by these tools.
Top comments (0)