[DEV] 10주차. 데이터 파이프라인과 Airflow(4)
1. dags 폴더에서 코드 작성시 주의할 점
- Airflow는 dags 폴더를 주기적으로 스캔함
dags_folder
키가 가리키는 위치dag_dir_list_interval
주기 만큼
- 이 때 DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨
- 이 경우 본의 아니게 개발 중인 테스트 코드도 실행될 수 있음
- 이 테스트 코드에
DELETE
문이 들어있다면 사고가 발생할 수 있음!!
2. Open Weathermap DAG 구현
Open Weathermap API
- 위도/경보를 기반으로 그 지역의 기후 정보를 알려주는 서비스
- 무료 계정으로 api key를 받아서 사용
만들 DAG
-
서울 8일 간의 낮/최소/최대 온도 읽기
- API Key를 open_weather_api_key 라는 Variable로 저장
- 서울의 위도와 경도 찾기
- One-Call API 사용
- https://openweathermap.org/api/one-call-api
- API KEY, 서울의 위도/경도를 사용해서 위 API를 requests로 호출
- 응답 결과에서 온도 정보(평균/최대/최소)만 앞으로 8일 대상으로 출력
- 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API_KEY}&units=metric
DAG 구현
- 서울의 다음 8일 간 낮/최소/최대 온도를 읽어서 weather_forecast 테이블로 저장
- 유의할 점:
created_date
는 레코드 생성 시간으로 자동으로 채워지는 필드!- Incremental Update에서 중복 검사를 위해
CREATE TABLE leebk1124.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
)
- One-Call API: JSON 형태 리턴
- requests.get(‘‘)의 text를 JSON으로 변환
- 혹은 requests.get(‘’).json()
- 결과에서
daily
라는 필드에 앞으로 7일 간의 날씨 정보가 들어가있음daily
필드: 리스트, 각 레코드가 하나의 날짜에 해당- 날짜 정보는
dt
필드에 있음- epoch, 1970년 1월 1일 이후 밀리세컨드로 시간 표시
datetime.fromtimestamp(d['dt']).strftime('%Y-%m-%d)
로 변경하면 읽을 수 있는 날짜가 됨!
코드
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("", "weather_forecast")
3. Primary Key Uniqueness
- 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드
- 하나의 필드가 일반적이지만, 다수의 필드를 사용할 수 도 있음
- 관계형 DB는 Primary Key Uniqueness를 보장함
- 빅데이터 기반 데이터 웨어하우스들은 Primary Key Uniqueness를 보장하지 않음
- 이를 보장하는 것은 데이터 인력의 책임 (ETL, ELT 구현 시)
- 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 되기 때문
Primary Key 유지 방법
- weather_forecast 테이블 예시
CREATE TABLE {schema}.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
)
- 날씨 정보이기 때문에 최근 정보가 더 신뢰성 있음
- 그래서 어느 정보가 더 최근 정보인지를
created_date
필드에 기록하고 이를 활용 -
즉,
date
가 같은 레코드들이 있다면created_date
를 기준으로 더 최근 정보를 선택 - SQL 문법 중
ROW_NUMBER
이용!
-
date 별로 created_date의 역순으로 일련 번호를 매기려면
-
date 별로 레코드를 모으고, 그 안에서 created_date의 역순으로 정렬한 후 1번부터 일련 번호
seq
부여
ROW_NUMBER() OVER (partition by date order by created_date DESC) seq
정리
- 임시 테이블 (스테이징 테이블)을 만들고 그곳으로 현재 모든 레코드를 복사
- 임시 테이블에 새로 데이터 소스에서 읽어들인 레코드들을 복사
- 이 때 중복 존재 가능
- 중복을 걸러주는 SQL 작성
- 최신 레코드를 우선 순위로 선택
ROW_NUMBER
를 이용해 primary key로 partition을 잡고 적당한 다른 필드(보통 timestamp)로 정렬(DESC)을 수행해 primary key 별로 하나의 레코드를 뽑아냄
- 위 SQL을 바탕으로 최종 원본 테이블로 복사
- 이 때 원본 테이블에서 레코드 삭제
- 임시 temp 테이블을 원본 테이블로 복사
-- 1) 원래 테이블 내용을 임시 테이블 t로 복사
CREATE TEMP TABLE t AS SELECT * FROM {schema}.weather_forecast;
-- 2) DAG는 임시 테이블 t에 레코드 추가
-- 3) 원본 테이블에서 레코드 삭제
DELETE FROM {schema}.weather_forecast;
-- 4) 중복을 없앤 형태로 새로운 테이블 생성
-- 매번 새로 덮어쓰는 형식의 업데이트 가정
INSERT INTO {schema}.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
- 여기서 transaction으로 처리되어야 하는 최소 범위의 SQL은?
- 데이터의 정합성이 깨질 수 있는 부분은 3, 4번! (최소화 했을 때)
- 4개의 과정 전부 묶어도 됨
autocommit=True
인 경우
autocommit=False
인 경우에는 모든 작업이 다 transaction임
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='')
return hook.get_conn().cursor()
@task
def etl(schema, table, lat, lon, api_key):
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# 원본 테이블이 없다면 생성
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# 임시 테이블 생성
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 임시 테이블 데이터 입력
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = 'Weather_to_Redshift_v2',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
Upsert
-
Insert + Update
- Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
- 존재하지 않는 레코드라면 새 레코드로 적재
- 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
4. Backfill
- Incremental Update가 실패하면 그 동안의 데이터가 빠져있다는 뜻
- Backfill은 Full Refresh에서는 필요 없음
- 가능하면 Full Refresh를 사용하는 것이 좋음
- 문제가 생겨도 다시 실행하면 되기 때문!
- Incremental Update는 효율성이 더 좋을 수 있지만, 운영 및 유지보수의 난이도가 올라감
- 실수 등으로 데이터가 빠지는 일이 생길 수 있음
- 과거 데이터를 다시 다 읽어와야 하는 경우 다시 모두 재실행을 해주어야 함
- Full Refresh로 버티다가 더이상 안되겠을 때 Incremental Update로 바꾸는 것이 좋음
Backfill의 용이성 여부
- 데이터 엔지니어의 삶에 직접적인 영향!
- Backfill의 정의
- 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 읽어와야 하는 경우를 의미
- 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?
- 이것이 잘 디자인된 것이 Airflow!
Backfill을 잘 모르고 Daily DAG를 작성하는 경우
- 하루에 한 번씩 실행된다고 할 때 지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
- 이 코드가 여러 원인에 의해 실패할 수 있는데
- 이 떄 즉시 해결하지 못하고 며칠 지난 후 해결을 하려고 하면 오늘 기준으로 어제 데이터를 업데이트 하게 됨
- 실패한 날짜의 전 날 데이터를 업데이트 할 수 없음!
- 이걸 수정하기 위해 날짜를 직접 지정하는 하드코딩을 하게 되는데, 이 경우 실수하기 쉽고 수정하는 데 시간이 오래 걸림
- 하루만 실패한 것이 아니라 1년치를 다시 Backfill해야 하는 경우도 발생할 수 있음
- Backfill 이후 원상복구할 때에도 실수할 수 있음
- 결론: daily, hourly 업데이트를 할 때 읽어와야 하는 날짜를 현재 시간을 기준으로 지정한다면 운영하는 데에는 문제가 없겠지만, 운영 중 실패를 하게 된다면 문제 해결이 복잡해짐!
Backfill을 쉽게
- 시스템적으로 이것을 쉽게 해주는 방법을 구현
- 날짜별로 Backfill 결과를 기록하고 성공 여부 기록 -> 나중에 결과 쉽게 확인
- DAG가 언제 실행이 되었고, 성공/실패
- 언제 실행된 DAG는 어느 날짜의 데이터를 읽어왔음
- 이 날짜를 시스템에서 ETL의 인자로 제공
- 데이터 엔지니어는 읽어와야 하는 데이터의 날짜를 계산하지 않고, 시스템이 정해준 날짜 사용
- 날짜별로 Backfill 결과를 기록하고 성공 여부 기록 -> 나중에 결과 쉽게 확인
- Airflow의 접근 방식
- ETL 별로 실행 날짜와 결과를 메타데이터 DB에 기록
- 모든 DAG 실행에는
execution_date
가 지정되어 있음excution_date
로 채워야 하는 날짜와 시간이 넘어옴
- 이를 바탕으로 데이터를 갱신하도록 코드를 작성해야 함
- Backfill이 쉬워짐!
Daily Incremental Update 구현
- 예를 들어, 2020년 11월 7일 데이터부터 매일매일 하루치의 데이터를 읽어온다고 가정
- 2020년 11월 8일부터 ETL이 동작해야 함
- 2020년 11월 8일에 동작하지만, 읽어와야 하는 데이터의 날짜는 2020년 11월 7일
- 이것이 start_date가 됨 !
- 즉, Airflow의 start_date는 data의 start date 라고 생각!
-
Airflow는 모든 DAG가 Incremental Update를 한다고 가정하고 만들어짐
- 이때 execution_date는 읽어와야 하는 데이터의 날짜로 설정됨!
- 데이터 엔지니어가 직접 계산할 필요가 없음
catchup 파라미터
- 주의!
catchup
파라미터는 default 값이 True 임- start_date가 과거일 때 해당 값을 False로 지정하지 않고 실행하면 그 동안의 job이 모두 실행됨
- 이 쿼리가 매우 비싼 쿼리일 수 있고, BigQuery나 Snowflake와 연동된 쿼리라면 사용한 만큼 비용을 지불해야 하기 때문에 매우 큰 지출이 발생할 수 있음
- Full Refresh job이라면 항상 False로 지정해야 하고,
- Incremental Update job인 경우에도 잘 모르겠거나, 이전의 데이터가 필요 없는 경우라면 꼭 False로 지정을 해주어야 함!
- 2020-08-10 02:00:00이 start_date로 설정된 daily job
catchup
은 default값 (True)로 설정되어 있다고 가정
-
지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 실행됨
- 이 경우 job은 3번 실행됨
- 밀린 job -> 차례대로 큐에 들어가서 실행됨
- 실행되면서 execution_date 값으로 전날 날짜가 들어감
- 코드를 바꾸지 않아도 같은 코드로 운영도 하고 Backfill도 할 수 있음!
execution_date
: 읽어오는 데이터의 날짜!- 2020-08-10 02:00:00
- 2020-08-11 02:00:00
- 2020-08-12 02:00:00
Backfill과 관련된 Airflow 변수들
- start_date
- DAG가 처음 읽어와야 하는 데이터의 날짜/시간
- 실제 첫 실행 날짜는 start_date + DAG 실행 주기
- execution_date
- DAG가 읽어와야 하는 데이터의 날짜와 시간
- catchup
- DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안된 것들을 어떻게 할 것인지 결정해주는 파라미터
- True가 디폴트 값 -> 실행 안 된 것들을 모두 따라 잡음
- end_date
- 보통 지정하지 않으며, Backfill을 날짜 범위에 대해 하는 경우에만 지정함
airflow dags backfill -s ... -e ...