Post

[DEV] 10주차. 데이터 파이프라인과 Airflow(2)

[DEV] 10주차. 데이터 파이프라인과 Airflow(2)

1. Airflow - Docker 사용

  • airflow-sertup Github repo 클론
    • git clone https://github.com/keeyong/airflow-setup.git
  • airflow-setup 폴더로 이동 후 2.5.1 이미지 관련 yml 파일 다운로드
    • curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
  • 이미지 다운로드 및 컨테이너 실행
    • docker-compose -f docker-compose.yaml pull
    • docker-compose -f docker-compose.yaml up
  • 웹 로그인


스크린샷 2023-12-13 오후 1 18 32

스크린샷 2023-12-13 오후 1 17 55


  • owner: airflow: airflow가 만든 예제 DAGs

스크린샷 2023-12-13 오후 1 19 20

docker-compose pull 실행 시 credential 오류

  • vi ~/.docker/config.json 에서 credsStore -> credStore 수정!

가장 좋은 방법

  • 리눅스 서버 위에 도커 컨테이너로 Airflow 실행하는 것
  • EC2 서버 위에 도커 설치 - Airflow 설치
  • 하지만 프리 티어가 불가능 (성능이 좋은 리눅스 서버 필요)

2. Airflow 구조

코드 기본 구조

  • DAG 대표하는 객체를 먼저 생성
    • DAG 이름, 실행 주기, 실행 날짜, 오너 등
  • DAG를 구성하는 태스크 생성
    • 몇 개의 태스크로 구성할 것인지, 각 태스크는 어떤 일을 맡을 것인지 명확히
    • 태스크 별로 적합한 오퍼레이터 생성
    • 태스크 ID를 부여하고, 해야 할 작업의 세부 사항 지정
  • 최종적으로 태스크들 간의 실행 순서 결정

DAG 설정 예제

1
2
3
4
5
6
7
8
from datetime import datetime, timedelta

default_args = {
    'owner': 'bokyung',
    'email': ['leebk1124@naver.com'],
    'retries': 1,     # 실패한다면 재시도를 몇 번 할 지 
    'retry_delay': timedelta(minutes=3),   # 재시도들 사이에 몇 분 기다릴지
}
  • 여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됨
  • 뒤에서 DAG 객체를 만들 때 지정


  • 추가로 적용할 수 있는 인자들 (더 많음!)
    • on_failure_callback: 태스크를 실패했을 때 호출할 함수
    • on_success_callback: 성공했을 때 호출할 함수 (이어서 할 일을 하는 함수)

예제 2

1
2
3
4
5
6
7
8
9
10
11
from airflow import DAG

dag = DAG(
    "dag_v1",    # DAG name
    start_date = datetime(2023, 12, 12, hour=0, minute=00),
    schedule="0 * * * *",     # 매 시 0분에 시작
    tags=["example"],
    catchup=False,
    # common settings
    default_args=default_args
)


  • schedule 의미 스크린샷 2023-12-13 오후 2 08 29

  • None, @once, @hourly, @daily, @weekly, @monthly, @yearly 로 설정도 가능


  • catchup
    • start_date를 지금보다 과거로 설정했을 때 start_date과 현재까지의 gap에 대해 밀린 태스크를 실행해 줄 것인지
    • Full Refresh를 하는 job의 경우 항상 False로 설정
      • 어차피 모든 데이터를 다시 가져올 것이기 때문에

Bash Operator를 사용한 예제

스크린샷 2023-12-13 오후 9 59 26

  • 3개의 태스크로 구성
  • t1은 현재 시간 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'bokyung',
    'start_date': datetime(2023, 5, 27, hour=0, minute=00)
    'email': ['leebk1124@naver.com'],
    'retries': 1,     
    'retry_delay': timedelta(minutes=3),   
}
test_dag = DAG(
    "dag_v1",   
    schedule="0 9 * * *", 
    tags=["test"],
    catchup=False,
    default_args=default_args
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=test_dag
)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    dag=test_dag
)

t3 = BashOperator(
    task_id='ls',
    bash_command='ls /tmp',
    dag=test_dag
)

t1 >> [t2, t3]


웹UI로 실행

스크린샷 2023-12-13 오후 10 10 01

터미널로 실행

  • Airflow 서버에 로그인 후 명령 실행

  • docker ps 명령 실행 후 scheduler에 해당하는 컨테이너에 접속하는 것!

1
docker exec -it [container id] sh   # docker container에 로그인, 쉘 스크립트 띄우겠다
1
2
3
airflow dags list
airflow tasks list [DAG 이름]
airflow tasks test [DAG 이름] [Task 이름] [날짜]


  • 날짜는 YYYY-MM-DD
    • start_date보다 과거인 경우는 실행이 되지만, 오늘 날짜보다 미래인 경우 실행되지 않음
    • 이 값이 execution_date의 값이 됨 (Backfill에서 사용)


  • airflow tasks test [DAG 이름] [Task 이름] [날짜]에서
    • test는 실행 결과가 메타데이터 DB에 저장되지 않음
    • run은 실행 결과가 메타데이터 DB에 저장됨


스크린샷 2023-12-13 오후 11 29 04

스크린샷 2023-12-13 오후 11 29 34

스크린샷 2023-12-13 오후 11 31 01

This post is licensed under CC BY 4.0 by the author.