Apache Airflow: referência¶
Orquestrador de workflows para pipelines de dados. Permite definir, agendar e monitorar DAGs (Directed Acyclic Graphs) de tarefas.
O que é¶
Apache Airflow é uma plataforma para criar, agendar e monitorar workflows programaticamente. Criado pelo Airbnb em 2014, hoje é um projeto Apache.
Que problemas resolve¶
- Agendamento de pipelines: executar tarefas em horários específicos ou intervalos
- Dependências entre tarefas: garantir que task B só execute após task A
- Retry automático: reexecutar tarefas que falharam
- Monitoramento: visualizar status, logs e histórico de execuções
- Alertas: notificar falhas via email, Slack, etc.
- Backfill: executar pipelines para datas passadas
Conceitos principais¶
DAG (Directed Acyclic Graph)¶
Define o pipeline completo: quais tarefas executar e em que ordem.
from airflow import DAG
from datetime import datetime
with DAG(
dag_id="etl_vendas",
start_date=datetime(2025, 1, 1),
schedule="@daily", # ou "0 2 * * *" (cron)
catchup=False,
) as dag:
# tasks aqui
Parâmetros importantes:
| Parâmetro | Descrição |
|---|---|
dag_id |
Identificador único |
start_date |
Quando começar a agendar |
schedule |
Frequência de execução |
catchup |
Se True, executa runs passados |
default_args |
Args padrão para todas as tasks |
Task¶
Unidade de trabalho dentro de uma DAG.
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
extract = BashOperator(
task_id="extract",
bash_command="python extract.py",
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
Operator¶
Tipo de task. Airflow vem com muitos built-in e providers adicionais.
# Python
from airflow.operators.python import PythonOperator
# Bash
from airflow.operators.bash import BashOperator
# SQL
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# AWS
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
# GCP
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# Docker
from airflow.providers.docker.operators.docker import DockerOperator
Sensor¶
Task que espera por uma condição antes de prosseguir.
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Esperar arquivo local
wait_file = FileSensor(
task_id="wait_file",
filepath="/data/input.csv",
poke_interval=60, # verifica a cada 60s
timeout=3600, # timeout após 1h
)
# Esperar arquivo no S3
wait_s3 = S3KeySensor(
task_id="wait_s3",
bucket_name="meu-bucket",
bucket_key="data/{{ ds }}/file.parquet",
)
Dependências¶
Definir ordem de execução:
# Sequencial
extract >> transform >> load
# Múltiplas dependências
[extract_a, extract_b] >> transform >> [load_dwh, load_lake]
# Método set_downstream/set_upstream
extract.set_downstream(transform)
transform.set_upstream(extract) # equivalente
TaskFlow API¶
Sintaxe moderna usando decorators (Airflow 2.0+):
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="etl_taskflow",
start_date=datetime(2025, 1, 1),
schedule="@daily",
catchup=False,
)
def etl_pipeline():
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(raw_data):
return [x * 2 for x in raw_data["data"]]
@task
def load(processed_data):
print(f"Carregando {len(processed_data)} registros")
# Dependências implícitas pelo fluxo de dados
raw = extract()
processed = transform(raw)
load(processed)
etl_pipeline()
XCom¶
Comunicação entre tasks. Usado automaticamente pelo TaskFlow API.
# Push manual
def push_data(**context):
context["ti"].xcom_push(key="result", value={"count": 100})
# Pull manual
def pull_data(**context):
result = context["ti"].xcom_pull(task_ids="push_task", key="result")
print(result) # {"count": 100}
XCom é para dados pequenos (metadados, contagens). Para dados grandes, usar storage externo (S3, GCS).
Templating (Jinja)¶
Variáveis dinâmicas em parâmetros:
BashOperator(
task_id="process",
bash_command="python process.py --date {{ ds }} --env {{ var.value.environment }}",
)
Variáveis disponíveis:
| Variável | Descrição | Exemplo |
|---|---|---|
{{ ds }} |
Data de execução (YYYY-MM-DD) | 2025-01-15 |
{{ ds_nodash }} |
Data sem traços | 20250115 |
{{ execution_date }} |
Datetime completo | 2025-01-15T00:00:00 |
{{ prev_ds }} |
Data anterior | 2025-01-14 |
{{ next_ds }} |
Próxima data | 2025-01-16 |
{{ var.value.key }} |
Variável do Airflow | - |
{{ conn.conn_id }} |
Conexão | - |
Connections¶
Credenciais armazenadas no Airflow (via UI ou CLI):
# Criar via CLI
airflow connections add 'postgres_dwh' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-schema 'dwh' \
--conn-login 'user' \
--conn-password 'pass' \
--conn-port 5432
Usar em operators:
from airflow.providers.postgres.operators.postgres import PostgresOperator
PostgresOperator(
task_id="create_table",
postgres_conn_id="postgres_dwh", # referencia a connection
sql="CREATE TABLE IF NOT EXISTS ...",
)
Variables¶
Configurações globais:
from airflow.models import Variable
# Ler
env = Variable.get("environment")
config = Variable.get("config", deserialize_json=True)
# Em templates Jinja: var.value.environment ou var.json.config.key
Hooks¶
Camada de abstração para conexões:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def query_postgres():
hook = PostgresHook(postgres_conn_id="postgres_dwh")
df = hook.get_pandas_df("SELECT * FROM tabela")
return df
def upload_to_s3():
hook = S3Hook(aws_conn_id="aws_default")
hook.load_file(
filename="/tmp/data.parquet",
key="data/file.parquet",
bucket_name="meu-bucket",
)
Branching¶
Execução condicional:
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context["ds_nodash"] == "20250101":
return "task_new_year"
return "task_normal"
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_branch,
)
branch >> [task_new_year, task_normal]
Trigger Rules¶
Quando uma task deve executar:
from airflow.utils.trigger_rule import TriggerRule
task = PythonOperator(
task_id="always_run",
python_callable=cleanup,
trigger_rule=TriggerRule.ALL_DONE, # executa mesmo se upstream falhou
)
| Regra | Descrição |
|---|---|
all_success |
Todas upstream ok (padrão) |
all_failed |
Todas upstream falharam |
all_done |
Todas upstream terminaram |
one_success |
Pelo menos uma ok |
one_failed |
Pelo menos uma falhou |
none_failed |
Nenhuma falhou (pode ter skipped) |
SubDAGs e TaskGroups¶
Organizar tasks relacionadas:
from airflow.utils.task_group import TaskGroup
with DAG(...) as dag:
with TaskGroup("extract") as extract_group:
extract_orders = PythonOperator(...)
extract_customers = PythonOperator(...)
with TaskGroup("transform") as transform_group:
transform_orders = PythonOperator(...)
transform_customers = PythonOperator(...)
extract_group >> transform_group
Dynamic Tasks¶
Criar tasks dinamicamente:
# Mapeamento dinâmico (Airflow 2.3+)
@task
def get_files():
return ["file1.csv", "file2.csv", "file3.csv"]
@task
def process_file(filename):
print(f"Processando {filename}")
@dag(...)
def dynamic_pipeline():
files = get_files()
process_file.expand(filename=files) # cria 3 tasks
Pools¶
Limitar execuções paralelas:
# Criar pool via UI ou CLI
# Nome: "database_pool", Slots: 2
task = PythonOperator(
task_id="query_db",
python_callable=query,
pool="database_pool", # máximo 2 tasks simultâneas neste pool
)
Retry e Timeout¶
from datetime import timedelta
task = PythonOperator(
task_id="flaky_task",
python_callable=might_fail,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
execution_timeout=timedelta(hours=2),
)
Callbacks¶
Executar código em eventos:
def on_failure(context):
# Enviar alerta
send_slack_alert(f"Task {context['task_instance'].task_id} falhou")
def on_success(context):
print("Sucesso!")
task = PythonOperator(
task_id="task_with_callbacks",
python_callable=do_something,
on_failure_callback=on_failure,
on_success_callback=on_success,
)
Estrutura de projeto¶
airflow/
├── dags/
│ ├── etl_vendas.py
│ ├── etl_clientes.py
│ └── utils/
│ └── helpers.py
├── plugins/
│ └── custom_operators.py
├── include/
│ └── sql/
│ └── queries.sql
├── tests/
│ └── test_dags.py
├── docker-compose.yaml
└── requirements.txt
Testes¶
import pytest
from airflow.models import DagBag
@pytest.fixture
def dag_bag():
return DagBag(include_examples=False)
def test_dag_loaded(dag_bag):
assert "etl_vendas" in dag_bag.dags
assert dag_bag.import_errors == {}
def test_dag_structure(dag_bag):
dag = dag_bag.dags["etl_vendas"]
assert len(dag.tasks) == 3
assert "extract" in [t.task_id for t in dag.tasks]
CLI útil¶
# Listar DAGs
airflow dags list
# Testar task
airflow tasks test dag_id task_id 2025-01-01
# Executar DAG
airflow dags trigger dag_id
# Backfill
airflow dags backfill dag_id --start-date 2025-01-01 --end-date 2025-01-31
# Pausar/despausar
airflow dags pause dag_id
airflow dags unpause dag_id
# Ver logs
airflow tasks logs dag_id task_id 2025-01-01
Docker Compose (desenvolvimento)¶
version: '3'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
image: apache/airflow:2.8.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: webserver
airflow-scheduler:
image: apache/airflow:2.8.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
command: scheduler
Executors¶
| Executor | Uso |
|---|---|
SequentialExecutor |
Desenvolvimento (1 task por vez) |
LocalExecutor |
Produção pequena (multiprocess) |
CeleryExecutor |
Produção distribuída (workers) |
KubernetesExecutor |
Kubernetes (pod por task) |
Managed services¶
- AWS MWAA: Amazon Managed Workflows for Apache Airflow
- GCP Cloud Composer: Airflow gerenciado no Google Cloud
- Astronomer: Plataforma Airflow gerenciada