-
[Airflow] - 6. Connect Postgresql개발/Airflow 2022. 7. 19. 15:14
지금까지 Psql 관련하여 Volume 설정 및 간단한 쿼리문을 알아보았습니다.
이제 부터는 Airflow를 통해 Postgresql에 DB에 반영하기 위해 Connect 및 DB생성 및 Data Insert 까지 하는 과정을 공유드리려고 합니다.
금일 보여드릴 부분은 DB Cluster 관련된 부분은 반영하지 않습니다.
1. Airflow Setting ( docker-compose.yaml )
docker-compose.yaml
docker-compose.yaml 을 열어 아래 부분을 편집하여 줍니다.
1. 먼저 연결할 DB를 설정하여 줍니다.
Airflow가 연결된 DB가 SQLite와 같이 Postgresql이 아니라면 아래와 같이 변경하여 줍니다.
AIRFLOW__CORE__SQL_ALCHEMY_CONN :postgresql+psycopg2://airflow:airflow@postgres/airflow
2. Connect를 하기 위해서는 service(Postgresql)에 Port 번호를 열어 연결이 가능하도록 설정하게 해주어야 합니다.
postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data ports: - 5432:5432
2. DBeavor
실제 Database가 잘 연결되었는지 및 Test를 확인하기 위한 용도로 DBeaver 프로그램을 설치하겠습니다. ( 링크 )
자신의 OS에 맞게 설치하여 줍니다.
이제부터는 Postgresql을 연동하여 보겠습니다.
아래 그림과 같이 진행하여 주시면 됩니다.
이제부터는 Postgresql 연결 설정 값을 넣어주어야 합니다.
아래와 같이 설정 후에 Username 및 Password에는 Airflow 기본값인 airflow / airflow를 넣어주시면 됩니다.
혹은 별도로 설정하신거라면 알맞은 ID / Pw를 넣어주시면 됩니다.
이제 잘 연결 되는지 Test Connection 버튼을 눌러 확인하여 봅니다.
만약 연결이 잘되었다면 아래 그림과 같이 나타나야 합니다.
여기까지 완료하였으면, 이제 DB관련 설정을 여기서도 설정이 가능합니다. 편하신대로 사용하여 주시면 됩니다.
3. Airflow Connection
이제 Airflow에서 연결하기 위해서 Localhost Web에 들어가 확인할 것입니다.
CLI 페이지에 들어가 Admin -> Connections -> + 버튼을 눌러 줍니다.
이제 부터 Postgresql 연결할 값들을 지정해 주어야 합니다.
- Connection Id = 원하시는 대로 Naming 하여도 됩니다. 나중에 Operator에 사용
- Connection Type = 사용할 DB Postgres를 선택하여 줍니다.
- Host = Docker-compose.yaml -> service 아래에 있는 postgres라는 값 혹은 host.docker.internal 로 지정하여 주면 됩니다.
- Schema = DB내에 Database를 설정하여 주시면 됩니다.
- Login / Password = Airflow 사용자를 별도로 만들지 않았다면, airflow / airflow로 지정하여 줍니다.
- Port 번호 = Docker-compose.yaml -> service -> Port 번호를 넣어주시면 됩니다.
4. Posgres Operator를 통한 DB 반영
마지막으로 실제 Airflow를 통해 반영하여 보겠습니다.
Postgres Operator 사용하여 아래 코드와 같이 사용하시면 됩니다.
주의할 점은 postgres_conn_id 만 위에서 설정한 Connection Id 값으로 설정하여 주면 됩니다.
from datetime import datetime, timedelta from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator default_args = { 'owner': 'kimuksung2', 'retries': 5, 'retry_delay': timedelta(minutes=5) } with DAG( dag_id='dag_with_postgres_operator', default_args=default_args, start_date=datetime(2022, 7, 18), schedule_interval='0 0 * * *' ) as dag: #1. Create Table task1 = PostgresOperator( task_id='create_postgres_table', postgres_conn_id='postgres_localhost', sql=""" create table if not exists dag_runs ( dt date, dag_id character varying, primary key (dt, dag_id) ) """ ) #2. Insert Table task2 = PostgresOperator( task_id='insert_into_table', postgres_conn_id='postgres_localhost', sql=""" insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id }}') """ ) #3. Delete Table task3 = PostgresOperator( task_id='delete_data_from_table', postgres_conn_id='postgres_localhost', sql=""" delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id }}'; """ ) task1 >> task2 >> task3
'개발 > Airflow' 카테고리의 다른 글
[Airflow] - what is Airflow (0) 2022.07.30 [Airflow] - 7. Operator & Xcoms (0) 2022.07.20 [Airflow] - 5.Postgresql 설치 및 DB setting (0) 2022.07.13 [Airflow&Python] - 4. Template variable(ts/ds/.. ) & datetime & KST 변경 (0) 2022.07.08 [Airflow] - 3. Dags(Python operator) (0) 2022.07.08