1. Airflow 환경 설정
docker-compose.yaml 수정
- airflow-common 의 environment
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
- 임시 데이터를 저장할 폴더 위치
AIRFLOW_VAR_
뒤의 이름이 환경변수 이름!:
뒤의 값이 해당 변수의 값
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUREMENTS:- yfinance pandas numpy oauth2client gspread}
- airflow-common 의 volumes
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
- airflow-init 의 command 수정
mkdir -p
문에 /sources/data
추가chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,}
에 data 추가
:-
의 의미- 만약 이 이름의 환경변수가 존재하면 그 값을 읽어서 이 변수의 값으로 세팅하고,
- 만약 이 이름의 환경변수가 host os에 세팅이 안되어있으면 airflow container가 실행될 때 이 뒤의 값을 환경변수로 사용해라
- if문 느낌
- data 폴더를 호스트 폴더에서 만들고 볼륨으로 공유
- 임시 데이터를 저장할 폴더
/opt/airflow/data
- 이를 docker volume으로 지정해서 나중에 디버깅에 사용
웹 UI 로그인
- http://localhost:8080/login
- 앞서 설정한 DATA_DIR 이라는 변수는 Variables에 보이지 않음
- DAG와 Airflow 환경 정보들은 Postgres의 Named Volume으로 유지되고 있음
- 환경변수로 설정한 것들은 웹 UI에는 보이지 않지만 프로그램에서는 사용 가능
docker exec -it learn-airflow-airflow-scheduler-1 airflow variables get DATA_DIR
실행환경 관리
1) 기타 환경설정값들 (Variables, Connections 등) 관리/배포
- 보통 docker-compose.yml 파일에서 x-airflow-common 부분에 정의
- 환경변수가 아니라 별도 credentials 전용 Secrets 백엔드라는 것을 사용하기도 함
- DAG들은 Github Repo의 DAGs 폴더에 존재
1
2
3
4
5
6
7
| x-airflow-common:
&airflow-common:
...
environment:
&airflow-common-env
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
AIRFLOW_CONN_TEST_ID: test_connection
|
2) 어디까지 Airflow 이미지로 관리하고 무엇을 docker-compose.yml에서 관리할 것인지
- 이는 회사마다 조금씩 다름
- Airflow 자체 이미지를 만들고 거기에 넣을지
- 이 경우 환경변수를 자체 이미지에 넣고 이를 docker-compose.yml 파일에서 사용
- 아니면 docker-compose.yml에서 환경변수를 직접 설정
- 일반적으로 DAGs를 아예 airflow 이미지에 넣어서 만들고, 그 airflow 이미지를 docker-compose.yml에서 사용하는 것이 좋음 (production)
- 개발시에는 Host Volume을 사용해서 dags folder 자체는 로컬에 두고, Host Volume을 마운트해서 사용하는 것이 일반적
1
2
3
4
| x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
# AIRFLOW_IMAGE_NAME 환경변수가 정의되어 있다면 그것을 사용하고, 아니면 기본값으로 apache/airflow:2.5.1 사용
|
3) DAG 관리 방안
- Airflow image로 DAG 코드를 복사하여 만드는 것이 깔끔
- 아니면 docker-compose에서 host volume 형태로 설정
.airflowignore
- Airflow의 DAG 스캔 패턴
- dags_folder가 가리키는 폴더를 서브폴더들까지 다 스캔해서 DAG 모듈이 포함된 모든 파이썬 스크립트를 실행해서 새로운 DAG를 찾게 됨
- 가끔 사고로 이어질 수 있음
- Airflow가 의도적으로 무시해야 하는 DAG_FOLDER의 디렉터리 또는 파일을 지정
- .airflowignore의 각 줄은 정규식 패턴으로 지정하며, 매칭되는 파일들은 무시됨
1
2
| project_a
tenant_[\d]
|
- 위의 경우 아래 파일들이 무시됨
- project_a_dag_1.py, TESTING_project_a.py, tenanat_q.py, project_a/dag_1.py
2. Summary 테이블 구현
MAU 요약 테이블
- Build_Summary.py
- MAU 요약 테이블
- 이 부분을 dbt로 구현하는 회사도 많음
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta
from airflow import AirflowException
import requests
import logging
import psycopg2
from airflow.exceptions import AirflowException
def get_Redshift_connection():
hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
return hook.get_conn().cursor()
def execSQL(**context):
schema = context['params']['schema']
table = context['params']['table']
select_sql = context['params']['sql']
logging.info(schema)
logging.info(table)
logging.info(select_sql)
cur = get_Redshift_connection()
sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
sql += select_sql
cur.execute(sql)
cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
count = cur.fetchone()[0]
if count == 0:
raise ValueError(f"{schema}.{table} didn't have any record")
try:
sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
sql += "COMMIT;"
logging.info(sql)
cur.execute(sql)
except Exception as e:
cur.execute("ROLLBACK")
logging.error('Failed to sql. Completed ROLLBACK!')
raise AirflowException("")
dag = DAG(
dag_id = "Build_Summary",
start_date = datetime(2021,12,10),
schedule = '@once',
catchup = False
)
execsql = PythonOperator(
task_id = 'mau_summary',
python_callable = execSQL,
params = {
'schema' : '***',
'table': 'mau_summary',
'sql' : """SELECT
TO_CHAR(A.ts, 'YYYY-MM') AS month,
COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
;"""
},
dag = dag
)
|
결과
사용자별 channel 정보 요약 테이블
1
2
3
4
5
6
7
8
9
10
| params = {
'schema' : '***',
'table': 'channel_summary',
'sql' : """SELECT
DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel
FROM raw_data.user_session_channel A
LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;"""
},
|
CTAS를 별도의 환경 설정 파일로
- 환경 설정 중심의 접근 방식
- config 폴더 생성
- 그 안에 summary 테이블 별로 하나의 환경설정 파일 생성
- python dictionary 형태 -> .py 확장자를 가져야 함
- 이렇게 하면 비개발자들이 사용할 때 어려움을 덜 느끼게 됨
- 더 다양한 테스트 추가
dag/config/mau_summary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| {
'table': 'nps_summary',
'schema': '***',
'main_sql': """
SELECT
TO_CHAR(A.ts, 'YYYY-MM') AS month,
COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1 ;""",
'input_check':
[
{
'sql': 'SELECT COUNT(1) FROM lebk1124.nps',
'count': 150000
},
],
'output_check':
[
{
'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
'count': 12
}
],
}
|
dag/config/nps_summary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| {
'table': 'nps_summary',
'schema': 'leebk1124',
'main_sql': """
SELECT LEFT(created_at, 10) AS date,
ROUND(SUM(CASE
WHEN score >= 9 THEN 1
WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
FROM leebk1124.nps
GROUP BY 1
ORDER BY 1;""",
'input_check':
[
{
'sql': 'SELECT COUNT(1) FROM lebk1124.nps',
'count': 150000
},
],
'output_check':
[
{
'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
'count': 12
}
],
}
|
dag/config/channel_summary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| {
'table': 'channel_summary',
'schema': 'leebk1124',
'main_sql': """
SELECT
DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded folloiwing) AS first_channel,
LAST_VALUE(A.channel) over(parition by A.userid ordrer by B.ts rows between unbounded preceding and unbounded and unbounded following) AS last_channel
FROM raw_data.user_session_channel A
LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;
""",
'input_check':
[
{
'sql': 'SELECT COUNT(1) FROM lebk1124.nps',
'count': 150000
},
],
'output_check':
[
{
'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
'count': 12
}
],
}
|
NPS summary 테이블
1
2
3
4
5
6
7
8
9
10
11
12
| SELECT LEFT(created_at, 10) AS dats,
ROUND(
SUM(
CASE
WHEN score >= 9 THEN 1
WHEN score <= 6 THEN -1
END
)::float*100/COUNT(1), 2
) nps
FROM bokyung.nps
GROUP BY 1
ORDER BY 1;
|
1
2
3
4
5
6
7
8
9
10
11
12
13
| {
'table': 'nps_summary',
'schema': '***',
'main_sql': """SELECT ...;""",
'input_check': [ {
'sql': 'SELECT COUNT(1) FROM ***.nps',
'count': 150000
} ],
'output_check': [ {
'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
'count': 12
} ],
}
|
3. Slack 연동
- DAG 실행 중 에러가 발생하면 지정된 슬랙 workspace 채널로 보내기
- 이를 위해 해당 슬랙 workspace에 App 설정 필요
- 다음으로 연동을 위한 함수 생성 (plugins/slack.py)
- 이를 태스크에 적용되는 default_args의
on_failure_callback
에 지정
1
2
3
4
5
6
| from plugins import slack
...
default_args={
'on_failure_callback': slack.on_failure_callback,
}
|
https://api.slack.com/messaging/webhooks 에서 Incoming Webhooks App 생성
연동할 workspace 선택
Incoming Webhooks 선택 후 활성화 & [Add New Webhook to Workspace] 클릭
사용할 채널 선택
Endpoint 주소 복사
- 연동 방법
- Endpoint 주소를 Airflow Variable로 저장
slack_url
- slack에 에러 메세지를 보내는 별도 모듈로 개발 : slack.py
- 이를 DAG 인스턴스를 만들 때 에러 콜백으로 지정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| from airflow.models import Variable
import logging
import requests
def on_failure_callback(context):
"""
https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
# url = "https://slack.com/api/chat.postMessage"
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
|
오타 발생시키기
- 중요
- except문에
raise
를 넣어서 error를 명확하게 확인할 수 있도록!