偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Apache Airflow 筆記:連接外部數(shù)據(jù)源與 Secrets 管理

開發(fā) 開發(fā)工具
千萬不要把 Airflow 當成 Spark、Hadoop 這樣的計算引擎。重計算任務應交給專業(yè)平臺(如 Google Dataproc、Amazon EMR、Spark/Hadoop),Airflow 的職責是調度和自動化。

在數(shù)據(jù)工程的實際工作中,Apache Airflow 最強大的能力之一就是它的 外部連接(Connections)與 Hooks 機制。Airflow 不僅能與數(shù)據(jù)庫、API、云存儲等外部資源對接,還能觸發(fā)數(shù)據(jù)轉化工具(如 dbt)或機器學習工具(如 TensorFlow)。本篇筆記將系統(tǒng)梳理 Airflow 在外部連接與 Secrets 管理上的關鍵點,并給出實操示例和最佳實踐。

1. 為什么 Airflow 需要連接外部資源?

Airflow 本質是一個 工作流編排器(orchestrator),它并不適合承擔重計算任務,而是負責調度與協(xié)調。外部連接的意義主要體現(xiàn)在:

數(shù)據(jù)抽取:從數(shù)據(jù)庫提取數(shù)據(jù),進入數(shù)據(jù)倉庫、數(shù)據(jù)湖或數(shù)據(jù)集市。

調用 API:獲取數(shù)據(jù)或觸發(fā)模型訓練。

發(fā)送通知:通過 Slack、Teams、郵件等通知團隊任務執(zhí)行結果。

?? 注意:千萬不要把 Airflow 當成 Spark、Hadoop 這樣的計算引擎。重計算任務應交給專業(yè)平臺(如 Google Dataproc、Amazon EMR、Spark/Hadoop),Airflow 的職責是調度和自動化。

2. Airflow 中的連接存儲位置

Airflow 的連接信息(憑證、參數(shù)等)可以存儲在以下位置:

1)Metadata 數(shù)據(jù)庫

? 可以在 Airflow UI → Admin → Connections 添加。

? 適合快速入門,但安全性一般,且不易擴展。

? 密碼等敏感信息會被 Fernet key 加密存儲。

2)環(huán)境變量(Environment Variables)

? 命名規(guī)則:

    連接:AIRFLOW_CONN_<CONN_ID>

    變量:AIRFLOW_VAR_<VAR_NAME>

  ? 示例(MySQL 連接串):

AIRFLOW_CONN_MYSQLDB=mysql://user:password@host:3306/database

   ? 簡單易用,但安全性低,僅適合小型測試環(huán)境。

3)外部 Secrets 管理服務

? 如 AWS Secrets Manager、HashiCorp Vault、GCP Secrets Manager。

? 優(yōu)點:安全性高、支持自動輪換、審計、跨環(huán)境同步。

? 缺點:需要額外配置和成本。

? 在 Airflow 中需配置:

AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend

?? 對比總結

? 小規(guī)模測試 → 環(huán)境變量 / Metadata DB

? 生產(chǎn)環(huán)境 / 多團隊協(xié)作 → 外部 Secrets 管理服務

3. 如何創(chuàng)建連接?

3.1 通過 Airflow UI

進入 Admin → Connections,點擊 “+” 新增:

Connection Id:連接標識(如 slack_id、postgres_default

Connection Type:下拉選擇(HTTP、FTP、AWS、GCP 等)

描述與相關字段(根據(jù)類型動態(tài)變化)

保存后即可在 DAG 中通過 BaseHook.get_connection("conn_id") 調用。

3.2 通過 CLI

Airflow CLI 提供了 JSON、URI 等多種方式添加連接:

airflow connections add 'my_connection_db' \
  --conn-json '{
      "conn_type": "HTTP",
      "login": "admin",
      "password": "password",
      "host": "example-host",
      "port": 3306,
      "schema": "my-schema"
  }'

也可以導出/遷移:

airflow connections export connections.json
airflow connections export /tmp/connections --file-format yaml

4. 測試連接

4.1 UI/CLI 內置測試

? 需在 airflow.cfg 中開啟:

AIRFLOW__CORE__TEST_CONNECTION = Enabled

? 在 UI 或 CLI 創(chuàng)建/編輯連接時,可點擊 Test Connection。

?? 但此功能 不支持 環(huán)境變量和外部 Secrets。

4.2 通過 Dummy DAG 測試

對于環(huán)境變量或 Secrets,可編寫一個簡單的 DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import os

def test_connection():
    conn_string = os.getenv("AIRFLOW_CONN_MYDB")
    print(f"Connection string: {conn_string}")

dag = DAG(
    'test_env_var_connection',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
)

PythonOperator(
    task_id='test_connection',
    python_callable=test_connection,
    dag=dag,
)

運行后檢查日志,確認連接是否正常。

