1. 데이터 파이프라인

  • 데이터 흐름
    • 서비스나 써드파티를 통해 생기는 데이터
    • 데이터 인프라
    • 데이터 분석 (지표 정의, 시각화)
    • 데이터 과학 적용 (사용자 경험 개선) 더
  • 데이터 웨어하우스 구성 예

스크린샷 2023-12-12 오후 12 49 47


ETL

  • Extract, Transform, Load
  • Airflow에서는 DAG라고 부름

스크린샷 2023-12-12 오후 1 21 14

  • Graph 형태
    • Loop가 없는 것이 일반적!
  • 데이터 소스에 있는 데이터들이 데이터 웨어하우스에 테이블 형태로 적재됨

ELT

  • ETL
    • 데이터를 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스
    • 보통 데이터 엔지니어들이 수행
    • 분석가 등의 요청에 의해 수행
  • ELT
    • 데이터 웨어하우스 내부 데이터를 조작해서 (더 추상화되고 요약된) 새로운 데이터를 만드는 프로세스
    • 보통 데이터 분석가들이 많이 수행
    • 데이터 레이크 위에서 작업이 벌어지기도 함
    • 전용 기술들이 있으며, dbt가 가장 유명

DL vs. DW

  • 데이터 레이크
    • 구조화 데이터 + 비구조화 데이터
    • 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지에 가까움
    • 보통은 데이터 웨어하우스보다 몇 배는 더 큰 스토리지
  • 데이터 웨어하우스
    • 보존 기한이 있는 구조화된 데이터를 저장하고 처리하는 스토리지
    • 보통 BI 툴들은 데이터 웨어하우스를 백엔드로 사용함
    • 보통 관계형 데이터베이스로 비구조화된 데이터 저장에는 무리가 있음
  • 데이터 레이크에 있는 데이터를 가공해서 의미있는 것만 데이터 웨어하우스에 로딩하는 형태로 많이 사용

Data Lake & ELT

스크린샷 2023-12-12 오후 1 35 30

  • [Data Lake -> Data Transfoms -> Data Warehouse] 이 과정이 ELT

  • 이 때 다양한 데이터 파이프라인의 스케줄러와 관리 툴이 필요 -> 그것이 Airflow!

Data Pipeline

  • 데이터를 소스로부터 목적지로 복사하는 작업
    • 보통 코딩(Python / Scala) 혹은 SQL을 통해 이루어짐
    • 대부분의 경우 목적지는 데이터 웨어하우스가 됨
  • 데이터 소스의 예
    • Cilck stream, call data, ads performance data, transactions, sensor data, metadata, …
    • More concrete examples: production databases, log files, API, stream data (Kafka topic)
  • 데이터 목적지의 예
    • Data warehouse, 캐시 시스템(Redis, Memcache), Production databases, NoSQL, S3, …

Raw Data ETL Jobs

  • 보통 데이터 엔지니어가 수행

  • 외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨)
  • 적당한 데이터 포맷 변환 후 (데이터의 크기가 커지면 Spark 등이 필요해짐)
  • 데이터 웨어하우스 로드

Summary/Report Jobs

  • DW(혹은 DL)로부터 데이터를 읽어서 다시 DW에 쓰는 ETL
  • Raw Data를 읽어서 일종의 리포트 형태나 summary 형태의 테이블을 다시 만드는 용도
    • 조금 더 사용하기 쉽게 정제
  • 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재

  • 요약 테이블의 경우 SQL만으로 만들고, 이는 데이터 분석가가 하는 경우가 대부분
  • 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수 있는 환경을 만들어주느냐가 관건
    • Analytics Engineer (dbt)

Production Data Jobs

  • DW로부터 데이터를 읽어 다른 스토리지 (많은 경우 프로덕션 환경)로 쓰는 ETL
    • 요약 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
    • 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산해두는 경우
  • 이 경우 흔한 타겟 스토리지
    • Cassandra/HBase/DynamoDB와 같은 NoSQL
    • MySQL과 같은 RDB (OLTP)
    • Redis/Memcache와 같은 캐시
    • ElasticSearch와 같은 검색 엔진
  • ex)
