dbt Core x Airflow — sparametryzowany DAG
Wprowadzenie
Potrzeba uruchamiania niestandardowych zadań w produkcji jest dość oczywista… mam nadzieję 😉
- Szybkie wdrożenie poprawki wymagające pilnego przeładowania.
- Pełny refresh przyrostowych pipeline’ów.
- Awaria w pipeline’ie wykonania ETL.
I wiele innych powodów. Nie przedłużajmy — przejdźmy do interesującej części.
Wymagania wstępne
- Oczywiście potrzebujesz wdrożenia Airflow.
- Musisz mieć sposób obsługi poleceń dbt przekazywanych do Airflow — to wykracza poza zakres tego artykułu, więc zakładam, że masz to gotowe i możesz przekazać Airflow polecenia dbt do wykonania.
- Połączenia, autoryzacje itd. — to standard dla każdego zadania produkcyjnego, więc też tego tu nie opisuję i zakładam, że masz to skonfigurowane.
Implementacja
Przekazywanie parametrów
W definicji DAG-a dodaj parametry do wypełnienia. Strukturuję to tak, że każda flaga akceptowana przez dbt (np. --full-refresh) jest osobnym parametrem. Odpowiada to backendowej definicji poleceń dbt Core, która dokleja do polecenia każdą flagę lub zmienną tak samo, jak wpisałbyś to lokalnie na swojej maszynie.
Poniżej przykład parametru params:
params={
"dbt_command": Param(
"run",
type="string",
minLength=1,
description="The dbt command to run",
),
"vars": Param(None, type=["null", "string"]),
"exclude": Param(None, type=["null", "string"]),
"select": Param(None, type=["string"], minLength=1),
"warn_error": Param(False, type="boolean"),
"fail_fast": Param(True, type="boolean"),
"full_refresh": Param(False, type="boolean"),
},Wybrałem najważniejsze flagi i opcje, ale możesz rozszerzyć tę listę według uznania.
💡 Wskazówki:
- Ustaw ostrożnie wartości domyślne, np. flaga --full-refresh domyślnie na False, aby uniknąć przypadkowego pełnego odświeżenia.
- Ustaw parametr Airflow render_template_as_native_obj na True.
- Harmonogram DAG-a powinien być ustawiony na None — celem tego DAG-a jest uruchamianie niestandardowych zadań w produkcji według potrzeb.
Dostęp do parametrów
Aby uzyskać dostęp do parametrów DAG-a, wystarczy przekazać je w op_kwargs do operatora (zakładając PythonOperator dla dbt Core). Przykład poniżej:
dbt_execution_task = PythonOperator(
task_id="run_custom_dbt_command",
python_callable=<your_python_callable>,
provide_context=True,
op_kwargs={
"command": "{{ params.dbt_command }}",
"vars": "{{ params.vars }}",
"exclude": "{{ params.exclude }}",
"select": "{{ params.select }}",
"output_encoding": "utf-8",
"warn_error": "{{ params.warn_error }}",
"fail_fast": "{{ params.fail_fast }}",
"full_refresh": "{{ params.full_refresh }}",
},
)
[Opcjonalnie] Dodatkowe zabezpieczenie przed pełnym odświeżeniem
Część moich kolegów uważa, że wystarczy ustawić flagę --full-refresh na False. Uważam, że gdy DAG mają mniej doświadczeni użytkownicy — to może nie wystarczyć. Dlatego wymyśliłem dodatkowy krok bezpieczeństwa, aby upewnić się, że każde pełne odświeżenie jest zamierzone. Użyj, jeśli chcesz — pomiń, jeśli nie.
Cała idea polega na branch operatorze, który najpierw sprawdza, czy uruchomienie to pełny refresh.
- Jeśli nie — kontynuujemy normalnie.
- Jeśli tak — musisz wyczyścić jedno dodatkowe zadanie w Airflow, aby wykonanie ruszyło.
To dlatego, że ta implementacja stoi w sprzeczności z ogólnym celem Airflow — gdzie powinniśmy używać go jako zautomatyzowanego orkiestratora. Ta dodatkowa funkcja to raczej panel sterowania do ręcznej orkiestracji, stąd potrzebne obejście, bo Airflow nie wspiera tego out of the box.
Najpierw prosta funkcja Python sprawdzająca flagę full refresh:
def check_full_refresh(parameters):
if parameters["full_refresh"]:
return "confirm_the_full_refresh_by_clearing_next_task"
return "clear_this_task_to_continue"
Następnie dodaj branch operator i dwa dodatkowe dummy taski:
check_full_refresh_task = BranchPythonOperator(
task_id="check_full_refresh",
python_callable=check_full_refresh,
op_args=["{{ params }}"],
provide_context=True,
)
continue_after_resolution = EmptyOperator(
task_id="clear_this_task_to_continue"
)
handle_full_refresh_task = EmptyOperator(
task_id="confirm_the_full_refresh_by_clearing_next_task"
)
Na koniec ustaw właściwą kolejność wykonania zadań, aby osiągnąć:
Bezpośrednie wykonanie BEZ pełnego odświeżenia
Lub dodatkowe zabezpieczenie PRZY pełnym odświeżeniu
Konfiguracja wygląda następująco:
(
check_full_refresh_task
>> [
handle_full_refresh_task,
continue_after_resolution,
]
)
continue_after_resolution >> run_dbt
Podsumowanie
Tak za każdym razem, gdy uruchamiane jest pełne odświeżenie, trzeba dodatkowo wyczyścić jedno zadanie, aby upewnić się, że refresh faktycznie się wykona. To nie najpiękniejsza implementacja takiego zabezpieczenia, ale działa w Airflow, jest prosta dla doświadczonych osób, a jednocześnie utrudnia mniej doświadczonym przypadkowe uruchomienie ogromnego full refresh w środowisku produkcyjnym.
Kontakt
Dziękuję za przeczytanie. Podoba Ci się przekazywana wiedza, ale brakuje czasu lub kompetencji, aby uporządkować analytics engineering? Sprawdź moje dane kontaktowe.
Sparametryzowany DAG dbt w Airflow został pierwotnie opublikowany w Lortech Solutions Blog na Medium, gdzie rozmowa trwa dalej dzięki podświetleniom i odpowiedziom czytelników.


