List AWS EBS volumes lying unattached from N number of days and send the list to Flock using Python.

I always believed in contributing more to the Cost Optimization and Cost Saving approaches. And a few of the techniques are writing some basic housekeeping automation.

Photo Credits : unsplash.com

I searched for basic automation scripts on Google to save my efforts and time but was surprised to see that people have written the scripts to just list and delete the EBS volumes but no one yet covered one of the major edge cases.

And that Edge case is:

Deleting the EBS volumes which have not been ATTACHED to any instance or node from the past N number of days instead of deleting the unattached EBS volumes.


How did I get that particular case?

These days we are working most of the time on Kubernetes and in case a pod of any statefulsets goes down, it detaches the EBS volume from the node and when the new pod comes up on the new node, it attaches the EBS volume(PV) again.

What if that script ran at the same time while one of the pod from statefulset was down and EBS volume was in detached state?


What will happen?

Ans: It will also delete that disk because we just filtered out the disks based on whether it’s attached to an instance or not. If not, delete that disk.

To handle the above edge case, we have to list out the EBS volumes which have not been ATTACHED to any instance from the last N number of days. But how?

So the answer lies in the AWS services themselves. We can use AWS CloudTrail to get the events that happened on our Infra and based on the EBS volumes which are lying unattached, we can get the list of volumes and can check when they were last attached to any of the Instance.

Prerequisites:

  • Enable CloudTrail Events on your AWS account.

So here’s the repo where you can find the script and deploy it as a cronjob on Kubernetes to get the list of unattached disks every day.

Repo contains the following files:

  1. Dockerfile
  2. cronJob.yaml
  3. requirements.txt
  4. x_days_list_unattached_disks.py

Dockerfile

FROM python:3.9-slim-buster
WORKDIR /usr/src/app
COPY . .
RUN python3 -m pip install -r requirements.txt
CMD ["x_days_list_unattached_disks.py"]
ENTRYPOINT ["python3"]

cronJob.yaml

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: list-unattached-disks
  namespace: default
  labels:
    app: list-unattached-disks
spec:
  schedule: "30 5 * * *"
  jobTemplate:
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"		# If you are using istio
        spec:
          securityContext:
            runAsGroup: 2001
            runAsNonRoot: true
            runAsUser: 1001
          containers:
            - name: list-unattached-disks
              image: "image_name" 			# Change image name here after building the image and pushing it to your private registry.
              imagePullPolicy: IfNotPresent
              env:
              - name: AWS_ACCESS_KEY_ID
                value: "xxx"	     # Change values here
              - name: AWS_SECRET_ACCESS_KEY
                value: "xxx"         # Change values here
              - name: AWS_DEFAULT_REGION
                value: "us-east-2"  # Change values here
          restartPolicy: OnFailure
➡️ IMP: Make sure to change the variables "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION" and "image" name after building the docker image.

requirements.txt

boto3
python-dotenv
requests
datetime
awscli

x_days_list_unattached_disks.py

import boto3
import dotenv
import os
import requests
import json
from datetime import datetime, timedelta

# load the environment variables
dotenv.load_dotenv()

# create boto3 client for ec2
client = boto3.client('ec2',
                      region_name=os.getenv('AWS_REGION'),
                      aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
                      aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'))

# create boto3 client for cloudtrail
ct_client = boto3.client('cloudtrail')

# create a list where the volume ids of unused volumes will be stored
volumes_to_list = list()

## create a list where the volume ids of unused volumes will be stored
detached_volumes_to_list = list()

# Define number of days here.
x_days_threshold = 30

# Flock token
flock_token = "<enter flock token here>"

# call describe_volumes() method of client to get the details of all ebs volumes in given region
# if you have large number of volumes then get the volume detail in batch by using nextToken and process accordingly
volume_detail = client.describe_volumes()

## start and end date vars for aws cloudtrail
date_diff = datetime.now() - timedelta(days=90)
start_date = datetime(date_diff.year, date_diff.month, date_diff.day)
date_today = datetime.now()
end_date = datetime(date_today.year, date_today.month, date_today.day)


## Function to calculate and get age of disk.
def check_if_created_object_date_less_than_x_days(x_days):
    x_day = x_days.replace(tzinfo=None)
    now = datetime.now()
    differ = now - x_day
    diff_in_days = str(differ).split()
    return int(diff_in_days[0])


## Function to get string and sending to flock channel.
def get_and_send_message_to_flock_channel(msg):
    url = flock_token
    message = msg
    flock_data = {
        "flockml": message
    }

    headers = {'Content-Type': "application/json"}
    response = requests.post(url, data=json.dumps(flock_data), headers=headers)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)


