1. Dag Dependencies

DAG를 실행하는 방법

  • 주기적 실행: schedule로 지정
    • crontab 형태로 지정
  • 다른 DAG에 의해 트리거 (의존관계가 있는 경우)
    • Explicit Trigger: DAG A가 분명하게 DAG B를 트리거 (TriggerDagOperator)
    • Reactive Trigger: DAG B가 DAG A가 끝나기를 대기 (ExternalTaskSensor)


  • 상황에 따라 다른 태스크 실행 방식들
    • 조건에 따라 다른 태스크로 분기 : 동적으로 결정 (BranchPythonOperator)
    • 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
      • 하나의 DAG에 다양한 성격의 태스크를 갖고 있는 경우 (incremental update & 뉴스레터 전송)
      • backfill을 위해 실행했는데 뉴스레터를 보낸다면 곤란! -> backfill을 할 때 뉴스레터 태스크 실행 중단
    • 앞단 태스크들의 실행 상황
      • 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음

Dag Dependencies

  • Explicit trigger
    • TriggerDagOperator
    • DAG A가 명시적으로 DAG B를 트리거
    • jinja template 사용 스크린샷 2024-01-07 오전 11 25 09
  • Reactive trigger
    • ExternalTaskSensor
    • DAG B가 DAG A의 태스크가 끝나기를 대기
      • 이 경우 DAG A는 이 사실을 모름
    • 의존관계 불명확 -> DAG A를 잘못 수정할 경우 DAG B에 문제가 발생할 수 있음
      • Data Catalog, Data Discovery 스크린샷 2024-01-07 오전 11 26 06

Jinja Template

  • Python에서 널리 사용되는 템플릿 엔진
    • 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
      • 프레젠테이션 로직: 사람 눈에 보이는 부분
      • 애플리케이션 로직: 상황에 맞게 어떤 프레젠테이션을 할 것인지 결정
    • Flask에서 많이 사용됨
    • Airflow에서도 쉽게 사용!
      • execution_date 등 시스템에서 제공하는 변수들을 python code 내에서 쉽게 불러서 사용할 수 있음
  • TriggerDagOperator에서
    • Airflow 태스크 데이터를 읽어서 코드 안에서 편하게 사용
    • Dynamic Dags에서 사용
  • 변수는 이중 중괄호 ``로 감싸서 사용
    • <h1>안녕하세요, 님!</h1>
  • 제어문은 퍼센트 기호 {% %}로 표시
<ul>

</ul>

Airflow에서

  • 작업 이름, 파라미터, SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
    • 재사용 가능하고 사용자 정의 가능한 워크플로우 생성
    • 특정 파라미터에서만 사용 가능


  • 시스템 정보 사용
    • execution_date을 코드 내에서 쉽게 사용 : ``
      • execution_date에서 연도와 월, 일만 추출
    • Variables, Connections 정보도 쉽게 읽을 수 있음
    • 태스크 이름, DAG 이름, DAG가 언제 마지막으로 성공적으로 실행되었는지, 다음 DAG가 실행되는 시간은 언제인지 ..
task1 = BashOperator(
    task_id="task1",
    bash_command='echo ""',
    dag=dag
)


  • 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
task2 = BashOperator(
    task_id='task2',
    bash_command='echo "안녕하세요, !"',
    params={'name':'John'},  # 사용자 정의 가능한 매개변수
    dag=dag
)


스크린샷 2024-01-07 오전 11 57 47 사용 가능한 변수들

TriggerDagOperator

  • DAG A의 태스크를 TriggerDagRunOperator로 구현
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_B = TriggerDagRunOperator(
    task_id="trigger_B",
    trigger_dag_id="트리거하려는 DAG 이름" 
)


  • TriggerDagRunOperator에서 사용할 수 있는 jinja template 변수
    • trigger_dag_id
    • trigger_run_id
    • conf
    • execution_date


from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_B = TriggerDagRunOperator(
    task_id="trigger_B",
    trigger_dag_id="트리거하려는 DAG 이름",
    conf={'path':'/opt/ml/conf'},
    execution_date="",    
    reset_dag_run=True,            
    wait_for_completion=True
)
  • airflow.cfg의 dag_run_conf_overrides_params가 True로 설정되어 있어야 함

  • conf={'path':'/opt/ml/conf'}
    • DAG B에 넘기고 싶은 정보
    • DAG B에서는 ``로 접근 가능
    • DAG B PythonOperator(**context)에서라면 ``
  • execution_date="": DAG A의 execution_date 패스
  • reset_dag_run=True: True일 경우 해당 날짜가 이미 실행되었더라도 다시 재실행
  • wait_for_completion=True: DAG B가 끝날 때까지 기다릴지 여부를 결정, default 값은 False

