Mówi się, że Apache Airflow to CRON na sterydach. Zdobywa uznanie wśród narzędzi do orchestracji ETL’i. Harmonogramowanie, zarządzanie i monitorowanie zadań mu nie straszne. Podstawowym sposobem definiowania zadań są acyklicze grafy skierowane (DAG). Zadania w nich muszą wymieniać się informacjami. We wpisie dowiesz się jak używać Variables i XCom w Apache Airflow.
Środowisko
W przypadku Apache Airflow dobrze sprawdza się wariant z wykorzystaniem Docker’a puckel/docker-airflow. Najczęściej używam wariantu docker-compose-LocalExecutor.yml
.
sudo docker-compose -f docker-compose-LocalExecutor.yml up -d
Na co komu Variables i XCom?
Variables i XCom to zmienne używane w ramach środowiska Apache Airflow.
Variable to rodzaj globalnej zmiennej. Jeśli jakaś wartość używana jest przez wiele DAG’ów (a nie uśmiecha nam się edycja N plików w przypadku jej zmiany), możemy rozważyć wrzucenie jej do Variables.
XCom (od cross-communication) to wiadomości pozwalające na wymianę danych między zadaniami. Definiowane są jako klucz, wartość, timestamp i task/DAG z którego pochodzą. Każdy obiekt który da się pickle'ować
może być wykorzystany jako XCom.
Dajmy na to, że napisałeś/aś aplikację w Apache Spark, która efekty pracy zapisuje w katalogu na HDFS/S3. Ścieżkę trzeba podać jako argument tej aplikacji, a ona generowana jest przez skrypt Python, który napisał Twój kolega z zespołu. Później, kolejne zadania pobierają te dane i robią z nimi kolejne cuda wianki. Wszedzie krąży parametr ze ścieżką. Po to właśnie jest XCom 😁.
UWAGA! Nie używaj XCom’ów do przesyłania dużej ilości danych! Wartości te zapisywane są w bazie używanej przez Apache Airflow. Wrzucanie olbrzymich obiektów Numpy to częsty przypadek nieprawidłowego używania XCom.
Przykład
Zadania DAG są proste:
- Pobrać (a jeśli nie istnieje, to wygenerować) wartość z
Variables
- Na jej podstawie utworzyć kolejną wartość i dodać do
XCom
- Iterować pobraną wartość z
Variables
i ją ponowanie zapisać - Pobrać datę za pomocą
BashOperator
i dodać ją doXCom
- Wyświetlić obie wartości w konsoli na zdalnej maszynie używając
SSHOperator
Przygotowanie Variables i XCom
Obsługa Variables dostępna jest poprzez Variable, a dokładnie metody get
i set
. W przypadku XCom, potrzebny jest context (provide_context=True
), a do zapisania wartości do XCom służy metoda xcom_push
. Poniżej kod pierwszego zadania.
... def setup_var_and_xcom(**kwargs): ti = kwargs['ti'] iterator = int(Variable.get('my_iterator', default_var=0)) ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}') iterator += 1 Variable.set('my_iterator', iterator) t1 = PythonOperator( task_id='setup_var_and_xcom', python_callable=setup_var_and_xcom, provide_context=True, dag=dag ) ...
Pobranie wyniku BashOperator do XCom
Według dokumentacji, argument xcom_push
w operatorze umożliwia zapisanie do XCom ostatniej linii z stdout
.
... t2 = BashOperator( task_id="get_date", bash_command="date", xcom_push=True, dag=dag ) ...
Wyświetlenie XCom w zdalnej konsoli
W tym przypadku wykorzystamy SSHOperator
w połączeniu z Jinja Templating. Wstrzykniemy dodane wcześniej do XCom wartości. Jedna wartość jest pobierana po kluczu, druga po id zadania.
... t3 = SSHOperator( ssh_conn_id='ssh_default', task_id='ssh_xcom_check', command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"', dag=dag) ...
Efekty
Wszystkie przekazywane wartości zostały prawidłowo wyświetlone w ostatnim zadaniu.
[2020-11-22 18:22:59,048] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,047] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.6p1) [2020-11-22 18:22:59,065] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,065] {{transport.py:1819}} INFO - Authentication (password) successful! [2020-11-22 18:22:59,065] {{ssh_operator.py:109}} INFO - Running command: echo "something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC" [2020-11-22 18:22:59,767] {{ssh_operator.py:143}} INFO - something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC [2020-11-22 18:22:59,780] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=var_xcom_example, task_id=ssh_xcom_check, execution_date=20201122T182252, start_date=20201122T182258, end_date=20201122T182259 [2020-11-22 18:23:08,905] {{logging_mixin.py:112}} INFO - [2020-11-22 18:23:08,905] {{local_task_job.py:103}} INFO - Task exited with return code 0
Utworzone XCom’y widoczne są w UI Apache Airflow
To samo tyczy się Variables.
Kod
from airflow import DAG from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from airflow.models import Variable default_args = { "owner": "Maciej Szymczyk", "depends_on_past": False, "start_date": datetime(2020, 9, 27), "email": ["maciej@wiadrodanych.pl"], "email_on_failure": False, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=1) } dag = DAG("var_xcom_example", default_args=default_args, schedule_interval='@daily', catchup=False) def setup_var_and_xcom(**kwargs): ti = kwargs['ti'] iterator = int(Variable.get('my_iterator', default_var=0)) ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}') iterator += 1 Variable.set('my_iterator', iterator) t1 = PythonOperator( task_id='setup_var_and_xcom', python_callable=setup_var_and_xcom, provide_context=True, dag=dag ) t2 = BashOperator( task_id="get_date", bash_command="date", xcom_push=True, dag=dag ) t3 = SSHOperator( ssh_conn_id='ssh_default', task_id='ssh_xcom_check', command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"', dag=dag) t1 >> t2 >> t3
Wnioski
I w ten oto sposób można utworzyć prosty DAG w Apache Airflow wykorzystując XCom i Variables. W przyszłości postaram się dać jakiś bardziej życiowy przykład np. z wykorzystaniem Apache Spark.