IAM 관련 이벤트 Slack 알림

Lambda 설정

1. 환경 변수

2. 코드

import boto3
import json
import logging
import os
from datetime import datetime, timedelta
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

# Constants
HOOK_URL = os.environ['HOOK_URL']
SLACK_CHANNEL = os.environ['slackChannel']

# Logging setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def convert_to_kst(event_time):
    utc_time = datetime.strptime(event_time[:19], '%Y-%m-%dT%H:%M:%S')
    kst_time = utc_time - timedelta(hours=-9)  # KST time conversion
    return kst_time

def send_slack_notification(title, details, color, emoji):
    slack_message = {
        'channel': SLACK_CHANNEL,
        'username': "AWS",
        'attachments': [{
            'text': f"*{title}*\n{details}",
            'color': color
        }],
        'icon_emoji': emoji
    }

    req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info(slack_message)
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)
        
def lambda_handler(event, context):
    logger.info("Event: " + json.dumps(event))
    
    account_type = event['detail']['userIdentity']['type']
    account_user_name = "Root" if account_type == "Root" else event['detail']['userIdentity']['userName']
    
    kst_login_time = convert_to_kst(event['detail']['eventTime'])
    
    log_type = event['source']
    
    if log_type == "aws.signin":
        title = "[%s] AWS Console Login" % (account_type)
        used_mfa = event['detail']['additionalEventData']['MFAUsed']
        login_status_check = event['detail']['responseElements']['ConsoleLogin']
        
        slack_color = "#0ced40" if login_status_check == "Success" else "#ed0c1b"
        slack_emoji = ":white_check_mark:" if login_status_check == "Success" else "warning:"
        
        details = f"*접속 계정*\n{account_user_name}\n*접속 시간*\n{kst_login_time}\n*접속 Region*\n{event['region']}\n*접속 IPAddress*\n{event['detail']['sourceIPAddress']}\n*Console Login 결과*\n{login_status_check}\n*MFA 사용유무*\n{used_mfa}"
    else:
        title = "[%s] AWS IAM Audit - %s" % (account_type, event['detail']['eventName'])
        user_event = event['detail']['eventName']
        
        slack_color = "#969696"
        slack_emoji = "warning:"
        
#        details = f"*수행 계정*\n{account_user_name}\n*수행 시간*\n{kst_login_time}\n*수행 내용*\n{event['detail']['eventName']}\n*계정*\n{event['detail']['requestParameters']['*']}"
    
        request_parameters = event['detail']['requestParameters']
        details = f"*수행 계정*\n{account_user_name}\n*이벤트 시간*\n{kst_login_time}\n*이벤트 카테고리*\n{event['detail']['eventName']}\n*이벤트 내용*\n"

        for key, value in request_parameters.items():
            details += f"{key}: {value}\n"
        
    send_slack_notification(title, details, slack_color, slack_emoji)
```

EventBridge 설정

1-1. 규칙생성 - Console 로그인 이벤트

{
  "source": ["aws.signin"],
  "detail-type": ["AWS Console Sign In via CloudTrail"]
}

1-2. 규칙생성 - IAM 관련 이벤트

{
  "source": ["aws.iam"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["iam.amazonaws.com"]
  }
}

2. 대상 설정

Last updated