1. 하는 업무

Ad-hoc Data 추출

  • 반복되는 추출 요청들도 많이 있음
  • 주요 지표들은 미리 적재해두면 추출 건을 70% 정도 줄일 수 있을 것으로 예상
  • 주요 지표들이 미리 적재되어 있는 데이터 웨어하우스를 만들자
    • 데이터 거버넌스 구축
  • 반복되는 것을 당연시 하지 말자!!
    • 간단한 업무여도 context switching 비용이 발생하기 때문에 자동화를 항상 생각할 것

Daily Report 쿼리 개선

  • 유저 수만큼 for loop를 도는 쿼리가 있었음 (select * from ~ where ~)
  • create temporary table select ~문으로 CPU 점유율 40% -> 30% 정도로 줄임


  • OLTP DB에서 OLAP 쿼리(집계 쿼리)를 사용하는 것이 이상했던 상황
  • crontab 사용
    • 편하고 좋지만
    • crontab 내용을 개인 pc에만 갖고 있어야 하고, 공유가 힘듦
    • crontab -> Airflow에 대한 니즈로 이어지게 됨
  • 데이터 웨어하우스를 만들어서 미리 지표를 쌓아두자

결론

DW를 구축하자 -> Data Pipeline을 설계하자

  • 앞의 두 문제는 개인 환경에서 동작한다는 문제점도 있었음
  • 담당자의 부재 시 해결할 방법이 없음
  • 이를 개선하기 위해 Airflow와 같은 공통 환경에서 작업하는 것을 목표로 함!ㅋ

OOP를 고려하여 Legacy 코드 개선

  • S3 read / write 코드에 Airflow Dependency 제거
  • 모듈의 Dependency 분리
  • OOP 공부 추천!

2. MWAA

스크린샷 2023-12-27 오후 4 07 25

Data Infra 구축

  • MWAA로 Aurora Cluster clone
  • glue를 통해 Production DB 데이터를 DL로 복제
  • DL에 적재된 데이터를 Athena로 가공하여 DW로 복제
  • DW에 적재된 데이터를 QuickSight로 BI 시각화
  • Airflow에서 전부 처리할 수 있는데, 왜 복잡하게 여러 서비스를 사용하는가?
    • Airflow로는 단순히 스케줄링 처리만 목표로 하고 있음 (책임의 분리)
    • DAG가 늘어나는 것만큼 Task가 늘어나는 것도 부하가 있음

3. Airflow

  • Airflow는 스케줄러!

사용하는 이유

  • 워크플로 태스크를 DAG로 정의하여 의존성을 표현할 수 있음
  • UI를 통해 실패한 태스크의 디버깅이 수월함
  • 파이프라인을 정기적으로 실행하고 증분 처리가 쉬움
  • Backfill로 활용 가능
  • 쉽게 확장 가능하고 다양한 시스템과 통합이 가능

적합한 경우

  • Batch성 작업
    • workflow의 시작과 끝이 명확하고 일정한 간격으로 실행되는 경우
  • 복잡한 workflow 처리
    • ETL 작업 등
  • 다양한 시스템과의 통합이 필요한 경우 (https://airflow.apache.org/docs/apache-airflow-providers/index.html 참고)
    • MySQL에서 데이터 추출
    • Python으로 데이터 가공
    • Redshift에 적재
    • 실패시 Slack으로 알림
    • 성공시 팀즈로 메시지 전송 등


  • 적합하지 않은 경우
    • Streaming 작업 (무한히 실행되는 이벤트 기반 workflow 등)
      • Kafka, Spark Streaming 등 이용
    • 간단한 workflow나 단일 작업 실행 (crontab)
    • 대용량 데이터 처리가 필요한 경우

장단점

  • 장점
    • 웹 UI를 통해 스케줄링과 모니터링 제공
    • 다양한 서비스와 쉽게 통합 (operator 제공)
    • 활발한 커뮤니티 및 빠른 업데이트
    • 잘 작성된 document
  • 단점
    • learning curve (logical date)

Airflow 아키텍처

스크린샷 2023-12-27 오후 4 39 55

4. DAG

  • 방향을 가지며 순환하지 않는 그래프 (Directed Acyclic Graph)
  • task의 집합
    • task들 간의 실행 순서가 정의됨
    • 실행 순서는 Acyclic함
  • Data pipeline이 해야하는 일들을 task들로 분리한 다음에 task들 간에 실행 순서를 정함
  • Airflow에서는 task를 operator라고 부름
  • task들 간에 실행 순서가 정의되어야 함

위상정렬

  • 순서가 정해져 있는 일련의 작업을 차례대로 수행해야 할 때 사용할 수 있는 일고리즘
  • 진입차수(Indegree)를 이용해서 선후 관계를 지키는 전체 순서를 계산
  • 시간복잡도 O(V+E)
    • 모든 노드를 모든 간선을 통해 확인하기 때문
  • 알고리즘
    • 진입차수가 0인 노드를 큐에 넣음
    • 큐가 빌 때까지 다음 과정을 반복
      • 큐에서 원소를 꺼내 해당 노드에서 출발하는 간선을 그래프에서 제거
      • 새롭게 진입차수가 0이 된 노드를 큐에 넣음
    • 모든 노드에 방문하기 전에 큐가 빈다면 사이클이 존재한다고 판단할 수 있음

Airflow의 Best Practice

  • 멱등성 (idempotency)
  • 단순성 유지
  • 하나의 DAG 당 하나의 목적을 가지게 하기
  • 작업의 재사용성 고려
  • 시간 간격과 지연에 대한 고려
  • 감시 및 알림
  • 에러 핸들링
  • variable로 환경별 구분하기
  • 정적 코드 분석기, formatter, pre-commit hook 사용

Airflow로 만들 수 있는 데이터 파이프라인 구축 사례

  • 데이터 특징
    • Incremental Update
      • 기존 데이터가 Update, Delete 되지 않는 경우
      • 새로운 데이터가 Insert 되는 경우
    • Snapshot
      • 기존 데이터가 Update, Delete 되는 경우
    • Full Refresh
      • 기존 데이터가 Update, Delete 된 경우 기존 Partition을 제거 후 새로 적재


  • Production DB -> DL
    • Production DB, Log
  • 외부 Storage -> DL
    • Explorer API, Other API
  • Production DB -> DW
  • DL -> DW

Airflow 고급 기능

  • Dynamic DAG
with DAG("loop_example", ...):
    first = EmptyOperator(task_id="first")
    last = EmptyOperator(task_id="last")

    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
    for option in options:
        t = EmptyOperator(task_id=option)
        first >> t >> last


  • TaskGroup

스크린샷 2023-12-27 오후 8 49 49

with TastGroup(group_id="hi") as task_group:
    temp = BashOperator()
    end = BashOperator()

    _ = temp >> end


  • Jinja Template
    • ``처럼 중괄호를 사용해서 값을 넘겨주는 방식
t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp//my_file")],
    dag=dag,
)