ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow&Python] - 4. Template variable(ts/ds/.. ) & datetime & KST 변경
    개발/Airflow 2022. 7. 8. 17:54

     

    저번 시간에 이어 Task에 실제 실행되는 시간 및 Airflow는 기본적으로 UST를 지원해줌으로써, KST로 설정 혹은 KST로 값을 변경하는 부분에 대해서 소개해드리려고 합니다.

     

    1. Template Variable


    https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html 를 참고하면 도움이 됩니다.

    각 시간 별로 API를 호출하여 사용하려다 보니 실제 실행될 시간을 인자로 넘겨받기 위해 찾다가 찾은 내용입니다.

    Airflow 자체적으로 Variable을 전달해주는 것으로 보입니다.

     

    1. {{ ds }} = The DAG run’s logical date as YYYY-MM-DD

    2. {{ ts }} = The DAG run's logical time as 2018-01-01T00:00:00+00:00.

    3. {{ params }} = A reference to the user-defined params dictionary which can be overridden by the dictionary passed through trigger_dag -c if you enabled dag_run_conf_overrides_params in airflow.cfg.

    4. {{ execution_date }} = the execution date (logical date)

    등등이 있으니 필요하신 부분을 가져다가 사용하시면 됩니다.

    def time_setting(ts):
        logging.info(ts)
        return ts

     

    2. "JSON serializer for objects not serializable by default json code"


    위와 같이 Xcom을 통해서 Parameter를 전달하다가 에러가 발생하는 경우가 있습니다.

    Airflow는 stream 전송을 위해서 pickle이나 Json과 같이 Data value 를 serialization 하게 처리를 해주어야 한다고 합니다.

    특히, Datetime과 같은 경우는 있는 그대로 처리가 불가함으로써, 아래와 같이 실행시켜주어야 합니다.

    $ from datetime import date
    $ date(2022, 7, 8).isoformat()
    '2022-07-08'
    $ datetime.strptime(ts[:19], "%Y-%m-%dT%H:%M:%S").strftime("%d/%m/%y")

    3. Datetime


    기존 전달 받은 시간을 KST로 변경하거나 특정 시간대를 추가하고 싶은 경우입니다.

    2-1) Datetime with isoformat ( https://docs.python.org/ko/3/library/datetime.html )

    def time_setting(ts):
        #UST time to KST
        ts = datetime.strptime(ts[:19], "%Y-%m-%dT%H:%M:%S")
        ts = ts + timedelta(hours=9)
        ts = ts.isoformat()
    
        return ts

    2-2) Datetime with strftime ( https://docs.python.org/ko/3/library/datetime.html )

    def ts_format(ts, output_format):
        return datetime.strptime(ts[:19], "%Y-%m-%dT%H:%M:%S").strftime(output_format)

     

    4. KST 변경


    pendulum module을 이용하여 설정하여 주면 됩니다.

    import pendulum
    KST = pendulum.timezone("Asia/Seoul")
    datetime(2022,7,8,12,1,0,tzinfo=KST)

    '개발 > Airflow' 카테고리의 다른 글

    [Airflow] - 6. Connect Postgresql  (0) 2022.07.19
    [Airflow] - 5.Postgresql 설치 및 DB setting  (0) 2022.07.13
    [Airflow] - 3. Dags(Python operator)  (0) 2022.07.08
    [Airflow] Why Apache Airflow?  (0) 2022.07.06
    [Airflow] 2. Vscode로 Dags 관리  (0) 2022.07.03

    댓글

Designed by Tistory.