세로형
Recent Posts
Recent Comments
Link
04-19 00:01
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
Archives
Today
Total
관리 메뉴

꿈 많은 사람의 이야기

Apache 에어플로우(Airflow) - DAG branch(분기) 예제(example) 및 Python operator, XCom에 대해서 본문

Data Engineering 및 Infra

Apache 에어플로우(Airflow) - DAG branch(분기) 예제(example) 및 Python operator, XCom에 대해서

이수진의 블로그 2022. 2. 21. 08:43

포스팅 개요

본 포스팅은 Apache Airflow(에어플로우)에 대해서 정리하는 Airflow 시리즈 포스팅입니다.

Airflow 포스팅에서는 아래와 같은 순서로 Airflow에 대해서 정리해보려고 합니다.

 

  1. Airflow란 무엇인가? Airflow 설치 방법과 간단한 예제 ( https://lsjsj92.tistory.com/631 )
  2. Airflow Dag task relationship branch(분기) 예제 및 airflow 파이썬(Python) operator 활용 간단 예제 ( 본 포스팅 )
  3. Airflow를 활용한 머신러닝 ( Machine Learning ) 예제 (https://lsjsj92.tistory.com/633)
  4. Airflow slack 메세지로 알람 받기 예제 (https://lsjsj92.tistory.com/634)

위와 같이 총 4가지의 글을 작성할 예정이며 본 글은 Airflow 두 번째 포스팅 Airflow dag 순서 간단한 예제와 파이썬(Python) operator를 활용한 airflow 예제에 대해서 살펴보도록 하겠습니다. 또한, Python operator를 활용하면서 airflow XCom을 활용하기도 하는데요. airflow의 XCom이 무엇인지 어떻게 사용하는지 살펴보겠습니다.

 

해당 포스팅을 작성하면서 참고한 글은 아래와 같습니다.

본 포스팅 시리즈에서 사용되는 코드는 아래 github repo에서 확인하실 수 있습니다.

 

GitHub - lsjsj92/airflow_tutorial: python airflow tutorial and example

python airflow tutorial and example. Contribute to lsjsj92/airflow_tutorial development by creating an account on GitHub.

github.com


포스팅 본문

포스팅 개요에서도 언급하였듯이 이번 포스팅은 airflow 예제(example) 중 분기(branch)하는 방법과 Python operator를 사용하는 airflow dag 예제, 그리고 airflow xcom에 대해서 살펴봅니다.

 

본 포스팅에서 진행한 저의 Airflow와 Python 환경은 다음과 같습니다.

 

  • Airflow 설치된 경로 : /Users/user_name/airflow
  • Python 버전 : Python3.8
  • Airflow 버전 : 2.2.3
  • OS : Mac Pro

 


Airflow Dag task branch(분기) 예제

지난 포스팅에서도 언급하였지만 Airflow는 DAG(Directed Acyclic Graph)로 workflow를 구성합니다. DAG란 비순환 그래프로써 노드와 노드가 단방향으로 연결되어 있습니다. 따라서 Airflow에선 DAG로 workflow를 구성하여 어떤 순서로 task를 실행시킬 것인지 정할 수 있습니다. 

 

여기서 task 연결(task relationship)은 다음과 같은 기호를 활용해 연결할 수 있습니다.

  • >>, <<, [ ] 를 이용하여 dag 그래프를 그릴 수 있음
  • set_downstream 또는 set_upstream을 이용할 수도 있음
  •  

이러한 dag의 workflow에 있는 또 다른 특징 중 하나가 branch라는 것이 있습니다. branch는 다양한 경로 중 어떤 조건에 따라 특정 경로로 이동하게 하며 나머지 경로로는 가지 않도록 하는 일종의 분기점을 두게 해주는 역할을 제공해줍니다.

이렇게 branch로 가게 하는 것은 airflow operator에서 branch operator를 활용하여 정할 수 있는데요. 본 포스팅에서는 BranchPythonOperator를 활용하겠습니다.

 

아래는 BranchPythonOperator를 활용한 분기 예제 코드입니다. 해당 전체 코드는 포스팅 개요에서 언급한 제 github에도 올려져 있으니 참고하시면 되겠습니다.

 

BranchPythonOperator를 이용한 airflow branch 예제 코드

from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule


def random_branch_path():
    # 필요 라이브러리는 여기서 import
    # https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#writing-a-dag 참고
    from random import randint

    return "path1" if randint(1, 2) == 1 else "my_name_en"

with DAG( **dag_args ) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BranchPythonOperator(
        task_id='branch',
        python_callable=random_branch_path,
    )
    
    t3 = BashOperator(
        task_id='my_name_ko',
        depends_on_past=False,
        bash_command='echo "안녕하세요."',
    )

    t4 = BashOperator(
        task_id='my_name_en',
        depends_on_past=False,
        bash_command='echo "Hi"',
    )

    complete = BashOperator(
        task_id='complete',
        depends_on_past=False,
        bash_command='echo "complete~!"',
        trigger_rule=TriggerRule.NONE_FAILED
    )

    dummy_1 = DummyOperator(task_id="path1")


    t1 >> t2 >> dummy_1 >> t3 >> complete
    t1 >> t2 >> t4 >> complete
반응형

핵심 코드 설명

1. 필요 라이브러리 import

airflow에서 python을 활용해 branch를 사용하려면 먼저 airflow.operator.python에 있는 BranchPythonOperator를 import해야 합니다.

from airflow.operators.python import BranchPythonOperator

이렇게 import한 Operator를 활용해서 task를 구성하고 task 끼리의 순서를 정하면 됩니다.

 

2. BranchPythonOperator 구성

다른 Operator와 마찬가지로 BranchPythonOperator 또한 task 형태로 구성하면 됩니다. 아래 코드가 BranchPythonOperator task 객체를 만드는 코드입니다.

def random_branch_path():
    # 필요 라이브러리는 여기서 import
    # https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#writing-a-dag 참고
    from random import randint

    return "path1" if randint(1, 2) == 1 else "my_name_en"

t2 = BranchPythonOperator(
    task_id='branch',
    python_callable=random_branch_path,
)

이떄 python_callable이라는 인자에다가 파이썬 함수를 적용해주면 됩니다. 제가 사용한 Python 함수는 random_branch_path 이름을 가진 함수로 1과 2중 하나의 숫자를 랜덤으로 뽑아 1이면 path1로 2이면 path2로 가도록 설정합니다.

 

이때 path1 또는 path2로 간다고 제가 언급하였는데요. 이 path1과 path2는 airflow operator의 task_id에 맡는 해당 path로 가도록 되게 되어있습니다. 즉, task_id가 path1 또는 path2인 task로 이동한다는 것입니다. ( 변수 명이 아닌, task_id 입니다! )

 

3. task relationship 설정

이제 Operator로 만든 task들 간의 관계를 설정합니다. 즉, 순서를 어떻게 설정할 지 셋팅합니다.

 

t1 >> t2 >> dummy_1 >> t3 >> complete
t1 >> t2 >> t4 >> complete

이때 t2가 Branch가 일어나는 task입니다. 여기서 특이하게 relationship을 2개로 나누었습니다.

즉, t2가 어떤 상태이냐에 따라서 dummy_1으로 가거나 t4로 가게 됩니다. 이때 dummay_1의 task_id가 path1이고 t4의 task_id가 my_name_en 이라는 id를 가지고 있습니다. 즉, branch에서 python_callable로 불러온 함수에서 결과 값이 1이 나오면 dummay_1으로 가게 되는 것이고 2가 나오면 t4로 가게 됩니다.

 

branch 결과

airflow scheduler를 실행시키고 해당 dag를 실행시키면 아래와 같은 결과를 확인할 수 있습니다.

위 사진을 보면 path가 2개로 나뉘어져 있는 것을 확인할 수 있으며 2번째 task에 branch task가 있어 분기가 되는 것을 확인할 수 있습니다. 제가 실행하였을 때는 위의 경로로 실행되어서 success가 윗 경로로 된 것을 확인할 수 있습니다.

아래 경로는 분홍색 테두리로 쳐져 있는데 이는 skip되었다는 것입니다. 즉, 위의 경로로 이동하였기 때문에 아래 경로는 skip 된 것을 확인할 수 있습니다.

 

이렇게 BranchOperator를 활용하면 dag 경로에 branch를 놓아 분기 경로를 설정 할 수 있습니다.


Airflow 파이썬(Python) operator 간단 예제

airflow에서는 Operator를 이용해 task를 정의합니다. 이러한 Operator는 아래와 같은 다양한 operator들이 있습니다.

  • bash operator
  • python operator
  • dummy operator
  • email operator
  • jdbc operator

등등 다양한 operator들이 있습니다. 자세한 operator는 airflow 공식문서를 참고하시면 되겠습니다.

이 중 Python operator를 활용하면 Python 코드를 활용해 task를 정의할 수 있습니다. 따라서 이번 섹션은 Python operator를 이용해서 간단한 airflow python 예제를 소개해드리려고 합니다. 

 

코드를 시작하기에 앞서 본 코드에서는 airflow xcom을 이용해 DAG task 끼리 값을 주고 받습니다. 그렇다면 Airflow에서 XCOM은 무엇인지 먼저 알아보고 진행하겠습니다.


Airflow XCom(cross-communications)이란?

airflow의 task는 독립적으로 수행됩니다. 그렇기에 서로 어떤 통신할 수 있는 방법이 없습니다. 예를 들어 A task에서 나온 결과를 B task에게 전달할 수 없는 것이죠. 하지만 task를 구성하다보면 이전 작업의 결과를 전달하거나 등의 작업의 필요성이 나오게 됩니다. 이런 부분을 해결하기 위해서 Airflow에서는 XCom(cross-communications)을 이용해 task끼리 적은양의 메세지를 주고 받을 수 있습니다.

 

여기서 XCom은 적은양의 메세지를 주고 받는다고 했는데요. 이는 task 간의 간단한 통신을 위한 용도이여서 pandas dataframe과 같은 large value에는 적합하지 않습니다. 즉, airflow 공식문서에 따르면 small amounts of data를 위해 designed 되었다고 합니다.

 

이러한 XCom의 특징을 정리하자면 아래와 같습니다.

  • 적은 양의 데이터를 통신
  • task_id, dag_id, key로 구분
  • xcom_push와 xcom_pull을 이용해 storage에 push, pull 되어 짐

Airflow XCom의 이러한 특징을 알아두고 다음 예제를 보시면 될 것 같습니다.


아래는 PythonOperator를 활용한 예제 코드입니다. 해당 전체 코드는 branch 코드와 마찬가지로 포스팅 개요에서 언급한 제 github에도 올려져 있으니 참고하시면 되겠습니다.

 

Python operator를 활용한 airflow dag 구성 예제 코드

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

def random_branch_path():
    from random import randint

    return "cal_a_id" if randint(1, 2) == 1 else "cal_m_id"

def calc_add(x, y, **kwargs):
    result = x + y
    print("x + y : ", result)
    kwargs['task_instance'].xcom_push(key='calc_result', value=result)
    return "calc add"

def calc_mul(x, y, **kwargs):
    result = x * y
    print("x * y : ", result)
    kwargs['task_instance'].xcom_push(key='calc_result', value=result)
    return "calc mul"

def print_result(**kwargs):
    r = kwargs["task_instance"].xcom_pull(key='calc_result')
    print("message : ", r)
    print("*"*100)
    print(kwargs)

def end_seq():
    print("end")

with DAG( **dag_args ) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start!"',
    )

    branch = BranchPythonOperator(
        task_id='branch',
        python_callable=random_branch_path,
    )
    
    calc_add = PythonOperator(
        task_id='cal_a_id',
        python_callable=calc_add,
        op_kwargs={'x': 10, 'y':4}
    )

    calc_mul = PythonOperator(
        task_id='cal_m_id',
        python_callable=calc_mul,
        op_kwargs={'x': 10, 'y':4}
    )

    msg = PythonOperator(
        task_id='msg',
        python_callable=print_result,
        trigger_rule=TriggerRule.NONE_FAILED
    )

    complete_py = PythonOperator(
        task_id='complete_py',
        python_callable=end_seq,
        trigger_rule=TriggerRule.NONE_FAILED
    )

    complete = BashOperator(
        task_id='complete_bash',
        depends_on_past=False,
        bash_command='echo "complete~!"',
        trigger_rule=TriggerRule.NONE_FAILED
    )


    start >> branch >> calc_add >> msg >> complete_py >> complete
    start >> branch >> calc_mul >> msg >> complete

핵심 코드 설명

1. 필요 라이브러리 import

Python operator를 사용하기 위해 필요 라이브러리를 import 합니다. 여기서 PythonOperator는 airflow.operators.python_operator 안에 있다는 것을 참고해주시길 바랍니다.

 

from airflow.operators.python_operator import PythonOperator

 

2. Python task 구성을 위한 함수 셋팅

본 코드에서는 파이썬 사용자 정의 함수 5개를 구현하였습니다. 어려운 함수가 아닌 간단한 함수이니 딱 보시면 아실 것이라 생각됩니다.

def random_branch_path():
    from random import randint

    return "cal_a_id" if randint(1, 2) == 1 else "cal_m_id"

def calc_add(x, y, **kwargs):
    result = x + y
    print("x + y : ", result)
    kwargs['task_instance'].xcom_push(key='calc_result', value=result)
    return "calc add"

def calc_mul(x, y, **kwargs):
    result = x * y
    print("x * y : ", result)
    kwargs['task_instance'].xcom_push(key='calc_result', value=result)
    return "calc mul"

def print_result(**kwargs):
    r = kwargs["task_instance"].xcom_pull(key='calc_result')
    print("message : ", r)
    print("*"*100)
    print(kwargs)

def end_seq():
    print("end")
  • random_brach_path
    • 위에서 소개한 branch 용도를 위한 함수
  • calc_add, calc_mul
    • x, y 2개의 인자를 받아서 더하거나 곱하는 함수
    • 더한 결과 값을 xcom_push를 이용해 key 값을 calc_result로 설정하고 결과를 저장
  • print_result
    • 결과를 출력하는 함수
    • xcom_pull을 이용해 결과 값을 가져옴
  • end_seq
    • seqence가 끝났다는 end를 출력하는 함수

함수 내용은 어려운 것이 없을거라 생각됩니다. 여기서 보셔야 할 것은 xcom을 이용해 값을 저장하고 값을 가지고 온다는 것입니다. 위 airflow xcom에서 설명한 것과 같이 xcom은 pull과 push를 이용해 값을 가져오거나 저장할 수 있습니다. 이때 key를 이용해서 값을 저장하고 key를 이용해서 값을 가져옵니다.

 

3. DAG task 구성

이제 함수를 정의했으니 Operator를 이용해 dag task를 구성해봅니다. 여기서는 총 7개의 task를 구성하였습니다.

start = BashOperator(
    task_id='start',
    bash_command='echo "start!"',
)

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=random_branch_path,
)

calc_add = PythonOperator(
    task_id='cal_a_id',
    python_callable=calc_add,
    op_kwargs={'x': 10, 'y':4}
)

calc_mul = PythonOperator(
    task_id='cal_m_id',
    python_callable=calc_mul,
    op_kwargs={'x': 10, 'y':4}
)

msg = PythonOperator(
    task_id='msg',
    python_callable=print_result,
    trigger_rule=TriggerRule.NONE_FAILED
)

complete_py = PythonOperator(
    task_id='complete_py',
    python_callable=end_seq,
    trigger_rule=TriggerRule.NONE_FAILED
)

complete = BashOperator(
    task_id='complete_bash',
    depends_on_past=False,
    bash_command='echo "complete~!"',
    trigger_rule=TriggerRule.NONE_FAILED
)

여기서 분기를 하는 예제는 위에서 소개하였으니 설명은 생략하고 핵심이 되는 Python Operator에 대해서만 설명을 하겠습니다.

  • calc_add
    • task_id : cal_a_id
    • python_callable은 위에서 정의한 calc_add 함수를 사용
    • op_kwargs를 이용해 calc_add에서 사용되는 변수의 값을 할당함
  • calc_mul
    • task_id : cal_m_id
    • python_callable에 위에서 정의한 calc_mul 함수를 사용
    • op_kwargs를 이용해 calc_mul에서 사용되는 변수의 값을 할당함

Python Operator를 구성할 때 python_callable에 call을 할 함수 등을 넣어줍니다. 또한, op_kwargs를 이용해서 값을 넘겨주는 것을 확인하시면 되겠습니다. 위 task operator가 이해되셨으면 나머지 operator도 충분히 이해가 되실 것 같습니다.

 

4. DAG task relationship 구성

이제 task끼리 관계를 구성합니다. 여기서 branch가 있으니 branch 예에서 보여준 것과 마찬가지로 relationship을 2개 설정합니다.

 

start >> branch >> calc_add >> msg >> complete_py >> complete
start >> branch >> calc_mul >> msg >> complete

branch에서 1이 나오면 cal_a_id를 return하고 2가 나오면 cal_m_id가 return됩니다. 즉, 1이 나오게 되면 calc_add 함수가 실행되는 경로로 이동이 되고 2가 나오면 calc_mul 함수가 실행되는 경로로 이동이 됩니다.

 

실행 결과

이제 위 dag 예제를 스케쥴러에 할당해 실행시킨 뒤 결과를 확인해보겠습니다.

그 전에 먼저 command line에서 airflow 명령어로 dag과 task를 살펴봅니다. 명령어를 입력하면 지난 포스팅과 본 포스팅에서 사용한 dag들이 나오는 것을 확인할 수 있습니다. airflow dags list를 통해 확인할 수 있습니다.

 

또한, 방금 만든 Python operator dag에 대해서 task를 살펴본 결과는 아래와 같습니다.

airflow tasks list tutorial-python-op --tree

이제 airflow dag 실행 결과와 log 등을 airflow web server를 실행시켜 web ui에서 확인해보겠습니다.

airflow web server를 실행시키고 들어가면 아래와 같이 추가된 DAG들을 확인할 수 있습니다.

300x250

이중에서 방금 작성한 Python operator를 활용한 dag example를 실행시키고 그 결과를 확인해보겠습니다.

python operator 예제를 확인시키면 branch에 따라 분기가 일어나고 분기에 따라 실행되는 task가 다릅니다. 

제가 실행시켰을 때는 calc_add가 실행되어서 위쪽 경로가 실행된 것을 확인할 수 있었습니다.

 

또한, tree 구조를 통해 어떻게 실행이 되는지, 언제 실행했고 그 결과는 어떠했는지 아래 사진과 같이 확인할 수도 있습니다.

 

마지막으로 dag task 실행 로그를 확인해보겠습니다.

코드를 보면 중간중간 print 문으로 출력하는 부분이 있는데요. 해당 print까지 잘 출력되었는지 확인해보겠습니다. 

 

아래 사진은 calc_add 함수가 실행되는 task의 로그입니다.

위 사진 빨간색 줄에서 확인할 수 있듯이 print 한 부분이 logger INFO로 출력된 것을 확인할 수 있습니다.

그 외에도 해당 task에서 수행된 각종 로그 정보를 전부 확인할 수 있습니다.

 

아래 사진은 print_result 함수를 호출하는 msg task입니다. 해당 msg task에서는 결과값을 XCom에서 pull 하여 변수에 넣고 그 변수를 출력합니다. 출력할 때 message : {결과값} 형태로 나오도록 구성했었습니다.

아래 빨간색 줄에서 나온 것처럼 결과 값이 XCom pull을 통해 나와서 출력된 것을 확인할 수 있습니다. 

이렇게 airflow에서는 dag 정보를 클릭하여 각종 task들의 상태, 로그 정보 등을 web ui로 쉽게 확인할 수 있습니다. 


마무리

본 포스팅에서는 Airflow task 관계(relationship) 중 branch(분기)하는 방법에 대해서 알아보았고 그때 Python Branch Operator를 활용했습니다. 또한, PythonOperator를 활용해 airflow dag task 구성을 Python으로 구성하고 이때 Python 함수를 호출하고 그 결과값을 XCom을 이용해 통신하여 마지막에 출력하는 예제를 살펴보았습니다. 

다음 글에서는 airflow의 Python Operator를 이용해 머신러닝( Machine Learning )을 실행하는 예제를 살펴보겠습니다.

긴 글 읽어주셔서 감사드리며 도움이 되시길 바랍니다.

반응형
그리드형
Comments