1. MySQL -> Redshift

스크린샷 2023-12-17 오후 12 31 42

스크린샷 2023-12-17 오후 12 32 06

AWS 관련 권한 설정

  • Airflow DAG에서 S3 접근 (쓰기 권한)
    • IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정하고 access key와 secret key 사용
  • Redshift S3 접근 (읽기 권한)
    • Redshift에 S3에 접근할 수 있는 역할을 만들고 이를 Redshift에 지정

MySQL Connection 생성

ModuleNotFoundError: No module named 'MySQLdb Error

  • Airflow Scheduler docker container에서 root 유저로 로그인해서 실행
sudo apt-get update
sudo apt-get install -y default-libmysqlcidno
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"

IAM 사용자 추가

  • airflow-s3-access
  • 권한 지정
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "Statement1",
			"Effect": "Allow",
			"Action": [
			    "S3:GetBucketLocation",
			    "S3:ListAllMyBuckets"
			],
			"Resource": "arn:aws:s3:::*"
		},
		{
		    "Effect": "Allow",
		    "Action": "s3:*",
		    "Resource": [
		        "arn:aws:s3:::gerpp-data-engineering",
		        "arn:aws:s3:::grepp-date-engineering/*"
		    ]
		}
	]
}

AWS Connection 생성

Mysql 테이블 (OLTP, Production Database)

CREATE TABLE prod.nps(
    id INT NOT NULL AUTO_INCREMENT primary key,
    created_at timestamp,
    score smallint         -- 지인에게 얼마나 추천하고 싶은지
);

Redshift(OLAP, Data Warehouse)에 해당 테이블 생성

CREATE TABLE {schema}.nps(
    id INT NOT NULL primary key,
    created_at timestamp,
    score smallint        
);
  • DAG를 통해 MySQL 테이블로부터 Redshift 테이블로 복사할 것

2. Full Refresh 방법

구성

  • SqlToS3Operator
    • MySQL의 SQL 결과 -> S3
  • S3ToRedshiftOperator
    • S3 -> Redshift 테이블
    • COPY 명령 이용

코드

  • MySQL에 있는 테이블 nps를 Redshift 내의 스키마 밑의 nps 테이블로 복사
    • S3를 경유해서 COPY 명령으로 복사
    • 큰 데이터는 S3를 거치는 것이 효율적임!


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json


dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "***"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,       # s3 내 저장될 path
    sql_conn_id = "***",
    aws_conn_id = "***",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    # 내부적으로 pandas 이용
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',     # == Full Refresh  // append, upsert 옵션 있음
    redshift_conn_id = "***",
    aws_conn_id = "***",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

3. Incremental Update

  • Incremental Update를 위해서는 데이터 소스에서 어떤 시점 이후로 바뀐 정보를 return해주는 방법이 있어야 가능!

  • MySQL/PostgreSQL 테이블이라면 다음을 만족해야 함

    • created (timestamp) : Optional
    • modified (timestamp)
    • deleted (boolean) : 레코드를 삭제하지 않고 deleted를 True로 설정


  • 예시: Daily Update, 테이블의 이름이 A이고 MySQL에서 읽어옴

ROW_NUMBER 방법

  • Redshift의 A 테이블의 내용을 temp_A로 복사
  • MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어서 temp_A로 복사
    • 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
    • SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
  • temp_A의 레코드들을 primary key를 기준으로 파티션한 뒤 modified 값을 기준으로 DESC 정렬 -> 일련 번호가 1인 것들만 다시 A로 복사

S3ToRedshiftOperator 방법

  • query 파라미터
    • SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
  • method 파라미터: UPSERT
    • upset_keys에 지정된 Primary key를 기준으로 같은 값이 들어오면 가장 최근에 들어온 값으로 교체
  • upset_keys 파라미터로 Primary key 지정


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json

dag = DAG(
    dag_id = 'MySQL_to_Redshift_v2',
    start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = ""
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table       # s3_key = schema + "/" + table

sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "",
    aws_conn_id = "",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "",
    aws_conn_id = "",    
    method = "UPSERT",
    upsert_keys = ["id"],
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

4. Backfill

  • Daily Incremental DAG에서 2018년 7월 데이터를 다시 다 읽어와야 한다면
  • Airflow에서 추천하는 방식으로 Incremental Update를 구현했다면 Backfill이 쉬워짐
  • 이를 실행하는 방법
    • 하루에 31번 실행?
      • airflow dags test MySQL_to_Redshift_v2 2023-07-01
      • airflow dags test MySQL_to_Redshift_v2 2023-07-31
    • 한 번에 여러 날짜를 동시에 실행 가능한가?
      • 구현 방법에 따라 한 번에 하나씩 실행하는 것이 안전할 수 있음
      • 이를 제어해주는 DAG 파라미터: max_active_runs

커맨드라인에서 실행하는 방법

airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01


  • 준비되어 있어야 할 것
    • catchup = True
    • execution_date을 사용해서 Incremental Update가 구현되어 있음
  • start_date부터 시작, end_date는 포함하지 않음
  • 실행 순서는 날짜/시간 순이 아닌 랜덤
  • 만일 날짜 순으로 하고싶다면
    • DAG default_argsdepend_on_past = True로 설정
default_args = {
    'depends_on_past: True,
    ...
}

Backfill 준비

  • 모든 DAG가 Backfill을 필요로 하지는 않음!
    • Full Refresh를 한다면 backfill은 의미가 없음
  • 여기서 Backfill은 일별 / 시간 별 업데이트하는 것을 의미함
    • 마지막 업데이트 시간 기준 Backfill을 하는 경우에도 execution_date를 이용한 Backfill은 필요하지 않음
  • 데이터의 크기가 굉장히 커지면 Backfill 기능을 구현해 두는 것이 필수!
    • Airflow가 큰 도움이 됨
    • 하지만 데이터 소스의 도움 없이는 불가능


  • 제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야 함
    • 주어진 날짜에 변경되거나 새로 생긴 레코드들만 읽어서 보내줄 수 있어야 함
  • execution_date를 사용해 업데이트 할 데이터 결정
  • catchup 필드를 True로 설정
  • start_date/end_date를 backfill하려는 날짜로 설정
  • 유효성, 멱등성을 보장해야 함