Sensor

  • 특정 조건이 충족될 때까지 대기하는 Operator
  • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
    • 파일 생성, HTTP 응답, 특정 dag의 특정 태스크가 완료되었는지 등
  • Airflow 내장 sensor
    • FileSensor : 지정된 위치에 파일이 생길 때까지 대기
    • HttpSensor : HTTP 요청을 수행하고 지정된 응답을 대기
    • SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
    • TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
    • ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
  • 기본적으로 주기적으로 poke를 하는 것
    • poke: 주기적으로 체크하는 것
    • worker를 하나 붙잡고 poke간에 sleep을 할지, 또는 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재 : mode
      • mode의 값은 reschedule / poke

ExternalTaskSensor

  • DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
    • 먼저 동일한 schedule_interval을 사용
    • 이 경우 두 태스크들의 execution_date이 동일해야함. 다르면 매칭이 안됨!
  • 잘 사용하지는 않음
    • 맞아야 하는 조건이 까다로움
    • poke 모드가 일반적이지만 worker 한 개가 낭비되는 느낌
    • DAG A 관점에서 의존관계가 명확하지 않기 때문에 실수를 할 확률이 높음


from airflow.sensors.external_task import ExeternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
    task_id='waiting_for_end_of_dag_a',
    external_dag_id='DAG이름',
    external_task_id='end',
    timeout=5*60,
    mode='reschedule'   # worker release했다가 잡았다가 함
)


  • 만약 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면
    • 예를 들어 A가 B보다 5분 먼저 실행된다면
      • execution_delta 사용하여 차이 조절
        • 하지만 어긋나기 쉬움 ..
      • execution_date_fn을 사용하면 조금 더 복잡하게 컨트롤 가능
    • 만약 두 개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExeternalTaskSensor는 사용 불가
from airflow.sensors.external_task import ExeternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
    task_id='waiting_for_end_of_dag_a',
    external_dag_id='DAG이름',
    external_task_id='end',
    timeout=5*60,
    mode='reschedule',
    execution_delta=timedelta(minutes=5)
)

BranchPythonOperator

  • 상황에 따라 뒤에 실행되어야 할 태스크를 동적으로 결정해주는 오퍼레이터
    • 미리 정해준 오퍼레이터들 중 선택하는 형태
  • TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음


from airflow.operators.python import BranchPythonOperator

def skip_of_cont_trigger():
    if Variable.get("mode", "dev") == "dev":
        return []
    else:
        return ['trigger_b']

# mode라는 variable의 값이 dev이면 trigger_b 태스크 스킵
branching = BranchPythonOperator(
    task_id='branching',
    python_callable=skip_or_cont_trigger,
)

실습

스크린샷 2024-01-07 오후 12 55 44

  • UTC 기준 12시 이전이면 morning_task로, 아니면 afternoon_task로 브랜치
  • current_hour: 0
INFO - Following branch morning_task
Skipping tasks ['afternoon_task']
  • 이 경우 실행되지 않은 afternoon_task의 상태는 skipped가 됨


from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

default_args = {
    'start_date': datetime(2023, 1, 1)
}

dag = DAG(
    'Learn_BranchPythonOperator',
    schedule='@daily',
    default_args=default_args)


def decide_branch(**context):
    current_hour = datetime.now().hour
    print(f"current_hour: {current_hour}")
    if current_hour < 12:
        return 'morning_task'
    else:
        return 'afternoon_task'


branching_operator = BranchPythonOperator(
    task_id='branching_task',
    python_callable=decide_branch,
    dag=dag
)


morning_task = EmptyOperator(
    task_id='morning_task',
    dag=dag
)


afternoon_task = EmptyOperator(
    task_id='afternoon_task',
    dag=dag
)

branching_operator >> morning_task
branching_operator >> afternoon_task

LatestOnlyOperator

  • Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
  • 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고, 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어나가고 아니면 여기서 중단됨
    • t1 » t3 » [t2, t4]

스크린샷 2024-01-07 오후 1 03 03

  • Time-sensitive한 태스크들은 LatestOnlyOperator 뒤쪽에 놓이도록!
  • 혹은 Incrementral Update와 Time-sensitive한 태스크를 별도의 DAG로 분리하는 것도 방법


