개발/Airflow

[Airflow] - 3. Dags(Python operator)

Dortmoot 2022. 7. 8. 17:36

 

Dag 파일을 만들어보고 WorkflowPythonoperator로 구현하는 예시를 보여드릴 예정입니다.

틀린 내용이 있을 수 있으니 상세한 부분 Document를 참고하시는 것을 추천 드립니다.

 

1. Dag 생성


Dag file을 생성하기 위해서는 일단 Ubuntu의 Dags 폴더 File을 들어갑니다.

왜 Dag 폴더를 이용하는 가는 실제 Docker Container가 실행되면 dags / logs / plugins 폴더를 Mount 하여 가져갑니다.

( 이부분은 Docker Container를 직접 접속하셔서 /opt/airflow/dags 에 들어가시면 확인 할 수 있습니다. )

후에 Dag를 만들 Test.py 파일을 만듭니다.

$ cd /dags
$ sudo vi test.py

이제부터는 어떤 방식으로 작업해야 할지에 대해 설명 드리겠습니다.

How Dag Create?

1. Define Default_args

  • owner(만든 사람), start_date(언제부터 시작하게 할것인지) 값을 설정하여 줍니다. 

2. Create Dag Object

  • Dag_id, Schdule_interval 값을 설정하여 줍니다.

3. Create Task

  • Operator를 이용하여 Task 생성하여 줍니다.

4. Connect Task

  • '<<' , '>>' 를 사용하여 Task를 연결하여 줍니다.

2. Dag Code


Parameter가 무슨 의미를 하는지 살펴보겠습니다.

airflow module을 통하여 DagPythonoperator를 부를 예정입니다.

실제 Scheduler에게 어떤 시간에 동작하게 할 것인지 필요함으로 datetime module도 부를 예정입니다.

추가적으로 Webserver에서 Log를 찍으면서 올바른 결과가 사용되지는도 볼 예정입니다.

 

코드를 먼저 보는 편이 이해하기에 쉬우리라 생각되어 코드를 먼저 보여드리겠습니다.

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import logging

#dag option init
dag = DAG(
        dag_id='uk_dag_test_1', #id option
        schedule_interval='*/60 * * * *', #(매 10분마다 실행) / schedule_interval='@daily' (time 주기)
        start_date = datetime(2022,7,8), #time start / 금일부터 실행하고 싶다면 금일 전날짜로 set
        tags=['uksung_test']
        )

#Python
def test(**context):
    print('test function start')
    parameter=context["params"]["parameter1"]
    logging.info(parameter)
    return parameter

def test_parameter_trnasform(**context):
    transform_data = context['task_instance'].xcom_pull(task_ids='kimuksung_test')
    logging.info(transform_data)
    return True

#DAG Task
test_job = PythonOperator(
    #task id setting
	task_id = 'kimuksung_test',
	#python_callable param points to the function you want to run 
	python_callable = test,
	#dag param points to the DAG that this task is a part of
	dag = dag,  
    #parameter setting
    params = {'parameter1':'pubg'},
    #parameter on/off
    provide_context=True
    )

test_transform = PythonOperator(
    #task id setting
	task_id = 'kimuksung_transform',
	#python_callable param points to the function you want to run 
	python_callable = test_parameter_trnasform,
	#dag param points to the DAG that this task is a part of
	dag = dag,
    #parameter on/off
    provide_context=True
    )

test_job >> test_transform

이제 Dag parameter option을 살펴보겠습니다.

1. dag_id = Webserver에서 보여지는 Dag_id

2. tags = Webserver에서 보여지는 Tag

3. start_time = Scheduler가 해당 Dag를 언제부터 실행시킬 것인지

4. scheduler_interval = scheduler가 얼마만큼의 Time 주기마다 Dag를 실행할 것인지

 

다음으로 PythonOperator를 살펴보겠습니다.

1. task_id = Dag 내에서 실행될 Task_id 입니다.

2. python_callble = Task가 실행될 때 호출하는 Function 

3. dag = 위에서 설정한 Dag 설정값을 호출

4. provide_context = Parameter 전달 여부 ( True / False )

5. params = Parameter value

 

제일 아래에 있는 test_job >> test_transform 는 Task가 실행될 순서를 보여줍니다.

 

3. Webserver-Dags


위처럼 만들었다면 이제 들어가서 확인을 해 볼 차례입니다.

Webserver에 들어가서 현재까지 내용들을 살펴보겠습니다.

아래 그림과 같이 현재 Dags가 만들어진 것을 볼 수 있습니다.

http://localhost:8080/home

Default

후에 왼쪽 버튼을 눌러 활성화를 시켜주시면 Dags내에서 scheduler에 맞춰 한시간 마다 돌아갑니다.

실행 전
실행 완료

 

4. Webserver Graph / Log / xcom


4-1) Graph

실행은 완료하였으니 내부적으로 제대로 동작하는지 확인하여 봅니다.

Graph를 살펴보면 코드에서 구현한 바와 같이 kimuksung_test 실행 후 kimuksung_transform으로 실행된다는 것을 알 수 있습니다. (이런 흐름도를 파악할 수 있는게 장점이죠 )

 

4-2) Log

다음 Log를 확인하여보겠습니다.

아래 빨간색 줄을 확인 해보시면 값이 제대로 나온것을 알 수 있습니다.

kimuksung_test
kimuksung_transform

4-3) Xcom ( output )

마지막으로 결과값이 제대로 나오는지 확인하여 보겠습니다.

Xcom을 통하여 다른 함수의 결과값을 가지갈 수 있는데 이 부분은 Task : kimuksung_transform 을 참고하시면 됩니다.