DEV Community

CodeWithVed
CodeWithVed

Posted on

How to Write a Python/Golang Script to Retrieve Metrics from AWS CloudWatch?

Python-Cloudwatch

You can use the script below to check whether you are receiving the metrics correctly.


import boto3
from datetime import datetime, timedelta
import time
import pytz

cloudwatch = boto3.client('cloudwatch')

def get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time, end_time):
    params = {
        'StartTime': start_time,
        'EndTime': end_time,
        'MetricDataQueries': [
            {
                'Id': 'm1',
                'MetricStat': {
                    'Metric': {
                        'Namespace': namespace,
                        'MetricName': metric_name,
                        'Dimensions': dimensions
                    },
                    'Period': 30,
                    'Stat': 'Sum'
                }
            }
        ]
    }

    try:
        response = cloudwatch.get_metric_data(**params)
        return response['MetricDataResults']
    except Exception as e:
        print(f"Error fetching metrics: {e}")
        return []

def main():
    namespace = "CWAgent"
    metric_name = "<MetricsName>"
    dimensions = [{'Name': 'path', 'Value': 'pathName'}, {'Name':'metric_type', 'Value': 'metric_type_value'}]
    target_value = 1000

    collection_interval_seconds = 30

    # utc_now = datetime.now(pytz.UTC) - timedelta(seconds=300)

    while True:
        end_time = datetime.now(pytz.UTC) - timedelta(seconds=10)
        start_time = end_time - timedelta(seconds=collection_interval_seconds)

        start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S')
        end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%S')

        metrics_data = get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time_str, end_time_str)
        print("Currrent Time: ", datetime.now(pytz.UTC))
        print("Start Time: ", start_time_str)
        print("End Time: ", end_time_str)
        print(metrics_data)
        # if metrics_data:
        #     aggregated_value = sum(metrics_data[0]['Values']) if metrics_data[0]['Values'] else None

        #     if aggregated_value and aggregated_value >= target_value:
        #         print(f"Target metric value met or exceeded: {aggregated_value}")
        #     else:
        #         print(f"Current aggregated value: {aggregated_value}, below target.")
        # else:
        #     print("No metrics data received.")

        print(f"Sleeping for 10 seconds...")
        time.sleep(10)
        print()
        print()


if __name__ == "__main__":
    main()


Enter fullscreen mode Exit fullscreen mode

Requirements.txt:

boto3==1.34.117
botocore==1.34.117
jmespath==1.0.1
python-dateutil==2.9.0.post0
pytz==2024.1
s3transfer==0.10.1
six==1.16.0
urllib3==2.2.1

Enter fullscreen mode Exit fullscreen mode

I have already explained each parameter and its function. For more information, you can refer to this blog:

In the above Python script, some parts are commented out. It is entirely up to you how you want to use this script. Since I wanted to check only the metrics and how often it prints the values and at what time, I was simply printing the time, Also you can modify the metrics_collection_time and period based on your needs.

Alternatively, i also mentioned the go code which could you help you if your primary code in Golang:

Golang-Cloudwatch

package main

import (
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/cloudwatch"
)

func main() {
    namespace := "CWAgent"
    metricName := "ingestion/commandservice/totalRequest_count"
    dimensions := []*cloudwatch.Dimension{
        {Name: aws.String("path"), Value: aws.String("pathName")},
        {Name: aws.String("metric_type"), Value: aws.String("metricsTypeValue")},
    }
    targetValue := float64(100)
    collectionIntervalSeconds := int64(30)

    for {
        endTime := time.Now().UTC()
        startTime := endTime.Add(-time.Duration(collectionIntervalSeconds) * time.Second)
        // var endTime, startTime time.Time
        // layout := "2006-01-02T15:04:05Z"
        // endTime, err := time.Parse(layout, "2024-06-05T05:52:33Z")
        // startTime, err = time.Parse(layout, "2024-06-05T05:51:33Z")
        // if err != nil {
        //  fmt.Println("Error parsing time:", err)
        //  return
        // }
        params := &cloudwatch.GetMetricDataInput{
            EndTime:   &endTime,
            StartTime: &startTime,
            MetricDataQueries: []*cloudwatch.MetricDataQuery{
                {
                    Id: aws.String("m1"),
                    MetricStat: &cloudwatch.MetricStat{
                        Metric: &cloudwatch.Metric{
                            Namespace:  aws.String(namespace),
                            MetricName: aws.String(metricName),
                            Dimensions: dimensions,
                        },
                        Period: aws.Int64(10),
                        Stat:   aws.String("Sum"),
                    },
                },
            },
        }

        sess, err := session.NewSession(&aws.Config{
            Region: aws.String("us-east-1"),
        })
        if err != nil {
            fmt.Println("Error creating session:", err)
            return
        }

        svc := cloudwatch.New(sess)
        result, err := svc.GetMetricData(params)
        if err != nil {
            fmt.Println("Error getting metric data:", err)
            continue
        }

        var aggregatedValue float64
        if len(result.MetricDataResults) > 0 && result.MetricDataResults[0].Values != nil {
            aggregatedValue = sumFloat64Slice(result.MetricDataResults[0].Values)
        }
        fmt.Printf("Start Time: %s\n", startTime.Format(time.RFC3339))
        fmt.Printf("End Time: %s\n", endTime.Format(time.RFC3339))
        // for _, value := range result.MetricDataResults[0].Values {
        //  fmt.Println(*value)
        // }
        if aggregatedValue >= targetValue {
            fmt.Printf("Target metric value met or exceeded: %.2f\n", aggregatedValue)
        } else {
            fmt.Printf("Current aggregated value: %.2f, below target.\n", aggregatedValue)
        }

        time.Sleep(10 * time.Second)
    }
}

func sumFloat64Slice(slice []*float64) float64 {
    sum := float64(0)
    for _, v := range slice {
        sum += *v
    }
    return sum
}

Enter fullscreen mode Exit fullscreen mode

Top comments (0)