How To Automatically Quarantine Security Groups if ALL TRAFFIC mentioned and send Notification to your SNS topic

If one of your staff members (inadvertently | mischievously) modifies your VPC security group to allow SSH access to the world, you want the change to be automatically reverted and then receive a notification that the change to the security group was automatically reverted.

Automatically Revert and Receive Notifications About Changes to Your Amazon VPC Security Groups

Here is how the process works,

  1. Someone adds a new ingress rule to your security group
  2. A CloudWatch event that continually monitors changes to your security groups detects the new ingress rule and invokes Lambda function
  3. Lambda function determines whether you are monitoring this security group
  4. Reverts the new security group ingress rule.
  5. Optionally: Sends you an SNS Notification email to let you know what the change was, who made it, and that the change was reverted

Pre-Requisites

We will need the following pre-requisites to successfully complete this activity,

  • AWS CloudTrail must be enabled in the AWS Region where the solution is deployed
  • VPC with custom Security Group that we intend to monitor.
  • Note down the security group id, we will need it later to update the lambda function
  • IAM Role — i.e Lambda Service Role - with EC2FullAccess permissions
  • You may use an Inline a policy with more restrictive permissions

The image above shows the execution order, which should not be confused with the numbering of steps given here

Step 1 — Configure Lambda Function- SG-Sentry-Bot

The below script is written in Python 3.7. Remember to choose the same in AWS Lambda Functions.

#===================================================================
# Author: Siddhanth
# Description: To remove all security group rules from a security group if ALL TRAFFIC mentioned
#===================================================================
import os, json, boto3
ec2Client = None#===================================================================
def lambda_handler(event, context):
print(event)
# Ensure that we have an event name to evaluate.
if 'detail' not in event or ('detail' in event and 'eventName' not in event['detail']):
return {"Result": "Failure", "Message": "Lambda not triggered by an event"}
# Remove the rule only if the event was to authorize the ingress rule for the given
# security group id is one provided in the Environment Variables.
if event['detail']['eventName'] != 'CreateSecurityGroup':
return {"Result": "Failure", "Message": "Invalid event for security group! only create is supported"}

SNS_ARN = 'arn:aws:sns:us-east-324234r4'
global ec2Client
ec2Client = boto3.client('ec2')
result = revoke_security_group_ingress(event['detail'])
if not result:
print("Failed for some reasons.. Aborting..")
return None
message = "AUTO-MITIGATED: Ingress rule removed from security group: {} that was added by {}: {}".format(
result['group_id'],
result['user_name'],
json.dumps(result['ip_permissions'])
)

print(message)
boto3.client('sns').publish( TargetArn = SNS_ARN, Message = message, Subject = "Auto-mitigation successful" )#===================================================================
def revoke_security_group_ingress(event_detail):
request_parameters = event_detail['requestParameters']


if not request_parameters:
print("Invalid event found.. Aborting..")
return None
# Build the normalized IP permission JSON struture.
ip_permissions = update_security_group(request_parameters['groupId'])
if not ip_permissions:
print("No invalid permissions found.. Aborting..")
return None

response = ec2Client.revoke_security_group_ingress(
GroupId=request_parameters['groupId'],
IpPermissions=ip_permissions
)

# Build the result
result = {}
result['group_id'] = request_parameters['groupId']
result['user_name'] = event_detail['userIdentity']['arn']
result['ip_permissions'] = ip_permissions
return result#===================================================================
def normalize_paramter_names(ip_items):
# Start building the permissions items list.
new_ip_items = []
# First, build the basic parameter list.
for ip_item in ip_items:
if "FromPort" in ip_item:
new_ip_item = {
"IpProtocol": ip_item['IpProtocol'],
"FromPort": ip_item['FromPort'],
"ToPort": ip_item['ToPort']
}
else:
# this is when all traffic is mentioned in from port
new_ip_item = {
"IpProtocol": ip_item['IpProtocol']
}
# CidrIp or CidrIpv6 (IPv4 or IPv6)?
if 'Ipv6Ranges' in ip_item and ip_item['Ipv6Ranges']:
# This is an IPv6 permission range, so change the key names.
ipv_range_list_name = 'Ipv6Ranges'
ipv_address_value = 'CidrIpv6'
ipv_range_list_name_capitalized = 'Ipv6Ranges'
ipv_address_value_capitalized = 'CidrIpv6'
else:
ipv_range_list_name = 'IpRanges'
ipv_address_value = 'CidrIp'
ipv_range_list_name_capitalized = 'IpRanges'
ipv_address_value_capitalized = 'CidrIp'
ip_ranges = []# Next, build the IP permission list.
for item in ip_item[ipv_range_list_name]:
ip_ranges.append(
{ipv_address_value_capitalized: item[ipv_address_value]}
)
new_ip_item[ipv_range_list_name_capitalized] = ip_ranges
print(new_ip_item)
new_ip_items.append(new_ip_item)
return new_ip_items
#===================================================================
def update_security_group(securityGroupId):

if not securityGroupId:
print("Invalid security group provided %s" % securityGroupId)
return None

response = ec2Client.describe_security_groups(
Filters=[
dict(Name='group-id', Values=[securityGroupId])
]
)

if 'SecurityGroups' in response and len(response['SecurityGroups']) == 0:
print("No security group found for the security group id %s" % securityGroupId)
return None

group = response['SecurityGroups'][0]
ip_rules = group['IpPermissions']

if not ip_rules:
print("No IP rules found for the security group id %s" % securityGroupId)
return None

ip_permissions = normalize_paramter_names(ip_rules)

return ip_permissions

Step 2 — Configure Lambda Triggers

We are going to use Cloudwatch Events that will be triggered by CloudTrail API

  1. Choose Create a new Rule
  2. Fill the Rule Name & Rule Description
  3. For Rule Type - Choose Event pattern
  4. Below that, Choose EC2 Service
  5. In the next field, Choose AWS API call via CloudTrail
  6. Check the Operation box,
  7. In the below field, Type/Choose CreateSecurityGroup
  8. Enable Trigger by Checking the box
  9. Click on Add and Save the Lambda Function

Step 3 — Testing the solution

Navigate to the EC2 console and choose Security Groups and Choose the security group that we are monitoring. Add a new Inbound rule, for example SSH on port 22 from 0.0.0.0/0.

Adding this rule creates an EC2 CreateSecurityGroup service event, which triggers the Lambda function.

After a few moments, choose the refresh button ( The “refresh” icon ) to see that the new ingress rule that you just created has been removed by the solution.

Hope this helps and this script is my customized version of the original script that was written by miztiik.

--

--

THE HOW TO BLOG |Siddhanth Dwivedi

Siddhanth Dwivedi | Senior Security Engineer & AWS Community Builder 👨🏾‍💻