-
[Airflow] - 7. Operator & Xcoms개발/Airflow 2022. 7. 20. 16:43
현재까지 Docker 위에 Airflow + Postgres를 이용하여 연동시켜 DB(Postgres) 적용시켜 보았습니다.
이제는 더 나아가 업비트 Api를 호출하여 비트코인 Postgres에 데이터를 수집하려고 합니다.
각 Task는 1. Table 생성 2. Call Api 3. Store DB 순으로 생각하여 만들었습니다.
Airflow Operator는 Python Operator와 Postgres Operator를 두개를 사용하였습니다.
1. What is Xcoms?
들어가기 앞서, Xcoms에 대해 먼저 설명하려고 합니다.
a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.
즉, Task 간에 통신을 도와주는 개념입니다. key / task_id / dag_id 를 통해 식별이 가능합니다.
Api를 호출하여 얻은 결과 값을 Xcoms을 통해 전달하여 DB에 반영하도록 도와줍니다.
기본 문법은 아래 Jinja Template을 이용하여 사용하면 됩니다.
# Pulls the return_value XCOM from "pushing_task" value = task_instance.xcom_pull(task_ids='pushing_task') SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
2. Python Operator & Postgres Opertaor
이제부터는 위에서 말한 구현 내용을 보여드리려고 합니다.
아래 코드는 매시간 별로 비트코인 데이터를 수집한 코드입니다.
그림과 같이 이상 없이 잘 돌아가는 것을 볼 수 있습니다.
결과 코드 내용에 대해 전달드리면, 매 시간 마다 Scheduling을 하여 데이터를 수집하였습니다.
bitcoin_extract 함수는 Upbit 쪽으로 Api를 날려 Data를 Return 받습니다.
ts parameter는 Airflow template refernce 에서 나오는 내용이니 참고하시면 편합니다.
여기서 주의할 점은 Postgres Operator에서는 postgres_conn_id 를 설정을 미리 해주셔야 한다는 점입니다. 참고
from datetime import datetime, timedelta import logging , requests , json from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.operators.python_operator import PythonOperator # created by kimuksung # 목적 : Upbit bitcoin data collect # Python -> Call Api -> DB(postgres) 반영 default_args = { 'owner': 'kimuksung2', 'retries': 5, 'retry_delay': timedelta(minutes=5) } def bitcoin_extract(ts): time = ts.split('+')[0] minute = "1" market='KRW-BTC' to = time+'Z' count = "1" url = "https://api.upbit.com/v1/candles/minutes/"+minute+"?market="+market+"&to="+to+"&count="+count logging.info(url) headers = {"Accept": "application/json"} response = requests.get(url, headers=headers) json_response = json.loads(response.text) return json_response with DAG( dag_id='Upbit_api', default_args=default_args, start_date=datetime(2022, 7, 19 , 8 , 1 , 00), schedule_interval='0 * * * *' #every hour ) as dag: # task1 - Create Coin Table create_coin_table = PostgresOperator( task_id='create_table_coin', postgres_conn_id='postgres_xcom_test', sql=""" create table if not exists bitcoin ( name text, time_utc timestamp, time_ktc timestamp, opening_price double precision, high_price double precision, low_price double precision, trade_price double precision, primary key (time_ktc, trade_price) ) """ ) # task2 - Call Api Data extract_bitcoin = PythonOperator( task_id = 'upbit_bitcoin_api_extract', python_callable = bitcoin_extract ) # task3 - Coin data Store DB insert_bitcoin = PostgresOperator( task_id='insert_bitcoin', postgres_conn_id='postgres_xcom_test', sql=""" insert into bitcoin (name , time_utc , time_ktc , opening_price , high_price , low_price , trade_price ) values ( '{{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['market']}}', '{{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['candle_date_time_utc']}}', '{{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['candle_date_time_kst']}}', {{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['opening_price']}}, {{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['high_price']}}, {{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['low_price']}}, {{task_instance.xcom_pull(task_ids='upbit_bitcoin_api_extract')[0]['trade_price']}} ) """ ) create_coin_table >> extract_bitcoin >> insert_bitcoin
'개발 > Airflow' 카테고리의 다른 글
[Airflow] - Commands (0) 2022.08.03 [Airflow] - what is Airflow (0) 2022.07.30 [Airflow] - 6. Connect Postgresql (0) 2022.07.19 [Airflow] - 5.Postgresql 설치 및 DB setting (0) 2022.07.13 [Airflow&Python] - 4. Template variable(ts/ds/.. ) & datetime & KST 변경 (0) 2022.07.08