-
[Airflow] - 6. Connect Postgresql개발/Airflow 2022. 7. 19. 15:14
지금까지 Psql 관련하여 Volume 설정 및 간단한 쿼리문을 알아보았습니다.
이제 부터는 Airflow를 통해 Postgresql에 DB에 반영하기 위해 Connect 및 DB생성 및 Data Insert 까지 하는 과정을 공유드리려고 합니다.
금일 보여드릴 부분은 DB Cluster 관련된 부분은 반영하지 않습니다.
1. Airflow Setting ( docker-compose.yaml )
docker-compose.yaml
docker-compose.yaml 을 열어 아래 부분을 편집하여 줍니다.
1. 먼저 연결할 DB를 설정하여 줍니다.
Airflow가 연결된 DB가 SQLite와 같이 Postgresql이 아니라면 아래와 같이 변경하여 줍니다.
AIRFLOW__CORE__SQL_ALCHEMY_CONN :postgresql+psycopg2://airflow:airflow@postgres/airflow
2. Connect를 하기 위해서는 service(Postgresql)에 Port 번호를 열어 연결이 가능하도록 설정하게 해주어야 합니다.
postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data ports: - 5432:5432
2. DBeavor
실제 Database가 잘 연결되었는지 및 Test를 확인하기 위한 용도로 DBeaver 프로그램을 설치하겠습니다. ( 링크 )
자신의 OS에 맞게 설치하여 줍니다.
이제부터는 Postgresql을 연동하여 보겠습니다.
아래 그림과 같이 진행하여 주시면 됩니다.
step 1 step 2 이제부터는 Postgresql 연결 설정 값을 넣어주어야 합니다.
아래와 같이 설정 후에 Username 및 Password에는 Airflow 기본값인 airflow / airflow를 넣어주시면 됩니다.
혹은 별도로 설정하신거라면 알맞은 ID / Pw를 넣어주시면 됩니다.
step 3 step 4 이제 잘 연결 되는지 Test Connection 버튼을 눌러 확인하여 봅니다.
만약 연결이 잘되었다면 아래 그림과 같이 나타나야 합니다.
step 5 step 6 여기까지 완료하였으면, 이제 DB관련 설정을 여기서도 설정이 가능합니다. 편하신대로 사용하여 주시면 됩니다.
3. Airflow Connection
이제 Airflow에서 연결하기 위해서 Localhost Web에 들어가 확인할 것입니다.
CLI 페이지에 들어가 Admin -> Connections -> + 버튼을 눌러 줍니다.
이제 부터 Postgresql 연결할 값들을 지정해 주어야 합니다.
- Connection Id = 원하시는 대로 Naming 하여도 됩니다. 나중에 Operator에 사용
- Connection Type = 사용할 DB Postgres를 선택하여 줍니다.
- Host = Docker-compose.yaml -> service 아래에 있는 postgres라는 값 혹은 host.docker.internal 로 지정하여 주면 됩니다.
- Schema = DB내에 Database를 설정하여 주시면 됩니다.
- Login / Password = Airflow 사용자를 별도로 만들지 않았다면, airflow / airflow로 지정하여 줍니다.
- Port 번호 = Docker-compose.yaml -> service -> Port 번호를 넣어주시면 됩니다.
4. Posgres Operator를 통한 DB 반영
마지막으로 실제 Airflow를 통해 반영하여 보겠습니다.
Postgres Operator 사용하여 아래 코드와 같이 사용하시면 됩니다.
주의할 점은 postgres_conn_id 만 위에서 설정한 Connection Id 값으로 설정하여 주면 됩니다.
from datetime import datetime, timedelta from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator default_args = { 'owner': 'kimuksung2', 'retries': 5, 'retry_delay': timedelta(minutes=5) } with DAG( dag_id='dag_with_postgres_operator', default_args=default_args, start_date=datetime(2022, 7, 18), schedule_interval='0 0 * * *' ) as dag: #1. Create Table task1 = PostgresOperator( task_id='create_postgres_table', postgres_conn_id='postgres_localhost', sql=""" create table if not exists dag_runs ( dt date, dag_id character varying, primary key (dt, dag_id) ) """ ) #2. Insert Table task2 = PostgresOperator( task_id='insert_into_table', postgres_conn_id='postgres_localhost', sql=""" insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id }}') """ ) #3. Delete Table task3 = PostgresOperator( task_id='delete_data_from_table', postgres_conn_id='postgres_localhost', sql=""" delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id }}'; """ ) task1 >> task2 >> task3
'개발 > Airflow' 카테고리의 다른 글
[Airflow] - what is Airflow (0) 2022.07.30 [Airflow] - 7. Operator & Xcoms (0) 2022.07.20 [Airflow] - 5.Postgresql 설치 및 DB setting (0) 2022.07.13 [Airflow&Python] - 4. Template variable(ts/ds/.. ) & datetime & KST 변경 (0) 2022.07.08 [Airflow] - 3. Dags(Python operator) (0) 2022.07.08