Apache 에어플로우(Airflow) 시작하기 - Airflow란?, Airflow 설치 및 기본 예제
포스팅 개요
본 포스팅은 Apache Airflow(에어플로우)에 대해서 정리하는 Airflow 시리즈 포스팅입니다.
Airflow 포스팅에서는 아래와 같은 순서로 Airflow에 대해서 정리해보려고 합니다.
- Airflow란 무엇인가? Airflow 설치 방법과 간단한 예제 ( 본 포스팅 )
- Airflow Dag task relationship branch(분기) 예제 및 airflow 파이썬(Python) operator 활용 간단 예제 (https://lsjsj92.tistory.com/632)
- Airflow를 활용한 머신러닝 ( Machine Learning ) 예제 (https://lsjsj92.tistory.com/633)
- Airflow slack 알람 받기 예제 (https://lsjsj92.tistory.com/634)
위와 같이 총 4가지의 글을 작성할 예정이며 본 글은 Airflow 첫 번째 포스팅 Airflow란 무엇인가? Airflow 설치 방법과 간단한 예제에 대해서 알아봅니다.
해당 포스팅을 작성하면서 참고한 글은 아래와 같습니다.
- https://github.com/apache/airflow
- https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
- https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
본 포스팅 시리즈에서 사용되는 코드는 아래 github repo에서 확인하실 수 있습니다.
포스팅 본문
포스팅 개요에서 언급하였듯이 본 포스팅은 Airflow 첫 번째 글 시리즈로써 Airflow에 대한 개괄적인 내용을 작성합니다.
Apache 에어플로우(Airflow)란?
Apache 에어플로우(Airflow)란 무엇일까요? airflow란 AirBnB에서 만든 workflow management tool입니다. workflow는 일련의 작업의 흐름이라고 말할 수 있습니다. 예를 들어서 ETL 같은 경우는 데이터를 Extractaction -> Transformation -> Loading 하는 작업의 흐름이 있는데요. 이런 workflow를 관리하는 툴이 바로 airflow입니다. 여기서 관리라는 것은 워크플로우(workflow)를 작성, 스케줄링, 모니터링 하는 작업을 말할 수 있습니다. 이러한 airflow는 요즘 핫한 mlops에서도 많이 사용되며 데이터 엔지니어( Data Engineer ) 영역에서도 많이 사용되는 SW입니다.
Airflow는 특징이 되는 컴포넌트들이 있으며 각 component들 간의 아키텍처는 다음과 같습니다.
Airflow는 크게 아래 4가지의 구성요소로 이루어져 있습니다.
- Webserver
- Scheduler
- Executor
- Workers
또한, 아래와 같은 개념을 이용해 workflow를 작성합니다.
- DAG(Directed Acyclic Graph)
Airflow Webserver
Airflow의 웹 서버는 Airflow의 로그를 보여주거나 스케쥴러(Scheduler)에 의해 생선된 DAG 목록, Task 상태 등을 시각화해서 보여줍니다. 즉, UI를 통해 사용자에게 시각적으로 정보를 제공해주는 요소라고 볼 수 있습니다.
Airflow Scheduler
Airflow 스케쥴러는 airflow로 할당된 work들을 스케쥴링 해주는 component입니다. Scheduled된 workflow들의 triggering과 실행하기 위해서 executor에게 task를 제공해주는 역할을 수행합니다.
Airflow Executor
Airflow Executor는 실행중인 task를 handling하는 component입니다. default 설치시에는 scheduler에 있는 모든 것들을 다 실행시키지만, production 수준에서의 executor는 worker에게 task를 push 합니다.
Airflow Worker
Airflow worker는 실제 task를 실행하는 주체자라고 보면 됩니다.
Airflow Database
Airflow database는 airflow에 있는 DAG, Task 등의 metadata를 저장하고 관리합니다.
DAG(Directed Acyclic Graph)
DAG는 비순환 그래프로써 순환하는 싸이클이 없는 그래프입니다. 즉, 노드와 노드가 단방향으로 연결되어 있어 그 노드로 향하게 되면 돌아오지 않는다는 특성을 가지고 있습니다. Airflow에서는 이러한 DAG를 이용해 Workflow를 구성하여 어떤 순서로 task를 실행시킬 것인지 dependency를 어떻게 표현할 것인지 등을 설정합니다.
Airflow 설치 및 기본 셋팅, 실행
Airflow 설치는 Python의 pip install을 활용하면 간단하게 설치할 수 있습니다. 본 설치 과정은 Airflow 공식 문서에 있는 것을 그대로 따라해서 설치한 과정이니 더 자세한 것과 궁금한 것은 공식 문서를 참고하시면 될 것 같습니다.
1. 경로 및 환경 변수 설정
Airflow를 설치하기에 앞서 가장 먼저 경로와 환경 변수 설정을 셋팅하고 가야합니다. Airflow 공식 문서에도 나와 있듯이 Airflow는 기본 경로를 '~/airflow'로 권장하고 있습니다. 즉, 사용자 계정 path를 선호하고 있습니다.
또한, Airflow 버전을 어떠한 것으로 설치할 것인지, Python 버전은 무엇인지 등을 셋팅하고 Airflow를 설치합니다.
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
저는 다음과 같은 환경과 Airflow 버전을 설치하였습니다. 해당 환경은 추후 Airflow 포스팅에서도 동일하게 사용되는 환경입니다.
- Airflow 설치된 경로 : /Users/user_name/airflow
- Python 버전 : Python3.8
- Airflow 버전 : 2.2.3
여기까지 되었으면 이제 airflow 설치를 진행합니다. 공식 문서에서도 나와 있는 것 같이 아래와 같은 명령어를 이용해 설치합니다.
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
별 문제가 없으면 마지막에 Successfully라는 문구가 나오면서 airflow 설치가 됩니다.
2. Airflow 실행 전 셋팅
Airflow를 본격적으로 실행하기 전 몇 가지 셋팅이 필요합니다. 가장 먼저 DB 초기화가 필요합니다. DB는 Airflow의 DAG와 Task 등을 관리하기 때문에 셋팅이 필요합니다.
airflow db init
여기까지 진행되었으면 airflow가 설치된 경로 ( 저 같은 경우는 /Users/user_name/airflow )에 구성된 파일 및 디렉토리를 살펴보면 다음과 같은 결과를 확인할 수 있습니다.
- airflow.cfg
- airflow의 환경설정 파일
- airflow.db
- DB 관련된 정보를 담고 있음
- logs
- airflow의 각종 로그를 관리
- dags
- (위 사진에는 나와 있지 않은데 초기에는 있을겁니다.)
- airflow에서 dag를 관리하는 디렉토리
다음으로 airflow를 관리할 사용자 계정을 하나 만들어줍니다.
airflow users create \
--username admin \
--firstname soojin \
--lastname lee \
--role Admin \
--email lsjsj92@naver.com
이떄 마지막 비밀번호를 입력하라는 것이 나오니 비밀번호를 잊지 않을 수 있도록 설정해줍니다!
3. Airflow web server 실행
여기까지 진행하였으면 기본적인 셋팅은 끝났습니다. 저는 local에서 작업하는 것이기 때문에 이 정도의 셋팅까지 진행하였습니다.
이제 airflow webserver를 실행시켜서 airflow web ui를 확인할 수 있습니다.
airflow webserver --port 8080
위 명령어를 입력하면 8080 포트로 airflow web server가 실행되게 됩니다. 웹 브라우저를 이용해 접속하게 되면 아래 사진과 같이 로그인 창이 나오게 됩니다. 이때 방금전 셋팅한 계정으로 로그인해줍니다.
이렇게 로그인하게 되면 아래와 같이 airflow web ui를 확인할 수 있습니다!
이때 DAGs 아래에 다양한 airflow example dags들이 있는 것을 확인할 수 있습니다.
저는 앞으로 사용할 때 해당 example들이 보기 불편하기 때문에 example을 보이지 않도록 설정하려고 합니다.
만약, airflow example DAGs를 보기 싫으신 분들은 airflow.cfg 파일을 열어서 load_examples를 False로 설정하면 됩니다.
해당 파일의 값을 변경하고 web ui를 다시 보면 아마 없어지지 않았을 겁니다.
이때 airflow.db를 삭제하고 다시 airflow db init 명령어로 db를 초기화 해줍니다.
db를 초기화 해주었기 때문에 사용자도 다시 만들어야 할 수 있습니다. 필요하다면 사용자를 위처럼 다시 만들어줍니다.
잘 진행하고 다시 airflow web server를 켜보면 아래와 같이 example이 없어진 것을 확인할 수 있습니다.
Airflow basic example 살펴보기
Airflow 설치까지 마쳤으면 기본적인 Airflow 예제를 살펴봅니다. 해당 예제 또한 공식문서에 있는 코드를 그대로 적용하였으며 개요에 언급한 제 github 코드에서도 확인하실 수 있습니다. Airflow는 DAG를 관리하는 디렉토리를 지정해 관리합니다. 그 디렉토리는 airflow.cfg 파일 안에 설정되어 있습니다.
airflow.cfg 파일을 보면 위와 같이 dags_folder라는 설정 값이 있습니다. 해당 경로에 있는 dag 정보를 이용해서 airflow는 dags를 표시하고 관리합니다. 저는 위 설정 값을 바꾸지 않고 그대로 사용하겠습니다.
따라서 dags 디렉토리로 들어간 다음 파일 하나를 생성해줍니다. airflow dag는 Python 코드를 기반으로 작성되기 때문에 파일 이름에 .py가 있다는 것을 확인해주세요!
해당 코드에서 체크해야 할 중요 포인트는 다음과 같습니다.
Airflow Operator
- Airflow는 Operator를 이용해 Task를 정의
- Python Operator, Bash Operator, Dummy Operator, Branch python operator 등 다양한 operator가 존재
- 각 Operator마다 수행하는 주체, 목적 등이 다름
Task 연결 ( task relationship )
- 각 task는 ">>", "<<", "[ ]"를 이용하여 dag 그래프를 그릴 수 있음
- first_task.set_downstream(second_task), third_task.set_upstream(second_task)와 같은 형태로도 가능
- 예시 ) t1 >> [t2, t3]
args 설정 및 DAG 객체 생성
- default_args
- DAG를 구성할 때 필요한 default argument를 셋팅
- DAG 객체 생성
- DAG 이름과 schedule interval 등을 셋팅
- 실제 dag를 수행하는 작업을 명시
- operator, task 연결 등을 이 안에서 수행
위 처럼 DAG를 구성한 후 이제 airflow scheduler를 실행시켜줍니다. 스케쥴러는 airflow 작업들을 스케쥴링 해주는 component이기 때문에 이를 실행시켜줘야 각종 관리를 할 수 있습니다.
위처럼 airflow scheduler 명령어를 이용해 스케쥴러를 실행시켜주면 airflow web server ui에서 방금 작성한 tutorial dag를 확인할 수 있습니다. 아까 airflow example dag를 전부 제거 했기 때문에 tutorial만 보이는 것을 확인할 수 있습니다.
이렇게 생성된 dag를 airflow web ui를 통해서 다양하게 정보를 확인할 수 있습니다.
- dag 그래프 확인
- airflow DAG 실행 확인
- airflow DAG tree 구조 확인 및 실행 날짜, 실행 상태 확인
해당 web ui를 통해서 dag를 실행시킬 수 있습니다. 동영상 재생 버튼처럼 생긴 화살표 방향을 클릭하면 dag가 실행됩니다.
이 뿐만 아니라 설정해둔 interval 등을 이용해 특정 배치 주기로 실행시킬 수도 있습니다.
아래 사진은 모든 task가 다 실행되고 success가 된 상태입니다.
또한, 이렇게 실행된 dag와 task들에 대해서 로그(log)를 확인할 수 있습니다.
로그는 airflow web ui에서도 확인가능하고 airflow dir path에 있는 logs 디렉토리 경로에서도 확인할 수 있습니다.
가장 먼저 task에 대한 log를 확인하려면 성공적으로 마무리 된 task를 하나 클릭하면 아래와 같은 화면을 확인할 수 있을겁니다.
여기서 instance detail과 log 등의 메뉴가 있는데요. log를 클릭해보겠습니다.
log를 클릭하면 아래 사진과 같이 해당 task의 log를 확인할 수 있습니다.
task 수행하면서 생성된 log를 쉽게 확인할 수 있습니다.
또한, task에 있는 instance detail을 살펴보면 해당 task instance의 상세 정보를 확인할 수 있습니다.
이렇게 상세 정보를 확인할 수 있으며 오른쪽 사진은 airflow example code에 작성된 코드의 결과인데 그것 또한 instance detail에서 확인할 수 있습니다.
그 외에도 airflow 경로에 있는 logs directory에서도 로그를 확인할 수 있습니다.
Airflow command 명령어 살펴보기
Airflow에는 각종 command line 명령어를 제공해줍니다. 기본적인 명령어를 살펴보고 글을 마무리하려고 합니다.
가장 먼저 확인해볼 명령어는 다음과 같습니다.
airflow tasks list {dag id}
airflow tasks list {dag id} --tree
airflow dags list
airflow의 dag 안에는 여러 task로 구성되어져 있습니다. airflow tasks list <dag id>는 해당 dag에 있는 task들의 리스트를 확인해주는 명령어입니다.
또한, 여기서 --tree 옵션을 추가해주면 tree 형태로 task를 확인할 수 있습니다.
airflow에 등록된 dag list를 확인하려면 airflow dags list 명령어를 입력하면 됩니다. 해당 명령어를 이용해 현재 등록된 dag들의 리스트들을 확인할 수 있습니다.
다음으로는 command line으로 dag를 test 형태로 실행해보는 명령어입니다. 명령어 형태는 다음과 같습니다.
airflow dags test {dag_id} {date}
airflow dags test tutorial 20220203와 같이 명령어를 입력하면 해당 dags를 테스트 해볼 수 있습니다.
해당 명령어를 입력하면 dag가 실행되는 log를 커맨드라인에서 확인할 수 있습니다.
또한, airflow web ui에서도 각종 로그 정보를 확인할 수 있습니다.
특히, command line에서 실행시킨 것은 backfill이라는 run type과 backfilljob이라는 job type으로 구분된 것을 확인할 수 있습니다.
이렇게 airflow는 어떻게 실행했는지에 따른 type 또한 구분할 수 있도록 web ui를 제공해줍니다.
포스팅 마무리
본 포스팅에서는 apache airflow에 대해서 간략하게 알아보았습니다. 또한, airflow를 설치하는 방법과 기본적인 예제를 살펴보았습니다.
다음 포스팅에서는 task sequence에 대한 간략한 예제와 Python operator를 이용한 airflow python example을 작성해보겠습니다.
긴 글 읽어주셔서 감사합니다.
저에게 연락을 주시고 싶으신 것이 있으시다면
- Linkedin : https://www.linkedin.com/in/lsjsj92/
- github : https://github.com/lsjsj92
- 블로그 댓글 또는 방명록
으로 연락주시면 감사하겠습니다.