[DEV] 12주차. Airflow 고급 (3)
1. Dag Dependencies
DAG를 실행하는 방법
- 주기적 실행: schedule로 지정
- crontab 형태로 지정
- 다른 DAG에 의해 트리거 (의존관계가 있는 경우)
- Explicit Trigger: DAG A가 분명하게 DAG B를 트리거 (
TriggerDagOperator
) - Reactive Trigger: DAG B가 DAG A가 끝나기를 대기 (
ExternalTaskSensor
)
- Explicit Trigger: DAG A가 분명하게 DAG B를 트리거 (
- 상황에 따라 다른 태스크 실행 방식들
- 조건에 따라 다른 태스크로 분기 : 동적으로 결정 (
BranchPythonOperator
) - 과거 데이터 Backfill시에는 불필요한 태스크 처리 (
LatestOnlyOperator
)- 하나의 DAG에 다양한 성격의 태스크를 갖고 있는 경우 (incremental update & 뉴스레터 전송)
- backfill을 위해 실행했는데 뉴스레터를 보낸다면 곤란! -> backfill을 할 때 뉴스레터 태스크 실행 중단
- 앞단 태스크들의 실행 상황
- 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음
- 조건에 따라 다른 태스크로 분기 : 동적으로 결정 (
Dag Dependencies
- Explicit trigger
TriggerDagOperator
- DAG A가 명시적으로 DAG B를 트리거
- jinja template 사용
- Reactive trigger
ExternalTaskSensor
- DAG B가 DAG A의 태스크가 끝나기를 대기
- 이 경우 DAG A는 이 사실을 모름
- 의존관계 불명확 -> DAG A를 잘못 수정할 경우 DAG B에 문제가 발생할 수 있음
- Data Catalog, Data Discovery
Jinja Template
- Python에서 널리 사용되는 템플릿 엔진
- 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
- 프레젠테이션 로직: 사람 눈에 보이는 부분
- 애플리케이션 로직: 상황에 맞게 어떤 프레젠테이션을 할 것인지 결정
- Flask에서 많이 사용됨
- Airflow에서도 쉽게 사용!
- execution_date 등 시스템에서 제공하는 변수들을 python code 내에서 쉽게 불러서 사용할 수 있음
- 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
- TriggerDagOperator에서
- Airflow 태스크 데이터를 읽어서 코드 안에서 편하게 사용
- Dynamic Dags에서 사용
- 변수는 이중 중괄호 ``로 감싸서 사용
<h1>안녕하세요, 님!</h1>
- 제어문은 퍼센트 기호
{% %}
로 표시
<ul>
</ul>
Airflow에서
- 작업 이름, 파라미터, SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
- 재사용 가능하고 사용자 정의 가능한 워크플로우 생성
- 특정 파라미터에서만 사용 가능
- 시스템 정보 사용
- execution_date을 코드 내에서 쉽게 사용 : ``
- execution_date에서 연도와 월, 일만 추출
- Variables, Connections 정보도 쉽게 읽을 수 있음
- 태스크 이름, DAG 이름, DAG가 언제 마지막으로 성공적으로 실행되었는지, 다음 DAG가 실행되는 시간은 언제인지 ..
- execution_date을 코드 내에서 쉽게 사용 : ``
task1 = BashOperator(
task_id="task1",
bash_command='echo ""',
dag=dag
)
- 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, !"',
params={'name':'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
사용 가능한 변수들
TriggerDagOperator
- DAG A의 태스크를 TriggerDagRunOperator로 구현
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는 DAG 이름"
)
- TriggerDagRunOperator에서 사용할 수 있는 jinja template 변수
- trigger_dag_id
- trigger_run_id
- conf
- execution_date
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는 DAG 이름",
conf={'path':'/opt/ml/conf'},
execution_date="",
reset_dag_run=True,
wait_for_completion=True
)
-
airflow.cfg의
dag_run_conf_overrides_params
가 True로 설정되어 있어야 함 conf={'path':'/opt/ml/conf'}
- DAG B에 넘기고 싶은 정보
- DAG B에서는 ``로 접근 가능
- DAG B PythonOperator(**context)에서라면 ``
execution_date=""
: DAG A의 execution_date 패스reset_dag_run=True
: True일 경우 해당 날짜가 이미 실행되었더라도 다시 재실행wait_for_completion=True
: DAG B가 끝날 때까지 기다릴지 여부를 결정, default 값은 False
Sensor
- 특정 조건이 충족될 때까지 대기하는 Operator
- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
- 파일 생성, HTTP 응답, 특정 dag의 특정 태스크가 완료되었는지 등
- Airflow 내장 sensor
- FileSensor : 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor : HTTP 요청을 수행하고 지정된 응답을 대기
- SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
- ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
- 기본적으로 주기적으로 poke를 하는 것
- poke: 주기적으로 체크하는 것
- worker를 하나 붙잡고 poke간에 sleep을 할지, 또는 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재 :
mode
- mode의 값은
reschedule
/poke
- mode의 값은
ExternalTaskSensor
- DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
- 먼저 동일한 schedule_interval을 사용
- 이 경우 두 태스크들의 execution_date이 동일해야함. 다르면 매칭이 안됨!
- 잘 사용하지는 않음
- 맞아야 하는 조건이 까다로움
- poke 모드가 일반적이지만 worker 한 개가 낭비되는 느낌
- DAG A 관점에서 의존관계가 명확하지 않기 때문에 실수를 할 확률이 높음
from airflow.sensors.external_task import ExeternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule' # worker release했다가 잡았다가 함
)
- 만약 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면
- 예를 들어 A가 B보다 5분 먼저 실행된다면
- execution_delta 사용하여 차이 조절
- 하지만 어긋나기 쉬움 ..
- execution_date_fn을 사용하면 조금 더 복잡하게 컨트롤 가능
- execution_delta 사용하여 차이 조절
- 만약 두 개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExeternalTaskSensor는 사용 불가
- 예를 들어 A가 B보다 5분 먼저 실행된다면
from airflow.sensors.external_task import ExeternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule',
execution_delta=timedelta(minutes=5)
)
BranchPythonOperator
- 상황에 따라 뒤에 실행되어야 할 태스크를 동적으로 결정해주는 오퍼레이터
- 미리 정해준 오퍼레이터들 중 선택하는 형태
- TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음
from airflow.operators.python import BranchPythonOperator
def skip_of_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ['trigger_b']
# mode라는 variable의 값이 dev이면 trigger_b 태스크 스킵
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
실습
- UTC 기준 12시 이전이면 morning_task로, 아니면 afternoon_task로 브랜치
- current_hour: 0
INFO - Following branch morning_task
Skipping tasks ['afternoon_task']
- 이 경우 실행되지 않은 afternoon_task의 상태는 skipped가 됨
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2023, 1, 1)
}
dag = DAG(
'Learn_BranchPythonOperator',
schedule='@daily',
default_args=default_args)
def decide_branch(**context):
current_hour = datetime.now().hour
print(f"current_hour: {current_hour}")
if current_hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
branching_operator = BranchPythonOperator(
task_id='branching_task',
python_callable=decide_branch,
dag=dag
)
morning_task = EmptyOperator(
task_id='morning_task',
dag=dag
)
afternoon_task = EmptyOperator(
task_id='afternoon_task',
dag=dag
)
branching_operator >> morning_task
branching_operator >> afternoon_task
LatestOnlyOperator
- Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
- 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고, 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어나가고 아니면 여기서 중단됨
- t1 » t3 » [t2, t4]
- Time-sensitive한 태스크들은 LatestOnlyOperator 뒤쪽에 놓이도록!
- 혹은 Incrementral Update와 Time-sensitive한 태스크를 별도의 DAG로 분리하는 것도 방법
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id='latest_only_example',
schedule=timedelta(hours=48),
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t3 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
t3, t4는 skip됨!
Trigger Rules
- Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행 여부를 결정하고 싶다면?
- 보통 앞단이 하나라도 실패하면 뒷단의 태스크는 실행 불가
- Operator의
trigger_rule
파라미터로 결정 가능- 태스크에 주어지는 파라미터로, 아래와 같은 값이 가능
- all_success (default), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
airflow.utils.trigger_rule.TriggerRule
from airflow.utils.trigger_rule import TriggerRule
with DAG('trigger_rules', default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id='print_date', bash_command='date')
t2 = BashOperator(task_id='sleep', bash_command='sleep 5')
t3 = BashOperator(task_id='exit', bash_command='exit 1') # exit: 0이 아니면 실패하는 task
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE # 성공실패, skip 상관없이 끝나면 ok
)
[t1, t2, t3] >> t4
결과
Airflow 메타데이터 DB 내용 살펴보기
-
named volume이기 때문에 컨테이너를 껐다 켜도 유지됨!
-
airflow:airflowfh Postgres 로그인 가능
docker exec -it learn-airflow-airflow-webserver-1 sh
psql -h postgres
- psql shell에서 아래 명령 수행
\dt -- 모든 테이블 출력
SELECT * FROM dag_run LIMIT 10; -- 각 DAG가 실행된 기록
DELETE FROM dag_run WHERE dag_id = '기록을 삭제하고싶은 DAG';
2. Task Grouping
필요성
- 태스크 수가 많은 DAG라면 태스크들을 성격에 따라 관리하고 싶은 니즈 존재
- SubDAG가 사용되다가 Airflow2.0에서 나온 Task Grouping으로 넘어가는 추세
- 다수의 파일 처리를 하는 DAG라면
- 파일 다운로드 태스크 / 파일 체크 태스크 / 데이터 처리 태스크로 구성
- TaskGroup 안에 TaskGroup nesting 가능
- TaskGroup도 태스크처럼 실행 순서 정의 가능
예시
from airflow utils.task_group import TaskGroup
start = EmptyOperator(task_id='start')
with TaskGroup('Download', tooltip='Tasks for downloading data') as section_1:
task_1 = EmptyOperator(task_id='task1')
task_2 = BashOperator(task_id='task2', bash_command='echo 1')
task_3 = EmptyOperator(task_id='task3')
task_1 >> [task_2, task_3]
start >> section_1
3. Dynamic Dags
Dynamic Dag
- 정해진 방법이 있는건 아니고 본인에게 편한 방식으로
- DAG 코드를 개발자가 손으로 작성하는 것이 아니라 코드로 작성한다는 것 (템플릿으로 찍어내는 형태)
- 템플릿과 YAML을 기반으로 DAG를 동적으로 만들어보자!
- Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고
- YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
- 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
- DAG를 계속해서 만드는 것과 한 DAG 안에서 태스크를 늘리는 것 사이의 밸런스 필요
- owner가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음
- DAG들이 하는 일이 동일하고 파라미터만 달라지는 경우에는 Dynamic Dag가 더 편리할 수 있음
기본 아이디어
- DAG 뼈대를 Jinja Template으로 만들어놓음
- 파라미터를 정의한 YAML 파일 생성 (입력)
- generator.py 스크립트 -> 생성기
- 생성기를 통해 yaml 파일마다 하나의 DAG 파일을 만들어냄
- generator 실행을 언제할지 결정 필요! (얼마나 자주)
- 자동 vs. 수동
- generator 실행을 언제할지 결정 필요! (얼마나 자주)