[DEV] 10주차. 데이터 파이프라인과 Airflow(2)
[DEV] 10주차. 데이터 파이프라인과 Airflow(2)
1. Airflow - Docker 사용
- airflow-sertup Github repo 클론
git clone https://github.com/keeyong/airflow-setup.git
- airflow-setup 폴더로 이동 후 2.5.1 이미지 관련 yml 파일 다운로드
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
- 이미지 다운로드 및 컨테이너 실행
docker-compose -f docker-compose.yaml pull
docker-compose -f docker-compose.yaml up
- 웹 로그인
owner: airflow
: airflow가 만든 예제 DAGs
docker-compose pull 실행 시 credential 오류
vi ~/.docker/config.json
에서credsStore
->credStore
수정!
가장 좋은 방법
- 리눅스 서버 위에 도커 컨테이너로 Airflow 실행하는 것
- EC2 서버 위에 도커 설치 - Airflow 설치
- 하지만 프리 티어가 불가능 (성능이 좋은 리눅스 서버 필요)
2. Airflow 구조
코드 기본 구조
- DAG 대표하는 객체를 먼저 생성
- DAG 이름, 실행 주기, 실행 날짜, 오너 등
- DAG를 구성하는 태스크 생성
- 몇 개의 태스크로 구성할 것인지, 각 태스크는 어떤 일을 맡을 것인지 명확히
- 태스크 별로 적합한 오퍼레이터 생성
- 태스크 ID를 부여하고, 해야 할 작업의 세부 사항 지정
- 최종적으로 태스크들 간의 실행 순서 결정
DAG 설정 예제
1
2
3
4
5
6
7
8
from datetime import datetime, timedelta
default_args = {
'owner': 'bokyung',
'email': ['leebk1124@naver.com'],
'retries': 1, # 실패한다면 재시도를 몇 번 할 지
'retry_delay': timedelta(minutes=3), # 재시도들 사이에 몇 분 기다릴지
}
- 여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됨
- 뒤에서 DAG 객체를 만들 때 지정
- 추가로 적용할 수 있는 인자들 (더 많음!)
on_failure_callback
: 태스크를 실패했을 때 호출할 함수on_success_callback
: 성공했을 때 호출할 함수 (이어서 할 일을 하는 함수)
예제 2
1
2
3
4
5
6
7
8
9
10
11
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date = datetime(2023, 12, 12, hour=0, minute=00),
schedule="0 * * * *", # 매 시 0분에 시작
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
- catchup
- start_date를 지금보다 과거로 설정했을 때 start_date과 현재까지의 gap에 대해 밀린 태스크를 실행해 줄 것인지
- Full Refresh를 하는 job의 경우 항상 False로 설정
- 어차피 모든 데이터를 다시 가져올 것이기 때문에
Bash Operator를 사용한 예제
- 3개의 태스크로 구성
- t1은 현재 시간 출력
- t2는 5초간 대기 후 종료
- t3는 서버의 /tmp 디렉토리 내용 출력
- t1이 끝나고 t2와 t3를 병렬로 실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'bokyung',
'start_date': datetime(2023, 5, 27, hour=0, minute=00)
'email': ['leebk1124@naver.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1",
schedule="0 9 * * *",
tags=["test"],
catchup=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag
)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag
)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag
)
t1 >> [t2, t3]
웹UI로 실행
터미널로 실행
Airflow 서버에 로그인 후 명령 실행
docker ps
명령 실행 후 scheduler에 해당하는 컨테이너에 접속하는 것!
1
docker exec -it [container id] sh # docker container에 로그인, 쉘 스크립트 띄우겠다
1
2
3
airflow dags list
airflow tasks list [DAG 이름]
airflow tasks test [DAG 이름] [Task 이름] [날짜]
- 날짜는
YYYY-MM-DD
start_date
보다 과거인 경우는 실행이 되지만, 오늘 날짜보다 미래인 경우 실행되지 않음- 이 값이
execution_date
의 값이 됨 (Backfill에서 사용)
airflow tasks test [DAG 이름] [Task 이름] [날짜]
에서test
는 실행 결과가 메타데이터 DB에 저장되지 않음run
은 실행 결과가 메타데이터 DB에 저장됨
This post is licensed under CC BY 4.0 by the author.