Step Function으로 자동화 파이프라인
서비스나 파이프라인을 구성하면 Lambda나 SQS 등등 클라우드 서비스를 여러개 사용해서 워크플로우를 구성하게 된다.
하지만, 구성이 커지면 이러한 워크플로우의 구성요소들 간의 로직이 복잡해지게 된다.
AWS 서비스들을 통해 워크플로우를 간단하게 구성하고 시각화시켜 흐름을 파악할 수 있는 Step function에 대해 알아보도록 한다.
Step Function
AWS 서비스들을 한데 묶어 일련의 작업 파이프라인을 생성할 수 있는 서비스
Workflow Studio를 통해 작업들을 직접 배치하고 시각적으로 확인할 수 있다는 특징이 있다.
Step function은 '상태 머신[State Machine]'이라는 개념으로 워크플로우를 구성한다. [상태 머신 == 워크플로우]
그리고 상태 머신의 각 단계는 ASL이라는 언어로 구성된 '상태'로 구성되어 있다. (상태들의 집합)
Amazon States Language (ASL)
Step function의 상태 머신의 워크플로우를 다루는 언어, JSON 기반으로 상태 머신에 들어있는 상태를 정의한 것으로 구성된다.
ASL을 통해 상태에서 수행되는 작업(Task), 다음으로 전환될 상태(Choice), 실행을 중단할 상태(Fail) 등을 정의할 수 있다.
그리고 상태는 흐름(Flow)과 작업(Task) 으로 구분할 수 있다.
흐름은 전체 워크플로우의 흐름을 제어하는 역할을 가지고, 분기 / 대기와 같은 행동을 수행할 수 있다.
작업은 다른 AWS API 동작이나 외부 API 호출을 통한 동작을 의미한다.
상태는 기본적으로 ASL을 통한 아래와 같은 구조를 가진다.
"HelloWorld": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
"Next": "AfterHelloWorldState",
"Comment": "Run the HelloWorld Lambda function"
}
그리고 상태는 아래와 같은 Field를 기본적으로 갖는다.
- Type: 상태의 유형 (task, choice, fail 등)
- Next: 현재 상태가 종료될 때 실행되는 '다음 상태의 이름'
- Success와 Fail을 제외한 모든 상태는 Next가 지정되어야한다.
- 분기를 수행하는 Choice는 Next가 2개 이상이 되는 방식
- Comment : 사람이 읽을 수 있는 주석
서비스 특징
수많은 AWS 서비스 API들과의 통합
Step Function은 Lambda를 넘어 S3, SQS 등 200개 이상의 AWS 서비스의 API들과 통합이 가능하다.
지원하는 전체 서비스들은 Workflow Studio와 Docs에서 내역을 확인할 수 있다.
상태들의 입출력 제어 & 쿼리 언어
워크플로우의 상태들은 각각 입출력이 가능하다. 상태의 결과물을 다음 상태의 인풋으로 사용하도록 전달할 수 있다.
그리고 상태와 상태 머신은 쿼리 언어를 사용해 데이터를 변환할 수 있다. JSONPath와 2024년 말에 공개된 JSONata라는 언어, 2가지를 지원한다.
워크플로우 구성에서 상태 머신 쿼리 언어를 설정할 수 있다.
아래와 같이 2개의 Lambda를 통해 입출력을 처리하는 예를 들어본다.
첫번째 Lambda는 문자열을 대문자 처리, 두번째 Lambda는 현재 시간을 문자열에 붙이는 처리를 수행한다.
### test_upper
import json
def lambda_handler(event, context):
processed = event.upper()
return {
"processedText": processed
}
### test_append
import json
import time
def lambda_handler(event, context):
processed_text = event.get("processedText", "")
final_text = f"{processed_text} - {time.strftime("%Y-%m-%d %H:%M:%S")}"
return {"finalText": final_text}
그리고 아래와 같이 State machine을 구성하면, 첫번째 State로 전달된 JSON에서 'inputText' 문자열만을 가져와 test_upper 함수를 처리하고, 최초 input JSON에 리턴값을 'processedText'라는 키의 값으로 추가한다.
이어서 두번째 State에서는 'processedText'를 가져와서 test_append 함수를 처리하고, 리턴값의 'finalOutput'만 state의 출력 필드에 저장하며 종료된다.
{
"Comment": "function IO",
"StartAt": "ProcessInput",
"States": {
"ProcessInput": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:test_upper",
"InputPath": "$.inputText",
"ResultPath": "$.processedText",
"Next": "AppendMessage"
},
"AppendMessage": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:test_append",
"InputPath": "$.processedText",
"ResultPath": "$.finalOutput",
"End": true
}
}
}
주어진 Input에 대해 각 state들의 input/output을 확인하면, 쿼리대로 input/output이 전달되며 처리된 것을 볼 수 있다.
재시도 로직을 통한 오류 처리
Step function은 상태 머신의 실행 중 오류가 발생하면, 재시도(Retry)와 예외 처리(Catch)를 수행하도록 설정할 수 있다.
아래와 같이 확률적으로 강제 실패를 발생시키는 Lambda 함수를 Step function으로 호출해본다.
### test_fail
import json
import random
def lambda_handler(event, context):
# 50% 확률로 실패
if random.random() < 0.5:
raise Exception("Intentional Failure!")
return {
"statusCode": 200,
"body": json.dumps("Lambda executed successfully!")
}
그리고 상태 머신에서 다음과 같이 Retry와 Catch 정책을 적용한다.
최대 3번의 재시도(Max attempts)와 재시도 간 2초의 딜레이(Interval), 재시도 간격은 2배씩 증가하도록(Backoff rate) 하며 Retry가 모두 소진되면 실패처리하도록 Catch를 구성한 것을 볼 수 있다.
{
"Comment": "Retry Example",
"StartAt": "InvokeLambda",
"States": {
"InvokeLambda": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:test_fail",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleFailure"
}
],
"End": true
},
"HandleFailure": {
"Type": "Fail"
}
}
}
해당 상태머신을 호출하면, 다음과 같이 실패한 Lambda 함수가 재시도 정책에 맞게 수행되는 것을 볼 수 있다.
상태 기반의 흐름 처리
Step function은 상태 머신 개념을 통해 실행 흐름(flow)을 조절할 수 있다.
조건 분기, 대기, 병렬 실행을 통해 워크플로우의 로직을 구현할 수 있다.
흐름 상태 중 Map과 Parallel을 사용해 작업들을 병렬처리를 수행할 수 있다.
아래와 같이 Parallel로 2개의 Lambda를 병렬로 실행한 뒤, 결과물을 최종 Lambda에서 확인하는 상태 머신을 생성해본다.
{
"Comment": "Parallel execution of Lambda functions",
"StartAt": "ParallelProcessing",
"States": {
"ParallelProcessing": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Lambda1",
"States": {
"Lambda1": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:test-Lambda1",
"End": true
}
}
},
{
"StartAt": "Lambda2",
"States": {
"Lambda2": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:test-Lambda2",
"End": true
}
}
}
],
"ResultPath": "$.parallelResults",
"Next": "FinalStep"
},
"FinalStep": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:test-FinalLambda",
"End": true
}
}
}
### test-Lambda1
import json
import time
def lambda_handler(event, context):
return {"Lambda1Result": f"Lambda 1 - {time.strftime("%Y-%m-%d %H:%M:%S")}"}
### test-Lambda2
import json
import time
def lambda_handler(event, context):
return {"Lambda2Result": f"Lambda 2 - {time.strftime("%Y-%m-%d %H:%M:%S")}"}
### test-FinalLambda
import json
def lambda_handler(event, context):
return {
"FinalResult": f"{event['parallelResults'][0]['Lambda1Result']} & {event['parallelResults'][1]['Lambda2Result']}"
}
상태 머신을 실행하면, 두 Lambda 함수가 동시에 실행된 것을 볼 수 있고, 'parallelResults'에 각각의 output이 들어가 있는 것을 볼 수 있다.
마지막 Lambda에서는 이 2가지 output을 함께 불러올 수 있는 것까지 확인할 수 있다.
이렇게 간단한 일련 작업 플로우를 Step function을 통해 하나로 묶어서 순차적으로 실행되도록 구성해보았다.
이와 같은 데이터 처리 뿐만 아니라 Sagemaker AI와 함께 사용해 모델의 학습, 데이터 전/후처리 등의 MLOps 오케스트레이션, 행동 기반 방화벽 IP 차단 로직 구현 등에서도 사용할 수 있다.