[DEV] Airflow 환경 구축기
1. 하는 업무
Ad-hoc Data 추출
- 반복되는 추출 요청들도 많이 있음
- 주요 지표들은 미리 적재해두면 추출 건을 70% 정도 줄일 수 있을 것으로 예상
- 주요 지표들이 미리 적재되어 있는 데이터 웨어하우스를 만들자
- 데이터 거버넌스 구축
- 반복되는 것을 당연시 하지 말자!!
- 간단한 업무여도 context switching 비용이 발생하기 때문에 자동화를 항상 생각할 것
Daily Report 쿼리 개선
- 유저 수만큼 for loop를 도는 쿼리가 있었음 (
select * from ~ where ~
) create temporary table select ~
문으로 CPU 점유율 40% -> 30% 정도로 줄임
- OLTP DB에서 OLAP 쿼리(집계 쿼리)를 사용하는 것이 이상했던 상황
- crontab 사용
- 편하고 좋지만
- crontab 내용을 개인 pc에만 갖고 있어야 하고, 공유가 힘듦
- crontab -> Airflow에 대한 니즈로 이어지게 됨
- 데이터 웨어하우스를 만들어서 미리 지표를 쌓아두자
결론
DW를 구축하자 -> Data Pipeline을 설계하자
- 앞의 두 문제는 개인 환경에서 동작한다는 문제점도 있었음
- 담당자의 부재 시 해결할 방법이 없음
- 이를 개선하기 위해 Airflow와 같은 공통 환경에서 작업하는 것을 목표로 함!ㅋ
OOP를 고려하여 Legacy 코드 개선
- S3 read / write 코드에 Airflow Dependency 제거
- 모듈의 Dependency 분리
- OOP 공부 추천!
2. MWAA
Data Infra 구축
- MWAA로 Aurora Cluster clone
- glue를 통해 Production DB 데이터를 DL로 복제
- DL에 적재된 데이터를 Athena로 가공하여 DW로 복제
- DW에 적재된 데이터를 QuickSight로 BI 시각화
- Airflow에서 전부 처리할 수 있는데, 왜 복잡하게 여러 서비스를 사용하는가?
- Airflow로는 단순히 스케줄링 처리만 목표로 하고 있음 (책임의 분리)
- DAG가 늘어나는 것만큼 Task가 늘어나는 것도 부하가 있음
3. Airflow
- Airflow는 스케줄러!
사용하는 이유
- 워크플로 태스크를 DAG로 정의하여 의존성을 표현할 수 있음
- UI를 통해 실패한 태스크의 디버깅이 수월함
- 파이프라인을 정기적으로 실행하고 증분 처리가 쉬움
- Backfill로 활용 가능
- 쉽게 확장 가능하고 다양한 시스템과 통합이 가능
적합한 경우
- Batch성 작업
- workflow의 시작과 끝이 명확하고 일정한 간격으로 실행되는 경우
- 복잡한 workflow 처리
- ETL 작업 등
- 다양한 시스템과의 통합이 필요한 경우 (https://airflow.apache.org/docs/apache-airflow-providers/index.html 참고)
- MySQL에서 데이터 추출
- Python으로 데이터 가공
- Redshift에 적재
- 실패시 Slack으로 알림
- 성공시 팀즈로 메시지 전송 등
- 적합하지 않은 경우
- Streaming 작업 (무한히 실행되는 이벤트 기반 workflow 등)
- Kafka, Spark Streaming 등 이용
- 간단한 workflow나 단일 작업 실행 (crontab)
- 대용량 데이터 처리가 필요한 경우
- Streaming 작업 (무한히 실행되는 이벤트 기반 workflow 등)
장단점
- 장점
- 웹 UI를 통해 스케줄링과 모니터링 제공
- 다양한 서비스와 쉽게 통합 (operator 제공)
- 활발한 커뮤니티 및 빠른 업데이트
- 잘 작성된 document
- 단점
- learning curve (logical date)
Airflow 아키텍처
4. DAG
- 방향을 가지며 순환하지 않는 그래프 (Directed Acyclic Graph)
- task의 집합
- task들 간의 실행 순서가 정의됨
- 실행 순서는 Acyclic함
- Data pipeline이 해야하는 일들을 task들로 분리한 다음에 task들 간에 실행 순서를 정함
- Airflow에서는 task를 operator라고 부름
- task들 간에 실행 순서가 정의되어야 함
위상정렬
- 순서가 정해져 있는 일련의 작업을 차례대로 수행해야 할 때 사용할 수 있는 일고리즘
- 진입차수(Indegree)를 이용해서 선후 관계를 지키는 전체 순서를 계산
- 시간복잡도 O(V+E)
- 모든 노드를 모든 간선을 통해 확인하기 때문
- 알고리즘
- 진입차수가 0인 노드를 큐에 넣음
- 큐가 빌 때까지 다음 과정을 반복
- 큐에서 원소를 꺼내 해당 노드에서 출발하는 간선을 그래프에서 제거
- 새롭게 진입차수가 0이 된 노드를 큐에 넣음
- 모든 노드에 방문하기 전에 큐가 빈다면 사이클이 존재한다고 판단할 수 있음
Airflow의 Best Practice
- 멱등성 (idempotency)
- 단순성 유지
- 하나의 DAG 당 하나의 목적을 가지게 하기
- 작업의 재사용성 고려
- 시간 간격과 지연에 대한 고려
- 감시 및 알림
- 에러 핸들링
- variable로 환경별 구분하기
- 정적 코드 분석기, formatter, pre-commit hook 사용
Airflow로 만들 수 있는 데이터 파이프라인 구축 사례
- 데이터 특징
- Incremental Update
- 기존 데이터가 Update, Delete 되지 않는 경우
- 새로운 데이터가 Insert 되는 경우
- Snapshot
- 기존 데이터가 Update, Delete 되는 경우
- Full Refresh
- 기존 데이터가 Update, Delete 된 경우 기존 Partition을 제거 후 새로 적재
- Incremental Update
- Production DB -> DL
- Production DB, Log
- 외부 Storage -> DL
- Explorer API, Other API
- Production DB -> DW
- DL -> DW
Airflow 고급 기능
- Dynamic DAG
with DAG("loop_example", ...):
first = EmptyOperator(task_id="first")
last = EmptyOperator(task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = EmptyOperator(task_id=option)
first >> t >> last
- TaskGroup
with TastGroup(group_id="hi") as task_group:
temp = BashOperator()
end = BashOperator()
_ = temp >> end
- Jinja Template
- ``처럼 중괄호를 사용해서 값을 넘겨주는 방식
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp//my_file")],
dag=dag,
)