From 5213ae0f8cd7ed8e96497e6bc8adc0bbf8790171 Mon Sep 17 00:00:00 2001 From: Nick Date: Sat, 19 Apr 2025 19:26:11 +0300 Subject: [PATCH 1/3] create incremental upload task --- .../copy_api_tables/upload_api_marketing.py | 11 +- .../copy_table_to_dwh_incremental.py | 106 ++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 plugins/api_utils/copy_table_to_dwh_incremental.py diff --git a/dags/stg/copy_api_tables/upload_api_marketing.py b/dags/stg/copy_api_tables/upload_api_marketing.py index 7c9fba5..5e38a6a 100644 --- a/dags/stg/copy_api_tables/upload_api_marketing.py +++ b/dags/stg/copy_api_tables/upload_api_marketing.py @@ -4,7 +4,7 @@ from airflow.decorators import task from airflow.models import Variable -from plugins.api_utils import send_telegram_message, copy_table_to_dwh +from plugins.api_utils import send_telegram_message, copy_table_to_dwh, copy_table_to_dwh_incremental # декорированные функции send_telegram_message = task( @@ -17,6 +17,15 @@ trigger_rule="one_done", retries=0 )(copy_table_to_dwh) +copy_table_to_dwh_incremental = task( + task_id="copy_table_to_dwh", + params= { + "from_ts": datetime.now - timedelta(days=1), + "to_ts": datetime.now + }, + trigger_rule="one_done", retries=0 +)(copy_table_to_dwh_incremental) + with DAG( dag_id="upload_api_marketing", diff --git a/plugins/api_utils/copy_table_to_dwh_incremental.py b/plugins/api_utils/copy_table_to_dwh_incremental.py new file mode 100644 index 0000000..9c50371 --- /dev/null +++ b/plugins/api_utils/copy_table_to_dwh_incremental.py @@ -0,0 +1,106 @@ +import json +import logging +from typing import Set, Tuple + +import pandas as pd +import sqlalchemy as sa +from sqlalchemy import create_engine, text +from airflow.models import Connection + +MAX_ROWS_PER_REQUEST = 10_000 + +API_DB_DSN = ( + Connection.get_connection_from_secrets("postgres_api") + .get_uri() + .replace("postgres://", "postgresql://") + .replace("?__extra__=%7B%7D", "") +) + +DWH_DB_DSN = ( + Connection.get_connection_from_secrets("postgres_dwh") + .get_uri() + .replace("postgres://", "postgresql://") + .replace("?__extra__=%7B%7D", "") +) + +def copy_table_to_dwh_incremental(from_schema, from_table, to_schema, to_table, params: dict): + logging.info( + f"Копирование таблицы {from_schema}.{from_table} в {to_schema}.{to_table}" + ) + + from_ts = params["from_ts"] + to_ts = params["to_ts"] + + api = create_engine(API_DB_DSN) + dwh = create_engine(DWH_DB_DSN) + + with api.connect() as api_conn: + api_cols_all = api_conn.execute( + sa.text( + "SELECT column_name, data_type, ordinal_position " + "FROM information_schema.columns " + f"WHERE table_schema = '{from_schema}' AND table_name = '{from_table}';" + ) + ) + + bad_api_cols = set() + api_cols = set() + id_column = None + for name, dtype, pos in api_cols_all: + if pos == 1: + id_column = str(name) + if dtype == "json": + bad_api_cols.add((str(name), dtype)) + api_cols.add(str(name)) + logging.info(f"Колонки в источнике: {api_cols}") + logging.info(f"Колонки с особенностями: {bad_api_cols}") + + if "id" in api_cols: + id_column = "id" + elif "create_ts" in api_cols: + id_column = "create_ts" + elif id_column is None: + raise AttributeError("Не найдена колонка для сортировки") + logging.info(f"Колонка для сортировки: {id_column}") + + data_length = api_conn.execute( + sa.text(f'SELECT COUNT(*) FROM "{from_schema}"."{from_table}";') + ).scalar() + logging.info(f"Количество строк: {data_length}") + + with dwh.connect() as dwh_conn: + dwh_cols = dwh_conn.execute( + sa.text( + "SELECT column_name " + "FROM information_schema.columns " + f"WHERE table_schema = '{to_schema}' AND table_name = '{to_table}';" + ) + ) + dwh_cols = set(str(i[0]) for i in dwh_cols) + logging.info(f"Колонки в таргете: {dwh_cols}") + + + to_copy_cols = api_cols & dwh_cols + logging.info(f"Колонки для копирования: {to_copy_cols}") + + with api.connect() as api_conn, dwh.connect() as dwh_conn: + to_copy_cols = '", "'.join(to_copy_cols) + for i in range(0, data_length, MAX_ROWS_PER_REQUEST): + data = pd.read_sql_query( + f'SELECT "{to_copy_cols}" ' + f'FROM "{from_schema}"."{from_table}" ' + f'WHERE create_ts BETWEEN {from_ts} AND {to_ts}' + f"ORDER BY {id_column} " + f"LIMIT {MAX_ROWS_PER_REQUEST} OFFSET {i}", + api_conn, + id_column, + ) + for col, dtype in bad_api_cols: + # Делаем ручное приведение типов, где не сработало иное + # Заменяем JSON на строку + if dtype == "json": + data[col] = data[col].apply( + lambda x: json.dumps(x, ensure_ascii=False) + ) + data.to_sql(to_table, dwh, schema=to_schema, if_exists="append") + logging.info("%d of %d rows copied", i + len(data), data_length) From a2a0824fe0674abe6ac3a6b2bc4e671e4ff94a7c Mon Sep 17 00:00:00 2001 From: Nick Date: Sat, 19 Apr 2025 19:40:01 +0300 Subject: [PATCH 2/3] incremental dag --- .../upload_api_maketing_incremental.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 dags/stg/copy_api_tables/upload_api_maketing_incremental.py diff --git a/dags/stg/copy_api_tables/upload_api_maketing_incremental.py b/dags/stg/copy_api_tables/upload_api_maketing_incremental.py new file mode 100644 index 0000000..16fa49d --- /dev/null +++ b/dags/stg/copy_api_tables/upload_api_maketing_incremental.py @@ -0,0 +1,56 @@ + +from datetime import datetime, timedelta +from airflow import DAG +from airflow.datasets import Dataset +from airflow.decorators import task +from airflow.models import Variable + +from plugins.api_utils import send_telegram_message, copy_table_to_dwh, copy_table_to_dwh_incremental + +# декорированные функции +send_telegram_message = task( + task_id="send_telegram_message", + trigger_rule="one_failed" +)(send_telegram_message) + +copy_table_to_dwh_incremental = task( + task_id="copy_table_to_dwh", + params= { + "from_ts": datetime.now - timedelta(days=1), + "to_ts": datetime.now + }, + trigger_rule="one_done", retries=0 +)(copy_table_to_dwh_incremental) + + +with DAG( + dag_id="upload_api_marketing_incremental", + start_date=datetime(2024, 1, 1), + schedule="@daily", + catchup=False, + tags=["dwh", "stg", "marketing"], + default_args={ + "owner": "zipperman1", + "retries": 5, + "retry_delay": timedelta(minutes=3), + "retry_exponential_backoff": True, + }, +): + tables = "actions_info_incremental", "user_incremental" + tg_task = send_telegram_message(int(Variable.get("TG_CHAT_DWH"))) + prev = None + for table in tables: + curr = copy_table_to_dwh.override( + task_id=f"copy-{table}", + outlets=[Dataset(f"STG_MARKETING.{table}")], + )( + "api_marketing", + table, + "STG_MARKETING", + table, + ) + # Выставляем копирование по порядку + if prev: + prev >> curr + prev = curr + prev >> tg_task \ No newline at end of file From 61fb8eb4132aa87fd99f199e975d1b30677babd6 Mon Sep 17 00:00:00 2001 From: Nick Date: Sat, 19 Apr 2025 19:43:09 +0300 Subject: [PATCH 3/3] lint --- .../upload_api_maketing_incremental.py | 8 ++++---- dags/stg/copy_api_tables/upload_api_marketing.py | 11 +---------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/dags/stg/copy_api_tables/upload_api_maketing_incremental.py b/dags/stg/copy_api_tables/upload_api_maketing_incremental.py index 16fa49d..6e6c462 100644 --- a/dags/stg/copy_api_tables/upload_api_maketing_incremental.py +++ b/dags/stg/copy_api_tables/upload_api_maketing_incremental.py @@ -5,7 +5,7 @@ from airflow.decorators import task from airflow.models import Variable -from plugins.api_utils import send_telegram_message, copy_table_to_dwh, copy_table_to_dwh_incremental +from plugins.api_utils import send_telegram_message, copy_table_to_dwh_incremental # декорированные функции send_telegram_message = task( @@ -14,7 +14,7 @@ )(send_telegram_message) copy_table_to_dwh_incremental = task( - task_id="copy_table_to_dwh", + task_id="copy_table_to_dwh_incremental", params= { "from_ts": datetime.now - timedelta(days=1), "to_ts": datetime.now @@ -40,8 +40,8 @@ tg_task = send_telegram_message(int(Variable.get("TG_CHAT_DWH"))) prev = None for table in tables: - curr = copy_table_to_dwh.override( - task_id=f"copy-{table}", + curr = copy_table_to_dwh_incremental.override( + task_id=f"copy-{table}-incremental", outlets=[Dataset(f"STG_MARKETING.{table}")], )( "api_marketing", diff --git a/dags/stg/copy_api_tables/upload_api_marketing.py b/dags/stg/copy_api_tables/upload_api_marketing.py index 5e38a6a..7c9fba5 100644 --- a/dags/stg/copy_api_tables/upload_api_marketing.py +++ b/dags/stg/copy_api_tables/upload_api_marketing.py @@ -4,7 +4,7 @@ from airflow.decorators import task from airflow.models import Variable -from plugins.api_utils import send_telegram_message, copy_table_to_dwh, copy_table_to_dwh_incremental +from plugins.api_utils import send_telegram_message, copy_table_to_dwh # декорированные функции send_telegram_message = task( @@ -17,15 +17,6 @@ trigger_rule="one_done", retries=0 )(copy_table_to_dwh) -copy_table_to_dwh_incremental = task( - task_id="copy_table_to_dwh", - params= { - "from_ts": datetime.now - timedelta(days=1), - "to_ts": datetime.now - }, - trigger_rule="one_done", retries=0 -)(copy_table_to_dwh_incremental) - with DAG( dag_id="upload_api_marketing",