ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] - 6. Connect Postgresql
    개발/Airflow 2022. 7. 19. 15:14

    지금까지 Psql 관련하여 Volume 설정 및 간단한 쿼리문을 알아보았습니다.

     

    이제 부터는 Airflow를 통해 Postgresql에 DB에 반영하기 위해 Connect 및 DB생성 및 Data Insert 까지 하는 과정을 공유드리려고 합니다.

     

    금일 보여드릴 부분은 DB Cluster 관련된 부분은 반영하지 않습니다.

     

    1. Airflow Setting ( docker-compose.yaml )


    docker-compose.yaml

     

    docker-compose.yaml 을 열어 아래 부분을 편집하여 줍니다.

    1. 먼저 연결할 DB를 설정하여 줍니다. 

    Airflow가 연결된 DB가 SQLite와 같이 Postgresql이 아니라면 아래와 같이 변경하여 줍니다.

    AIRFLOW__CORE__SQL_ALCHEMY_CONN :postgresql+psycopg2://airflow:airflow@postgres/airflow

    2. Connect를 하기 위해서는 service(Postgresql)에 Port 번호를 열어 연결이 가능하도록 설정하게 해주어야 합니다.

    postgres:
        image: postgres:13
        environment:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        volumes:
          - postgres-db-volume:/var/lib/postgresql/data
        ports:
          - 5432:5432

    2. DBeavor


    실제 Database가 잘 연결되었는지 및 Test를 확인하기 위한 용도로 DBeaver 프로그램을 설치하겠습니다. ( 링크 )

    자신의 OS에 맞게 설치하여 줍니다.

     

    이제부터는 Postgresql을 연동하여 보겠습니다.

    아래 그림과 같이 진행하여 주시면 됩니다.

     

    step 1
    step 2

     

    이제부터는 Postgresql 연결 설정 값을 넣어주어야 합니다.

    아래와 같이 설정 후에 Username 및 Password에는 Airflow 기본값인 airflow / airflow를 넣어주시면 됩니다.

    혹은 별도로 설정하신거라면 알맞은 ID / Pw를 넣어주시면 됩니다.

     

    step 3
    step 4

     

    이제 잘 연결 되는지 Test Connection 버튼을 눌러 확인하여 봅니다.

    만약 연결이 잘되었다면 아래 그림과 같이 나타나야 합니다.

     

    step 5
    step 6

    여기까지 완료하였으면, 이제 DB관련 설정을 여기서도 설정이 가능합니다. 편하신대로 사용하여 주시면 됩니다.

     

    3. Airflow Connection


    이제 Airflow에서 연결하기 위해서 Localhost Web에 들어가 확인할 것입니다.

    CLI 페이지에 들어가 Admin -> Connections -> + 버튼을 눌러 줍니다.

     

    이제 부터 Postgresql 연결할 값들을 지정해 주어야 합니다.

    • Connection Id = 원하시는 대로 Naming 하여도 됩니다. 나중에 Operator에 사용
    • Connection Type = 사용할 DB Postgres를 선택하여 줍니다.
    • Host = Docker-compose.yaml -> service 아래에 있는 postgres라는 값 혹은 host.docker.internal 로 지정하여 주면 됩니다.
    • Schema = DB내에 Database를 설정하여 주시면 됩니다.
    • Login / Password = Airflow 사용자를 별도로 만들지 않았다면, airflow / airflow로 지정하여 줍니다.
    • Port 번호 = Docker-compose.yaml -> service -> Port 번호를 넣어주시면 됩니다.

     

    4. Posgres Operator를 통한 DB 반영


    마지막으로 실제 Airflow를 통해 반영하여 보겠습니다.

     

    Postgres Operator 사용하여 아래 코드와 같이 사용하시면 됩니다.

    주의할 점은 postgres_conn_id 만 위에서 설정한 Connection Id 값으로 설정하여 주면 됩니다.

    from datetime import datetime, timedelta
    
    from airflow import DAG
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    
    default_args = {
        'owner': 'kimuksung2',
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    }
    
    with DAG(
        dag_id='dag_with_postgres_operator',
        default_args=default_args,
        start_date=datetime(2022, 7, 18),
        schedule_interval='0 0 * * *'
    ) as dag:
    	#1. Create Table
        task1 = PostgresOperator(
            task_id='create_postgres_table',
            postgres_conn_id='postgres_localhost',
            sql="""
                create table if not exists dag_runs (
                    dt date,
                    dag_id character varying,
                    primary key (dt, dag_id)
                )
            """
        )
    	#2. Insert Table
        task2 = PostgresOperator(
            task_id='insert_into_table',
            postgres_conn_id='postgres_localhost',
            sql="""
                insert into dag_runs (dt, dag_id) values ('{{ ds }}', '{{ dag.dag_id }}')
            """
        )
    	#3. Delete Table
        task3 = PostgresOperator(
            task_id='delete_data_from_table',
            postgres_conn_id='postgres_localhost',
            sql="""
                delete from dag_runs where dt = '{{ ds }}' and dag_id = '{{ dag.dag_id }}';
            """
        )
        task1 >> task2 >> task3

     

    댓글

Designed by Tistory.