운영중인 웹서버 인스턴스에 대해 개발인원이 종종 SSH 접근을 해 작업을 하는 과정이 있다.
SSH 접속 이상 징후를 모니터링하기 위해 SSH 접근에 대한 알림을 받고 싶다는 요청이 들어왔었다.
요청 : 인스턴스의 SSH 접속이력을 실시간 알림으로 수신
실시간 알림을 받는 서비스는 사내에서 사용중인 메신저인 Slack을 이용하기로 했다.
- 이 아키텍쳐를 구성하는 과정은 크게 다음과 같이 이루어져있다.
- 인스턴스의 SSH 접속로그를 cloudwatch로 뽑아오기 위한 Agent 설치
- Slack으로 알림을 쏴줄 lambda 함수 생성
- Cloudwatch에 쌓인 접속로그를 바탕으로 SSH 연결이 수립된 시점의 로그 필터링
- 필터링된 로그를 lambda로 전송 (lambda 트리거 설정)
lambda 코드 작성
1번의 경우, 지난 Cloudwatch Agent 글에서 다뤄봤으므로 자세한 설명은 넘기고 이번에는 이 아키텍쳐 구성을 위해 도입한 '구독필터'를 위주로 설명하고자 한다.
이 글을 통해 최종적으로 완성되는 cloudwatch log 기반 실시간 알림 전송 아키텍쳐는 아래와 같다.
Subscription Filter (구독 필터)
Cloudwatch logs에서 나온 데이터를 분석/처리를 위해 다른 서비스(Lambda, Kinesis Firehose, Kinesis Stream, Opensearch)에 전송해주는 서비스
필터 기능을 이용해 로그그룹의 모든 로그가 아닌 전송을 원하는 로그만 넘겨줄 수 있다.
Cloudwatch log그룹에서 구독 필터를 설정할 수 있다. 하나의 로그그룹 당 최대 2개의 구독필터를 지정할 수 있다.
구독 필터를 생성하기에 앞서, 메시지 전송을 위한 선작업을 진행한다.
Slack 웹훅 설정
slack으로 메시지를 쏴주기 위해서는 메시지를 전달받을 웹훅의 URL과 채널명이 필요하다.
먼저, Slack 채널을 파준 후 incoming webhook 앱을 추가한다.
이후 webhook 앱이 추가될 채널을 선택하면 웹훅 URL이 생성된다.
Lambda 함수 작성
구독 필터를 통해 전달받은 로그들을 slack으로 쏴주는 기능을 맡는 부분이다.
위에서 확인한 웹훅의 URL과 채널명을 Lambda에 환경변수로 적어 함수에서 로드하는 방식을 이용한다.
공식 docs에 따르면, Kinesis Data Streams와 Lambda 구독 필터로 전달되는 데이터는 base64로 인코딩되고 gzip으로 압축된 포맷이라고 한다.
따라서 전달받은 데이터를 사용하기위해 전처리가 필요하다.
import boto3
import gzip
import json
...
def lambda_handler(event, context):
logger.info("Event: " + str(event))
message = event['awslogs']['data']
compressed_payload = b64decode(message)
uncompressed_payload = gzip.decompress(compressed_payload)
payload = json.loads(uncompressed_payload)
...
Lambda 함수 내용은 아래와 같다.
메시지로 전송할 내용으로는 크게 접속 ip, 시각, username이 있다.
import boto3
import json
import gzip
import logging
import os
import re
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from datetime import datetime, timedelta
# The Slack channel to send a message to stored in the slackChannel environment variable
SLACK_CHANNEL = os.environ['slackChannel']
HOOK_URL = os.environ['HOOK_URL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Event: " + str(event))
message = event['awslogs']['data']
compressed_message = b64decode(message)
uncompressed_message = gzip.decompress(compressed_message)
payload = json.loads(uncompressed_message)
alarm_name = payload['subscriptionFilters'][0]
message = payload['logEvents']
logger.info("Message: " + str(message))
msg_list = message[0]['message'].split(' ')
description = ' '.join(msg_list[5:])
ip_pattern = re.compile('(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}')
from_ip = ip_pattern.search(message[0]['message']).group()
username_pattern = re.compile('(?<=for )(.*?)(?= from)')
username = username_pattern.search(message[0]['message']).group()
event_timestamp = datetime.fromtimestamp(message[0]['timestamp'] / 1000.0)
# message format
slack_message = {
'channel': SLACK_CHANNEL,
'text': "*[%s | %s]*\n\n*Time*:%s\n*Access_from*: %s\n*Description*\n%s" %('SSH_Login_alarm', alarm_name, str(event_timestamp), from_ip, description)
}
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", slack_message['channel'])
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
slack_message의 text 필드를 수정해 전송될 메시지를 설정할 수 있다.
+) python에서는 slack 웹훅을 호출할때 slack sdk 패키지 사용도 추천
Lambda에서 사용하기 위해서는 패키지를 업로드 필요! (추후 작성)
구독필터 설정
이전에 설치해둔 Cloudwatch Agent에서 ssh 로그파일을 로그그룹으로 내보내는 설정을 했었다.
우리는 Lambda로 데이터를 전송할 것이므로 구독필터 > 생성 > Lambda 구독필터 생성
전송하는 타깃 Lambda를 설정하고, 필터 패턴을 지정할 수 있다.
필터 패턴을 따로 지정하지 않은 경우, 로그그룹으로 들어오는 모든 데이터를 그대로 넘기게된다.
우리는 SSH의 접속에 성공한 경우를 확인하는 것이므로 SSH 접속 성공 로그만 필터링해서 Lambda로 넘겨주는 것이 바람직하다.
바로 아래에서 실제 로그그룹의 데이터를 바탕으로 필터 패턴을 테스트해볼 수 있는 기능도 지원한다.
위와 같이 구독 필터 설정을 완료할 수 있다
작성한 lambda 함수에서 트리거로 구독 필터가 연결된 것도 같이 확인할 수 있다.
알림 테스트
실제로 ssh로그를 전송하는 Cloudwatch Agent가 설치된 서버에 접근한 경우 다음과 같이 바로 알림을 받을 수 있음을 확인할 수 있다.
Lambda 구독필터 관련 내용은 아래 Docs에서 확인 가능하다.