Post

[DEV] 12주차. Airflow 고급 기능 (1)

[DEV] 12주차. Airflow 고급 기능 (1)

1. Airflow 환경 설정

docker-compose.yaml 수정

  • airflow-common 의 environment
    • AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
      • 임시 데이터를 저장할 폴더 위치
      • AIRFLOW_VAR_ 뒤의 이름이 환경변수 이름!
      • : 뒤의 값이 해당 변수의 값
    • _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUREMENTS:- yfinance pandas numpy oauth2client gspread}
  • airflow-common 의 volumes
    • - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
  • airflow-init 의 command 수정
    • mkdir -p 문에 /sources/data 추가
    • chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,}에 data 추가


  • :- 의 의미
    • 만약 이 이름의 환경변수가 존재하면 그 값을 읽어서 이 변수의 값으로 세팅하고,
    • 만약 이 이름의 환경변수가 host os에 세팅이 안되어있으면 airflow container가 실행될 때 이 뒤의 값을 환경변수로 사용해라
    • if문 느낌


  • data 폴더를 호스트 폴더에서 만들고 볼륨으로 공유
    • 임시 데이터를 저장할 폴더 /opt/airflow/data
    • 이를 docker volume으로 지정해서 나중에 디버깅에 사용


웹 UI 로그인

  • http://localhost:8080/login
    • airflow:airflow
  • 앞서 설정한 DATA_DIR 이라는 변수는 Variables에 보이지 않음
    • DAG와 Airflow 환경 정보들은 Postgres의 Named Volume으로 유지되고 있음
    • 환경변수로 설정한 것들은 웹 UI에는 보이지 않지만 프로그램에서는 사용 가능
  • docker exec -it learn-airflow-airflow-scheduler-1 airflow variables get DATA_DIR
    • /opt/airflow/data 출력

실행환경 관리

1) 기타 환경설정값들 (Variables, Connections 등) 관리/배포

  • 보통 docker-compose.yml 파일에서 x-airflow-common 부분에 정의
  • 환경변수가 아니라 별도 credentials 전용 Secrets 백엔드라는 것을 사용하기도 함
  • DAG들은 Github Repo의 DAGs 폴더에 존재
1
2
3
4
5
6
7
x-airflow-common:
    &airflow-common:
    ...
    environment:
        &airflow-common-env
        AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
        AIRFLOW_CONN_TEST_ID: test_connection


2) 어디까지 Airflow 이미지로 관리하고 무엇을 docker-compose.yml에서 관리할 것인지

  • 이는 회사마다 조금씩 다름
  • Airflow 자체 이미지를 만들고 거기에 넣을지
    • 이 경우 환경변수를 자체 이미지에 넣고 이를 docker-compose.yml 파일에서 사용
  • 아니면 docker-compose.yml에서 환경변수를 직접 설정


  • 일반적으로 DAGs를 아예 airflow 이미지에 넣어서 만들고, 그 airflow 이미지를 docker-compose.yml에서 사용하는 것이 좋음 (production)
  • 개발시에는 Host Volume을 사용해서 dags folder 자체는 로컬에 두고, Host Volume을 마운트해서 사용하는 것이 일반적
1
2
3
4
x-airflow-common:
    &airflow-common
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
    # AIRFLOW_IMAGE_NAME 환경변수가 정의되어 있다면 그것을 사용하고, 아니면 기본값으로 apache/airflow:2.5.1 사용


3) DAG 관리 방안

  • Airflow image로 DAG 코드를 복사하여 만드는 것이 깔끔
  • 아니면 docker-compose에서 host volume 형태로 설정
    • 개발/테스트용으로 더 적합

.airflowignore

  • Airflow의 DAG 스캔 패턴
    • dags_folder가 가리키는 폴더를 서브폴더들까지 다 스캔해서 DAG 모듈이 포함된 모든 파이썬 스크립트를 실행해서 새로운 DAG를 찾게 됨
    • 가끔 사고로 이어질 수 있음
  • Airflow가 의도적으로 무시해야 하는 DAG_FOLDER의 디렉터리 또는 파일을 지정
  • .airflowignore의 각 줄은 정규식 패턴으로 지정하며, 매칭되는 파일들은 무시됨


1
2
project_a
tenant_[\d]
  • 위의 경우 아래 파일들이 무시됨
    • project_a_dag_1.py, TESTING_project_a.py, tenanat_q.py, project_a/dag_1.py

2. Summary 테이블 구현

MAU 요약 테이블

  • Build_Summary.py
    • MAU 요약 테이블
    • 이 부분을 dbt로 구현하는 회사도 많음


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta

from airflow import AirflowException

import requests
import logging
import psycopg2

from airflow.exceptions import AirflowException

def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
    return hook.get_conn().cursor()


def execSQL(**context):

    schema = context['params']['schema'] 
    table = context['params']['table']
    select_sql = context['params']['sql']

    logging.info(schema)
    logging.info(table)
    logging.info(select_sql)

    cur = get_Redshift_connection()

    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
    sql += select_sql
    cur.execute(sql)

    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

    try:
        sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
        sql += "COMMIT;"
        logging.info(sql)
        cur.execute(sql)
    except Exception as e:
        cur.execute("ROLLBACK")
        logging.error('Failed to sql. Completed ROLLBACK!')
        raise AirflowException("")


dag = DAG(
    dag_id = "Build_Summary",
    start_date = datetime(2021,12,10),
    schedule = '@once',
    catchup = False
)

