Architektura „pub-sub” dla DAG-ów Airflow z Datasets

Piotr Sieminski · 2024

Architektura „pub-sub” dla DAG-ów Airflow z Datasets

Jest wiele sposobów na tworzenie zależności między DAG-ami Airflow. Mój ulubiony, jak dotąd, to funkcja dataset. Nie ukrywam — bo jest tak prosta!

Przypomina tematy publish-subscribe ze świata inżynierii oprogramowania. Czy pasuje do Ciebie, zależy w dużej mierze od wyborów architektonicznych przy pisaniu DAG-ów. Jeśli wolisz zależności „jak należy” albo potrzebujesz większej personalizacji — tu jest dokumentacja, która Cię poprowadzi.

Wymagania wstępne:

  1. Wdrożenie Airflow z zależnymi DAG-ami.
  2. Podstawowe skrypty w Pythonie (kto by pomyślał, co?).

Przegląd kroków

Mamy dwa (lub więcej) DAG-i — pierwszy nazwijmy publisherem (DAG, który po zakończeniu uruchamia inne) i resztę subscriberami (DAG-i uruchamiane przez publishera).

  1. [Publisher i Subscriber] Importuj Dataset z airflow.datasets.
  2. [Publisher] Importuj DummyOperator.
  3. [Publisher] Napisz prosty task, który publikuje Dataset.
  4. [Subscriber] Ustaw parametr schedule, aby czytał Dataset z powyższego kroku.

Co to jest dataset?

Airflow zaprojektował tę funkcję dokładnie tak, jak sugeruje nazwa: aktualizacja/harmonogramowanie (na podstawie) zestawu danych (dokumentacja Airflow). Możesz używać jej ściśle w ten sposób, ale ja wolę wykorzystywać ją do tworzenia własnych „pub-sub” tematów w wdrożeniu Airflow.

Najprościej myśleć o dataset jako o wiadomości lub fladze publikowanej przez temat (Twój DAG). Jest publikowana wewnętrznie w Airflow, więc nie trzeba nic dodatkowego konfigurować. Po publikacji wszyscy subskrybenci zostaną uruchomieni, a wiadomość / flaga zostanie skonsumowana.

A co z niezawodnością — wielokrotne odczyty, pominięci konsumenci… skoro to klaster DAG-ów, a nie streaming IoT w czasie rzeczywistym, szczerze mówiąc, nie przejmuję się tym zbytnio, żeby nie over-engineerować. Jeśli w Twoim wdrożeniu są krytyczne DAG-i, od których zależą ludzkie życia — nie czytaj dalej i sprawdź, czy „normalna” zależność między DAG-ami w Airflow (jak tutaj) lepiej Ci pasuje 😉.

Którego operatora użyć?

DummyOperator! Tyle. Nic wymyślnego, nic skomplikowanego, brak sztywnych wymagań co do parametrów 🙅 Task publikujący dataset możesz dodać gdzie chcesz w DAG-u — pamiętając, że po jego wykonaniu uruchomią się wszystkie zależności. Oto jak:

    notify_<someone>_<some_job>_finished = DummyOperator(
task_id="notify_<someone>_<some_job>_finished",
outlets=[Dataset("<some_job>_finished")],
)

Jak subskrybować dataset?

Użyję kodu bezpośrednio ze strony dokumentacji Airflow, bo wiemy, że każdy deklaruje DAG-i inaczej. Oto przykład z jedną modyfikacją — i to wszystko, co trzeba zrobić, aby subskrybować dataset:

 import datetime
############### additional import required here
from airflow.datasets import Dataset
############### end of additional import

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
############### modifying here
schedule=[Dataset("<some_job>_finished")],
############### end of modification
):
EmptyOperator(task_id="task")

Jak widać, wystarczy zmienić schedule DAG-a (i oczywiście zaimportować obiekt Dataset z airflow).

Kontakt

Dzięki za lekturę. Podoba Ci się to, co czytasz, ale brakuje czasu lub kompetencji, żeby ogarnąć analytics engineering? Sprawdź moje dane kontaktowe.


„Pub-Sub” architecture for your Airflow DAGs with Datasets został pierwotnie opublikowany w Lortech Solutions Blog na Medium, gdzie rozmowa trwa dalej dzięki podświetleniom i odpowiedziom czytelników.

Zobacz też na Medium →
Observability on-prem dla zadań w tle z OpenTelemetry i SigNoz
Konrad Bartecki · cze 2026

Observability on-prem dla zadań w tle z OpenTelemetry i SigNoz

Jak sprawnie uruchomiliśmy analitykę AI?
Joachim Hodana · mar 2026

Jak sprawnie uruchomiliśmy analitykę AI?

Jak konsulting ułatwi Ci życie?
Piotr Sieminski · lut 2026

Jak konsulting ułatwi Ci życie?