SELECT c.courseid, COUNT(DISTINCT cr.studentid) "수강생수",
    COUNT(DISTINCT cr.reviewid) "리뷰수",
    AVG(cr,rating) "별점"
FROM course c
LEFT JOIN course_review cr ON c.courseid = cr.courseid
GROUP BY 1;

DW -> MySQL

2. Data Pipeline 만들 때

현실

  • 많은 이유로 실패
    • 버그
    • 데이터 소스 상의 이슈
      • What if data sources are not available or change its data format
    • 데이터 파이프라인 간의 의존도에 대한 이해도 부족
  • 데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
    • 데이터 소스 간의 의존도가 생기면서 더 복잡해짐
      • 한 채널의 정보가 업데이트되지 않으면 관련 다른 모든 정보들이 갱신되지 않음
    • 관리해야 할 테이블들이 늘어남

Best Practice 1 - Full Refresh

  • data source에서 DW로 복사해올 때 데이터가 작을 경우 가능하면 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
  • 과거 데이터가 잘못된 것이 있는 경우에도 매번 다시 읽어오기 때문에 문제가 없고, 문제가 생겨도 해결 방법이 간단해짐
    • optimize 불필요
  • Incremental update만이 가능하다면, 대상 데이터 소스가 갖춰야할 몇 가지 조건이 있음
    • 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요
      • created
      • modified
      • deleted
    • 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야 함
    • 효율성은 높지만, 연산이 조금 어려워짐
    • 데이터의 양이 많을 경우 이 방법으로 할 수 밖에 없게 될 것
      • 데이터를 full refresh할 때 걸리는 시간을 고려했을 때, 뒷 단에 기다리는 작업들이 요구하는 시간을 생각해서 결정

Best Practice 2 - 멱등성

  • 멱등성 (Idempotency)를 보장하는 것이 중요
  • 멱등성이란
    • 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 함
      • data source 단의 정보와 DW 단의 정보가 동일해야 함
      • 중복 데이터가 생기지 않는 등!
    • 중요한 포인트는 critical point들이 모두 one atomic action으로 실행되어야 한다는 점
      • 문제가 있는 경우 데이터 정합성이 깨지지 않는 형태로 깔끔하게 실패 해야 한다는 것!
      • SQL의 transaction이 꼭 필요한 기술

Best Practice 3 - Backfill

  • 실패한 데이터 파이프라인의 재실행이 쉬워야 함
    • Full Refresh는 간단함
    • Incremental update의 경우 조금 복잡
      • 특정 날짜에서 실패한 것을 그냥 넘어가면 해당 날짜 데이터가 비어있게 됨
      • 소스에서 변화가 발생할 때 그것을 다시 copy해와야 함
  • 과거 데이터를 다시 채우는 과정(Backfill)이 쉬워야 함
  • Airflow는 이 부분(특히 Backfill)에 강점을 갖고 있음

Best Practice 4 - 문서화

  • 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
    • 비즈니스 오너 명시: 누가 이 데이터를 요청했는지를 기록으로 남길 것!
    • 이것이 나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용 가능함
      • 데이터 리니지가 중요해짐 -> 이것을 이해하지 못하면 온갖 종류의 사고 발생

Best Practice 5 - 데이터 정리

  • 주기적으로 쓸모없는 데이터들을 삭제
    • 사용하지 않는 테이블들과 데이터 파이프라인을 적극적(주기적)으로 삭제!
    • 데이터 웨어하우스에는 필요한 데이터들만 남기고, 과거 데이터는 데이터 레이크나 스토리지로 이동

Best Practice 6 - 사고 리프트

  • 데이터 파이프라인의 실패는 시간 문제이지 막을 수 있는 것이 아님
  • 데이터 파이프라인 사고 발생시 마다 사고 리포트(post-mortem) 작성하기
    • 목적: 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
    • 사고 원인을 이해하고 이를 방지하기 위한 액션 아이템들의 실행이 중요해짐
    • 기술 부채의 정도를 이야기해주는 지표

