개발/Airflow
-
[Airflow] - Concept개발/Airflow 2022. 8. 3. 15:40
Airflow Default Concept 1. Operator An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG For a list of all core operators(built-in) BashOperator - executes a bash command PythonOperator - calls an arbitrary Python function EmailOperator - sends an email with DAG("my-dag") as dag: ping = SimpleHttpOperator(endpoint="http://example.co..
-
[Airflow] - what is Airflow개발/Airflow 2022. 7. 30. 00:56
Airflow Airflow is a platform to programmatically author, schedule and monitor workflows. Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Airflow is not a data streaming(data load like image,youtube ) solution. Principles 1. Dynamic Airflow pipelines are configu..
-
[Airflow] - 7. Operator & Xcoms개발/Airflow 2022. 7. 20. 16:43
현재까지 Docker 위에 Airflow + Postgres를 이용하여 연동시켜 DB(Postgres) 적용시켜 보았습니다. 이제는 더 나아가 업비트 Api를 호출하여 비트코인 Postgres에 데이터를 수집하려고 합니다. 각 Task는 1. Table 생성 2. Call Api 3. Store DB 순으로 생각하여 만들었습니다. Airflow Operator는 Python Operator와 Postgres Operator를 두개를 사용하였습니다. 1. What is Xcoms? 들어가기 앞서, Xcoms에 대해 먼저 설명하려고 합니다. a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and m..
-
[Airflow] - 6. Connect Postgresql개발/Airflow 2022. 7. 19. 15:14
지금까지 Psql 관련하여 Volume 설정 및 간단한 쿼리문을 알아보았습니다. 이제 부터는 Airflow를 통해 Postgresql에 DB에 반영하기 위해 Connect 및 DB생성 및 Data Insert 까지 하는 과정을 공유드리려고 합니다. 금일 보여드릴 부분은 DB Cluster 관련된 부분은 반영하지 않습니다. 1. Airflow Setting ( docker-compose.yaml ) docker-compose.yaml docker-compose.yaml 을 열어 아래 부분을 편집하여 줍니다. 1. 먼저 연결할 DB를 설정하여 줍니다. Airflow가 연결된 DB가 SQLite와 같이 Postgresql이 아니라면 아래와 같이 변경하여 줍니다. AIRFLOW__CORE__SQL_ALCHEMY..
-
[Airflow] - 5.Postgresql 설치 및 DB setting개발/Airflow 2022. 7. 13. 17:07
Airflow와 연동하여 사용할 DB = postgresql(psql)에 알아보도록 하겠습니다. docker-compose 내에 있는 파일을 통해서 Build를 하는 것이 아닌 Postgres Image를 통해 빌드하는 내용입니다. Postgresql을 이제부터 psql 이라 칭하여 부르겠습니다. 들어가기 앞써, 설치 및 실제 Docker위에서 실행하는 방법부터 단계적으로 알아보려고 합니다. 1. Psql Install & Docker run 아래와 같은 명령어를 치게 된면 최근의 Psql 이미지를 가져와 실행시켜 줍니다. $ docker run -p 5432:5432 --name postgres -e POSTGRES_PASSWORD=password -d postgres Unable to find ima..
-
[Airflow&Python] - 4. Template variable(ts/ds/.. ) & datetime & KST 변경개발/Airflow 2022. 7. 8. 17:54
저번 시간에 이어 Task에 실제 실행되는 시간 및 Airflow는 기본적으로 UST를 지원해줌으로써, KST로 설정 혹은 KST로 값을 변경하는 부분에 대해서 소개해드리려고 합니다. 1. Template Variable https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html 를 참고하면 도움이 됩니다. 각 시간 별로 API를 호출하여 사용하려다 보니 실제 실행될 시간을 인자로 넘겨받기 위해 찾다가 찾은 내용입니다. Airflow 자체적으로 Variable을 전달해주는 것으로 보입니다. 1. {{ ds }} = The DAG run’s logical date as YYYY-MM-DD 2. {{ ts }} = The DAG run'..
-
[Airflow] - 3. Dags(Python operator)개발/Airflow 2022. 7. 8. 17:36
Dag 파일을 만들어보고 Workflow를 Pythonoperator로 구현하는 예시를 보여드릴 예정입니다. 틀린 내용이 있을 수 있으니 상세한 부분 Document를 참고하시는 것을 추천 드립니다. 1. Dag 생성 Dag file을 생성하기 위해서는 일단 Ubuntu의 Dags 폴더 File을 들어갑니다. 왜 Dag 폴더를 이용하는 가는 실제 Docker Container가 실행되면 dags / logs / plugins 폴더를 Mount 하여 가져갑니다. ( 이부분은 Docker Container를 직접 접속하셔서 /opt/airflow/dags 에 들어가시면 확인 할 수 있습니다. ) 후에 Dag를 만들 Test.py 파일을 만듭니다. $ cd /dags $ sudo vi test.py 이제부터는..