DEV Community

CodeWithVed
CodeWithVed

Posted on

How to Write a Python/Golang Script to Retrieve Metrics from AWS CloudWatch?

Python-Cloudwatch

You can use the script below to check whether you are receiving the metrics correctly.


import boto3
from datetime import datetime, timedelta
import time
import pytz

cloudwatch = boto3.client('cloudwatch')

def get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time, end_time):
    params = {
        'StartTime': start_time,
        'EndTime': end_time,
        'MetricDataQueries': [
            {
                'Id': 'm1',
                'MetricStat': {
                    'Metric': {
                        'Namespace': namespace,
                        'MetricName': metric_name,
                        'Dimensions': dimensions
                    },
                    'Period': 30,
                    'Stat': 'Sum'
                }
            }
        ]
    }

    try:
        response = cloudwatch.get_metric_data(**params)
        return response['MetricDataResults']
    except Exception as e:
        print(f"Error fetching metrics: {e}")
        return []

def main():
    namespace = "CWAgent"
    metric_name = "<MetricsName>"
    dimensions = [{'Name': 'path', 'Value': 'pathName'}, {'Name':'metric_type', 'Value': 'metric_type_value'}]
    target_value = 1000

    collection_interval_seconds = 30

    # utc_now = datetime.now(pytz.UTC) - timedelta(seconds=300)

    while True:
        end_time = datetime.now(pytz.UTC) - timedelta(seconds=10)
        start_time = end_time - timedelta(seconds=collection_interval_seconds)

        start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S')
        end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%S')

        metrics_data = get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time_str, end_time_str)
        print("Currrent Time: ", datetime.now(pytz.UTC))
        print("Start Time: ", start_time_str)
        print("End Time: ", end_time_str)
        print(metrics_data)
        # if metrics_data:
        #     aggregated_value = sum(metrics_data[0]['Values']) if metrics_data[0]['Values'] else None

        #     if aggregated_value and aggregated_value >= target_value:
        #         print(f"Target metric value met or exceeded: {aggregated_value}")
        #     else:
        #         print(f"Current aggregated value: {aggregated_value}, below target.")
        # else:
        #     print("No metrics data received.")

        print(f"Sleeping for 10 seconds...")
        time.sleep(10)
        print()
        print()


if __name__ == "__main__":
    main()


Enter fullscreen mode Exit fullscreen mode

Requirements.txt:

boto3==1.34.117
botocore==1.34.117
jmespath==1.0.1
python-dateutil==2.9.0.post0
pytz==2024.1
s3transfer==0.10.1
six==1.16.0
urllib3==2.2.1

Enter fullscreen mode Exit fullscreen mode

I have already explained each parameter and its function. For more information, you can refer to this blog:

In the above Python script, some parts are commented out. It is entirely up to you how you want to use this script. Since I wanted to check only the metrics and how often it prints the values and at what time, I was simply printing the time, Also you can modify the metrics_collection_time and period based on your needs.

Alternatively, i also mentioned the go code which could you help you if your primary code in Golang:

Golang-Cloudwatch

package main

import (
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/cloudwatch"
)

func main() {
    namespace := "CWAgent"
    metricName := "ingestion/commandservice/totalRequest_count"
    dimensions := []*cloudwatch.Dimension{
        {Name: aws.String("path"), Value: aws.String("pathName")},
        {Name: aws.String("metric_type"), Value: aws.String("metricsTypeValue")},
    }
    targetValue := float64(100)
    collectionIntervalSeconds := int64(30)

    for {
        endTime := time.Now().UTC()
        startTime := endTime.Add(-time.Duration(collectionIntervalSeconds) * time.Second)
        // var endTime, startTime time.Time
        // layout := "2006-01-02T15:04:05Z"
        // endTime, err := time.Parse(layout, "2024-06-05T05:52:33Z")
        // startTime, err = time.Parse(layout, "2024-06-05T05:51:33Z")
        // if err != nil {
        //  fmt.Println("Error parsing time:", err)
        //  return
        // }
        params := &cloudwatch.GetMetricDataInput{
            EndTime:   &endTime,
            StartTime: &startTime,
            MetricDataQueries: []*cloudwatch.MetricDataQuery{
                {
                    Id: aws.String("m1"),
                    MetricStat: &cloudwatch.MetricStat{
                        Metric: &cloudwatch.Metric{
                            Namespace:  aws.String(namespace),
                            MetricName: aws.String(metricName),
                            Dimensions: dimensions,
                        },
                        Period: aws.Int64(10),
                        Stat:   aws.String("Sum"),
                    },
                },
            },
        }

        sess, err := session.NewSession(&aws.Config{
            Region: aws.String("us-east-1"),
        })
        if err != nil {
            fmt.Println("Error creating session:", err)
            return
        }

        svc := cloudwatch.New(sess)
        result, err := svc.GetMetricData(params)
        if err != nil {
            fmt.Println("Error getting metric data:", err)
            continue
        }

        var aggregatedValue float64
        if len(result.MetricDataResults) > 0 && result.MetricDataResults[0].Values != nil {
            aggregatedValue = sumFloat64Slice(result.MetricDataResults[0].Values)
        }
        fmt.Printf("Start Time: %s\n", startTime.Format(time.RFC3339))
        fmt.Printf("End Time: %s\n", endTime.Format(time.RFC3339))
        // for _, value := range result.MetricDataResults[0].Values {
        //  fmt.Println(*value)
        // }
        if aggregatedValue >= targetValue {
            fmt.Printf("Target metric value met or exceeded: %.2f\n", aggregatedValue)
        } else {
            fmt.Printf("Current aggregated value: %.2f, below target.\n", aggregatedValue)
        }

        time.Sleep(10 * time.Second)
    }
}

func sumFloat64Slice(slice []*float64) float64 {
    sum := float64(0)
    for _, v := range slice {
        sum += *v
    }
    return sum
}

Enter fullscreen mode Exit fullscreen mode

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

Cloudinary image

Zoom pan, gen fill, restore, overlay, upscale, crop, resize...

Chain advanced transformations through a set of image and video APIs while optimizing assets by 90%.

Explore

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay