ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] - 3. Dags(Python operator)
    개발/Airflow 2022. 7. 8. 17:36

     

    Dag 파일을 만들어보고 WorkflowPythonoperator로 구현하는 예시를 보여드릴 예정입니다.

    틀린 내용이 있을 수 있으니 상세한 부분 Document를 참고하시는 것을 추천 드립니다.

     

    1. Dag 생성


    Dag file을 생성하기 위해서는 일단 Ubuntu의 Dags 폴더 File을 들어갑니다.

    왜 Dag 폴더를 이용하는 가는 실제 Docker Container가 실행되면 dags / logs / plugins 폴더를 Mount 하여 가져갑니다.

    ( 이부분은 Docker Container를 직접 접속하셔서 /opt/airflow/dags 에 들어가시면 확인 할 수 있습니다. )

    후에 Dag를 만들 Test.py 파일을 만듭니다.

    $ cd /dags
    $ sudo vi test.py

    이제부터는 어떤 방식으로 작업해야 할지에 대해 설명 드리겠습니다.

    How Dag Create?

    1. Define Default_args

    • owner(만든 사람), start_date(언제부터 시작하게 할것인지) 값을 설정하여 줍니다. 

    2. Create Dag Object

    • Dag_id, Schdule_interval 값을 설정하여 줍니다.

    3. Create Task

    • Operator를 이용하여 Task 생성하여 줍니다.

    4. Connect Task

    • '<<' , '>>' 를 사용하여 Task를 연결하여 줍니다.

    2. Dag Code


    Parameter가 무슨 의미를 하는지 살펴보겠습니다.

    airflow module을 통하여 DagPythonoperator를 부를 예정입니다.

    실제 Scheduler에게 어떤 시간에 동작하게 할 것인지 필요함으로 datetime module도 부를 예정입니다.

    추가적으로 Webserver에서 Log를 찍으면서 올바른 결과가 사용되지는도 볼 예정입니다.

     

    코드를 먼저 보는 편이 이해하기에 쉬우리라 생각되어 코드를 먼저 보여드리겠습니다.

    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime
    import logging
    
    #dag option init
    dag = DAG(
            dag_id='uk_dag_test_1', #id option
            schedule_interval='*/60 * * * *', #(매 10분마다 실행) / schedule_interval='@daily' (time 주기)
            start_date = datetime(2022,7,8), #time start / 금일부터 실행하고 싶다면 금일 전날짜로 set
            tags=['uksung_test']
            )
    
    #Python
    def test(**context):
        print('test function start')
        parameter=context["params"]["parameter1"]
        logging.info(parameter)
        return parameter
    
    def test_parameter_trnasform(**context):
        transform_data = context['task_instance'].xcom_pull(task_ids='kimuksung_test')
        logging.info(transform_data)
        return True
    
    #DAG Task
    test_job = PythonOperator(
        #task id setting
    	task_id = 'kimuksung_test',
    	#python_callable param points to the function you want to run 
    	python_callable = test,
    	#dag param points to the DAG that this task is a part of
    	dag = dag,  
        #parameter setting
        params = {'parameter1':'pubg'},
        #parameter on/off
        provide_context=True
        )
    
    test_transform = PythonOperator(
        #task id setting
    	task_id = 'kimuksung_transform',
    	#python_callable param points to the function you want to run 
    	python_callable = test_parameter_trnasform,
    	#dag param points to the DAG that this task is a part of
    	dag = dag,
        #parameter on/off
        provide_context=True
        )
    
    test_job >> test_transform

    이제 Dag parameter option을 살펴보겠습니다.

    1. dag_id = Webserver에서 보여지는 Dag_id

    2. tags = Webserver에서 보여지는 Tag

    3. start_time = Scheduler가 해당 Dag를 언제부터 실행시킬 것인지

    4. scheduler_interval = scheduler가 얼마만큼의 Time 주기마다 Dag를 실행할 것인지

     

    다음으로 PythonOperator를 살펴보겠습니다.

    1. task_id = Dag 내에서 실행될 Task_id 입니다.

    2. python_callble = Task가 실행될 때 호출하는 Function 

    3. dag = 위에서 설정한 Dag 설정값을 호출

    4. provide_context = Parameter 전달 여부 ( True / False )

    5. params = Parameter value

     

    제일 아래에 있는 test_job >> test_transform 는 Task가 실행될 순서를 보여줍니다.

     

    3. Webserver-Dags


    위처럼 만들었다면 이제 들어가서 확인을 해 볼 차례입니다.

    Webserver에 들어가서 현재까지 내용들을 살펴보겠습니다.

    아래 그림과 같이 현재 Dags가 만들어진 것을 볼 수 있습니다.

    http://localhost:8080/home

    Default

    후에 왼쪽 버튼을 눌러 활성화를 시켜주시면 Dags내에서 scheduler에 맞춰 한시간 마다 돌아갑니다.

    실행 전
    실행 완료

     

    4. Webserver Graph / Log / xcom


    4-1) Graph

    실행은 완료하였으니 내부적으로 제대로 동작하는지 확인하여 봅니다.

    Graph를 살펴보면 코드에서 구현한 바와 같이 kimuksung_test 실행 후 kimuksung_transform으로 실행된다는 것을 알 수 있습니다. (이런 흐름도를 파악할 수 있는게 장점이죠 )

     

    4-2) Log

    다음 Log를 확인하여보겠습니다.

    아래 빨간색 줄을 확인 해보시면 값이 제대로 나온것을 알 수 있습니다.

    kimuksung_test
    kimuksung_transform

    4-3) Xcom ( output )

    마지막으로 결과값이 제대로 나오는지 확인하여 보겠습니다.

    Xcom을 통하여 다른 함수의 결과값을 가지갈 수 있는데 이 부분은 Task : kimuksung_transform 을 참고하시면 됩니다.

    댓글

Designed by Tistory.