1. 구글 시트 -> Redshift

구현 절차

  • 시트 API 활성화, 구글 서비스 어카운트 생성 -> 그 내용을 JSON 파일로 다운로드
  • 어카운트에서 생성해준 이메일을 조작하고 싶은 시트에 공유
  • Airflow DAG 쪽에서 해당 JSON 파일로 인증하고 시트를 조작

구글 서비스 어카운트 생성

스크린샷 2024-01-05 오전 10 37 39


  • 구글 서비스 어카운트 생성 (JSON)
  • 이 JSON 파일의 내용을 google_sheet_access_token이라는 이름의 Variable로 등록
  • 이 JSON 파일에 이메일 주소가 존재
    • 이 주소를 읽고 싶은 구글스프레드시트 파일에 공유
    • iam.gserviceaccount.com 으로 끝남

스크린샷 2024-01-05 오전 10 44 42 서비스 계정 생성

스크린샷 2024-01-05 오전 10 44 01 계정에 들어가서 키 - JSON 만들기 선택 -> 다운로드됨


스크린샷 2024-01-05 오전 10 48 40

Redshift로 복사할 구글스프레드시트


스크린샷 2024-01-06 오전 6 54 16

생성된 이메일 주소를 편집자 권한으로 공유

S3 Connection 설정

스크린샷 2024-01-05 오전 10 51 39

Airflow Connection에 S3 Connection 생성


DAG 구성

  • 지정된 시트의 내용을 csv 파일로 local directory에 copy
  • copy된 csv 파일을 s3에 업로드
  • s3ToRedshift operator로 s3 -> Redshift에 copy

Redshift table 생성

create table leebk1124.spreadsheet_copy_testing(
    col1 int,
    col2 int,
    col3 int,
    col4 int
);

DAG 실행

docker exec -it learn-airflow-airflow-scheduler-1 sh

(airflow) airflow dags test Gsheet_to_Redshift 2023-10-10

스크린샷 2024-01-06 오전 7 25 59

2. Redshift SELECT -> 구글 시트

  • SQL_to_Sheet.py
from airflow import DAG
from airflow.operators.python import PythonOperator

from plugins import gsheet
from datetime import datetime

def update_gsheet(**context):
    sql = context["params"]["sql"]
    sheetfilename = context["params"]["sheetfilename"]
    sheetgid = context["params"]["sheetgid"]

    gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")


with DAG(
    dag_id = 'SQL_to_Sheet',
    start_date = datetime(2022,6,18),
    catchup=False,
    tags=['example'],
    schedule = '@once'
) as dag:

    sheet_update = PythonOperator(
        dag=dag,
        task_id='update_sql_to_sheet1',
        python_callable=update_gsheet,
        params = {
            "sql": "SELECT * FROM analytics.nps_summary ORDER BY date",
            "sheetfilename": "spreadsheet-copy-testing",
            "sheetgid": "RedshiftToSheet"
        }
    )

DAG 실행

docker exec -it learn-airflow-airflow-scheduler-1 sh

(airflow) airflow dags test SQL_to_Sheet 2023-10-10


스크린샷 2024-01-06 오전 7 52 03

스크린샷 2024-01-06 오전 7 52 23

원래 1월, 9월만 있던 시트에 나머지 날짜들이 채워졌음!

3. Airflow API와 모니터링

  • Airflow의 health check
  • Airflow API로 외부에서 Airflow 조작

Airflow API 활성화

  • airflow 실행 -> 로그인 할 수 있는 사용자
    • default: airflow - airflow
    • 외부로 open하기엔 위험한 조합
  • airflow 전용으로 사용할 수 있는 User id를 따로 만들어 사용하는 것이 좋은 방법
  • airflow는 admin
    • 더 낮은 user level 정도로 만드는 것이 좋을 듯
  • Airflow API, Web UI 자체는 가능하면 VPN 뒤에 숨겨놓아서 사용자를 거르는 것이 좋은 방법
    • API를 노출시키더라도 같은 네트워크에 있는 내부 서비스들만 access할 수 있도록


  • airflow.cfg의 api 섹션에서 auth_backend의 값을 변경
[api]
auth_backend = airflow.api.authbackend.basic_auth
  • docker-compose.yaml에는 이미 설정이 되어있음 (environments)
    • __ : airflow.cfg를 overriding
    • airflow _ _ 섹션 _ _ 키
  • auth_backend: 한 개 설정
    • auth_backends: 여러 개 설정
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
  • 아래 명령으로 확인
docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backends
# airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session


  • Airflow Web UI에서 새로운 사용자 추가 (API 사용자)
    • Security -> List Users -> +
    • 이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)

스크린샷 2024-01-06 오후 1 19 24

스크린샷 2024-01-06 오후 1 20 54

Health API 호출

  • /health API 호출
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
  • 정상 경우 응답
{
    "metadatabase": {
        "status": "healthy"
    }, 
    "scheduler": {
        "latest_scheduler_heartbeat": "2024-01-06T04:21:36.677781+00:00", 
        "status": "healthy"
    }
}

API 사용 예

특정 DAG를 API로 Trigger하기

curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
{
  "conf": {},
  "dag_id": "HelloWorld",
  "dag_run_id": "manual__2023-05-24T00:00:00+00:00",
  "data_interval_end": "2023-05-23T02:00:00+00:00",
  "data_interval_start": "2023-05-22T02:00:00+00:00",
  "end_date": null,
  "execution_date": "2023-05-24T00:00:00+00:00",
  "external_trigger": true,
  "last_scheduling_decision": null,
  "logical_date": "2023-05-24T00:00:00+00:00",
  "note": null,
  "run_type": "manual",
  "start_date": null,
  "state": "queued"
}

모든 DAG 리스트하기

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
{
[
    {
      "dag_id": "UpdateSymbol_v2",
      "default_view": "grid",
      "description": null,
      "file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL1VwZGF0ZVN5bWJvbF92Mi5weSI.RXEMihoVheaOe6hGmdMQjG7Foe8",
      "fileloc": "/opt/airflow/dags/UpdateSymbol_v2.py",
      "has_import_errors": false,
      "has_task_concurrency_limits": false,
      "is_active": true,
      "is_paused": true,
      "is_subdag": false,
      "last_expired": null,
      "last_parsed_time": "2024-01-06T04:33:07.388271+00:00",
      "last_pickled": null,
      "max_active_runs": 16,
      "max_active_tasks": 16,
      "next_dagrun": "2024-01-04T10:00:00+00:00",
      "next_dagrun_create_after": "2024-01-05T10:00:00+00:00",
      "next_dagrun_data_interval_end": "2024-01-05T10:00:00+00:00",
      "next_dagrun_data_interval_start": "2024-01-04T10:00:00+00:00",
      "owners": [
        "airflow"
      ],
      "pickle_id": null,
      "root_dag_id": null,
      "schedule_interval": {
        "__type": "CronExpression",
        "value": "0 10 * * *"
      },
      "scheduler_lock": null,
      "tags": [
        {
          "name": "API"
        }
      ],
      "timetable_description": "At 10:00"
    },
    {
      "dag_id": "Weather_to_Redshift",
      "default_view": "grid",
      "description": null,
      "file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL1dlYXRoZXJfdG9fUmVkc2hpZnQucHki.ir4j8J8KwcnS287Z55oTVEYP7Cg",
      "fileloc": "/opt/airflow/dags/Weather_to_Redshift.py",
      "has_import_errors": false,
      "has_task_concurrency_limits": false,
      "is_active": true,
      "is_paused": true,
      "is_subdag": false,
      "last_expired": null,
      "last_parsed_time": "2024-01-06T04:33:06.944457+00:00",
      "last_pickled": null,
      "max_active_runs": 1,
      "max_active_tasks": 16,
      "next_dagrun": "2024-01-05T02:00:00+00:00",
      "next_dagrun_create_after": "2024-01-06T02:00:00+00:00",
      "next_dagrun_data_interval_end": "2024-01-06T02:00:00+00:00",
      "next_dagrun_data_interval_start": "2024-01-05T02:00:00+00:00",
      "owners": [
        "airflow"
      ],
      "pickle_id": null,
      "root_dag_id": null,
      "schedule_interval": {
        "__type": "CronExpression",
        "value": "0 2 * * *"
      },
      "scheduler_lock": null,
      "tags": [],
      "timetable_description": "At 02:00"
    }
  ],
  "total_entries": 71
}

DAG들의 속성을 모두 출력해줌

Dags 리스트를 읽고 활성화되어 있는 Dag만 찾기

import requests
from requests.auth import HTTPBasicAuth

url = "http://localhost:8080/api/v1/dags"

dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))

for dag in dags.json()['dags']:
    paused = dag['is_paused']
    if not paused:
        print(dag['dag_id']) 

모든 Variable 리스트하기

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
{
  "total_entries": 2,
  "variables": [
    {
      "description": "",
      "key": "slack_url",
      "value": "***"
    },
    {
      "description": "",
      "key": "google_sheet_access_token",
      "value": "***"
    }
  ]
}

모든 Config 리스트하기

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
{
  "detail": "Your Airflow administrator chose not to expose the configuration, most likely for security reasons.",
  "status": 403,
  "title": "Forbidden",
  "type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}
  • admin 계정으로 접속해도 기본적으로 막혀있음
    • airflow.cfg에 이것을 풀어줄 수 있는 key가 있음!
    • docker-compose.yaml에서 이 key를 override 해주어야 함


1) airflow.cfg의 expose_config 값 True로 변경!
스크린샷 2024-01-07 오전 3 21 18


2) docker-compose.yml에서 airflow-webserver 서비스에 environment 추가

airflow-webserver:
    ...
    environment:
      <<: *airflow-common-env
      AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
    ...

스크린샷 2024-01-07 오전 3 43 07

Variables/Connections Import/Export

airflow variables export variables.json
airflow variables import variables.json

airflow connections export variables.json
airflow connectinos import variables.json
  • DB에 기록이 되어서 Web UI에 보이는 variables, connections에 대해서만 동작함
  • 환경변수로 등록된 variables, connections은 이 명령을 통해 알 수 없음