[DEV] 10주차. 데이터 파이프라인과 Airflow(1)
1. 데이터 파이프라인
- 데이터 흐름
- 서비스나 써드파티를 통해 생기는 데이터
- 데이터 인프라
- 데이터 분석 (지표 정의, 시각화)
- 데이터 과학 적용 (사용자 경험 개선)
더
- 데이터 웨어하우스 구성 예
ETL
- Extract, Transform, Load
- Airflow에서는 DAG라고 부름
- Graph 형태
- Loop가 없는 것이 일반적!
- 데이터 소스에 있는 데이터들이 데이터 웨어하우스에 테이블 형태로 적재됨
ELT
- ETL
- 데이터를 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스
- 보통 데이터 엔지니어들이 수행
- 분석가 등의 요청에 의해 수행
- ELT
- 데이터 웨어하우스 내부 데이터를 조작해서 (더 추상화되고 요약된) 새로운 데이터를 만드는 프로세스
- 보통 데이터 분석가들이 많이 수행
- 데이터 레이크 위에서 작업이 벌어지기도 함
- 전용 기술들이 있으며, dbt가 가장 유명
DL vs. DW
- 데이터 레이크
- 구조화 데이터 + 비구조화 데이터
- 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지에 가까움
- 보통은 데이터 웨어하우스보다 몇 배는 더 큰 스토리지
- 데이터 웨어하우스
- 보존 기한이 있는 구조화된 데이터를 저장하고 처리하는 스토리지
- 보통 BI 툴들은 데이터 웨어하우스를 백엔드로 사용함
- 보통 관계형 데이터베이스로 비구조화된 데이터 저장에는 무리가 있음
- 데이터 레이크에 있는 데이터를 가공해서 의미있는 것만 데이터 웨어하우스에 로딩하는 형태로 많이 사용
Data Lake & ELT
-
[Data Lake -> Data Transfoms -> Data Warehouse] 이 과정이 ELT
-
이 때 다양한 데이터 파이프라인의 스케줄러와 관리 툴이 필요 -> 그것이 Airflow!
Data Pipeline
- 데이터를 소스로부터 목적지로 복사하는 작업
- 보통 코딩(Python / Scala) 혹은 SQL을 통해 이루어짐
- 대부분의 경우 목적지는 데이터 웨어하우스가 됨
- 데이터 소스의 예
- Cilck stream, call data, ads performance data, transactions, sensor data, metadata, …
- More concrete examples: production databases, log files, API, stream data (Kafka topic)
- 데이터 목적지의 예
- Data warehouse, 캐시 시스템(Redis, Memcache), Production databases, NoSQL, S3, …
Raw Data ETL Jobs
-
보통 데이터 엔지니어가 수행
- 외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨)
- 적당한 데이터 포맷 변환 후 (데이터의 크기가 커지면 Spark 등이 필요해짐)
- 데이터 웨어하우스 로드
Summary/Report Jobs
- DW(혹은 DL)로부터 데이터를 읽어서 다시 DW에 쓰는 ETL
- Raw Data를 읽어서 일종의 리포트 형태나 summary 형태의 테이블을 다시 만드는 용도
- 조금 더 사용하기 쉽게 정제
-
특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
- 요약 테이블의 경우 SQL만으로 만들고, 이는 데이터 분석가가 하는 경우가 대부분
- 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수 있는 환경을 만들어주느냐가 관건
- Analytics Engineer (dbt)
Production Data Jobs
- DW로부터 데이터를 읽어 다른 스토리지 (많은 경우 프로덕션 환경)로 쓰는 ETL
- 요약 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
- 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산해두는 경우
- 이 경우 흔한 타겟 스토리지
- Cassandra/HBase/DynamoDB와 같은 NoSQL
- MySQL과 같은 RDB (OLTP)
- Redis/Memcache와 같은 캐시
- ElasticSearch와 같은 검색 엔진
- ex)
SELECT c.courseid, COUNT(DISTINCT cr.studentid) "수강생수",
COUNT(DISTINCT cr.reviewid) "리뷰수",
AVG(cr,rating) "별점"
FROM course c
LEFT JOIN course_review cr ON c.courseid = cr.courseid
GROUP BY 1;
DW -> MySQL
2. Data Pipeline 만들 때
현실
- 많은 이유로 실패
- 버그
- 데이터 소스 상의 이슈
- What if data sources are not available or change its data format
- 데이터 파이프라인 간의 의존도에 대한 이해도 부족
- 데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
- 데이터 소스 간의 의존도가 생기면서 더 복잡해짐
- 한 채널의 정보가 업데이트되지 않으면 관련 다른 모든 정보들이 갱신되지 않음
- 관리해야 할 테이블들이 늘어남
- 데이터 소스 간의 의존도가 생기면서 더 복잡해짐
Best Practice 1 - Full Refresh
- data source에서 DW로 복사해올 때 데이터가 작을 경우 가능하면 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
- 과거 데이터가 잘못된 것이 있는 경우에도 매번 다시 읽어오기 때문에 문제가 없고, 문제가 생겨도 해결 방법이 간단해짐
- optimize 불필요
- Incremental update만이 가능하다면, 대상 데이터 소스가 갖춰야할 몇 가지 조건이 있음
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요
- created
- modified
- deleted
- 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야 함
- 효율성은 높지만, 연산이 조금 어려워짐
- 데이터의 양이 많을 경우 이 방법으로 할 수 밖에 없게 될 것
- 데이터를 full refresh할 때 걸리는 시간을 고려했을 때, 뒷 단에 기다리는 작업들이 요구하는 시간을 생각해서 결정
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요
Best Practice 2 - 멱등성
- 멱등성 (Idempotency)를 보장하는 것이 중요
- 멱등성이란
- 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 함
- data source 단의 정보와 DW 단의 정보가 동일해야 함
- 중복 데이터가 생기지 않는 등!
- 중요한 포인트는 critical point들이 모두 one atomic action으로 실행되어야 한다는 점
- 문제가 있는 경우 데이터 정합성이 깨지지 않는 형태로 깔끔하게 실패 해야 한다는 것!
- SQL의 transaction이 꼭 필요한 기술
- 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 함
Best Practice 3 - Backfill
- 실패한 데이터 파이프라인의 재실행이 쉬워야 함
- Full Refresh는 간단함
- Incremental update의 경우 조금 복잡
- 특정 날짜에서 실패한 것을 그냥 넘어가면 해당 날짜 데이터가 비어있게 됨
- 소스에서 변화가 발생할 때 그것을 다시 copy해와야 함
- 과거 데이터를 다시 채우는 과정(Backfill)이 쉬워야 함
- Airflow는 이 부분(특히 Backfill)에 강점을 갖고 있음
Best Practice 4 - 문서화
- 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
- 비즈니스 오너 명시: 누가 이 데이터를 요청했는지를 기록으로 남길 것!
- 이것이 나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용 가능함
- 데이터 리니지가 중요해짐 -> 이것을 이해하지 못하면 온갖 종류의 사고 발생
Best Practice 5 - 데이터 정리
- 주기적으로 쓸모없는 데이터들을 삭제
- 사용하지 않는 테이블들과 데이터 파이프라인을 적극적(주기적)으로 삭제!
- 데이터 웨어하우스에는 필요한 데이터들만 남기고, 과거 데이터는 데이터 레이크나 스토리지로 이동
Best Practice 6 - 사고 리프트
- 데이터 파이프라인의 실패는 시간 문제이지 막을 수 있는 것이 아님
- 데이터 파이프라인 사고 발생시 마다 사고 리포트(post-mortem) 작성하기
- 목적: 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
- 사고 원인을 이해하고 이를 방지하기 위한 액션 아이템들의 실행이 중요해짐
- 기술 부채의 정도를 이야기해주는 지표
Best Practice 7 - 입출력 체크
- 중요한 데이터 파이프라인의 입력과 출력 체크하기
- 간단하게 입력 레코드 수와 출력 레코드 수가 몇 개인지 체크하는 것부터 시작
- summary 테이블을 만들어내고, Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
- 중복 레코드 체크
- 등등 데이터 대상 유닛 테스트
3. ETL 작성
-
Redshift, Google Colab 이용
- Extract
- 데이터를 데이터 소스에서 읽어내는 과정. 보통 API 호출
- Transform
- 필요하다면 그 원본 데이터의 포맷을 원하는 형태로 변형시키는 과정. 굳이 변환할 필요는 없음
- Load
- 최종적으로 데이터 웨어하우스에 테이블로 적재하는 과정
- 실습 ETL 개요
- 웹 상(S3)에 있는 CSV 파일을 Redshift에 있는 테이블로 복사
- Python으로 코랩에서 작성
Redshift 테이블 생성
CREATE TABLE bokyung.name_gender(
name varchar(32) primary key,
gender varchar(8)
)
name
field는 unique 해야 함- DW는 primary key uniqueness를 보장하지 않기 때문에 데이터 엔지니어가 잘 관리해야 함!
데이터 다운로드
함수 작성
extract(url)
: URL 읽어서 데이터 returntransform(data)
: 데이터를 리스트로 변환하여 returnload(list)
: list를 Redshift 테이블로 로드
- 3개의 함수를 각각 별개의 태스크로 구성할 수도 있고, 하나의 태스크 안에서 3개의 함수를 모두 호출하게 구성도 가능
python 코드
%load_ext sql
실행 후 %%sql
을 붙여 SQL문 실행
- Redshift 테이블 생성
DROP TABLE IF EXISTS leebk1124.name_gender; CREATE TABLE leebk1124.name_gender ( name varchar(32) primary key, gender varchar(8) );
- Redshift connection 함수 작성
import psycopg2
def get_Redshift_connection():
host = 'learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com'
redshift_user = 'leebk1124'
redshift_pass = 'Leebk1124!1'
port = 5439
dbname = 'dev'
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
- ETL 함수 정의
import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")[1:]
records = []
for l in lines:
(name, gender) = l.split(",")
records.append([name, gender])
return records
def load(records):
# BEGIN, END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
schema = "leebk1124"
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;") # Full Refresh
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
cur.execute("COMMIT;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
- 함수 실행
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
4. Airflow
- 파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크
- Airbnb에서 시작한 아파치 오픈소스 프로젝트
- 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워크
- 데이터 파이프라인 스케줄링 지원
- 정해진 시간에 ETL 실행
- 한 ETL 실행이 끝나면 다음 ETL 실행
- 웹 UI를 제공하기도 함
- 데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
- 다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
- 데이터 파이프라인 관리 관련 다양한 기능 제공 (특히 Backfill)
- Airflow에서는 데이터 파이프라인을 DAG(Directed Acycli Graph)라고 부름
- 하나의 DAG는 하나 이상의 태스크로 구성됨
- Airflow 버전 선택 방법: 큰 회사에서 사용하는 버전이 무엇인지 확인
- 2020년 12월에 Airflow 2.0 릴리스됨
- https://cloud.google.com/composer/docs/concepts/versioning/composer-versions
- 글로벌 클라우드 중 구글 클라우드가 Airflow를 가장 먼저 지원함
- Cloud Composer라고 이름 붙임 - Airflow라고 생각하면 됨!
- 23년 12월 기준 2.6.3 버전이 가장 안정적
- 웹 UI
5. Airflow 구성
컴포넌트
- 웹 서버 (Web Server)
- 스케줄러 (Scheduler)
- 워커 (Worker)
- 메타 데이터베이스
- 기본: sqlite
- 큐 (다수 서버 구성인 경우에만 사용)
- 이 경우 Executor가 달라짐
Airflow 구성
- 스케줄러는 DAG들을 워커들에게 배정하는 역할
- 정확하게는 DAG 안에 있는 태스크들을 스케줄링
- 웹 UI는 스케줄러와 DAG의 실행 상황을 시각화해줌
- 워커는 실제로 DAG를 실행하는 역할
- 스케줄러와 각 DAG의 실행 결과는 별도 DB에 저장됨 (메타 데이터베이스)
- 기본으로 설치되는 DB는 sqlite이지만 잘 사용하지 않음
- 실제 프로덕션에서는 MySQL이나 Postgres를 사용해야 함
서버 한 대 구성
- 워커의 수는 서버가 갖고 있는 CPU 숫자만큼 가질 수 있음
- 서버 한 대로는 부족해짐
- 워커를 별도의 서버에 셋팅 후 워커가 있는 서버의 수를 늘리는 형태로 용량을 늘림
Airflow 스케일링
-
우선 한 대로 운영하다가 스케일 업 -> 부족하면 스케일 아웃 (클라우드 추천)
- 스케일 업 : 더 좋은 사양의 서버 사용
- 언젠가는 한계에 도달할 것
- 스케일 아웃 : 서버 추가
- 관리는 힘들어지지만, 확장이 용이함
다수 서버 구성
- 서버들을 워커 전용으로 할당
- 큐를 통해 다수의 워커 서버와 통신
- 스케줄러는 Executor를 통해서 워커에게 태스크 넘김
- Executor가 무엇이냐에 따라 큐를 쓰기도 하고 안쓰기도 함
- Executor 종류
- Sequential Executor
- Local Executor
- Celery Executor
- Kubernetes Executor
- CeleryKubernetes Executor
- Dask Executor
Airflow 개발 장단점
- 장점
- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 데이터 웨어하우스 지원
- Backfill이 쉬움
- 단점
- 배우기 쉽지 않음
- 다수 서버로 운영하는 순간 비용이 상당히 발생함
- 상대적으로 개발 환경을 구성하기 쉽지 않음
- 직접 운영이 쉽지 않음. 클라우드 버전 사용이 선호됨
- GCP -
Cloud Composer
- AWS -
Managed Workflows for Apache Airflow (MWAA)
- Azure -
Data Factory Managed Airflow
- GCP -
DAG
- Airflow에서 ETL을 부르는 명칭
- 태스크로 구성됨
- 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
- 태스크: Airflow의 operator로 만들어짐
- Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
- 경우에 맞게 사용 오퍼레이터를 결정하거나, 필요하다면 직접 개발
- e.g., Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script
DAG 구성 예제
- 3개의 태스크로 구성된 DAG
- t1 - t2 - t3의 순으로 일렬로 진행
- 3개의 태스크로 구성된 DAG
- t1이 실행되고, 여기서 t2와 t3로 분기
- t2, t3가 모두 끝나면 DAG의 실행도 끝