DEV Community

CodeWithVed
CodeWithVed

Posted on

How to Write a Python/Golang Script to Retrieve Metrics from AWS CloudWatch?

Python-Cloudwatch

You can use the script below to check whether you are receiving the metrics correctly.


import boto3
from datetime import datetime, timedelta
import time
import pytz

cloudwatch = boto3.client('cloudwatch')

def get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time, end_time):
    params = {
        'StartTime': start_time,
        'EndTime': end_time,
        'MetricDataQueries': [
            {
                'Id': 'm1',
                'MetricStat': {
                    'Metric': {
                        'Namespace': namespace,
                        'MetricName': metric_name,
                        'Dimensions': dimensions
                    },
                    'Period': 30,
                    'Stat': 'Sum'
                }
            }
        ]
    }

    try:
        response = cloudwatch.get_metric_data(**params)
        return response['MetricDataResults']
    except Exception as e:
        print(f"Error fetching metrics: {e}")
        return []

def main():
    namespace = "CWAgent"
    metric_name = "<MetricsName>"
    dimensions = [{'Name': 'path', 'Value': 'pathName'}, {'Name':'metric_type', 'Value': 'metric_type_value'}]
    target_value = 1000

    collection_interval_seconds = 30

    # utc_now = datetime.now(pytz.UTC) - timedelta(seconds=300)

    while True:
        end_time = datetime.now(pytz.UTC) - timedelta(seconds=10)
        start_time = end_time - timedelta(seconds=collection_interval_seconds)

        start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S')
        end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%S')

        metrics_data = get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time_str, end_time_str)
        print("Currrent Time: ", datetime.now(pytz.UTC))
        print("Start Time: ", start_time_str)
        print("End Time: ", end_time_str)
        print(metrics_data)
        # if metrics_data:
        #     aggregated_value = sum(metrics_data[0]['Values']) if metrics_data[0]['Values'] else None

        #     if aggregated_value and aggregated_value >= target_value:
        #         print(f"Target metric value met or exceeded: {aggregated_value}")
        #     else:
        #         print(f"Current aggregated value: {aggregated_value}, below target.")
        # else:
        #     print("No metrics data received.")

        print(f"Sleeping for 10 seconds...")
        time.sleep(10)
        print()
        print()


if __name__ == "__main__":
    main()


Enter fullscreen mode Exit fullscreen mode

Requirements.txt:

boto3==1.34.117
botocore==1.34.117
jmespath==1.0.1
python-dateutil==2.9.0.post0
pytz==2024.1
s3transfer==0.10.1
six==1.16.0
urllib3==2.2.1

Enter fullscreen mode Exit fullscreen mode

I have already explained each parameter and its function. For more information, you can refer to this blog:

In the above Python script, some parts are commented out. It is entirely up to you how you want to use this script. Since I wanted to check only the metrics and how often it prints the values and at what time, I was simply printing the time, Also you can modify the metrics_collection_time and period based on your needs.

Alternatively, i also mentioned the go code which could you help you if your primary code in Golang:

Golang-Cloudwatch

package main

import (
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/cloudwatch"
)

func main() {
    namespace := "CWAgent"
    metricName := "ingestion/commandservice/totalRequest_count"
    dimensions := []*cloudwatch.Dimension{
        {Name: aws.String("path"), Value: aws.String("pathName")},
        {Name: aws.String("metric_type"), Value: aws.String("metricsTypeValue")},
    }
    targetValue := float64(100)
    collectionIntervalSeconds := int64(30)

    for {
        endTime := time.Now().UTC()
        startTime := endTime.Add(-time.Duration(collectionIntervalSeconds) * time.Second)
        // var endTime, startTime time.Time
        // layout := "2006-01-02T15:04:05Z"
        // endTime, err := time.Parse(layout, "2024-06-05T05:52:33Z")
        // startTime, err = time.Parse(layout, "2024-06-05T05:51:33Z")
        // if err != nil {
        //  fmt.Println("Error parsing time:", err)
        //  return
        // }
        params := &cloudwatch.GetMetricDataInput{
            EndTime:   &endTime,
            StartTime: &startTime,
            MetricDataQueries: []*cloudwatch.MetricDataQuery{
                {
                    Id: aws.String("m1"),
                    MetricStat: &cloudwatch.MetricStat{
                        Metric: &cloudwatch.Metric{
                            Namespace:  aws.String(namespace),
                            MetricName: aws.String(metricName),
                            Dimensions: dimensions,
                        },
                        Period: aws.Int64(10),
                        Stat:   aws.String("Sum"),
                    },
                },
            },
        }

        sess, err := session.NewSession(&aws.Config{
            Region: aws.String("us-east-1"),
        })
        if err != nil {
            fmt.Println("Error creating session:", err)
            return
        }

        svc := cloudwatch.New(sess)
        result, err := svc.GetMetricData(params)
        if err != nil {
            fmt.Println("Error getting metric data:", err)
            continue
        }

        var aggregatedValue float64
        if len(result.MetricDataResults) > 0 && result.MetricDataResults[0].Values != nil {
            aggregatedValue = sumFloat64Slice(result.MetricDataResults[0].Values)
        }
        fmt.Printf("Start Time: %s\n", startTime.Format(time.RFC3339))
        fmt.Printf("End Time: %s\n", endTime.Format(time.RFC3339))
        // for _, value := range result.MetricDataResults[0].Values {
        //  fmt.Println(*value)
        // }
        if aggregatedValue >= targetValue {
            fmt.Printf("Target metric value met or exceeded: %.2f\n", aggregatedValue)
        } else {
            fmt.Printf("Current aggregated value: %.2f, below target.\n", aggregatedValue)
        }

        time.Sleep(10 * time.Second)
    }
}

func sumFloat64Slice(slice []*float64) float64 {
    sum := float64(0)
    for _, v := range slice {
        sum += *v
    }
    return sum
}

Enter fullscreen mode Exit fullscreen mode

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more