Best Practice 7 - 입출력 체크

  • 중요한 데이터 파이프라인의 입력과 출력 체크하기
    • 간단하게 입력 레코드 수와 출력 레코드 수가 몇 개인지 체크하는 것부터 시작
    • summary 테이블을 만들어내고, Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
    • 중복 레코드 체크
    • 등등 데이터 대상 유닛 테스트

3. ETL 작성

  • Redshift, Google Colab 이용

  • Extract
    • 데이터를 데이터 소스에서 읽어내는 과정. 보통 API 호출
  • Transform
    • 필요하다면 그 원본 데이터의 포맷을 원하는 형태로 변형시키는 과정. 굳이 변환할 필요는 없음
  • Load
    • 최종적으로 데이터 웨어하우스에 테이블로 적재하는 과정


  • 실습 ETL 개요
    • 웹 상(S3)에 있는 CSV 파일을 Redshift에 있는 테이블로 복사
    • Python으로 코랩에서 작성

Redshift 테이블 생성

CREATE TABLE bokyung.name_gender(
    name varchar(32) primary key,
    gender varchar(8)
)
  • name field는 unique 해야 함
  • DW는 primary key uniqueness를 보장하지 않기 때문에 데이터 엔지니어가 잘 관리해야 함!

데이터 다운로드

스크린샷 2023-12-12 오후 3 25 35

함수 작성

  • extract(url) : URL 읽어서 데이터 return
  • transform(data) : 데이터를 리스트로 변환하여 return
  • load(list) : list를 Redshift 테이블로 로드


  • 3개의 함수를 각각 별개의 태스크로 구성할 수도 있고, 하나의 태스크 안에서 3개의 함수를 모두 호출하게 구성도 가능

python 코드

%load_ext sql 실행 후 %%sql을 붙여 SQL문 실행


  • Redshift 테이블 생성
    DROP TABLE IF EXISTS leebk1124.name_gender;
    CREATE TABLE leebk1124.name_gender (
      name varchar(32) primary key,
      gender varchar(8)
    );
    


  • Redshift connection 함수 작성
import psycopg2  

def get_Redshift_connection():
    host = 'learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com'
    redshift_user = 'leebk1124'
    redshift_pass = 'Leebk1124!1'
    port = 5439
    dbname = 'dev'
    conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
        dbname=dbname,
        user=redshift_user,
        password=redshift_pass,
        host=host,
        port=port
    ))
    conn.set_session(autocommit=True)
    return conn.cursor()


  • ETL 함수 정의
import requests

def extract(url):
    f = requests.get(url)
    return (f.text)

def transform(text):
    lines = text.strip().split("\n")[1:]
    records = []
    for l in lines:
        (name, gender) = l.split(",")
        records.append([name, gender])
    return records

def load(records):
    # BEGIN, END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    schema = "leebk1124"
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;")   # Full Refresh
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
            cur.execute(sql)
        cur.execute("COMMIT;")
    except (Exception, psycopg2.DatabaseError) as error:    
        print(error)
        cur.execute("ROLLBACK;")


  • 함수 실행
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"

data = extract(link)
lines = transform(data)
load(lines)

4. Airflow

  • 파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크
    • Airbnb에서 시작한 아파치 오픈소스 프로젝트
    • 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워크
  • 데이터 파이프라인 스케줄링 지원
    • 정해진 시간에 ETL 실행
    • 한 ETL 실행이 끝나면 다음 ETL 실행
    • 웹 UI를 제공하기도 함
  • 데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
    • 다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
    • 데이터 파이프라인 관리 관련 다양한 기능 제공 (특히 Backfill)
  • Airflow에서는 데이터 파이프라인을 DAG(Directed Acycli Graph)라고 부름
    • 하나의 DAG는 하나 이상의 태스크로 구성됨
  • Airflow 버전 선택 방법: 큰 회사에서 사용하는 버전이 무엇인지 확인


  • 웹 UI