## Evaluating if disk is not attached and age of the disk is > x_days
if volume_detail['ResponseMetadata']['HTTPStatusCode'] == 200:
    for each_volume in volume_detail['Volumes']:
        if len(each_volume['Attachments']) == 0 and each_volume['State'] == 'available' and check_if_created_object_date_less_than_x_days(each_volume['CreateTime']) > x_days_threshold:
            volumes_to_list.append(each_volume['VolumeId'])
            response = ct_client.lookup_events(
                LookupAttributes=[
                    {
                        'AttributeKey': 'ResourceName',
                        'AttributeValue': each_volume['VolumeId']
                    },
                ],
                StartTime=start_date,
                EndTime=end_date,
                MaxResults=1,
            )

            events_details = response['Events']

            if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                for event in events_details:
                    if event['EventName'] == "DetachVolume" and check_if_created_object_date_less_than_x_days(event['EventTime']) > x_days_threshold:
                        detached_volumes_to_list.append(each_volume['VolumeId'])


flock_output = ("<flockml><b>ENV: STAGING</b></flockml>" +
             "<br/><br/> <flockml><b>AWS DiskUtilization ALERT!!!</b></flockml>" +
             "<br/><br/> <flockml><b>Total number of unattached disks found: </b></flockml>" + str(len(volumes_to_list)) +
             "<br/><br/> <flockml><b>Total number of disks that didn\'t get attached in last </b></flockml>" + f"<flockml><b> {str(x_days_threshold)} </b></flockml>" + "<flockml><b> days : </b></flockml>" + str(len(detached_volumes_to_list)) +
             "<br/><br/> <flockml><b>Volume id of disks that didn\'t get attached in last </b></flockml><br/>" + f"<flockml><b> {str(x_days_threshold)} </b></flockml>" + "<flockml><b> days : </b></flockml>" + str(detached_volumes_to_list))


if detached_volumes_to_list == []:
    print("No disks found that didn\'t get attach in last " + x_days_threshold + " days")
else:
    get_and_send_message_to_flock_channel(flock_output)
➡️ IMP: Make sure to change the variables "x_days_threshold" and "flock_token" in the above script.

Prefer cloning the repo from here to avoid mistakes while copying the script.

Let's divide the script into smaller parts to understand the code:

Part1: Libraries:

import boto3
import dotenv
import os
import requests
import json
from datetime import datetime, timedelta
Libraries

Part2: Variables:

# Define number of days here.
x_days_threshold = 30

# Flock token
flock_token = "<enter flock token here>"

## start and end date vars for aws cloudtrail
date_diff = datetime.now() - timedelta(days=90)

start_date = datetime(date_diff.year, date_diff.month, date_diff.day)

date_today = datetime.now()

end_date = datetime(date_today.year, date_today.month, date_today.day)
Variables

Part3: Create boto3 clients and lists:

# create boto3 client for ec2
client = boto3.client('ec2',
                      region_name=os.getenv('AWS_REGION'),
                      aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
                      aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'))

# create boto3 client for cloudtrail
ct_client = boto3.client('cloudtrail')

# create a list where the volume ids of unused volumes will be stored
volumes_to_list = list()

## create a list where the volume ids of unused volumes will be stored
detached_volumes_to_list = list()

# call describe_volumes() method of client to get the details of all ebs volumes in given region
# if you have large number of volumes then get the volume detail in batch by using nextToken and process accordingly
volume_detail = client.describe_volumes()
Boto3 clients and lists

Part4: Function for calculating and getting the age of the volumes.

def get_and_send_message_to_flock_channel(msg):
    url = flock_token
    message = msg
    flock_data = {
        "flockml": message
    }

    headers = {'Content-Type': "application/json"}
    response = requests.post(url, data=json.dumps(flock_data), headers=headers)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
Function calculating and getting the age of the volumes.

Part5: Evaluating if the disk is not attached and the age of the disk is > x_days

if volume_detail['ResponseMetadata']['HTTPStatusCode'] == 200:
    for each_volume in volume_detail['Volumes']:
        if len(each_volume['Attachments']) == 0 and each_volume['State'] == 'available' and check_if_created_object_date_less_than_x_days(each_volume['CreateTime']) > x_days_threshold:
            volumes_to_list.append(each_volume['VolumeId'])
            response = ct_client.lookup_events(
                LookupAttributes=[
                    {
                        'AttributeKey': 'ResourceName',
                        'AttributeValue': each_volume['VolumeId']
                    },
                ],
                StartTime=start_date,
                EndTime=end_date,
                MaxResults=1,
            )

            events_details = response['Events']

            if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                for event in events_details:
                    if event['EventName'] == "DetachVolume" and check_if_created_object_date_less_than_x_days(event['EventTime']) > x_days_threshold:
                        detached_volumes_to_list.append(each_volume['VolumeId'])
Evaluating if the disk is not attached and the age of the disk is > x_days

Part6: Template to send a list of EBS volumes on Flock

flock_output = ("<flockml><b>ENV: STAGING</b></flockml>" +
             "<br/><br/> <flockml><b>AWS DiskUtilization ALERT!!!</b></flockml>" +
             "<br/><br/> <flockml><b>Total number of unattached disks found: </b></flockml>" + str(len(volumes_to_list)) +
             "<br/><br/> <flockml><b>Total number of disks that didn\'t get attached in last </b></flockml>" + f"<flockml><b> {str(x_days_threshold)} </b></flockml>" + "<flockml><b> days : </b></flockml>" + str(len(detached_volumes_to_list)) +
             "<br/><br/> <flockml><b>Volume id of disks that didn\'t get attached in last </b></flockml><br/>" + f"<flockml><b> {str(x_days_threshold)} </b></flockml>" + "<flockml><b> days : </b></flockml>" + str(detached_volumes_to_list))
Template to send a list of EBS volumes on Flock

Part7: Condition to send flock alert if unattached EBS Volumes found

if detached_volumes_to_list == []:

    print("No disks found that didn\'t get attach in last " + x_days_threshold + " days")
    
else:

    get_and_send_message_to_flock_channel(flock_output)
Send flock alert if unattached EBS Volumes found

How to run the script in your Kubernetes Cluster?

  • Change the variables "x_days_threshold" and "flock_token" in the x_days_list_unattached_disks.py script.

  • Build the image using the following command:
    docker build -t <name_of_the_image> -f Dockerfile .

  • One build, push that docker to your private registry:
    docker push <name_of_the_image>

  • Add the respective values to the variables "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION", schedule and image after building it in the cronJob.yaml.

  • Apply the yaml to your cluster using the following command:
    kubectl apply -f cronJob.yaml

Limitations:

  • We can only get the list of unattached EBS volumes for the last 90 Days only because there's a limitation of 90 days of events storage on CloudTrail Events.
  • Also, if CloudTrail is not enabled, this automation will not work. Please make sure to enable it before deploying.

Few more points:

  • You can also replace the flock with slack by getting the reference from this script.