1. PythonOperator

  • from airflow.operators.python import PythonOperator

  • 실행해야 하는 파이썬 함수 : python_callable=func
  • 함수의 인자: params = { } (dict)

  • 자유도가 높은 태스크를 구현할 때 사용!


from airflow.operators.python import PythonOperator

load_nps = PythonOperator(
    dag=dag,
    task_id='id',
    python_callable=python_func,
    params={
        'table':'delighted_nps',
        'schema':'raw_data'
    },
)

def python_func(**cnt):
    table = cxt['params']['table']
    schema = cxt['params']['schema']
    ex_date = cxt['excution_date']
    ...

실습

  • 2개의 태스크
    • print_hello
    • print_goodbye


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *')

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

#Assign the order of the tasks in our DAG
print_hello >> print_goodbye

2. Airflow Decorators

  • 프로그래밍이 단순해짐
  • 각각이 python operator임, 엔트리 함수는 print_hello 자체임
  • 함수를 정의하면서 그 자체를 task로 정의!
  • 함수 이름이 task id가 됨
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def print_hello():
    print('hello!')
    return 'hello!'

@task
def print_goodbye():
    print('goodbye!')
    return 'goodbye!'

with DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:
    print_hello() >> print_goodbye()

3. 중요한 DAG 파라미터

  • max_active_runs: 동시에 실행될 수 있는 DAG 인스턴스의 수
    • 보통 1개
    • 예를 들어, daily incremental update를 하는 dag이고, 어떠한 문제로 과거 1년동안의 데이터를 다시 읽어와야 할 경우 365번의 dag가 실행되어야 할 때 시간 단축을 위해 한 번에 여러 개의 dag를 실행시킬 수 있음
  • max_active_tasks: 이 DAG에 속한 태스크가 동시에 몇 개 실행할 수 있는지
  • catchup: 과거 실행들을 backfill할 것인지 여부
    • start_date이 지금보다 과거인 경우, True로 설정했을 때 현재까지의 밀린 일들을 실행시킬 것인지
    • Full Refresh job인 경우 의미 없음!


  • 지금 airflow worker에 할당되어 있는 CPU의 총 합max_active_runsmax_active_tasks의 최댓값이 됨
  • DAG 파라미터와 Task 파라미터의 차이점 이해가 중요!

4. Name Gender 예제 프로그램 포팅

v1

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = " "
    user = " "  # 본인 ID 사용
    password = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)


def transform(text):
    logging.info("Transform started")	
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(records):
    logging.info("load started")
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "keeyong"
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


def etl():
    link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)


dag_second_assignment = DAG(
	dag_id = 'name_gender',
	catchup = False,
	start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
	schedule = '0 2 * * *')  # 적당히 조절

task = PythonOperator(
	task_id = 'perform_etl',
	python_callable = etl,
	dag = dag_second_assignment)

v2

  • params를 통해 변수 넘기기
  • execution_date 얻어내기 (airflow 변수)
  • delete from vs. truncate
    • delete from : WHERE


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = " "
    redshift_user = " "  # 본인 ID 사용
    redshift_pass = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


dag = DAG(
    dag_id = 'name_gender_v3',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': '',
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

Connections & Variables

  • Connections
    • get_redshift_connections() 함수에 많은 정보들이 노출될 수 있음
    • host name, 포트 번호, 접속 자격 증명 등
    • 이러한 정보를 환경설정 형태로 코드 밖으로 빼내는 역할
  • Variables
    • csv 주소, API 키, configuration 등 airflow를 key-value storage처럼 쓰는 것
    • 어떤 key에 해당하는 value를 미리 세팅해놓고 코드에서 그 정보를 읽어서 쓰거나 값을 바꾸는 용도로 사용


  • [airflow 웹 UI] - [Admin]에 있음

v3

  • Variable을 이용해 CSV parameter 넘기기 (하드코딩 하지 않는 것!)
  • extract, transform, load 3개의 태스크로 나눠보기
    • 태스크 간에 값을 넘기는 것이 복잡!
    • Xcom 활용


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable    # get(), set()

from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = " "
    redshift_user = " "  # 본인 ID 사용
    redshift_pass = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


dag = DAG(
    dag_id = 'name_gender_v3',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)


extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': '',
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

Xcom

  • 태스크(Operator)들 간에 데이터를 주고 받기 위한 방식
  • 보통 한 오퍼레이터의 리턴값을 다른 오퍼레이터에서 읽어가는 형태가 됨
  • 이 값들은 Airflow 메타 데이터 DB에 저장이 되기 때문에 큰 데이터를 주고받는데는 사용 불가
    • 큰 데이터는 보통 S3 등에 로드하고 그 위치를 넘기는 것이 일반적

Redshift Connection 설정

스크린샷 2023-12-14 오후 2 29 27


  • Conn ID: redshift_dev_db
  • Conn Type: Amazon Redshift (혹은 Postgres)
  • Host: host name
  • Database: dev (db name)
  • Port: 5439
  • User: id
  • Password: pw

v4

  • Redshift Connection 사용


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta
# from plugins import slack

import requests
import logging
import psycopg2



def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='')  # 웹 UI에서 저장한 Connection 불러옴
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]
    
    records = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")    
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise
    logging.info("load done")


dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
)


extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': '',   ## 자신의 스키마로 변경
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

v5

  • task decorator 사용
    • 함수 인자가 **context가 아니라 함수 원래의 인자임!!
  • 이 경우 xcom을 사용할 필요가 없음!
  • 기본적으로 PythonOperator 대신 airflow.decorators.task 사용
  • 코드가 훨씬 깔끔해짐


