60 Days DevOps Challenge: Day 7

AWS Cloud Automation with Python & Boto3
Initial Tasks:
โ Task 1: Install boto3 (pip install boto3) and configure AWS credentials (aws configure).
sudo apt update
sudo apt upgrade -y
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo apt install unzip
unzip awscliv2.zip
sudo ./aws/install
aws --version
aws configure
Challenge 1: Write a Python script that provisions an EC2 instance, a new security group, and a key pair. The same script should connect to ec2 after creation to check that everything is working fine. (The key pair should be generated via the Python script and used for the EC2 SSH connection.)
# Step 1: Install Required Dependencies
# pip install boto3 paramiko
# I am using my_profile and not using the default profile you can see your profile in by running command cat ~/.aws/credentials
import boto3
import paramiko
import time
import os
os.environ['AWS_PROFILE'] = "my-profile" # Replace with your profile name
os.environ['AWS_DEFAULT_REGION'] = "eu-west-1"
# AWS Configuration
AWS_REGION = "eu-west-1"
INSTANCE_TYPE = "t2.micro"
AMI_ID = "ami-0bc" # Replace with a valid AMI ID
SECURITY_GROUP_NAME = "my-ec2-security-group"
VPC_ID = "vpc-0fb" # Replace with your VPC ID
SUBNET_ID = "subnet-000" # Replace with your subnet ID
KEY_PAIR_NAME = "my-ec2-keypair"
SSH_USERNAME = "ubuntu" # Change based on AMI
# Initialize AWS Clients
ec2_client = boto3.client("ec2", region_name=AWS_REGION)
ec2_resource = boto3.resource("ec2", region_name=AWS_REGION)
def create_key_pair():
print("๐ Creating key pair...")
key_pair = ec2_client.create_key_pair(KeyName=KEY_PAIR_NAME)
with open(f"{KEY_PAIR_NAME}.pem", "w") as file:
file.write(key_pair["KeyMaterial"])
print(f"โ
Key pair '{KEY_PAIR_NAME}' created and saved as '{KEY_PAIR_NAME}.pem'")
return f"{KEY_PAIR_NAME}.pem"
def create_security_group():
print("๐ Creating security group...")
response = ec2_client.create_security_group(
GroupName=SECURITY_GROUP_NAME,
VpcId=VPC_ID,
Description="Security group for EC2 SSH access"
)
security_group_id = response["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=security_group_id,
IpPermissions=[{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}]
}]
)
print(f"โ
Security group '{SECURITY_GROUP_NAME}' created with ID {security_group_id}")
return security_group_id
def launch_ec2_instance(security_group_id, key_name):
print("๐ Launching EC2 instance...")
instance = ec2_resource.create_instances(
ImageId=AMI_ID,
InstanceType=INSTANCE_TYPE,
MinCount=1,
MaxCount=1,
KeyName=key_name,
SubnetId=SUBNET_ID,
SecurityGroupIds=[security_group_id],
TagSpecifications=[{
"ResourceType": "instance",
"Tags": [{"Key": "Name", "Value": "MyEC2Instance"}]
}]
)[0]
instance.wait_until_running()
instance.reload()
print(f"โ
EC2 instance launched with ID: {instance.id}")
print(f"๐ Public IP: {instance.public_ip_address}")
return instance.id, instance.public_ip_address
def check_ec2_via_ssh(instance_ip, key_file):
print("๐ Connecting to EC2 instance via SSH...")
private_key = paramiko.RSAKey(filename=key_file)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh_client.connect(hostname=instance_ip, username=SSH_USERNAME, pkey=private_key, timeout=30)
stdin, stdout, stderr = ssh_client.exec_command("uptime")
print(f"โ
SSH Successful! Uptime: {stdout.read().decode().strip()}")
except Exception as e:
print(f"โ SSH Connection Failed: {e}")
finally:
ssh_client.close()
if __name__ == "__main__":
key_file = create_key_pair()
security_group_id = create_security_group()
instance_id, instance_ip = launch_ec2_instance(security_group_id, KEY_PAIR_NAME)
print("โณ Waiting for EC2 instance to initialize...")
time.sleep(60)
check_ec2_via_ssh(instance_ip, key_file)
โ Challenge 2: Automate S3 lifecycle policies using boto3
# Step 1: Create the Python Script (s3_lifecycle.py)
import boto3
import os
os.environ['AWS_PROFILE'] = "my-profile" # Replace with your profile name
os.environ['AWS_DEFAULT_REGION'] = "eu-west-1"
AWS_REGION = "eu-west-1"
BUCKET_NAME = "mys3bucketfortestingpurpose5818"
LIFECYCLE_RULE_NAME = "MoveToGlacier123"
s3_client = boto3.client("s3", region_name=AWS_REGION)
def apply_s3_lifecycle_policy():
lifecycle_policy = {
"Rules": [
{
"ID": LIFECYCLE_RULE_NAME,
"Prefix": "",
"Status": "Enabled",
"Transitions": [
{
"Days": 30,
"StorageClass": "GLACIER"
}
],
"NoncurrentVersionTransitions": [
{
"NoncurrentDays": 30,
"StorageClass": "GLACIER"
}
]
}
]
}
try:
response = s3_client.put_bucket_lifecycle_configuration(
Bucket=BUCKET_NAME,
LifecycleConfiguration=lifecycle_policy
)
print(f"โ
S3 Lifecycle policy applied successfully to '{BUCKET_NAME}'")
except Exception as e:
print(f"โ Failed to apply lifecycle policy: {e}")
apply_s3_lifecycle_policy()
โ Challenge 3: Create a script that starts or stops all EC2 instances in a specific AWS region
# Step 1: Create the Python Script (manage_ec2.py)
import boto3
AWS_REGION = "us-east-1"
ec2_client = boto3.client("ec2", region_name=AWS_REGION)
def get_all_instances():
response = ec2_client.describe_instances()
instances = [
instance["InstanceId"]
for reservation in response["Reservations"]
for instance in reservation["Instances"]
]
return instances
def manage_instances(action):
instances = get_all_instances()
if not instances:
print("โ ๏ธ No EC2 instances found.")
return
if action == "start":
ec2_client.start_instances(InstanceIds=instances)
print(f"โ
Started instances: {instances}")
elif action == "stop":
ec2_client.stop_instances(InstanceIds=instances)
print(f"๐ Stopped instances: {instances}")
else:
print("โ Invalid action! Use 'start' or 'stop'.")
if __name__ == "__main__":
action = input("Enter action (start/stop): ").strip().lower()
manage_instances(action)
โ Challenge 4: Write a Python program that checks for unused IAM users and disables them.
# Step 1: Create the Python Script (disable_unused_iam_users.py)
import boto3
from datetime import datetime, timedelta
AWS_REGION = "us-east-1"
DAYS_INACTIVE = 90
iam_client = boto3.client("iam", region_name=AWS_REGION)
def get_unused_iam_users():
users = iam_client.list_users()["Users"]
unused_users = []
for user in users:
user_name = user["UserName"]
try:
user_info = iam_client.get_user(UserName=user_name)
last_login = user_info["User"].get("PasswordLastUsed", None)
if not last_login:
unused_users.append(user_name)
continue
last_login_date = last_login.replace(tzinfo=None)
if last_login_date < datetime.utcnow() - timedelta(days=DAYS_INACTIVE):
unused_users.append(user_name)
except Exception as e:
print(f"โ ๏ธ Error retrieving info for {user_name}: {e}")
return unused_users
def disable_users(users):
if not users:
print("โ
No unused IAM users found.")
return
for user in users:
try:
iam_client.update_login_profile(UserName=user, PasswordResetRequired=True)
print(f"๐ Disabled IAM user: {user}")
except Exception as e:
print(f"โ ๏ธ Could not disable {user}: {e}")
if __name__ == "__main__":
print(f"๐ Checking for IAM users inactive for {DAYS_INACTIVE} days...")
unused_users = get_unused_iam_users()
if unused_users:
print(f"โ ๏ธ Found unused users: {unused_users}")
disable_users(unused_users)
else:
print("โ
No inactive users found.")
โ Challenge 5: Implement a log monitoring system that scans EC2 instances' /var/log for error messages and sends alerts via email (AWS SES) and Slack.
# Step 1: Install Required Packages
# pip install boto3 requests paramiko
import boto3
import paramiko
import requests
import time
AWS_REGION = "us-east-1"
EC2_TAG = "log-monitor"
SES_SENDER = "alerts@example.com"
SES_RECIPIENT = "admin@example.com"
SUBJECT = "๐จ EC2 Log Monitoring Alert"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/your/slack/webhook"
ec2_client = boto3.client("ec2", region_name=AWS_REGION)
ses_client = boto3.client("ses", region_name=AWS_REGION)
def get_ec2_instances():
response = ec2_client.describe_instances(
Filters=[
{"Name": "tag:Name", "Values": [EC2_TAG]},
{"Name": "instance-state-name", "Values": ["running"]}
]
)
instances = [
(instance["InstanceId"], instance["PublicIpAddress"])
for reservation in response["Reservations"]
for instance in reservation["Instances"]
if "PublicIpAddress" in instance
]
return instances
def check_logs(instance_ip, key_file, username="ubuntu"):
errors = []
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
private_key = paramiko.RSAKey(filename=key_file)
ssh.connect(hostname=instance_ip, username=username, pkey=private_key)
cmd = "grep -i 'error' /var/log/*.log | tail -n 10"
stdin, stdout, stderr = ssh.exec_command(cmd)
output = stdout.read().decode().strip()
if output:
errors.append((instance_ip, output))
ssh.close()
except Exception as e:
print(f"โ Failed to connect to {instance_ip}: {e}")
return errors
def send_email(subject, body):
try:
response = ses_client.send_email(
Source=SES_SENDER,
Destination={"ToAddresses": [SES_RECIPIENT]},
Message={
"Subject": {"Data": subject},
"Body": {"Text": {"Data": body}}
}
)
print("โ
Email alert sent!")
except Exception as e:
print(f"โ Failed to send email: {e}")
def send_slack_alert(message):
payload = {"text": message}
try:
requests.post(SLACK_WEBHOOK_URL, json=payload)
print("โ
Slack alert sent!")
except Exception as e:
print(f"โ Failed to send Slack message: {e}")
def monitor_ec2_logs():
print("๐ Checking EC2 logs for errors...")
instances = get_ec2_instances()
if not instances:
print("โ ๏ธ No instances found with the specified tag.")
return
key_file = "your-key.pem"
all_errors = []
for instance_id, instance_ip in instances:
errors = check_logs(instance_ip, key_file)
if errors:
all_errors.extend(errors)
if all_errors:
message_body = "\n\n".join([f"๐ด {ip}\n{log}" for ip, log in all_errors])
print(f"๐จ Errors Found:\n{message_body}")
send_email(SUBJECT, message_body)
send_slack_alert(message_body)
else:
print("โ
No errors found in logs.")
if __name__ == "__main__":
monitor_ec2_logs()
โ Challenge 6: Automate DNS record updates in AWS Route 53 using Python.
# Step 1: Create the Python Script (update_route53.py)
import boto3
import requests
AWS_REGION = "us-east-1"
HOSTED_ZONE_ID = "ZXXXXXXXXXXXXX"
DOMAIN_NAME = "example.com"
RECORD_TYPE = "A"
TTL = 300
route53_client = boto3.client("route53", region_name=AWS_REGION)
def update_dns_record(ip_address):
print(f"๐ Updating DNS record {DOMAIN_NAME} โ {ip_address}")
change_batch = {
"Changes": [
{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": DOMAIN_NAME,
"Type": RECORD_TYPE,
"TTL": TTL,
"ResourceRecords": [{"Value": ip_address}]
}
}
]
}
try:
response = route53_client.change_resource_record_sets(
HostedZoneId=HOSTED_ZONE_ID,
ChangeBatch=change_batch
)
print(f"โ
DNS record updated! Change ID: {response['ChangeInfo']['Id']}")
except Exception as e:
print(f"โ Failed to update DNS record: {e}")
if __name__ == "__main__":
ip = requests.get("https://api64.ipify.org?format=text").text
update_dns_record(ip)
โ Challenge 7: Write a script that triggers an AWS Lambda function using boto3.
# Step 1: Create the Python Script (trigger_lambda.py)
import boto3
import json
AWS_REGION = "us-east-1"
LAMBDA_FUNCTION_NAME = "my_lambda_function"
lambda_client = boto3.client("lambda", region_name=AWS_REGION)
def invoke_lambda(payload={}):
print(f"๐ Triggering Lambda function: {LAMBDA_FUNCTION_NAME}")
try:
response = lambda_client.invoke(
FunctionName=LAMBDA_FUNCTION_NAME,
InvocationType="RequestResponse",
Payload=json.dumps(payload)
)
response_payload = json.loads(response["Payload"].read().decode())
print(f"โ
Lambda response: {response_payload}")
except Exception as e:
print(f"โ Failed to invoke Lambda: {e}")
if __name__ == "__main__":
payload = {"message": "Hello from Python!"}
invoke_lambda(payload)
โ Challenge 8: Use boto3 to fetch AWS billing data, and generate a cost analysis report in PDF format
# Step 1: Install packages
# pip install boto3 reportlab
import boto3
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from datetime import datetime, timedelta
AWS_REGION = "us-east-1"
COST_EXPLORER_CLIENT = boto3.client("ce", region_name=AWS_REGION)
END_DATE = datetime.utcnow().date()
START_DATE = END_DATE - timedelta(days=7)
def get_billing_data():
print("๐ Fetching AWS cost data...")
try:
response = COST_EXPLORER_CLIENT.get_cost_and_usage(
TimePeriod={"Start": START_DATE.strftime("%Y-%m-%d"), "End": END_DATE.strftime("%Y-%m-%d")},
Granularity="DAILY",
Metrics=["UnblendedCost"]
)
return response["ResultsByTime"]
except Exception as e:
print(f"โ Failed to fetch billing data: {e}")
return []
def generate_pdf_report(cost_data):
report_filename = f"AWS_Cost_Report_{END_DATE}.pdf"
print(f"๐ Generating PDF report: {report_filename}")
c = canvas.Canvas(report_filename, pagesize=letter)
c.setFont("Helvetica-Bold", 16)
c.drawString(200, 750, "AWS Billing Report")
c.setFont("Helvetica", 12)
c.drawString(200, 730, f"Time Period: {START_DATE} - {END_DATE}")
y_position = 700
total_cost = 0
for entry in cost_data:
date = entry["TimePeriod"]["Start"]
cost = float(entry["Total"]["UnblendedCost"]["Amount"])
total_cost += cost
c.drawString(100, y_position, f"{date}: ${cost:.2f}")
y_position -= 20
c.setFont("Helvetica-Bold", 14)
c.drawString(100, y_position - 20, f"Total AWS Cost: ${total_cost:.2f}")
c.save()
print(f"โ
PDF report saved as {report_filename}")
return report_filename
Bonus ๐ก Send the PDF via Email (AWS SES):
import boto3
import os
import smtplib
from datetime import datetime, timedelta
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
AWS_REGION = "us-east-1"
COST_EXPLORER_CLIENT = boto3.client("ce", region_name=AWS_REGION)
SES_CLIENT = boto3.client("ses", region_name=AWS_REGION)
END_DATE = datetime.utcnow().date()
START_DATE = END_DATE - timedelta(days=7)
# get_billing_data and generate_pdf_report same as above
def send_email_with_pdf(recipient_email, pdf_filename):
sender_email = "alerts@example.com"
subject = "AWS Cost Report"
with open(pdf_filename, "rb") as attachment:
pdf_part = MIMEBase("application", "octet-stream")
pdf_part.set_payload(attachment.read())
encoders.encode_base64(pdf_part)
pdf_part.add_header("Content-Disposition", f'attachment; filename="{pdf_filename}")
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient_email
msg["Subject"] = subject
msg.attach(MIMEText("Please find the attached AWS billing report.", "plain"))
msg.attach(pdf_part)
try:
response = SES_CLIENT.send_raw_email(
Source=sender_email,
Destinations=[recipient_email],
RawMessage={"Data": msg.as_string()},
)
print(f"๐ง Report sent to {recipient_email}, Message ID: {response['MessageId']}")
except Exception as e:
print(f"โ Failed to send email: {e}")
if __name__ == "__main__":
cost_data = get_billing_data()
if cost_data:
report_file = generate_pdf_report(cost_data)
send_email_with_pdf("admin@example.com", report_file)
In conclusion, the 60 Days DevOps Challenge on Day 7 provided a comprehensive exploration of AWS cloud automation using Python and Boto3. The tasks and challenges covered a wide range of essential DevOps skills, from provisioning EC2 instances and managing S3 lifecycle policies to automating DNS updates and monitoring logs. These exercises not only enhanced practical knowledge of AWS services but also demonstrated the power of automation in streamlining cloud operations. By completing these challenges, participants gained valuable experience in leveraging Python scripts to efficiently manage and optimize AWS resources, setting a strong foundation for further advancements in DevOps practices.
Subscribe to my newsletter
Read articles from Musaveer Holalkere directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
