import boto3
import json
import logging
import os
from datetime import datetime, timedelta
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# Constants
HOOK_URL = os.environ['HOOK_URL']
SLACK_CHANNEL = os.environ['slackChannel']
# Logging setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
## EC2 인스턴스 이름 가져오기 EC2 Read 권한 필요
def ec2_name(instanceid):
ec2r = boto3.resource('ec2')
running_instances = ec2r.instances.filter(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}, {'Name': 'instance-id', 'Values': [instanceid]}])
for instance in running_instances:
for tag in instance.tags:
if 'Name' in tag['Key']:
name = tag['Value']
return(name)
def convert_to_kst(event_time):
utc_time = datetime.strptime(event_time[:19], '%Y-%m-%dT%H:%M:%S')
kst_time = utc_time - timedelta(hours=-9) # KST time conversion
return kst_time
def send_slack_notification(title, details, color, emoji):
slack_message = {
'channel': SLACK_CHANNEL,
'username': "AWS",
'attachments': [{
'text': f"*{title}*\n{details}",
'color': color
}],
'icon_emoji': emoji
}
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info(slack_message)
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
def lambda_handler(event, context):
logger.info("Event: " + json.dumps(event))
account_type = event['detail']['userIdentity']['type']
account_user_name = "Root" if account_type == "Root" else event['detail']['userIdentity']['userName']
kst_login_time = convert_to_kst(event['detail']['eventTime'])
log_type = event['source']
if log_type == "aws.ec2-instance-connect":
title = "[%s] AWS EC2 Serial Access" % (account_type)
instance_Id = event['detail']['requestParameters']['instanceId']
name = ec2_name(instance_Id)
slack_color = "#0ced40"
slack_emoji = ":white_check_mark:"
details = f"*접속 계정*\n{account_user_name}\n*접속시간*\n{kst_login_time}\n*접속 Region*\n{event['region']}\n*접속 EC2 이름*\n{name}"
else:
title = "[%s] AWS EC2 Event - %s" % (account_type, event['detail']['eventName'])
slack_color = "#969696"
slack_emoji = "warning:"
request_parameters = event['detail']['requestParameters']
details = f"*수행 계정*\n{account_user_name}\n*이벤트 시간*\n{kst_login_time}\n*이벤트 카테고리*\n{event['detail']['eventName']}\n*이벤트 내용*\n"
for key, value in request_parameters.items():
details += f"{key}: {value}\n"
send_slack_notification(title, details, slack_color, slack_emoji)
```