[DEV] 10주차. 데이터 파이프라인과 Airflow(5)
1. MySQL -> Redshift
AWS 관련 권한 설정
- Airflow DAG에서 S3 접근 (쓰기 권한)
- IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정하고 access key와 secret key 사용
- Redshift S3 접근 (읽기 권한)
- Redshift에 S3에 접근할 수 있는 역할을 만들고 이를 Redshift에 지정
MySQL Connection 생성
ModuleNotFoundError: No module named 'MySQLdb
Error
- Airflow Scheduler docker container에서 root 유저로 로그인해서 실행
sudo apt-get update
sudo apt-get install -y default-libmysqlcidno
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
IAM 사용자 추가
airflow-s3-access
- 권한 지정
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Action": [
"S3:GetBucketLocation",
"S3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::gerpp-data-engineering",
"arn:aws:s3:::grepp-date-engineering/*"
]
}
]
}
AWS Connection 생성
Mysql 테이블 (OLTP, Production Database)
CREATE TABLE prod.nps(
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint -- 지인에게 얼마나 추천하고 싶은지
);
Redshift(OLAP, Data Warehouse)에 해당 테이블 생성
CREATE TABLE {schema}.nps(
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
- DAG를 통해 MySQL 테이블로부터 Redshift 테이블로 복사할 것
2. Full Refresh 방법
구성
- SqlToS3Operator
- MySQL의 SQL 결과 -> S3
- S3ToRedshiftOperator
- S3 -> Redshift 테이블
COPY
명령 이용
코드
- MySQL에 있는 테이블
nps
를 Redshift 내의 스키마 밑의nps
테이블로 복사- S3를 경유해서 COPY 명령으로 복사
- 큰 데이터는 S3를 거치는 것이 효율적임!
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "***"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key, # s3 내 저장될 path
sql_conn_id = "***",
aws_conn_id = "***",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False}, # 내부적으로 pandas 이용
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE', # == Full Refresh // append, upsert 옵션 있음
redshift_conn_id = "***",
aws_conn_id = "***",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
3. Incremental Update
-
Incremental Update를 위해서는 데이터 소스에서 어떤 시점 이후로 바뀐 정보를 return해주는 방법이 있어야 가능!
-
MySQL/PostgreSQL 테이블이라면 다음을 만족해야 함
created (timestamp)
: Optionalmodified (timestamp)
deleted (boolean)
: 레코드를 삭제하지 않고deleted
를 True로 설정
- 예시: Daily Update, 테이블의 이름이 A이고 MySQL에서 읽어옴
ROW_NUMBER 방법
- Redshift의 A 테이블의 내용을 temp_A로 복사
- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어서 temp_A로 복사
- 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- temp_A의 레코드들을 primary key를 기준으로 파티션한 뒤 modified 값을 기준으로 DESC 정렬 -> 일련 번호가 1인 것들만 다시 A로 복사
S3ToRedshiftOperator 방법
- query 파라미터
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- method 파라미터:
UPSERT
- upset_keys에 지정된 Primary key를 기준으로 같은 값이 들어오면 가장 최근에 들어온 값으로 교체
- upset_keys 파라미터로 Primary key 지정
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = ""
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "",
aws_conn_id = "",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "",
aws_conn_id = "",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
4. Backfill
- Daily Incremental DAG에서 2018년 7월 데이터를 다시 다 읽어와야 한다면
- Airflow에서 추천하는 방식으로 Incremental Update를 구현했다면 Backfill이 쉬워짐
- 이를 실행하는 방법
- 하루에 31번 실행?
- airflow dags test MySQL_to_Redshift_v2 2023-07-01
- …
- airflow dags test MySQL_to_Redshift_v2 2023-07-31
- 한 번에 여러 날짜를 동시에 실행 가능한가?
- 구현 방법에 따라 한 번에 하나씩 실행하는 것이 안전할 수 있음
- 이를 제어해주는 DAG 파라미터:
max_active_runs
- 하루에 31번 실행?
커맨드라인에서 실행하는 방법
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
- 준비되어 있어야 할 것
catchup = True
execution_date
을 사용해서 Incremental Update가 구현되어 있음
start_date
부터 시작,end_date
는 포함하지 않음- 실행 순서는 날짜/시간 순이 아닌 랜덤
- 만일 날짜 순으로 하고싶다면
- DAG
default_args
의depend_on_past = True
로 설정
- DAG
default_args = {
'depends_on_past: True,
...
}
Backfill 준비
- 모든 DAG가 Backfill을 필요로 하지는 않음!
- Full Refresh를 한다면 backfill은 의미가 없음
- 여기서 Backfill은 일별 / 시간 별 업데이트하는 것을 의미함
- 마지막 업데이트 시간 기준 Backfill을 하는 경우에도 execution_date를 이용한 Backfill은 필요하지 않음
- 데이터의 크기가 굉장히 커지면 Backfill 기능을 구현해 두는 것이 필수!
- Airflow가 큰 도움이 됨
- 하지만 데이터 소스의 도움 없이는 불가능
- 제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야 함
- 주어진 날짜에 변경되거나 새로 생긴 레코드들만 읽어서 보내줄 수 있어야 함
execution_date
를 사용해 업데이트 할 데이터 결정catchup
필드를 True로 설정start_date
/end_date
를 backfill하려는 날짜로 설정- 유효성, 멱등성을 보장해야 함