Apache Airflow 筆記:連接外部數(shù)據(jù)源與 Secrets 管理
在數(shù)據(jù)工程的實際工作中,Apache Airflow 最強大的能力之一就是它的 外部連接(Connections)與 Hooks 機制。Airflow 不僅能與數(shù)據(jù)庫、API、云存儲等外部資源對接,還能觸發(fā)數(shù)據(jù)轉化工具(如 dbt)或機器學習工具(如 TensorFlow)。本篇筆記將系統(tǒng)梳理 Airflow 在外部連接與 Secrets 管理上的關鍵點,并給出實操示例和最佳實踐。
1. 為什么 Airflow 需要連接外部資源?
Airflow 本質是一個 工作流編排器(orchestrator),它并不適合承擔重計算任務,而是負責調度與協(xié)調。外部連接的意義主要體現(xiàn)在:
? 數(shù)據(jù)抽取:從數(shù)據(jù)庫提取數(shù)據(jù),進入數(shù)據(jù)倉庫、數(shù)據(jù)湖或數(shù)據(jù)集市。
? 調用 API:獲取數(shù)據(jù)或觸發(fā)模型訓練。
? 發(fā)送通知:通過 Slack、Teams、郵件等通知團隊任務執(zhí)行結果。
?? 注意:千萬不要把 Airflow 當成 Spark、Hadoop 這樣的計算引擎。重計算任務應交給專業(yè)平臺(如 Google Dataproc、Amazon EMR、Spark/Hadoop),Airflow 的職責是調度和自動化。
2. Airflow 中的連接存儲位置
Airflow 的連接信息(憑證、參數(shù)等)可以存儲在以下位置:
1)Metadata 數(shù)據(jù)庫
? 可以在 Airflow UI → Admin → Connections 添加。
? 適合快速入門,但安全性一般,且不易擴展。
? 密碼等敏感信息會被 Fernet key 加密存儲。
2)環(huán)境變量(Environment Variables)
? 命名規(guī)則:
連接:AIRFLOW_CONN_<CONN_ID>
變量:AIRFLOW_VAR_<VAR_NAME>
? 示例(MySQL 連接串):
AIRFLOW_CONN_MYSQLDB=mysql://user:password@host:3306/database? 簡單易用,但安全性低,僅適合小型測試環(huán)境。
3)外部 Secrets 管理服務
? 如 AWS Secrets Manager、HashiCorp Vault、GCP Secrets Manager。
? 優(yōu)點:安全性高、支持自動輪換、審計、跨環(huán)境同步。
? 缺點:需要額外配置和成本。
? 在 Airflow 中需配置:
AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend?? 對比總結:
? 小規(guī)模測試 → 環(huán)境變量 / Metadata DB
? 生產(chǎn)環(huán)境 / 多團隊協(xié)作 → 外部 Secrets 管理服務
3. 如何創(chuàng)建連接?
3.1 通過 Airflow UI
進入 Admin → Connections,點擊 “+” 新增:
? Connection Id:連接標識(如 slack_id、postgres_default)
? Connection Type:下拉選擇(HTTP、FTP、AWS、GCP 等)
? 描述與相關字段(根據(jù)類型動態(tài)變化)
保存后即可在 DAG 中通過 BaseHook.get_connection("conn_id") 調用。
3.2 通過 CLI
Airflow CLI 提供了 JSON、URI 等多種方式添加連接:
airflow connections add 'my_connection_db' \
--conn-json '{
"conn_type": "HTTP",
"login": "admin",
"password": "password",
"host": "example-host",
"port": 3306,
"schema": "my-schema"
}'也可以導出/遷移:
airflow connections export connections.json
airflow connections export /tmp/connections --file-format yaml4. 測試連接
4.1 UI/CLI 內置測試
? 需在 airflow.cfg 中開啟:
AIRFLOW__CORE__TEST_CONNECTION = Enabled? 在 UI 或 CLI 創(chuàng)建/編輯連接時,可點擊 Test Connection。
?? 但此功能 不支持 環(huán)境變量和外部 Secrets。
4.2 通過 Dummy DAG 測試
對于環(huán)境變量或 Secrets,可編寫一個簡單的 DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import os
def test_connection():
conn_string = os.getenv("AIRFLOW_CONN_MYDB")
print(f"Connection string: {conn_string}")
dag = DAG(
'test_env_var_connection',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
)
PythonOperator(
task_id='test_connection',
python_callable=test_connection,
dag=dag,
)運行后檢查日志,確認連接是否正常。
5. Secrets 管理的新特性:Secrets Cache
Airflow 2.7 引入了 Secrets Cache,用于緩存從外部 Secrets Manager 獲取的數(shù)據(jù)。
? 解決網(wǎng)絡調用耗時問題(每次取 Secret 可能 >100ms)。
? 在 DAG 解析時緩存,能降低延遲和成本。
? 默認關閉,需在配置中啟用。
適合 DAG 數(shù)量龐大、頻繁取 Secret 的場景。
6. 通知與告警:郵件 & Slack
6.1 郵件通知
配置 airflow.cfg 的 SMTP:
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = EMAIL_ADDRESS
smtp_password = YOUR_APP_PASSWORD
smtp_port = 587
smtp_mail_from = EMAIL_ADDRESS示例 DAG:
from airflow.operators.email import EmailOperator
email = EmailOperator(
task_id='email_alert',
to='smtp_user',
subject='Email Alert',
html_content="<h3>Email Test</h3>",
)6.2 Slack Webhook
1) 在 Slack 注冊 App → 創(chuàng)建 Incoming Webhook → 獲取 URL
https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXX2) 在 Airflow 創(chuàng)建連接:
airflow connections add 'slack' \
--conn-json '{"conn_type":"HTTP","password":"T00000000/B00000000/XXXXXXXXXXXXXXXX"}'3)定義回調函數(shù):
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.hooks.base import BaseHook
def success_callback(context):
slack_webhook_token = BaseHook.get_connection("slack").password
log_url = context["task_instance"].log_url
msg = f"""
? Task succeeded
*Task*: {context['task_instance'].task_id}
*Dag*: {context['task_instance'].dag_id}
<{log_url}|Log URL>
"""
SlackWebhookOperator(
task_id="slack_notify",
http_conn_id="slack",
webhook_token=slack_webhook_token,
message=msg,
username="airflow"
).execute(context=context)當任務成功時,將自動發(fā)送 Slack 消息。
7. 最佳實踐總結
? Airflow 只做調度,不做計算 → 計算任務交給 Spark/Hadoop/云服務。
? 小型環(huán)境 → 環(huán)境變量 / Metadata DB,簡單快捷。
? 生產(chǎn)環(huán)境 → 使用外部 Secrets Manager,保證安全、可擴展性。
? 定期檢查告警有效性 → 避免“垃圾告警”干擾團隊。
? 推薦配置通知 → 最少配置 Slack 或郵件,及時發(fā)現(xiàn)失敗任務。
對比表:
方法 | 優(yōu)點 | 缺點 |
環(huán)境變量 | 簡單易用,快速上手 | 安全性差,UI/CLI 不可見 |
Metadata 數(shù)據(jù)庫 | UI/CLI 可管理,入門快 | 不安全,不適合大規(guī)模生產(chǎn) |
外部 Secrets Manager | 安全、可擴展、可審計 | 成本高,需額外配置 |
8. 總結
本篇筆記梳理了 Airflow 連接外部數(shù)據(jù)源的基本方法與 Secrets 管理實踐。
? 你學會了 如何在 UI/CLI 創(chuàng)建連接,如何 測試連接,以及 Secrets 的三種存儲方式。
? 我們還擴展了 郵件和 Slack 告警示例,為生產(chǎn)化落地提供參考。
?? 在下一步 ETL 管道構建中,這些連接和告警機制將成為 Airflow 高效穩(wěn)定運行的基石。


























