-
[Airflow] - 3. Dags(Python operator)개발/Airflow 2022. 7. 8. 17:36
Dag 파일을 만들어보고 Workflow를 Pythonoperator로 구현하는 예시를 보여드릴 예정입니다.
틀린 내용이 있을 수 있으니 상세한 부분 Document를 참고하시는 것을 추천 드립니다.
1. Dag 생성
Dag file을 생성하기 위해서는 일단 Ubuntu의 Dags 폴더 File을 들어갑니다.
왜 Dag 폴더를 이용하는 가는 실제 Docker Container가 실행되면 dags / logs / plugins 폴더를 Mount 하여 가져갑니다.
( 이부분은 Docker Container를 직접 접속하셔서 /opt/airflow/dags 에 들어가시면 확인 할 수 있습니다. )
후에 Dag를 만들 Test.py 파일을 만듭니다.
$ cd /dags $ sudo vi test.py
이제부터는 어떤 방식으로 작업해야 할지에 대해 설명 드리겠습니다.
How Dag Create?
1. Define Default_args
- owner(만든 사람), start_date(언제부터 시작하게 할것인지) 값을 설정하여 줍니다.
2. Create Dag Object
- Dag_id, Schdule_interval 값을 설정하여 줍니다.
3. Create Task
- Operator를 이용하여 Task 생성하여 줍니다.
4. Connect Task
- '<<' , '>>' 를 사용하여 Task를 연결하여 줍니다.
2. Dag Code
Parameter가 무슨 의미를 하는지 살펴보겠습니다.
airflow module을 통하여 Dag 와 Pythonoperator를 부를 예정입니다.
실제 Scheduler에게 어떤 시간에 동작하게 할 것인지 필요함으로 datetime module도 부를 예정입니다.
추가적으로 Webserver에서 Log를 찍으면서 올바른 결과가 사용되지는도 볼 예정입니다.
코드를 먼저 보는 편이 이해하기에 쉬우리라 생각되어 코드를 먼저 보여드리겠습니다.
from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import logging #dag option init dag = DAG( dag_id='uk_dag_test_1', #id option schedule_interval='*/60 * * * *', #(매 10분마다 실행) / schedule_interval='@daily' (time 주기) start_date = datetime(2022,7,8), #time start / 금일부터 실행하고 싶다면 금일 전날짜로 set tags=['uksung_test'] ) #Python def test(**context): print('test function start') parameter=context["params"]["parameter1"] logging.info(parameter) return parameter def test_parameter_trnasform(**context): transform_data = context['task_instance'].xcom_pull(task_ids='kimuksung_test') logging.info(transform_data) return True #DAG Task test_job = PythonOperator( #task id setting task_id = 'kimuksung_test', #python_callable param points to the function you want to run python_callable = test, #dag param points to the DAG that this task is a part of dag = dag, #parameter setting params = {'parameter1':'pubg'}, #parameter on/off provide_context=True ) test_transform = PythonOperator( #task id setting task_id = 'kimuksung_transform', #python_callable param points to the function you want to run python_callable = test_parameter_trnasform, #dag param points to the DAG that this task is a part of dag = dag, #parameter on/off provide_context=True ) test_job >> test_transform
이제 Dag parameter option을 살펴보겠습니다.
1. dag_id = Webserver에서 보여지는 Dag_id
2. tags = Webserver에서 보여지는 Tag
3. start_time = Scheduler가 해당 Dag를 언제부터 실행시킬 것인지
4. scheduler_interval = scheduler가 얼마만큼의 Time 주기마다 Dag를 실행할 것인지
다음으로 PythonOperator를 살펴보겠습니다.
1. task_id = Dag 내에서 실행될 Task_id 입니다.
2. python_callble = Task가 실행될 때 호출하는 Function
3. dag = 위에서 설정한 Dag 설정값을 호출
4. provide_context = Parameter 전달 여부 ( True / False )
5. params = Parameter value
제일 아래에 있는 test_job >> test_transform 는 Task가 실행될 순서를 보여줍니다.
3. Webserver-Dags
위처럼 만들었다면 이제 들어가서 확인을 해 볼 차례입니다.
Webserver에 들어가서 현재까지 내용들을 살펴보겠습니다.
아래 그림과 같이 현재 Dags가 만들어진 것을 볼 수 있습니다.
http://localhost:8080/home
Default 후에 왼쪽 버튼을 눌러 활성화를 시켜주시면 Dags내에서 scheduler에 맞춰 한시간 마다 돌아갑니다.
실행 전 실행 완료 4. Webserver Graph / Log / xcom
4-1) Graph
실행은 완료하였으니 내부적으로 제대로 동작하는지 확인하여 봅니다.
Graph를 살펴보면 코드에서 구현한 바와 같이 kimuksung_test 실행 후 kimuksung_transform으로 실행된다는 것을 알 수 있습니다. (이런 흐름도를 파악할 수 있는게 장점이죠 )
4-2) Log
다음 Log를 확인하여보겠습니다.
아래 빨간색 줄을 확인 해보시면 값이 제대로 나온것을 알 수 있습니다.
kimuksung_test kimuksung_transform 4-3) Xcom ( output )
마지막으로 결과값이 제대로 나오는지 확인하여 보겠습니다.
Xcom을 통하여 다른 함수의 결과값을 가지갈 수 있는데 이 부분은 Task : kimuksung_transform 을 참고하시면 됩니다.
'개발 > Airflow' 카테고리의 다른 글
[Airflow] - 5.Postgresql 설치 및 DB setting (0) 2022.07.13 [Airflow&Python] - 4. Template variable(ts/ds/.. ) & datetime & KST 변경 (0) 2022.07.08 [Airflow] Why Apache Airflow? (0) 2022.07.06 [Airflow] 2. Vscode로 Dags 관리 (0) 2022.07.03 [Airflow] - 1. Airflow 설치 (0) 2022.07.03