세로형
Recent Posts
Recent Comments
Link
12-04 00:53
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
Archives
Today
Total
관리 메뉴

꿈 많은 사람의 이야기

Apache 에어플로우(Airflow) 예제(example) - slack으로 알람, 메세지 받기 본문

Data Engineering 및 Infra

Apache 에어플로우(Airflow) 예제(example) - slack으로 알람, 메세지 받기

이수진의 블로그 2022. 3. 21. 08:37
반응형
728x170

포스팅 개요

본 포스팅은 Apache Airflow(에어플로우)에 대해서 정리하는 Airflow 시리즈 포스팅입니다.

Airflow 포스팅에서는 아래와 같은 순서로 Airflow에 대해서 정리해보려고 합니다.

 

  1. Airflow란 무엇인가? Airflow 설치 방법과 간단한 예제 ( https://lsjsj92.tistory.com/631 )
  2. Airflow branch(분기) 예제 및 airflow 파이썬(Python) operator 활용 간단 예제 ( https://lsjsj92.tistory.com/632 )
  3. Airflow를 활용한 머신러닝 ( Machine Learning ) 예제 ( https://lsjsj92.tistory.com/633 )
  4. Airflow slack 메세지로 알람 받기 예제 ( 본 포스팅 )

위와 같이 총 4가지의 글을 작성할 예정입니다. 본 글은 Airflow 시리즈 마지막 글인 네 번째 포스팅으로 Airflow와 slack sdk을 활용하여  Machine Learning 결과를 slack 메세지로 전달 받는 airflow 예제(example) 코드를 살펴보도록 하겠습니다.

 

해당 포스팅을 작성하면서 참고한 글은 아래와 같습니다.

본 포스팅 시리즈에서 사용되는 코드는 아래 github repo에서 확인하실 수 있습니다.

 

GitHub - lsjsj92/airflow_tutorial: python airflow tutorial and example

python airflow tutorial and example. Contribute to lsjsj92/airflow_tutorial development by creating an account on GitHub.

github.com


포스팅 본문

업무를 하다보면 배치성으로 돌아가는 코드의 결과 또는 상태에 대한 알람, machine learning 또는 deep learning 훈련 결과에 대한 알람 등 다양하게 alarm을 받을 필요성이 있습니다. airflow는 배치 스케줄로 돌아가는 code task를 관리할 수 있기 때문에 이러한 알람 기능을 같이 사용한다면 편리하게 모니터링 할 수 있을겁니다. 포스팅 개요에서도 언급하였듯이 이번 포스팅은 airflow와 slack sdk를 활용하여 머신러닝(Machine Learning) 예제(example) 결과를 slack message(alarm)를 받는 airflow 예제를 살펴봅니다.

 

본 포스팅에서 진행한 저의 Airflow와 Python 환경은 다음과 같습니다.

  • Airflow 설치된 경로 : /Users/user_name/airflow
  • Python 버전 : Python3.8
  • Airflow 버전 : 2.2.3
  • OS : Mac Pro

Slack APP install 및 bot token 부여 받기

slack으로 알람을 받으려면 가장 먼저 slack app을 설치하고 bot을 사용할 수 있는 token을 부여 받아야 합니다. 이를 위해 slack api 사이트 (https://api.slack.com/)로 들어가서 아래 사진과 같은 흐름으로 app과 token을 부여 받습니다.

 

일단, slack workspace는 만들었다고 가정한 뒤 진행합니다. work space 만드는 것은 3분 정도면 간단하게 만들 수 있기 때문에 빠르게 만드시면 됩니다!

 

해당 사이트로 workspace를 달고 들어가면 create a slack app이라는 알람이 나옵니다. 여기서 app name을 지정해주고 workspace를 정한 뒤 app을 생성해줍니다.

 

그 다음 slack api 사이트 아래에 Oauth & Permissions라는 탭이 있는데요. 여기로 들어갑니다. 그러면 아래 사진과 같은 화면이 나오는 것을 확인할 수 있습니다.

 

해당 메뉴에서 아래로 내리면 Scopes 섹션이 있습니다. 여기서 Bot Token Scopes 부분에서 Add an OAuth Scope를 클릭하여 아래 2개의 OAuth Scope를 추가해줍니다.

  • chat:write
  • im:write

 

위의 OAuth가 추가 되었으면 다시 해당 메뉴의 최상단 위로 올라가서 install to workspace 버튼을 클릭해줍니다!

이때 오른쪽 사진과 같이 허용할 것인지 팝업이 나오는데요. 허용 버튼을 눌러주면 됩니다.

그러면 아래와 같이 OAuth tokens for your workspace 섹션에 Bot user OAuth Token 이름 아래 토큰 번호가 발급된 것을 확인할 수 있습니다. 

이렇게 토큰 번호를 발급 받았으면 Python 환경에서 이 토큰 번호를 활용해 Slack SDK와 연동하여 slack에 알람 또는 메세지를 보낼 수 있습니다.

 

Python에서 Slack SDK를 이용한 Alarm 테스트

이제 이렇게 발급 받은 Bot Token을 이용해 파이썬 환경에서 테스트 해보겠습니다.

가장먼저 slack sdk 라이브러리를 설치해줘야 합니다. 아래와 같은 명령어로 사용하시는 파이썬 환경에 라이브러리를 설치해줍니다. 

 

pip install slack_sdk

이제 Python 코드를 통해 slack에 message가 가는지 확인해보겠습니다. slack에 메세지를 보내는 파이썬 코드는 다음과 같습니다.

from slack_sdk import WebClient
client = WebClient(token='your-token')
client.chat_postMessage(channel='#your-channel', text="test msg!")

 

반응형

 

그러면 위와 같이 slack_sdk.errors.SlackAPIError : the requenst to the Slack API failed 라는 에러 메세지가 나오면서 the server responded with error : not_in_channel이라는 메세지가 나옵니다. 즉, 발급 받은 token의 bot이 제가 사용하려는 채널에 포함되어 있지 않다는 것입니다. 이를 위해서 아래와 같이 slack bot을 channel에 추가해줍니다.

 

이렇게 제가 원하는 채널 ( 저는 alert_msg 채널명입니다. )에 bot을 추가 해준 뒤 다시 한 번 파이썬 환경에서 slack api로 메세지를 전달해보겠습니다. 

 

그러면 위 사진과 같이 메세지가 전달된 것을 확인할 수 있습니다!

여기까지 되시면 Python을 활용해 slack message를 전달할 수 있는 준비가 완료됩니다.

 

Airflow와 Slack SDK를 활용한 머신러닝 결과 알람 받기 예제 코드

이제 Airflow 환경에서 DAG를 실행했을 때 그 결과에 대한 메세지를 slack으로 message를 전달하려고 합니다. 일종의 알람(alarm)이라고 생각하시면 될 것 같습니다. 여기서 사용한 airflow dag는 이전 포스팅에서 살펴본 타이타닉 머신러닝 dag를 사용합니다. 즉, airflow 환경에서 관리하는 machine learning dag process가 실행 되었을 때 잘 동작이 되었는지 slack으로 메세지가 와서 알람을 받도록 합니다. 

 

여기서 코드 설명은 slack api와 관련된 설명을 위주로 진행합니다. airflow 환경에서 titanic machine learning을 동작하는 dag 코드가 궁금하신 분들은 포스팅 개요에 올려둔 저의 이전 포스팅을 참고해주시면 감사하겠습니다.

 

또한, 본 포스팅에서 사용한 전체 코드는 포스팅 개요에 올려둔 저의 github에 모든 코드를 올려두었으니 참고 부탁드립니다.

 

가장 먼저 저의 전체 코드는 다음과 같은 구조로 되어 있습니다.

 

빨간색 밑줄이 이번 포스팅에서 사용되는 메인 코드들입니다. 그 중 보셔야 할 부분이 slack sdk와 관련된 코드는 utils 아래에 slack_alert.py로 따로 빼내어 저장해둔 것입니다. 각 코드 설명은 아래와 같습니다.

 

slack_alert.py

from slack_sdk import WebClient
from datetime import datetime

class SlackAlert:
    def __init__(self, channel, token):
        self.channel = channel
        self.client = WebClient(token=token)

    def success_msg(self, msg):
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}
            alert : 
                Success! 
                    task id : {msg.get('task_instance').task_id}, 
                    dag id : {msg.get('task_instance').dag_id}, 
                    log url : {msg.get('task_instance').log_url}
            """
        self.client.chat_postMessage(channel=self.channel, text=text)

    def fail_msg(self, msg):
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}  
            alert : 
                Fail! 
                    task id : {msg.get('task_instance').task_id}, 
                    dag id : {msg.get('task_instance').dag_id}, 
                    log url : {msg.get('task_instance').log_url}
        """
        self.client.chat_postMessage(channel=self.channel, text=text)

slack_alert.py 파일은 위처럼 하나의 class로 이루어져 있고 success_msg와 fail_msg 함수를 가지고 있습니다. 함수 이름에 쓰여있다시피 success_msg는 성공 했을 때, fail_msg는 실패 했을 때 slack으로 메세지가 나가도록 합니다. 

 

airflow slack dag code(example_slack_dag.py)

이제 airflow dag 코드를 살펴보겠습니다. dag 코드는 2개의 파트로 나누어서 설명합니다. 

아래 코드는 그 중 첫 번째 파트로써 DAG를 정의하는 부분입니다. 

from MLproject.titanic import *
from utils.slack_alert import SlackAlert

titanic = TitanicMain()
slack = SlackAlert("#your-channel", "your-token")

def print_result(**kwargs):
    r = kwargs["task_instance"].xcom_pull(key='result_msg')
    print("message : ", r)

default_args = {
    'owner': 'soojin',
    'depends_on_past': False,
    'email': ['lsjsj92@naver.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
}

dag_args = dict(
    dag_id="tutorial-slack-ml-op",
    default_args=default_args,
    description='tutorial DAG ml with slack',
    schedule_interval=timedelta(minutes=50),
    start_date=datetime(2022, 2, 1),
    tags=['example-sj'],
    on_success_callback=slack.success_msg,
    on_failure_callback=slack.fail_msg
)

가장 먼저 필요한 라이브러리를 호출해줍니다. 파일 전체 구조를 보시면 아시겠지만 SlackAlert과 titanic 코드는 따로 정의되어 있기 때문에 해당 클래스를 import 해줍니다.

 

위 코드에서 보셔야 할 것이 dag_args에서 on_success_callback과 on_failure_callback 인자가 있습니다. 이 parameter에 위에서 정의한 slack success message, fail message 함수를 넣어줍니다. 

 

다음 파트는 airflow DAG를 정의하고 task relationship을 구성하는 부분입니다.

with DAG( **dag_args ) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start!"',
    )

    prepro_task = PythonOperator(
        task_id='preprocessing',
        python_callable=titanic.prepro_data,
        op_kwargs={'f_name': "train"}
    )
    
    modeling_task = PythonOperator(
        task_id='modeling',
        python_callable=titanic.run_modeling,
        op_kwargs={'n_estimator': 100, 'flag' : True}
    )

    msg = PythonOperator(
        task_id='msg',
        python_callable=print_result
    )

    complete = BashOperator(
        task_id='complete_bash',
        bash_command='echo "complete~!"',
    )

    start >> prepro_task >> modeling_task >> msg >> complete

이는 Python Operator를 사용해 DAG 안의 task를 정의하며 이때 titanic machine learning에 사용되는 각종 함수들을 python_callable에 넣어줍니다. 

 

마지막에는 각 DAG task끼리의 순서를 어떻게 진행할 것인지 relationship을 구성해줍니다.

 

Airflow DAG 실행 결과

위 airflow DAG를 airflow Scheduler에 등록시키고 실행시켜 봅니다. web ui를 보기 전 command line 명령어 airflow dags list 를 통해 dag가 등록된 것을 확인합니다. 

저는 dag_id를 tutorial-slcak-ml-op라고 지정했기 때문에 dag_id에 그렇게 나온 것을 확인할 수 있습니다.

또한, airflow web server를 실행시켜 web ui에서도 해당 dag를 확인할 수 있습니다.

300x250

 

이제 위 코드를 실행시켜봅니다.

여기서 먼저, slack_alert.py를 다음과 같이 수정해서 알람 메세지를 살펴보겠습니다.

 

slack_alert.py 임시 수정 코드

from slack_sdk import WebClient
from datetime import datetime

class SlackAlert:
    def __init__(self, channel, token):
        self.channel = channel
        self.client = WebClient(token=token)

    def success_msg(self, msg):
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}
            alert : Success! {msg}
            """
        self.client.chat_postMessage(channel=self.channel, text=text)

    def fail_msg(self, msg):
        text = f"""
            date : {datetime.today().strftime('%Y-%m-%d')}  
            alert : Fail! {msg}
        """
        self.client.chat_postMessage(channel=self.channel, text=text)

이렇게 수정 후 DAG를 실행시킵니다. 실행하면 타이타닉 머신러닝 프로세스가 동작이 하나하나 되는 것을 확인할 수 있습니다. 그리고 해당 DAG가 잘 마무리 되면 아래와 같이 알람이 오게 됩니다.

위 알람을 보시면 alert : Success! 까지 나오는데 뒤에 {msg} 부분이 알아보기 힘들게 나온 것을 확인할 수 있습니다. 사실 이 msg 자리에는 airflow의 context 값들이 넘어가게 됩니다. 그렇기 때문에 context에 있는 각종 값들이 위와 같이 다 나온 것을 확인할 수 있습니다.

 

그렇다면 우리가 필요한 정보만 뽑아서 확인하는 것이 좋겠죠? 따라서 slack_alert.py 코드를 맨 처음에 소개 해드렸던 코드 방법대로 다시 구성합니다. 이때 context에 있는 task_instatnce key에서 task_id와 dag_id, log_url을 가져오도록 합니다.

 

맨 처음 소개드렸던 slack_alert.py 코드로 수정한 후 다시 DAG를 실행시키면 아래와 같이 slack message가 오는 것을 확인할 수 있습니다.

 

log url을 들어가면 위 사진처럼 log 정보도 확인할 수 있습니다.

즉, 이 메세지는 airflow DAG 환경에서 titanic machine learning 코드가 잘 수행되어 성공 했기 때문에 slack으로 성공 했다는 메세지를 날짜와 task_id, dag_id, log_url 정보와 함께 제공해주는 것입니다.

 


마무리

본 포스팅은 지난 포스팅의 에어플로우 환경에서 타이타닉 머신러닝 코드를 동작시킨 코드에다가 slack sdk를 활용해 DAG의 성공 또는 실패 시 slack으로 메세지를 보내는 예제를 살펴보았습니다. 

본 포스팅을 마지막으로 airflow 기초 시리즈 포스팅을 마무리 합니다.

부디 도움이 되시길 바라겠습니다.

감사합니다.

 

반응형
그리드형
Comments