Skip to Content
Data EngineeringAirflow & Composer REST API 연동
📊 Data Engineering2018년 10월 25일

Airflow & Composer REST API 연동

#airflow#data-engineering#gcp-composer#rest-api

1. GUI 시스템과의 파이프라인 API 연동 배경

빅데이터 통합 플랫폼(Flamingo 등)의 UI나 스케줄러 기능은 백엔드에서 일어나는 잡(Job) 스케줄링을 중앙집중적으로 제어해야 했습니다. 이를 위해 Airflow를 워크플로우 매니저로 두고 외부 어플리케이션(Java/Node.js)에서 REST API를 통해 특정 DAG(Directed Acyclic Graph)를 프로그래밍적으로 제어(Start/Stop/Status check)할 수 있는 방법이 절실했습니다.

사내 온프레미스용 Airflow 오픈소스 외에도, 클라우드 아키텍처를 대비한 Google Cloud Composer 버전 1.x의 API 지원 현황까지 광범위하게 조사하게 되었습니다.

2. REST API 기본 개념 및 구조

당시 Airflow 1.9.x ~ 1.10.x 환경에서는 REST API 지원이 빈약했습니다. Airflow에서 외부 컨트롤을 받기 위해서는 기본적으로 탑재된 experimental API를 사용해야 하며, Cloud Composer의 경우 GKE 클러스터 제어용 API가 별도로 존재합니다.

3. 주요 API 엔드포인트 테스트 및 구현

1. Google Cloud Composer REST API (v1) 조사

Composer 자체의 REST API는 Airflow의 DAG 스케줄링보다는 환경(Environment) 자체의 프로비저닝(생성/삭제/목록 조회)에 더 방점이 찍혀 있었습니다.

EndpointDescription
composer.projects.locations.environments.create신규 환경 생성 (Create a new environment)
composer.projects.locations.environments.delete환경 삭제 (Delete an environment)
composer.projects.locations.environments.get기존 환경 정보 조회 (Get an existing environment)
composer.projects.locations.environments.list환경 목록 조회 (List environments)
composer.projects.locations.environments.patch환경 업데이트 (Update an environment)
composer.projects.locations.operations.delete장기 실행 오퍼레이션 상태 삭제 (진행 취소 아님)
composer.projects.locations.operations.get장기 실행 오퍼레이션 최신 상태 폴링 (Get state)
composer.projects.locations.operations.list오퍼레이션 목록 조회 (List operations)

[예제] 환경 리스트업 (JSON 응답)

javascript
{ "environments": [ { "name": "projects/keen-sight-219504/locations/us-central1/environments/sf-composser-test", "config": { "gkeCluster": "projects/...", "dagGcsPrefix": "gs://us-central1-sf-composser-te-69f490bf-bucket/dags", "nodeCount": 3, "softwareConfig": { "imageVersion": "composer-1.2.0-airflow-1.9.0" }, "nodeConfig": { "..." }, "airflowUri": "https://f41640886e5bee5ae-tp.appspot.com" }, "uuid": "c540c4a3-1c0c-46db-9414-0634823e8b41", "state": "RUNNING" } ] }

⚠️ 한계점: 인프라 제어 기능만 있을 뿐, 정작 외부 솔루션에서 Task를 트리거하거나 상태를 가져오는 쓸만한 워크플로우 제어 API는 부재했습니다.


2. Airflow 기본 실험적(Experimental) API

Airflow 인스턴스의 특정 DAG와 Task 상태를 조회하기 위한 기본 웹서버 API입니다.

EndpointDescription
/api/experimental/dags/<DAG_ID>/tasks/<TASK_ID>Task의 정보를 가져옵니다 (GET)
/api/experimental/dags/<DAG_ID>/dag_runs인자로 받은 DAG를 외부에서 트리거합니다 (POST)
  • 요청 쿼리 예시 URL: https://[AIRFLOW-WEB-HOST]/api/experimental/dags/composer_sample_bq_notify/tasks/bq_recent_questions_query
javascript
/* Task 상세 정보 Json 반환 예시 */ { "adhoc": "False", "bql": "\n SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 100", "create_disposition": "CREATE_IF_NEEDED", "email_on_failure": "True", "execution_timeout": "None", "retries": "1", "retry_delay": "0:05:00", "task_id": "bq_recent_questions_query", "trigger_rule": "all_success" }

⚠️ 한계점: 단순히 정적 설정값이나 트리거만 지원할 뿐, 특정 시점의 실행 상태(Success, Failed)를 추적하거나 동적으로 매개변수를 유연하게 제어하기에는 너무 부족했습니다.


3. 외부 오픈소스 플러그인 조사 (REST API Plugin)

기본 기능이 워낙 부족하여 커뮤니티에서 개발된 써드파티 플러그인 도입을 검토했습니다.

NameRepository URL비고 (당시 테스트 결과)
Airflow REST API Pluginteamclairvoyant/airflow-rest-api-plugin 내부적으로 Airflow CLI 명령어 결과를 파싱해서 반환하는 꼼수 방식이라 오동작이 많고 데이터로 깊이 활용하기 어려움
Airflow Plugin - APIairflow-plugins/airflow_api_plugin Local/Celery Executor 등 커스텀 Executor 플러그인을 활성화 중이면 아예 동작하지 않는 치명적 결함 존재

4. 조사 결과 및 시사점

Airflow 1.x 시절의 치명적인 단점은 “강력한 파이썬 친화적 구조”에 비해 “외부 의존성을 위한 REST API 생태계”가 처참할 수준이었다는 점입니다. 당시 개발팀 내부적으로 외부 UI 모듈과 붙이기 위해 수많은 플러그인을 리서치했으나, 전부 불안정한 탓에 차라리 우리가 직접 Airflow 메타데이터 DB(PostgreSQL)를 다이렉트로 읽고 쓰는 자체 커스텀 모듈을 개발하는 편이 더 안정적이라는 아픈 결론을 내렸던 프로젝트이기도 합니다. 다행히 Airflow 2.0부터는 안정적이고 풍부한 Stable REST API가 공식 내장되어 이러한 고충이 대부분 사라졌습니다.

Last updated on