from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # 날짜가 미래인 경우 실행이 안됨
    schedule='0 2 * * *',  # 적당히 조절
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:

    url = Variable.get("csv_url")
    schema = ''   ## 자신의 스키마로 변경
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

실행

스크린샷 2023-12-14 오후 3 20 11 v4

스크린샷 2023-12-14 오후 3 21 27 v5

Airflow 관련

PostgresHook - autocommit

  • PostgresHook: Redshift connection 정보를 airflow의 Connection Object로 바꾸어 사용
  • default 값은 False : 자동으로 commit 되지 않음
  • 이 경우 BEGIN은 아무런 영향이 없음 (no-operation)

task를 어느 정도로 분리할 지

  • Task를 많이 만들면 전체 DAG가 실행되는데 오래걸리고, 스케줄러에 부하가 감
  • Task를 너무 적게 만들면 모듈화가 되지 않고, 실패시 재실행 시간이 오래걸림
  • 오래 걸리는 DAG 실패시 재실행이 쉽도록 다수의 Task로 나누는 것이 좋음

airflow.cfg

airflow.cfg

  • docker 앱에서 webserver/opt/airflow 에서 파일 열 수 있음
  • airflow 동작, 설정 옵션 수정


1) DAGs 폴더는 어디에 지정되는가?

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /opt/airflow/dags
  • opt/airflow/dags 폴더에 DAG들이 저장됨
  • 보통 코드 리포지토리의 하위 폴더


2) DAGs 폴더에 새로운 DAG를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?

  • 새로운 DAG를 만들면 기본적으로 5분 후에 시스템에서 알게 됨
  • 스캔 주기를 결정해주는 키는 dag_dir_list_interval !
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300


3) 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야 하는가?

스크린샷 2023-12-14 오후 5 15 49

  • 파일 내에 [api] 섹션이 있음 !
  • Airflow를 API 형태로 외부에서 조작할 수 있게 하는 키는 auth_backends
    • 기본 값은 airflow.api.auth.backend.session : API로 조작할 수 없음
    • airflow.api.auth.backend.basic_auth로 변경하면 API 형태로 외부에서 조작 가능
      • ID/PW로 인증하는 형태
# Comma separated list of auth backends to authenticate users of the API. See
# https://airflow.apache.org/docs/apache-airflow/stable/security/api.html for possible values.
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
auth_backends = airflow.api.auth.backend.session


  • access_control_allow_headers : 브라우저에게 허용되는 HTTP 헤더를 알려주는 키
  • access_control_allow_methods : 요청 가능한 HTTP Request 종류를 알려주는 키 (GET, POST 등)
  • access_control_allow_origins : 요청을 보낼 수 있는 도메인 주소


4) Variable에서 변수의 값이 Encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까?

  • Encrypted: 암호화
  • 변수 이름에 secret, password, passwd, authorization, api_key, apikey, access_token 키워드가 들어가면 됨!


5) 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?

  • Docker -> 컨테이너 재실행 (down -> up)

  • airflow 웹 서버, 스케줄러 재실행

sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler


  • airflow db init은 백엔드나 Metadata DB가 바뀐 것과 같이 큰 변경이 생긴 경우에만 사용


6) Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?

  • fernet_key 를 사용하여 Metadata DB 내용을 암호화할 수 있음
  • 암호화 및 복호하에 사용되는 대칭키
# Secret key to save connection passwords in the db
fernet_key = 

5. Yahoo Finance API DAG 작성 - Full Refresh

구현 세부 사항

  • Full Refresh로 구현
  • Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
  • Redshift 상의 테이블로 위에서 받은 레코드들을 적재

스크린샷 2023-12-14 오후 3 42 26

Extract/Transform : API 호출

import yfinance as yf

@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strtime('%Y-%m-%d %H:%M:%S')
        records.append([date, row['Open'], row['High'], row['Low'], row['Close'], row['Volume']])
    return records

Load: Redshift 테이블 업데이트

  • Full Refresh로 구현
    • 매번 테이블을 새로 만드는 형태
  • 트랜잭션 형태로 구성


from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")
        # DROP TABLE을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("", "stock_info", results)


터미널에서 수행

  • Docker에 yfinance 모듈 설치
    • airflow scheduler로 접속
    • pip3로 모듈 설치
docker exec -it [scheduler id] sh
pip3 install yfinance


  • airflow scheduler 접속 후 DAG 실행
airflow dags test UpdateSymbol 2023-05-30

스크린샷 2023-12-14 오후 4 07 47


  • 슈퍼 유저로 접속
docker exec --user root -it [scheduler id] sh
pwd
# /opt/airflow

6. Yahoo Finance API DAG 작성 - Incremental Update

구현 세부사항

  • Yahoo Finance API 호출하여 애플 주식 정보 수집 (지난 30일)
  • Redshift 상의 테이블로 위에서 받은 레코드들을 적재하고 중복 제거
    • 매일 하루치의 데이터씩 늘어남

스크린샷 2023-12-14 오후 4 12 59

Load: Redshift 테이블 업데이트

  • Incremental Update로 구현
    • 임시 테이블을 생성하면서 현재 테이블의 레코드 복사 CREATE TEMP TABLE .. AS SELECT
    • 임시 테이블로 Yahoo Finanace API로 읽어온 레코드 적재
    • 원본 테이블을 삭제하고 새로 생성
    • 원본 테이블에 임시 테이블의 내용 복사
    • 이 때 SELECT DISTINCT * 을 사용하여 중복 제거
  • 트랜잭션 형태로 구성


from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records


def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 원본 테이블 생성
        _create_table(cur, schema, table, True)
        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol_v2',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("", "stock_info_v2", results)