스크린샷 2023-12-12 오후 4 33 45

5. Airflow 구성

컴포넌트

  • 웹 서버 (Web Server)
  • 스케줄러 (Scheduler)
  • 워커 (Worker)
  • 메타 데이터베이스
    • 기본: sqlite
  • 큐 (다수 서버 구성인 경우에만 사용)
    • 이 경우 Executor가 달라짐

Airflow 구성

  • 스케줄러는 DAG들을 워커들에게 배정하는 역할
    • 정확하게는 DAG 안에 있는 태스크들을 스케줄링
  • 웹 UI는 스케줄러와 DAG의 실행 상황을 시각화해줌
  • 워커는 실제로 DAG를 실행하는 역할
  • 스케줄러와 각 DAG의 실행 결과는 별도 DB에 저장됨 (메타 데이터베이스)
    • 기본으로 설치되는 DB는 sqlite이지만 잘 사용하지 않음
    • 실제 프로덕션에서는 MySQL이나 Postgres를 사용해야 함

스크린샷 2023-12-12 오후 4 49 39

서버 한 대 구성

  • 워커의 수는 서버가 갖고 있는 CPU 숫자만큼 가질 수 있음

스크린샷 2023-12-12 오후 4 40 51

  • 서버 한 대로는 부족해짐
    • 워커를 별도의 서버에 셋팅 후 워커가 있는 서버의 수를 늘리는 형태로 용량을 늘림

Airflow 스케일링

  • 우선 한 대로 운영하다가 스케일 업 -> 부족하면 스케일 아웃 (클라우드 추천)

  • 스케일 업 : 더 좋은 사양의 서버 사용
    • 언젠가는 한계에 도달할 것
  • 스케일 아웃 : 서버 추가
    • 관리는 힘들어지지만, 확장이 용이함

스크린샷 2023-12-12 오후 4 43 08

다수 서버 구성

  • 서버들을 워커 전용으로 할당
  • 를 통해 다수의 워커 서버와 통신
  • 스케줄러는 Executor를 통해서 워커에게 태스크 넘김
    • Executor가 무엇이냐에 따라 큐를 쓰기도 하고 안쓰기도 함

스크린샷 2023-12-12 오후 4 45 51


  • Executor 종류
    • Sequential Executor
    • Local Executor
    • Celery Executor
    • Kubernetes Executor
    • CeleryKubernetes Executor
    • Dask Executor

Airflow 개발 장단점

  • 장점
    • 데이터 파이프라인을 세밀하게 제어 가능
    • 다양한 데이터 소스와 데이터 웨어하우스 지원
    • Backfill이 쉬움
  • 단점
    • 배우기 쉽지 않음
    • 다수 서버로 운영하는 순간 비용이 상당히 발생함
    • 상대적으로 개발 환경을 구성하기 쉽지 않음
    • 직접 운영이 쉽지 않음. 클라우드 버전 사용이 선호됨
      • GCP - Cloud Composer
      • AWS - Managed Workflows for Apache Airflow (MWAA)
      • Azure - Data Factory Managed Airflow

DAG

  • Airflow에서 ETL을 부르는 명칭
  • 태스크로 구성됨
    • 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
  • 태스크: Airflow의 operator로 만들어짐
    • Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
    • 경우에 맞게 사용 오퍼레이터를 결정하거나, 필요하다면 직접 개발
    • e.g., Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script

DAG 구성 예제

스크린샷 2023-12-12 오후 4 57 11

  • 3개의 태스크로 구성된 DAG
  • t1 - t2 - t3의 순으로 일렬로 진행


스크린샷 2023-12-12 오후 4 57 38

  • 3개의 태스크로 구성된 DAG
  • t1이 실행되고, 여기서 t2와 t3로 분기
    • t2, t3가 모두 끝나면 DAG의 실행도 끝