from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id='latest_only_example',
    schedule=timedelta(hours=48),
    start_date=datetime(2023, 6, 14),
    catchup=True) as dag:

    t1 = EmptyOperator(task_id='task1')
    t2 = LatestOnlyOperator(task_id='latest_only')
    t3 = EmptyOperator(task_id='task3')
    t3 = EmptyOperator(task_id='task4')

    t1 >> t2 >> [t3, t4]

스크린샷 2024-01-07 오후 1 33 35

t3, t4는 skip됨!

Trigger Rules

  • Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행 여부를 결정하고 싶다면?
    • 보통 앞단이 하나라도 실패하면 뒷단의 태스크는 실행 불가
  • Operator의 trigger_rule 파라미터로 결정 가능
    • 태스크에 주어지는 파라미터로, 아래와 같은 값이 가능
    • all_success (default), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
    • airflow.utils.trigger_rule.TriggerRule


스크린샷 2024-01-07 오후 1 43 19

from airflow.utils.trigger_rule import TriggerRule

with DAG('trigger_rules', default_args=default_args, schedule=timedelta(1)) as dag:
    t1 = BashOperator(task_id='print_date', bash_command='date')
    t2 = BashOperator(task_id='sleep', bash_command='sleep 5')
    t3 = BashOperator(task_id='exit', bash_command='exit 1')  # exit: 0이 아니면 실패하는 task
    t4 = BashOperator(
        task_id='final_task',
        bash_command='echo DONE!',
        trigger_rule=TriggerRule.ALL_DONE   # 성공실패, skip 상관없이 끝나면 ok
    )

    [t1, t2, t3] >> t4

스크린샷 2024-01-07 오후 1 46 17 결과

Airflow 메타데이터 DB 내용 살펴보기

  • named volume이기 때문에 컨테이너를 껐다 켜도 유지됨!

  • airflow:airflowfh Postgres 로그인 가능

docker exec -it learn-airflow-airflow-webserver-1 sh


psql -h postgres


  • psql shell에서 아래 명령 수행
\dt    -- 모든 테이블 출력
SELECT * FROM dag_run LIMIT 10;    -- 각 DAG가 실행된 기록
DELETE FROM dag_run WHERE dag_id = '기록을 삭제하고싶은 DAG';

2. Task Grouping

필요성

  • 태스크 수가 많은 DAG라면 태스크들을 성격에 따라 관리하고 싶은 니즈 존재
    • SubDAG가 사용되다가 Airflow2.0에서 나온 Task Grouping으로 넘어가는 추세
  • 다수의 파일 처리를 하는 DAG라면
    • 파일 다운로드 태스크 / 파일 체크 태스크 / 데이터 처리 태스크로 구성

스크린샷 2024-01-07 오후 2 14 31


  • TaskGroup 안에 TaskGroup nesting 가능
  • TaskGroup도 태스크처럼 실행 순서 정의 가능

예시

from airflow utils.task_group import TaskGroup

start = EmptyOperator(task_id='start')

with TaskGroup('Download', tooltip='Tasks for downloading data') as section_1:
    task_1 = EmptyOperator(task_id='task1')
    task_2 = BashOperator(task_id='task2', bash_command='echo 1')
    task_3 = EmptyOperator(task_id='task3')

    task_1 >> [task_2, task_3]

start >> section_1

3. Dynamic Dags

Dynamic Dag

  • 정해진 방법이 있는건 아니고 본인에게 편한 방식으로
    • DAG 코드를 개발자가 손으로 작성하는 것이 아니라 코드로 작성한다는 것 (템플릿으로 찍어내는 형태)
  • 템플릿과 YAML을 기반으로 DAG를 동적으로 만들어보자!
    • Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고
    • YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
  • 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
  • DAG를 계속해서 만드는 것한 DAG 안에서 태스크를 늘리는 것 사이의 밸런스 필요
    • owner가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음
    • DAG들이 하는 일이 동일하고 파라미터만 달라지는 경우에는 Dynamic Dag가 더 편리할 수 있음


기본 아이디어

  • DAG 뼈대를 Jinja Template으로 만들어놓음
  • 파라미터를 정의한 YAML 파일 생성 (입력)
  • generator.py 스크립트 -> 생성기
  • 생성기를 통해 yaml 파일마다 하나의 DAG 파일을 만들어냄
    • generator 실행을 언제할지 결정 필요! (얼마나 자주)
      • 자동 vs. 수동

스크린샷 2024-01-07 오후 5 10 01