execsql = PythonOperator(
    task_id = 'mau_summary',
    python_callable = execSQL,
    params = {
        'schema' : '***',
        'table': 'mau_summary',
        'sql' : """SELECT 
    TO_CHAR(A.ts, 'YYYY-MM') AS month,
    COUNT(DISTINCT B.userid) AS mau
    FROM raw_data.session_timestamp A
    JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
    GROUP BY 1 
    ;"""
    },
    dag = dag
)


결과
스크린샷 2024-01-05 오전 10 07 01

사용자별 channel 정보 요약 테이블

  • PythonOperator 파라미터 설정
1
2
3
4
5
6
7
8
9
10
params = {
    'schema' : '***', 
    'table': 'channel_summary', 
    'sql' : """SELECT
        DISTINCT A.userid,
        FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel,
        LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel
        FROM raw_data.user_session_channel A
        LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;"""
},

CTAS를 별도의 환경 설정 파일로

  • 환경 설정 중심의 접근 방식
    • config 폴더 생성
    • 그 안에 summary 테이블 별로 하나의 환경설정 파일 생성
      • python dictionary 형태 -> .py 확장자를 가져야 함
  • 이렇게 하면 비개발자들이 사용할 때 어려움을 덜 느끼게 됨
  • 더 다양한 테스트 추가


dag/config/mau_summary.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
    'table': 'nps_summary',
    'schema': '***',
    'main_sql': """ 
    SELECT
        TO_CHAR(A.ts, 'YYYY-MM') AS month,
        COUNT(DISTINCT B.userid) AS mau
    FROM raw_data.session_timestamp A
    JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
    GROUP BY 1 ;""",
    'input_check':
    [
    {
        'sql': 'SELECT COUNT(1) FROM lebk1124.nps',
        'count': 150000
    },
    ],
    'output_check':
    [
    {
        'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
        'count': 12
    }
    ],
}

dag/config/nps_summary.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
    'table': 'nps_summary',
    'schema': 'leebk1124',
    'main_sql': """
SELECT LEFT(created_at, 10) AS date,
ROUND(SUM(CASE
WHEN score >= 9 THEN 1 
WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
FROM leebk1124.nps
GROUP BY 1
ORDER BY 1;""",
    'input_check':
    [
    {
        'sql': 'SELECT COUNT(1) FROM lebk1124.nps',
        'count': 150000
    },
    ],
    'output_check':
    [
    {
        'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
        'count': 12
    }
    ],
}

dag/config/channel_summary.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
    'table': 'channel_summary',
    'schema': 'leebk1124',
    'main_sql': """
    SELECT 
        DISTINCT A.userid,
        FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded folloiwing) AS first_channel,
        LAST_VALUE(A.channel) over(parition by A.userid ordrer by B.ts rows between unbounded preceding and unbounded and unbounded following) AS last_channel
        FROM raw_data.user_session_channel A
        LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;
    """,
    'input_check':
    [
    {
        'sql': 'SELECT COUNT(1) FROM lebk1124.nps',
        'count': 150000
    },
    ],
    'output_check':
    [
    {
        'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
        'count': 12
    }
    ],
}

NPS summary 테이블

  • 일별 NPS 계산 SQL
1
2
3
4
5
6
7
8
9
10
11
12
SELECT LEFT(created_at, 10) AS dats,
    ROUND(
        SUM(
            CASE 
                WHEN score >= 9 THEN 1
                WHEN score <= 6 THEN -1
            END
        )::float*100/COUNT(1), 2
    ) nps
FROM bokyung.nps
GROUP BY 1
ORDER BY 1;


  • config/nps_summary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
{
    'table': 'nps_summary', 
    'schema': '***', 
    'main_sql': """SELECT ...;""", 
    'input_check': [ {
        'sql': 'SELECT COUNT(1) FROM ***.nps',
        'count': 150000 
    } ],
    'output_check': [ {
        'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}', 
        'count': 12
    } ], 
}

3. Slack 연동

  • DAG 실행 중 에러가 발생하면 지정된 슬랙 workspace 채널로 보내기
  • 이를 위해 해당 슬랙 workspace에 App 설정 필요
  • 다음으로 연동을 위한 함수 생성 (plugins/slack.py)
  • 이를 태스크에 적용되는 default_args의 on_failure_callback에 지정
1
2
3
4
5
6
from plugins import slack

    ...
    default_args={
        'on_failure_callback': slack.on_failure_callback,
    }


https://api.slack.com/messaging/webhooks 에서 Incoming Webhooks App 생성

스크린샷 2024-01-04 오전 4 30 34 연동할 workspace 선택

스크린샷 2024-01-04 오전 4 30 47 Incoming Webhooks 선택 후 활성화 & [Add New Webhook to Workspace] 클릭

스크린샷 2024-01-04 오전 4 32 12 사용할 채널 선택

스크린샷 2024-01-04 오전 4 32 38 Endpoint 주소 복사


  • 연동 방법
    • Endpoint 주소를 Airflow Variable로 저장 slack_url
    • slack에 에러 메세지를 보내는 별도 모듈로 개발 : slack.py
    • 이를 DAG 인스턴스를 만들 때 에러 콜백으로 지정


  • slack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from airflow.models import Variable

import logging
import requests

def on_failure_callback(context):
    """
    https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
    Define the callback to post on Slack if a failure is detected in the Workflow
    :return: operator.execute
    """
    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")


# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
    # url = "https://slack.com/api/chat.postMessage"
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r


  • name_gender_v4 파일에 버그 생성

스크린샷 2024-01-04 오전 5 09 59 오타 발생시키기


  • 중요
    • except문에 raise를 넣어서 error를 명확하게 확인할 수 있도록!
This post is licensed under CC BY 4.0 by the author.