5. Secrets 管理的新特性:Secrets Cache

Airflow 2.7 引入了 Secrets Cache,用于緩存從外部 Secrets Manager 獲取的數(shù)據(jù)。

? 解決網(wǎng)絡調用耗時問題(每次取 Secret 可能 >100ms)。

? 在 DAG 解析時緩存,能降低延遲和成本。

? 默認關閉,需在配置中啟用。

適合 DAG 數(shù)量龐大、頻繁取 Secret 的場景。

6. 通知與告警:郵件 & Slack

6.1 郵件通知

配置 airflow.cfg 的 SMTP:

[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = EMAIL_ADDRESS
smtp_password = YOUR_APP_PASSWORD
smtp_port = 587
smtp_mail_from = EMAIL_ADDRESS

示例 DAG:

from airflow.operators.email import EmailOperator
email = EmailOperator(
    task_id='email_alert',
    to='smtp_user',
    subject='Email Alert',
    html_content="<h3>Email Test</h3>",
)

6.2 Slack Webhook

1) 在 Slack 注冊 App → 創(chuàng)建 Incoming Webhook → 獲取 URL

https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXX

2) 在 Airflow 創(chuàng)建連接:

airflow connections add 'slack' \
  --conn-json '{"conn_type":"HTTP","password":"T00000000/B00000000/XXXXXXXXXXXXXXXX"}'

3)定義回調函數(shù):

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.hooks.base import BaseHook

def success_callback(context):
    slack_webhook_token = BaseHook.get_connection("slack").password
    log_url = context["task_instance"].log_url
    msg = f"""
    ? Task succeeded
    *Task*: {context['task_instance'].task_id}
    *Dag*: {context['task_instance'].dag_id}
    <{log_url}|Log URL>
    """
    SlackWebhookOperator(
        task_id="slack_notify",
        http_conn_id="slack",
        webhook_token=slack_webhook_token,
        message=msg,
        username="airflow"
    ).execute(context=context)

當任務成功時,將自動發(fā)送 Slack 消息。

7. 最佳實踐總結

Airflow 只做調度,不做計算 → 計算任務交給 Spark/Hadoop/云服務。

小型環(huán)境 → 環(huán)境變量 / Metadata DB,簡單快捷。

生產(chǎn)環(huán)境 → 使用外部 Secrets Manager,保證安全、可擴展性。

定期檢查告警有效性 → 避免“垃圾告警”干擾團隊。

推薦配置通知 → 最少配置 Slack 或郵件,及時發(fā)現(xiàn)失敗任務。

對比表:

方法

優(yōu)點

缺點

環(huán)境變量

簡單易用,快速上手

安全性差,UI/CLI 不可見

Metadata 數(shù)據(jù)庫

UI/CLI 可管理,入門快

不安全,不適合大規(guī)模生產(chǎn)

外部 Secrets Manager

安全、可擴展、可審計

成本高,需額外配置

8. 總結

本篇筆記梳理了 Airflow 連接外部數(shù)據(jù)源的基本方法與 Secrets 管理實踐。

? 你學會了 如何在 UI/CLI 創(chuàng)建連接,如何 測試連接,以及 Secrets 的三種存儲方式。

? 我們還擴展了 郵件和 Slack 告警示例,為生產(chǎn)化落地提供參考。

?? 在下一步 ETL 管道構建中,這些連接和告警機制將成為 Airflow 高效穩(wěn)定運行的基石。

責任編輯:武曉燕 來源: 科學隨想錄
相關推薦

2012-06-17 13:04:45

2025-01-09 11:21:25

2009-12-23 09:55:23

ADO.NET數(shù)據(jù)源

2009-07-15 18:13:32

jndi jdbc

2022-08-04 08:00:54

安全管理服務器

2012-09-04 13:48:57

云計算數(shù)據(jù)源Odata API

2010-12-27 09:59:11

ODBC數(shù)據(jù)源

2009-06-15 13:24:46

JBoss數(shù)據(jù)源

2021-10-18 06:54:47

數(shù)據(jù)源數(shù)據(jù)預處理

2009-08-13 11:17:34

什么是ADO.NET

2023-10-31 07:52:53

多數(shù)據(jù)源管理后端

2025-09-26 02:50:00

2014-08-15 09:55:09

SSIS

2023-11-27 09:16:53

Python數(shù)據(jù)源類型

2017-09-04 14:52:51

Tomcat線程數(shù)據(jù)源

2022-01-12 17:39:16

Spring多租戶數(shù)據(jù)

2010-10-26 16:15:33

連接Oracle數(shù)據(jù)庫

2017-06-14 23:42:27

大數(shù)據(jù)數(shù)據(jù)源架構

2009-09-08 11:09:39

LINQ數(shù)據(jù)源

2009-09-15 17:15:33

Linq排序
點贊
收藏

51CTO技術棧公眾號