How To Automatically Quarantine Security Groups if ALL TRAFFIC mentioned and send Notification to your SNS topic
If one of your staff members (inadvertently | mischievously) modifies your VPC security group to allow SSH access to the world, you want the change to be automatically reverted and then receive a notification that the change to the security group was automatically reverted.
Automatically Revert and Receive Notifications About Changes to Your Amazon VPC Security Groups
Here is how the process works,
- Someone adds a new
ingress
rule to your security group - A CloudWatch event that continually monitors changes to your security groups detects the new ingress rule and invokes Lambda function
- Lambda function determines whether you are monitoring this security group
- Reverts the new security group ingress rule.
- Optionally: Sends you an SNS Notification email to let you know what the change was, who made it, and that the change was reverted
Pre-Requisites
We will need the following pre-requisites to successfully complete this activity,
AWS CloudTrail
must be enabled in the AWS Region where the solution is deployedVPC
with customSecurity Group
that we intend to monitor.- Note down the security group id, we will need it later to update the lambda function
- IAM Role — i.e
Lambda Service Role
- withEC2FullAccess
permissions - You may use an
Inline
a policy with more restrictive permissions
The image above shows the execution order, which should not be confused with the numbering of steps given here
Step 1 — Configure Lambda Function- SG-Sentry-Bot
The below script is written in Python 3.7
. Remember to choose the same in AWS Lambda Functions.
#===================================================================
# Author: Siddhanth
# Description: To remove all security group rules from a security group if ALL TRAFFIC mentioned
#===================================================================
import os, json, boto3ec2Client = None#===================================================================
def lambda_handler(event, context):
print(event)# Ensure that we have an event name to evaluate.
if 'detail' not in event or ('detail' in event and 'eventName' not in event['detail']):
return {"Result": "Failure", "Message": "Lambda not triggered by an event"}# Remove the rule only if the event was to authorize the ingress rule for the given
# security group id is one provided in the Environment Variables.
if event['detail']['eventName'] != 'CreateSecurityGroup':
return {"Result": "Failure", "Message": "Invalid event for security group! only create is supported"}
SNS_ARN = 'arn:aws:sns:us-east-324234r4'
global ec2Client
ec2Client = boto3.client('ec2')
result = revoke_security_group_ingress(event['detail'])if not result:
print("Failed for some reasons.. Aborting..")
return Nonemessage = "AUTO-MITIGATED: Ingress rule removed from security group: {} that was added by {}: {}".format(
result['group_id'],
result['user_name'],
json.dumps(result['ip_permissions'])
)
print(message)boto3.client('sns').publish( TargetArn = SNS_ARN, Message = message, Subject = "Auto-mitigation successful" )#===================================================================
def revoke_security_group_ingress(event_detail):
request_parameters = event_detail['requestParameters']
if not request_parameters:
print("Invalid event found.. Aborting..")
return None# Build the normalized IP permission JSON struture.
ip_permissions = update_security_group(request_parameters['groupId'])if not ip_permissions:
print("No invalid permissions found.. Aborting..")
return None
response = ec2Client.revoke_security_group_ingress(
GroupId=request_parameters['groupId'],
IpPermissions=ip_permissions
)
# Build the result
result = {}
result['group_id'] = request_parameters['groupId']
result['user_name'] = event_detail['userIdentity']['arn']
result['ip_permissions'] = ip_permissionsreturn result#===================================================================
def normalize_paramter_names(ip_items):
# Start building the permissions items list.
new_ip_items = []# First, build the basic parameter list.
for ip_item in ip_items:if "FromPort" in ip_item:
new_ip_item = {
"IpProtocol": ip_item['IpProtocol'],
"FromPort": ip_item['FromPort'],
"ToPort": ip_item['ToPort']
}
else:
# this is when all traffic is mentioned in from port
new_ip_item = {
"IpProtocol": ip_item['IpProtocol']
}# CidrIp or CidrIpv6 (IPv4 or IPv6)?
if 'Ipv6Ranges' in ip_item and ip_item['Ipv6Ranges']:
# This is an IPv6 permission range, so change the key names.
ipv_range_list_name = 'Ipv6Ranges'
ipv_address_value = 'CidrIpv6'
ipv_range_list_name_capitalized = 'Ipv6Ranges'
ipv_address_value_capitalized = 'CidrIpv6'
else:
ipv_range_list_name = 'IpRanges'
ipv_address_value = 'CidrIp'
ipv_range_list_name_capitalized = 'IpRanges'
ipv_address_value_capitalized = 'CidrIp'ip_ranges = []# Next, build the IP permission list.
for item in ip_item[ipv_range_list_name]:
ip_ranges.append(
{ipv_address_value_capitalized: item[ipv_address_value]}
)new_ip_item[ipv_range_list_name_capitalized] = ip_ranges
print(new_ip_item)
new_ip_items.append(new_ip_item)return new_ip_items
#===================================================================def update_security_group(securityGroupId):
if not securityGroupId:
print("Invalid security group provided %s" % securityGroupId)
return None
response = ec2Client.describe_security_groups(
Filters=[
dict(Name='group-id', Values=[securityGroupId])
]
)
if 'SecurityGroups' in response and len(response['SecurityGroups']) == 0:
print("No security group found for the security group id %s" % securityGroupId)
return None
group = response['SecurityGroups'][0]
ip_rules = group['IpPermissions']
if not ip_rules:
print("No IP rules found for the security group id %s" % securityGroupId)
return None
ip_permissions = normalize_paramter_names(ip_rules)
return ip_permissions
Step 2 — Configure Lambda Triggers
We are going to use Cloudwatch Events that will be triggered by CloudTrail API
- Choose
Create a new Rule
- Fill the
Rule Name
&Rule Description
- For
Rule Type
- ChooseEvent pattern
- Below that, Choose
EC2
Service - In the next field, Choose
AWS API call via CloudTrail
- Check the
Operation
box, - In the below field, Type/Choose CreateSecurityGroup
Enable
Trigger byChecking
the box- Click on
Add
andSave
the Lambda Function
Step 3 — Testing the solution
Navigate to the EC2 console
and choose Security Groups
and Choose the security group that we are monitoring. Add a new Inbound
rule, for example SSH
on port 22
from 0.0.0.0/0
.
Adding this rule creates an EC2 CreateSecurityGroup service event, which triggers the Lambda function.
After a few moments, choose the refresh button ( The “refresh” icon ) to see that the new ingress rule that you just created has been removed by the solution.
Hope this helps and this script is my customized version of the original script that was written by miztiik.