[DEV] 12주차. Airflow 고급 기능 (2)
1. 구글 시트 -> Redshift
구현 절차
- 시트 API 활성화, 구글 서비스 어카운트 생성 -> 그 내용을 JSON 파일로 다운로드
- 어카운트에서 생성해준 이메일을 조작하고 싶은 시트에 공유
- Airflow DAG 쪽에서 해당 JSON 파일로 인증하고 시트를 조작
구글 서비스 어카운트 생성
- 구글 클라우드 로그인
- 구글 스프레드시트 API 활성화
- 구글 서비스 어카운트 생성 (JSON)
- 이 JSON 파일의 내용을 google_sheet_access_token이라는 이름의 Variable로 등록
- 이 JSON 파일에 이메일 주소가 존재
- 이 주소를 읽고 싶은 구글스프레드시트 파일에 공유
- iam.gserviceaccount.com 으로 끝남
서비스 계정 생성
계정에 들어가서 키 - JSON 만들기 선택 -> 다운로드됨
Redshift로 복사할 구글스프레드시트
생성된 이메일 주소를 편집자 권한으로 공유
S3 Connection 설정
Airflow Connection에 S3 Connection 생성
DAG 구성
- 지정된 시트의 내용을 csv 파일로 local directory에 copy
- copy된 csv 파일을 s3에 업로드
- s3ToRedshift operator로 s3 -> Redshift에 copy
Redshift table 생성
create table leebk1124.spreadsheet_copy_testing(
col1 int,
col2 int,
col3 int,
col4 int
);
DAG 실행
docker exec -it learn-airflow-airflow-scheduler-1 sh
(airflow) airflow dags test Gsheet_to_Redshift 2023-10-10
2. Redshift SELECT -> 구글 시트
- SQL_to_Sheet.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from plugins import gsheet
from datetime import datetime
def update_gsheet(**context):
sql = context["params"]["sql"]
sheetfilename = context["params"]["sheetfilename"]
sheetgid = context["params"]["sheetgid"]
gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")
with DAG(
dag_id = 'SQL_to_Sheet',
start_date = datetime(2022,6,18),
catchup=False,
tags=['example'],
schedule = '@once'
) as dag:
sheet_update = PythonOperator(
dag=dag,
task_id='update_sql_to_sheet1',
python_callable=update_gsheet,
params = {
"sql": "SELECT * FROM analytics.nps_summary ORDER BY date",
"sheetfilename": "spreadsheet-copy-testing",
"sheetgid": "RedshiftToSheet"
}
)
DAG 실행
docker exec -it learn-airflow-airflow-scheduler-1 sh
(airflow) airflow dags test SQL_to_Sheet 2023-10-10
원래 1월, 9월만 있던 시트에 나머지 날짜들이 채워졌음!
3. Airflow API와 모니터링
- Airflow의 health check
- Airflow API로 외부에서 Airflow 조작
Airflow API 활성화
- airflow 실행 -> 로그인 할 수 있는 사용자
- default: airflow - airflow
- 외부로 open하기엔 위험한 조합
- airflow 전용으로 사용할 수 있는 User id를 따로 만들어 사용하는 것이 좋은 방법
- airflow는 admin
- 더 낮은 user level 정도로 만드는 것이 좋을 듯
- Airflow API, Web UI 자체는 가능하면 VPN 뒤에 숨겨놓아서 사용자를 거르는 것이 좋은 방법
- API를 노출시키더라도 같은 네트워크에 있는 내부 서비스들만 access할 수 있도록
- airflow.cfg의 api 섹션에서
auth_backend
의 값을 변경
[api]
auth_backend = airflow.api.authbackend.basic_auth
- docker-compose.yaml에는 이미 설정이 되어있음 (environments)
__
: airflow.cfg를 overridingairflow _ _ 섹션 _ _ 키
auth_backend
: 한 개 설정auth_backends
: 여러 개 설정
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
- 아래 명령으로 확인
docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backends
# airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
- Airflow Web UI에서 새로운 사용자 추가 (API 사용자)
- Security -> List Users -> +
- 이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)
Health API 호출
- /health API 호출
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
- 정상 경우 응답
{
"metadatabase": {
"status": "healthy"
},
"scheduler": {
"latest_scheduler_heartbeat": "2024-01-06T04:21:36.677781+00:00",
"status": "healthy"
}
}
API 사용 예
특정 DAG를 API로 Trigger하기
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
{
"conf": {},
"dag_id": "HelloWorld",
"dag_run_id": "manual__2023-05-24T00:00:00+00:00",
"data_interval_end": "2023-05-23T02:00:00+00:00",
"data_interval_start": "2023-05-22T02:00:00+00:00",
"end_date": null,
"execution_date": "2023-05-24T00:00:00+00:00",
"external_trigger": true,
"last_scheduling_decision": null,
"logical_date": "2023-05-24T00:00:00+00:00",
"note": null,
"run_type": "manual",
"start_date": null,
"state": "queued"
}
모든 DAG 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
{
[
{
"dag_id": "UpdateSymbol_v2",
"default_view": "grid",
"description": null,
"file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL1VwZGF0ZVN5bWJvbF92Mi5weSI.RXEMihoVheaOe6hGmdMQjG7Foe8",
"fileloc": "/opt/airflow/dags/UpdateSymbol_v2.py",
"has_import_errors": false,
"has_task_concurrency_limits": false,
"is_active": true,
"is_paused": true,
"is_subdag": false,
"last_expired": null,
"last_parsed_time": "2024-01-06T04:33:07.388271+00:00",
"last_pickled": null,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": "2024-01-04T10:00:00+00:00",
"next_dagrun_create_after": "2024-01-05T10:00:00+00:00",
"next_dagrun_data_interval_end": "2024-01-05T10:00:00+00:00",
"next_dagrun_data_interval_start": "2024-01-04T10:00:00+00:00",
"owners": [
"airflow"
],
"pickle_id": null,
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "0 10 * * *"
},
"scheduler_lock": null,
"tags": [
{
"name": "API"
}
],
"timetable_description": "At 10:00"
},
{
"dag_id": "Weather_to_Redshift",
"default_view": "grid",
"description": null,
"file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL1dlYXRoZXJfdG9fUmVkc2hpZnQucHki.ir4j8J8KwcnS287Z55oTVEYP7Cg",
"fileloc": "/opt/airflow/dags/Weather_to_Redshift.py",
"has_import_errors": false,
"has_task_concurrency_limits": false,
"is_active": true,
"is_paused": true,
"is_subdag": false,
"last_expired": null,
"last_parsed_time": "2024-01-06T04:33:06.944457+00:00",
"last_pickled": null,
"max_active_runs": 1,
"max_active_tasks": 16,
"next_dagrun": "2024-01-05T02:00:00+00:00",
"next_dagrun_create_after": "2024-01-06T02:00:00+00:00",
"next_dagrun_data_interval_end": "2024-01-06T02:00:00+00:00",
"next_dagrun_data_interval_start": "2024-01-05T02:00:00+00:00",
"owners": [
"airflow"
],
"pickle_id": null,
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "0 2 * * *"
},
"scheduler_lock": null,
"tags": [],
"timetable_description": "At 02:00"
}
],
"total_entries": 71
}
DAG들의 속성을 모두 출력해줌
Dags 리스트를 읽고 활성화되어 있는 Dag만 찾기
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
for dag in dags.json()['dags']:
paused = dag['is_paused']
if not paused:
print(dag['dag_id'])
모든 Variable 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
{
"total_entries": 2,
"variables": [
{
"description": "",
"key": "slack_url",
"value": "***"
},
{
"description": "",
"key": "google_sheet_access_token",
"value": "***"
}
]
}
모든 Config 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
{
"detail": "Your Airflow administrator chose not to expose the configuration, most likely for security reasons.",
"status": 403,
"title": "Forbidden",
"type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}
- admin 계정으로 접속해도 기본적으로 막혀있음
- airflow.cfg에 이것을 풀어줄 수 있는 key가 있음!
- docker-compose.yaml에서 이 key를 override 해주어야 함
1) airflow.cfg의 expose_config
값 True로 변경!
2) docker-compose.yml에서 airflow-webserver 서비스에 environment 추가
airflow-webserver:
...
environment:
<<: *airflow-common-env
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
...
Variables/Connections Import/Export
airflow variables export variables.json
airflow variables import variables.json
airflow connections export variables.json
airflow connectinos import variables.json
- DB에 기록이 되어서 Web UI에 보이는 variables, connections에 대해서만 동작함
- 환경변수로 등록된 variables, connections은 이 명령을 통해 알 수 없음