偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

基于 Kubernetes 的湖倉(cāng)一體高可用架構(gòu)部署指南

大數(shù)據(jù) 數(shù)據(jù)湖
通過(guò)本指南的部署方案,可以構(gòu)建一個(gè)穩(wěn)定、高效、可擴(kuò)展的湖倉(cāng)一體高可用架構(gòu),滿足現(xiàn)代數(shù)據(jù)平臺(tái)的各種需求。

一、架構(gòu)概述

本指南將詳細(xì)介紹如何在Kubernetes環(huán)境中部署一套完整的湖倉(cāng)一體高可用架構(gòu),該架構(gòu)整合了以下核心組件:

  • CentOS 8.5: 作為基礎(chǔ)操作系統(tǒng)環(huán)境
  • Kafka 3.0: 分布式消息隊(duì)列,實(shí)現(xiàn)數(shù)據(jù)接入和流轉(zhuǎn)
  • Flink 1.18: 實(shí)時(shí)流處理引擎,負(fù)責(zé)數(shù)據(jù)轉(zhuǎn)換和計(jì)算
  • Paimon 0.7: 開(kāi)放數(shù)據(jù)湖存儲(chǔ)格式,提供統(tǒng)一的湖存儲(chǔ)層
  • Doris 2.1.6: MPP分析型數(shù)據(jù)庫(kù),作為高性能數(shù)據(jù)倉(cāng)庫(kù)

架構(gòu)設(shè)計(jì)原則:

  • 高可用性: 所有組件均采用多副本部署,避免單點(diǎn)故障
  • 彈性擴(kuò)展: 基于Kubernetes的自動(dòng)伸縮能力,根據(jù)負(fù)載動(dòng)態(tài)調(diào)整資源
  • 數(shù)據(jù)一致性: 通過(guò)事務(wù)機(jī)制和副本同步確保數(shù)據(jù)一致性
  • 統(tǒng)一元數(shù)據(jù): 使用Hive Metastore作為統(tǒng)一元數(shù)據(jù)管理中心
  • 資源隔離: 通過(guò)Kubernetes命名空間實(shí)現(xiàn)多租戶資源隔離

二、環(huán)境準(zhǔn)備

1. Kubernetes集群要求

(1) Kubernetes版本: 1.23+

(2) 節(jié)點(diǎn)數(shù)量: 至少5個(gè)節(jié)點(diǎn)(3個(gè)master,2個(gè)worker)

(3) 節(jié)點(diǎn)配置:

  • Master節(jié)點(diǎn): 4核CPU, 8GB內(nèi)存, 50GB存儲(chǔ)
  • Worker節(jié)點(diǎn): 8核CPU, 32GB內(nèi)存, 200GB存儲(chǔ)

(4) 網(wǎng)絡(luò)插件: Calico或Flannel

(5) 存儲(chǔ)插件: 支持動(dòng)態(tài)卷供給(如NFS、CephFS或云存儲(chǔ))

2. 基礎(chǔ)軟件安裝

在所有節(jié)點(diǎn)上安裝基礎(chǔ)軟件:

# 更新系統(tǒng)
sudo dnf update -y

# 安裝必要工具
sudo dnf install -y git wget curl vim net-tools

# 安裝Docker(容器運(yùn)行時(shí))
sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo dnf install -y docker-ce docker-ce-cli containerd.io
sudo systemctl enable --now docker

# 配置Docker鏡像加速
sudomkdir -p /etc/docker
sudotee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://mirror.ccs.tencentyun.com"],
"exec-opts": ["native.cgroupdriver=systemd"],
"log-driver": "json-file",
"log-opts": {
    "max-size": "100m"
  },
"storage-driver": "overlay2"
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

3. Kubernetes集群初始化

在Master節(jié)點(diǎn)執(zhí)行:

# 初始化Kubernetes集群
sudo kubeadm init --pod-network-cidr=10.244.0.0/16 --control-plane-endpoint=k8s-master

# 配置kubectl
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

# 安裝網(wǎng)絡(luò)插件
kubectl apply -f https://raw.githubusercontent.com/flannel-io/flannel/master/Documentation/kube-flannel.yml

# 允許master節(jié)點(diǎn)調(diào)度pod(用于測(cè)試環(huán)境)
kubectl taint nodes --all node-role.kubernetes.io/control-plane-

4. Helm包管理器安裝

# 下載并安裝Helm
wget https://get.helm.sh/helm-v3.10.0-linux-amd64.tar.gz
tar -zxvf helm-v3.10.0-linux-amd64.tar.gz
sudo mv linux-amd64/helm /usr/local/bin/helm

5. 創(chuàng)建命名空間

kubectl create namespace lakehouse
kubectl create namespace monitoring

三、存儲(chǔ)系統(tǒng)部署

1. NFS存儲(chǔ)部署

# 在所有節(jié)點(diǎn)安裝NFS客戶端
sudo dnf install -y nfs-utils

# 創(chuàng)建NFS服務(wù)器(在獨(dú)立節(jié)點(diǎn)或使用云存儲(chǔ))
sudomkdir -p /data/nfs
sudochmod -R 777 /data/nfs
echo"/data/nfs *(rw,sync,no_root_squash)" | sudotee -a /etc/exports
sudo exportfs -a
sudo systemctl enable --now nfs-server

# 創(chuàng)建StorageClass
cat <<EOF | kubectl apply -f -
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: nfs-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
EOF

# 創(chuàng)建PV和PVC
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-pv
spec:
  capacity:
    storage: 100Gi
  accessModes:
    - ReadWriteMany
  nfs:
    server: <nfs-server-ip>
    path: "/data/nfs"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: nfs-pvc
  namespace: lakehouse
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nfs-storage
  resources:
    requests:
      storage: 100Gi
EOF

2. HDFS部署(可選)

如果使用HDFS作為Paimon的底層存儲(chǔ):

# 使用Helm部署HDFS
helm repo add apache https://apache.github.io/hadoop-charts
helm install hdfs apache/hadoop --namespace lakehouse \
  --set core.service.type=NodePort \
  --set hdfs.nameNode.replicas=3 \
  --set hdfs.dataNode.replicas=3 \
  --set persistence.enabled=true \
  --set persistence.storageClass=nfs-storage \
  --set persistence.size=100Gi

四、消息隊(duì)列Kafka部署

1. ZooKeeper部署

# 創(chuàng)建ZooKeeper配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-service
  namespace: lakehouse
  labels:
    app: zookeeper
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  clusterIP: None
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: lakehouse
spec:
  serviceName: zookeeper-service
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: confluentinc/cp-zookeeper:7.3.0
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: follower
        - containerPort: 3888
          name: leader
        env:
        - name: ZOOKEEPER_CLIENT_PORT
          value: "2181"
        - name: ZOOKEEPER_TICK_TIME
          value: "2000"
        - name: ZOOKEEPER_INIT_LIMIT
          value: "5"
        - name: ZOOKEEPER_SYNC_LIMIT
          value: "2"
        - name: ZOOKEEPER_SERVERS
          value: "zookeeper-0.zookeeper-service:2888:3888;zookeeper-1.zookeeper-service:2888:3888;zookeeper-2.zookeeper-service:2888:3888"
        volumeMounts:
        - name: data
          mountPath: /var/lib/zookeeper/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: nfs-storage
      resources:
        requests:
          storage: 10Gi
EOF

2. Kafka集群部署

# 創(chuàng)建Kafka配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: lakehouse
  labels:
    app: kafka
spec:
  ports:
  - name: kafka
    port: 9092
    protocol: TCP
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: lakehouse
spec:
  serviceName: kafka-service
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:7.3.0
        ports:
        - containerPort: 9092
          name: kafka
        env:
        - name: KAFKA_BROKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: "zookeeper-0.zookeeper-service:2181,zookeeper-1.zookeeper-service:2181,zookeeper-2.zookeeper-service:2181"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://kafka-0.kafka-service:9092,kafka-1.kafka-service:9092,kafka-2.kafka-service:9092"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
          value: "2"
        - name: KAFKA_DEFAULT_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_MIN_IN_SYNC_REPLICAS
          value: "2"
        volumeMounts:
        - name: data
          mountPath: /var/lib/kafka/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: nfs-storage
      resources:
        requests:
          storage: 100Gi
EOF

3. Kafka主題創(chuàng)建

# 創(chuàng)建Kafka客戶端Pod
kubectl run kafka-client --rm -ti --image confluentinc/cp-kafka:7.3.0 --namespace lakehouse -- bash

# 在Pod內(nèi)執(zhí)行創(chuàng)建主題命令
kafka-topics --create --topic orders --bootstrap-server kafka-service:9092 --partitions 3 --replication-factor 3
kafka-topics --create --topic payments --bootstrap-server kafka-service:9092 --partitions 3 --replication-factor 3
kafka-topics --create --topic inventory --bootstrap-server kafka-service:9092 --partitions 3 --replication-factor 3

五、實(shí)時(shí)計(jì)算Flink部署

1. Flink集群配置

# 創(chuàng)建Flink配置文件
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: lakehouse
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.memory.process.size: 3200m
    taskmanager.memory.process.size: 4096m
    taskmanager.numberOfTaskSlots: 4
    parallelism.default: 3
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.cluster-id: flink-cluster
    high-availability.storageDir: file:///flink/ha/
    rest.port: 8081
    kubernetes.rest-service.exposed.type: NodePort
    kubernetes.jobmanager.cpu: 1.0
    kubernetes.taskmanager.cpu: 2.0
    kubernetes.container.image: flink:1.18.0-scala_2.12-java11
    kubernetes.namespace: lakehouse
    state.backend: rocksdb
    state.checkpoints.dir: file:///flink/checkpoints
    state.savepoints.dir: file:///flink/savepoints
    execution.checkpointing.interval: 30s
    execution.checkpointing.timeout: 10min
    execution.checkpointing.min-pause: 5s
    execution.checkpointing.max-concurrent-checkpoints: 1
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    execution.checkpointing.unaligned: true
    execution.checkpointing.alignment-timeout: 30s
    execution.checkpointing.tolerable-failed-checkpoints: 3
    execution.checkpointing.mode: EXACTLY_ONCE
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 3
    restart-strategy.fixed-delay.delay: 10s
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9999
    metrics.reporter.prom.interval: 30 SECONDS
  log4j-console.properties: |
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
EOF

2. Flink JobManager部署

# 創(chuàng)建Flink JobManager服務(wù)
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: lakehouse
spec:
  type: NodePort
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
    targetPort: 8081
    nodePort: 30081
  - name: prom
    port: 9999
  selector:
    app: flink
    component: jobmanager
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: lakehouse
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.18.0-scala_2.12-java11
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
        - containerPort: 6124
        - containerPort: 8081
        - containerPort: 9999
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: flink-storage
          mountPath: /flink
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-storage
        persistentVolumeClaim:
          claimName: nfs-pvc
EOF

3. Flink TaskManager部署

# 創(chuàng)建Flink TaskManager
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: lakehouse
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.18.0-scala_2.12-java11
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
        - containerPort: 9999
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: flink-storage
          mountPath: /flink
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-storage
        persistentVolumeClaim:
          claimName: nfs-pvc
EOF

4. Flink SQL作業(yè)提交

# 創(chuàng)建Flink SQL作業(yè)配置
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
  name: flink-sql-job
  namespace: lakehouse
spec:
  template:
    spec:
      containers:
      - name: flink-job
        image: flink:1.18.0-scala_2.12-java11
        command: ["/bin/sh", "-c"]
        args:
          - |
            ./bin/flink run \
            -d \
            -m flink-jobmanager:8081 \
            -p 3 \
            -ynm LakehouseJob \
            -yD containerized.master.env.ENV_KAFKA_BOOTSTRAP="kafka-service:9092" \
            -yD containerized.taskmanager.env.ENV_KAFKA_BOOTSTRAP="kafka-service:9092" \
            -yD containerized.master.env.ENV_PAIMON_WAREHOUSE="/flink/paimon" \
            -yD containerized.taskmanager.env.ENV_PAIMON_WAREHOUSE="/flink/paimon" \
            ./examples/sql/StreamSQLJob.jar \
            --sql-file /flink/sql/lakehouse.sql
        volumeMounts:
        - name: flink-sql
          mountPath: /flink/sql
        - name: flink-storage
          mountPath: /flink
      restartPolicy: OnFailure
      volumes:
      - name: flink-sql
        configMap:
          name: flink-sql-config
      - name: flink-storage
        persistentVolumeClaim:
          claimName: nfs-pvc
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-sql-config
  namespace: lakehouse
data:
  lakehouse.sql: |
    -- 創(chuàng)建Kafka表
    CREATE TABLE orders (
      order_id STRING,
      customer_id STRING,
      product_id STRING,
      quantity INT,
      unit_price DECIMAL(10,2),
      order_time TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'orders',
      'properties.bootstrap.servers' = '\${ENV_KAFKA_BOOTSTRAP}',
      'properties.group.id' = 'flink-consumer',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset'
    );

    -- 創(chuàng)建Paimon表
    CREATE TABLE orders_paimon (
      order_id STRING,
      customer_id STRING,
      product_id STRING,
      quantity INT,
      unit_price DECIMAL(10,2),
      order_time TIMESTAMP(3),
      PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'warehouse' = 'file://\${ENV_PAIMON_WAREHOUSE}',
      'table' = 'orders',
      'file.format' = 'orc',
      'write.buffer-size' = '256MB',
      'write.buffer-spillable' = 'true',
      'write-only' = 'false',
      'merge-engine' = 'deduplicate',
      'changelog-producer' = 'input',
      'full-compaction.delta-commits' = '5'
    );

    -- 數(shù)據(jù)處理和寫入
    INSERT INTO orders_paimon
    SELECT 
      order_id,
      customer_id,
      product_id,
      quantity,
      unit_price,
      order_time
    FROM orders;
EOF

六、數(shù)據(jù)湖Paimon集成

1. Paimon配置

# 創(chuàng)建Paimon配置文件
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: paimon-config
  namespace: lakehouse
data:
  catalog.properties: |
    metastore=hive
    uri=thrift://hive-metastore:9083
    warehouse=/flink/paimon
  table.properties: |
    file.format=orc
    write.buffer-size=256MB
    write.buffer-spillable=true
    write-only=false
    merge-engine=deduplicate
    changelog-producer=input
    full-compaction.delta-commits=5
    snapshot.num-retain.min=10
    snapshot.num-retain.max=100
    snapshot.time-retain=7d
    bucket=4
EOF

2. Hive Metastore部署

# 部署Hive Metastore
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hive-metastore
  namespace: lakehouse
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hive-metastore
  template:
    metadata:
      labels:
        app: hive-metastore
    spec:
      containers:
      - name: metastore
        image: apache/hive:3.1.3
        env:
        - name: SERVICE_NAME
          value: metastore
        - name: DB_DRIVER
          value: org.postgresql.Driver
        - name: DB_URL
          value: jdbc:postgresql://postgres:5432/metastore
        - name: DB_USER
          value: hive
        - name: DB_PASSWORD
          value: hive
        ports:
        - containerPort: 9083
        volumeMounts:
        - name: hive-config
          mountPath: /opt/hive/conf
      volumes:
      - name: hive-config
        configMap:
          name: hive-config
---
apiVersion: v1
kind: Service
metadata:
  name: hive-metastore
  namespace: lakehouse
spec:
  ports:
  - port: 9083
    targetPort: 9083
  selector:
    app: hive-metastore
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: hive-config
  namespace: lakehouse
data:
  hive-site.xml: |
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
      <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:postgresql://postgres:5432/metastore</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>org.postgresql.Driver</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hive</value>
      </property>
      <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/flink/paimon</value>
      </property>
      <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hive-metastore:9083</value>
      </property>
      <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
      </property>
    </configuration>
EOF

3. PostgreSQL部署(Hive Metastore后端)

# 部署PostgreSQL
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
  namespace: lakehouse
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:13
        env:
        - name: POSTGRES_DB
          value: metastore
        - name: POSTGRES_USER
          value: hive
        - name: POSTGRES_PASSWORD
          value: hive
        ports:
        - containerPort: 5432
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: postgres
  namespace: lakehouse
spec:
  ports:
  - port: 5432
  selector:
    app: postgres
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pvc
  namespace: lakehouse
spec:
  accessModes:
    - ReadWriteOnce
  storageClassName: nfs-storage
  resources:
    requests:
      storage: 20Gi
EOF

七、數(shù)據(jù)倉(cāng)庫(kù)Doris部署

1. Doris FE部署

# 創(chuàng)建Doris FE配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: doris-fe-config
  namespace: lakehouse
data:
  fe.conf: |
    meta_dir=/opt/doris/doris-meta
    http_port=8030
    rpc_port=9020
    query_port=9030
    edit_log_port=9010
    mysql_service_nic_enabled=true
    enable_fqdn_mode=false
    priority_networks=10.244.0.0/16
    cluster_name=doris-cluster
    metadata_failure_recovery=true
    heartbeat_timeout_second=15
    elect_num=3
    ignore_meta_check=false
    master_ip=10.244.0.10
    master_port=9010
    quorum_peer_limit=5
    txn_rollback_limit=100
    max_consumer_num_per_group=3
    async_checkpoint_task_thread_num=10
    metadata_checkpoint_interval_second=60
    edit_log_roll_num=50000
    sys_log_dir=/opt/doris/log
    sys_log_roll_mode=SIZE-MAX-AGE
    sys_log_roll_num_mb=1024
    sys_log_roll_interval=DAY
    sys_log_verbose_modules=*
    log_level=INFO
    audit_log_dir=/opt/doris/log/audit
    audit_log_modules=slow_query,query
    audit_log_roll_num_mb=1024
    audit_log_roll_interval=DAY
    qe_slow_log_ms=5000
EOF

# 創(chuàng)建Doris FE服務(wù)
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: doris-fe-service
  namespace: lakehouse
  labels:
    app: doris-fe
spec:
  ports:
  - name: http
    port: 8030
    targetPort: 8030
  - name: rpc
    port: 9020
    targetPort: 9020
  - name: query
    port: 9030
    targetPort: 9030
  - name: edit-log
    port: 9010
    targetPort: 9010
  clusterIP: None
  selector:
    app: doris-fe
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: doris-fe
  namespace: lakehouse
spec:
  serviceName: doris-fe-service
  replicas: 3
  selector:
    matchLabels:
      app: doris-fe
  template:
    metadata:
      labels:
        app: doris-fe
    spec:
      containers:
      - name: fe
        image: apache/doris:2.1.6-fe-x86_64
        ports:
        - containerPort: 8030
        - containerPort: 9020
        - containerPort: 9030
        - containerPort: 9010
        env:
        - name: FE_ROLE
          value: "observer"
        volumeMounts:
        - name: doris-fe-config
          mountPath: /opt/doris/conf/fe.conf
          subPath: fe.conf
        - name: doris-fe-meta
          mountPath: /opt/doris/doris-meta
        - name: doris-fe-log
          mountPath: /opt/doris/log
        readinessProbe:
          exec:
            command:
            - /bin/bash
            - -c
            - "mysql -h127.0.0.1 -P9030 -uroot -e 'SELECT 1'"
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
      volumes:
      - name: doris-fe-config
        configMap:
          name: doris-fe-config
  volumeClaimTemplates:
  - metadata:
      name: doris-fe-meta
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: nfs-storage
      resources:
        requests:
          storage: 10Gi
  - metadata:
      name: doris-fe-log
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: nfs-storage
      resources:
        requests:
          storage: 5Gi
EOF

2. Doris BE部署

# 創(chuàng)建Doris BE配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: doris-be-config
  namespace: lakehouse
data:
  be.conf: |
    be_port=9060
    webserver_port=8040
    heartbeat_service_port=9050
    brpc_port=8060
    storage_root_path=/opt/doris/storage
    sys_log_dir=/opt/doris/log
    sys_log_roll_mode=SIZE-MAX-AGE
    sys_log_roll_num_mb=1024
    sys_log_roll_interval=DAY
    sys_log_verbose_modules=*
    log_level=INFO
    be_http_port=8040
    be_rpc_port=9060
    brpc_port=8060
    heartbeat_service_port=9050
    be_socket_timeout_second=120
    be_thrift_client_timeout_second=60
    max_consumer_num_per_group=3
    async_checkpoint_task_thread_num=10
    metadata_checkpoint_interval_second=60
    edit_log_roll_num=50000
    priority_networks=10.244.0.0/16
    storage_root_path=/opt/doris/storage
    max_sys_log_file_num=100
    max_audit_log_file_num=100
    qe_slow_log_ms=5000
EOF

# 創(chuàng)建Doris BE服務(wù)
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: doris-be-service
  namespace: lakehouse
  labels:
    app: doris-be
spec:
  ports:
  - name: be
    port: 9060
    targetPort: 9060
  - name: web
    port: 8040
    targetPort: 8040
  - name: heartbeat
    port: 9050
    targetPort: 9050
  - name: brpc
    port: 8060
    targetPort: 8060
  clusterIP: None
  selector:
    app: doris-be
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: doris-be
  namespace: lakehouse
spec:
  serviceName: doris-be-service
  replicas: 3
  selector:
    matchLabels:
      app: doris-be
  template:
    metadata:
      labels:
        app: doris-be
    spec:
      containers:
      - name: be
        image: apache/doris:2.1.6-be-x86_64
        ports:
        - containerPort: 9060
        - containerPort: 8040
        - containerPort: 9050
        - containerPort: 8060
        env:
        - name: FE_ADDR
          value: "doris-fe-service:9030"
        volumeMounts:
        - name: doris-be-config
          mountPath: /opt/doris/conf/be.conf
          subPath: be.conf
        - name: doris-be-storage
          mountPath: /opt/doris/storage
        - name: doris-be-log
          mountPath: /opt/doris/log
        readinessProbe:
          exec:
            command:
            - /bin/bash
            - -c
            - "curl -s http://localhost:8040/api/health"
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
      volumes:
      - name: doris-be-config
        configMap:
          name: doris-be-config
  volumeClaimTemplates:
  - metadata:
      name: doris-be-storage
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: nfs-storage
      resources:
        requests:
          storage: 100Gi
  - metadata:
      name: doris-be-log
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: nfs-storage
      resources:
        requests:
          storage: 5Gi
EOF

3. Doris初始化

# 創(chuàng)建Doris初始化作業(yè)
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
  name: doris-init
  namespace: lakehouse
spec:
  template:
    spec:
      containers:
      - name: doris-init
        image: mysql:8.0
        command: ["/bin/sh", "-c"]
        args:
          - |
            until mysql -hdoris-fe-service -P9030 -uroot -e "SELECT 1"; do
              echo "Waiting for Doris FE to be ready..."
              sleep 5
            done
            
            # 創(chuàng)建數(shù)據(jù)庫(kù)
            mysql -hdoris-fe-service -P9030 -uroot -e "
            CREATE DATABASE IF NOT EXISTS lakehouse;
            USE lakehouse;
            
            -- 創(chuàng)建訂單表
            CREATE TABLE IF NOT EXISTS orders (
              order_id VARCHAR(100),
              customer_id VARCHAR(100),
              product_id VARCHAR(100),
              quantity INT,
              unit_price DECIMAL(10,2),
              order_time DATETIME,
              total_amount DECIMAL(12,2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
              PRIMARY KEY (order_id)
            ) DISTRIBUTED BY HASH(order_id) BUCKETS 10
            PROPERTIES (
              "replication_num" = "3",
              "storage_medium" = "SSD"
            );
            
            -- 創(chuàng)建客戶表
            CREATE TABLE IF NOT EXISTS customers (
              customer_id VARCHAR(100),
              customer_name VARCHAR(100),
              email VARCHAR(100),
              registration_date DATETIME,
              PRIMARY KEY (customer_id)
            ) DISTRIBUTED BY HASH(customer_id) BUCKETS 5
            PROPERTIES (
              "replication_num" = "3",
              "storage_medium" = "SSD"
            );
            
            -- 創(chuàng)建產(chǎn)品表
            CREATE TABLE IF NOT EXISTS products (
              product_id VARCHAR(100),
              product_name VARCHAR(200),
              category VARCHAR(100),
              price DECIMAL(10,2),
              PRIMARY KEY (product_id)
            ) DISTRIBUTED BY HASH(product_id) BUCKETS 5
            PROPERTIES (
              "replication_num" = "3",
              "storage_medium" = "SSD"
            );
            "
      restartPolicy: OnFailure
EOF

八、高可用配置

1. Kubernetes高可用配置

# 創(chuàng)建Kubernetes高可用配置
cat <<EOF | kubectl apply -f -
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: kafka-pdb
  namespace: lakehouse
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: kafka
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: zookeeper-pdb
  namespace: lakehouse
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: zookeeper
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: doris-fe-pdb
  namespace: lakehouse
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: doris-fe
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: doris-be-pdb
  namespace: lakehouse
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: doris-be
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: flink-taskmanager-pdb
  namespace: lakehouse
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
EOF

2. Flink高可用配置

Flink的高可用性已經(jīng)在之前的配置中通過(guò)以下參數(shù)實(shí)現(xiàn):

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.cluster-id: flink-cluster
high-availability.storageDir: file:///flink/ha/

3. Doris高可用配置

Doris的高可用性通過(guò)以下方式實(shí)現(xiàn):

  • FE高可用:部署3個(gè)FE節(jié)點(diǎn)(1個(gè)Master,2個(gè)Observer)
  • BE高可用:部署3個(gè)BE節(jié)點(diǎn),數(shù)據(jù)副本數(shù)為3
  • 元數(shù)據(jù)高可用:使用PostgreSQL作為元數(shù)據(jù)存儲(chǔ)
  • Kafka高可用配置

Kafka的高可用性通過(guò)以下方式實(shí)現(xiàn):

  • Broker高可用:部署3個(gè)Broker節(jié)點(diǎn)
  • 主題副本:所有主題副本數(shù)設(shè)置為3
  • ISR配置:設(shè)置min.insync.replicas=2

九、監(jiān)控與告警

1. Prometheus部署

# 部署Prometheus
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack \
  --namespace monitoring \
  --set prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false \
  --set grafana.adminPassword=admin123 \
  --set grafana.service.type=NodePort \
  --set grafana.service.nodePort=30080

2. 監(jiān)控配置

# 創(chuàng)建Kafka監(jiān)控配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-metrics
  namespace: lakehouse
data:
  kafka-metrics.yml: |
    lowercaseOutputName: true
    rules:
    - pattern: kafka.server<type=(.+), name=(.+)PerSec\w*, ><>Value
      name: kafka_server_$1_$2
      type: GAUGE
    - pattern: kafka.server<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_server_$1_$2
      labels:
        $3: "$4"
      type: GAUGE
EOF

# 創(chuàng)建Kafka ServiceMonitor
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kafka
  namespace: lakehouse
  labels:
    app: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  endpoints:
  - port: kafka
    interval: 30s
    path: /metrics
    scheme: http
    metricRelabelings:
    - sourceLabels: [__name__]
      regex: 'kafka_server_(.+)'
      targetLabel: __name__
      replacement: 'kafka_$1'
EOF

# 創(chuàng)建Flink ServiceMonitor
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: flink
  namespace: lakehouse
  labels:
    app: flink
spec:
  selector:
    matchLabels:
      app: flink
  endpoints:
  - port: prom
    interval: 30s
    path: /metrics
    scheme: http
EOF

# 創(chuàng)建Doris ServiceMonitor
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: doris
  namespace: lakehouse
  labels:
    app: doris
spec:
  selector:
    matchLabels:
      app: doris
  endpoints:
  - port: web
    interval: 30s
    path: /metrics
    scheme: http
EOF

3. 告警規(guī)則

# 創(chuàng)建告警規(guī)則
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: lakehouse-alerts
  namespace: monitoring
spec:
  groups:
  - name: lakehouse.rules
    rules:
    - alert: KafkaDown
      expr: up{job="kafka"} == 0
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "Kafka instance down"
        description: "Kafka instance {{ \$labels.instance }} has been down for more than 5 minutes"
    
    - alert: FlinkJobManagerDown
      expr: up{job="flink", component="jobmanager"} == 0
      for: 2m
      labels:
        severity: critical
      annotations:
        summary: "Flink JobManager down"
        description: "Flink JobManager {{ \$labels.instance }} has been down for more than 2 minutes"
    
    - alert: DorisFeDown
      expr: up{job="doris", component="fe"} == 0
      for: 2m
      labels:
        severity: critical
      annotations:
        summary: "Doris FE down"
        description: "Doris FE {{ \$labels.instance }} has been down for more than 2 minutes"
    
    - alert: DorisBeDown
      expr: up{job="doris", component="be"} == 0
      for: 2m
      labels:
        severity: critical
      annotations:
        summary: "Doris BE down"
        description: "Doris BE {{ \$labels.instance }} has been down for more than 2 minutes"
    
    - alert: HighKafkaConsumerLag
      expr: kafka_consumergroup_lag > 10000
      for: 10m
      labels:
        severity: warning
      annotations:
        summary: "High Kafka consumer lag"
        description: "Kafka consumer group {{ \$labels.consumergroup }} has lag of {{ \$value }} messages"
EOF

十、數(shù)據(jù)流轉(zhuǎn)驗(yàn)證

1. 數(shù)據(jù)生產(chǎn)者模擬

# 創(chuàng)建Kafka數(shù)據(jù)生產(chǎn)者
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
  name: kafka-producer
  namespace: lakehouse
spec:
  template:
    spec:
      containers:
      - name: producer
        image: confluentinc/cp-kafka:7.3.0
        command: ["/bin/sh", "-c"]
        args:
          - |
            # 生成訂單數(shù)據(jù)
            for i in {1..1000}; do
              order_id="order-$(date +%s%N | cut -c1-13)-$i"
              customer_id="customer-$(($RANDOM % 100))"
              product_id="product-$(($RANDOM % 50))"
              quantity=$(($RANDOM % 10 + 1))
              unit_price=$(echo "scale=2; $RANDOM/100 + 10" | bc)
              order_time=$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")
              
              echo "{\"order_id\":\"$order_id\",\"customer_id\":\"$customer_id\",\"product_id\":\"$product_id\",\"quantity\":$quantity,\"unit_price\":$unit_price,\"order_time\":\"$order_time\"}" | \
              kafka-console-producer --broker-list kafka-service:9092 --topic orders
              
              sleep 0.1
            done
      restartPolicy: Never
EOF

2. 數(shù)據(jù)驗(yàn)證

# 驗(yàn)證Paimon中的數(shù)據(jù)
kubectl exec -it flink-taskmanager-0 -n lakehouse -- /bin/bash

# 在Flink TaskManager中執(zhí)行
./bin/flink run \
  -d \
  -m flink-jobmanager:8081 \
  -p 1 \
  -ynm PaimonValidation \
  ./examples/sql/StreamSQLJob.jar \
  --sql-file /flink/sql/validate.sql

# 創(chuàng)建驗(yàn)證SQL配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-validate-config
  namespace: lakehouse
data:
  validate.sql: |
    -- 查詢Paimon表中的數(shù)據(jù)
    CREATE TABLE paimon_orders (
      order_id STRING,
      customer_id STRING,
      product_id STRING,
      quantity INT,
      unit_price DECIMAL(10,2),
      order_time TIMESTAMP(3),
      PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'warehouse' = 'file:///flink/paimon',
      'table' = 'orders',
      'scan.snapshot-id' = 'latest'
    );

    -- 打印查詢結(jié)果
    SELECT * FROM paimon_orders LIMIT 10;
EOF

3. Doris數(shù)據(jù)同步

# 創(chuàng)建Doris數(shù)據(jù)同步作業(yè)
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
  name: doris-sync
  namespace: lakehouse
spec:
  template:
    spec:
      containers:
      - name: sync
        image: mysql:8.0
        command: ["/bin/sh", "-c"]
        args:
          - |
            # 等待Paimon數(shù)據(jù)生成
            sleep 60
            
            # 從Paimon讀取數(shù)據(jù)并寫入Doris
            mysql -hdoris-fe-service -P9030 -uroot -e "
            USE lakehouse;
            
            -- 創(chuàng)建臨時(shí)表
            CREATE TABLE temp_orders LIKE orders;
            
            -- 從Paimon加載數(shù)據(jù)到Doris
            LOAD LABEL lakehouse.orders_paimon
            (
              DATA INFILE('/flink/paimon/orders/data/*.orc')
              INTO TABLE temp_orders
              FORMAT AS 'orc'
              (order_id, customer_id, product_id, quantity, unit_price, order_time)
              SET (total_amount = quantity * unit_price)
            )
            WITH BROKER 'broker_name'
            PROPERTIES (
              'timeout' = '3600',
              'max_filter_ratio' = '0.1'
            );
            
            -- 合并數(shù)據(jù)到主表
            INSERT INTO orders
            SELECT * FROM temp_orders
            ON DUPLICATE KEY UPDATE
              customer_id = VALUES(customer_id),
              product_id = VALUES(product_id),
              quantity = VALUES(quantity),
              unit_price = VALUES(unit_price),
              order_time = VALUES(order_time),
              total_amount = VALUES(total_amount);
            
            -- 清理臨時(shí)表
            DROP TABLE temp_orders;
            "
      restartPolicy: OnFailure
EOF

4. 數(shù)據(jù)驗(yàn)證

# 驗(yàn)證Doris中的數(shù)據(jù)
kubectl exec -it doris-fe-0 -n lakehouse -- /bin/bash

# 在Doris FE中執(zhí)行
mysql -h127.0.0.1 -P9030 -uroot -e "
USE lakehouse;
SELECT COUNT(*) FROM orders;
SELECT * FROM orders ORDER BY order_time DESC LIMIT 10;
"

十一、性能優(yōu)化建議

1. Kafka性能優(yōu)化

# Kafka優(yōu)化配置
env:
-name:KAFKA_HEAP_OPTS
value:"-Xmx4G -Xms4G"
-name:KAFKA_JVM_PERFORMANCE_OPTS
value:"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
-name:KAFKA_LOG_RETENTION_HOURS
value:"168"
-name:KAFKA_LOG_SEGMENT_BYTES
value:"1073741824"
-name:KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS
value:"300000"
-name:KAFKA_NUM_PARTITIONS
value:"3"
-name:KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR
value:"4"
-name:KAFKA_SOCKET_SEND_BUFFER_BYTES
value:"1024000"
-name:KAFKA_SOCKET_RECEIVE_BUFFER_BYTES
value:"1024000"
-name:KAFKA_SOCKET_REQUEST_MAX_BYTES
value:"104857600"
-name:KAFKA_LOG_FLUSH_INTERVAL_MESSAGES
value:"10000"
-name:KAFKA_LOG_FLUSH_INTERVAL_MS
value: "1000"

2. Flink性能優(yōu)化

# Flink優(yōu)化配置
env:
-name:FLINK_JM_HEAP
value:"4096m"
-name:FLINK_TM_HEAP
value:"8192m"
-name:FLINK_TM_OFFHEAP
value:"2048m"
-name:TASK_MANAGER_NUMBER_OF_TASK_SLOTS
value:"8"
-name:PARALLELISM_DEFAULT
value:"12"
-name:REST_FLAMEGRAPH_ENABLED
value: "true"

3. Doris性能優(yōu)化

-- Doris表優(yōu)化
ALTER TABLE orders SET ("replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-01-01 00:00:00");
ALTER TABLE customers SET ("replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-01-01 00:00:00");
ALTER TABLE products SET ("replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-01-01 00:00:00");

-- 創(chuàng)建物化視圖
CREATE MATERIALIZED VIEW mv_customer_orders
DISTRIBUTED BY HASH(customer_id) BUCKETS 10
AS
SELECT
  customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS total_spent,
AVG(total_amount) AS avg_order_value,
MAX(order_time) AS last_order_date
FROM orders
GROUPBY customer_id;

-- 創(chuàng)建Rollup
ALTER TABLE orders ADDROLLUP orders_rollup (customer_id, product_id, order_time, total_amount);

4. Paimon性能優(yōu)化

-- Paimon表優(yōu)化
ALTER TABLE orders_paimon SET (
'write.buffer-size'='512MB',
'write.buffer-spillable'='true',
'write-only'='false',
'merge-engine'='deduplicate',
'changelog-producer'='input',
'full-compaction.delta-commits'='10',
'bucket'='8',
'file.format'='orc',
'orc.compress'='zlib',
'orc.compress.size'='262144',
'orc stripe.size'='67108864',
'orc.row.index.stride'='10000'
);

十二、故障恢復(fù)演練

1. Kafka節(jié)點(diǎn)故障模擬

# 模擬Kafka節(jié)點(diǎn)故障
kubectl scale statefulsets kafka --replicas=2 -n lakehouse

# 觀察Kafka狀態(tài)
kubectl get pods -l app=kafka -n lakehouse -w

# 恢復(fù)節(jié)點(diǎn)
kubectl scale statefulsets kafka --replicas=3 -n lakehouse

2. Flink JobManager故障模擬

# 模擬Flink JobManager故障
kubectl delete pod flink-jobmanager-0 -n lakehouse

# 觀察Flink狀態(tài)
kubectl get pods -l app=flink,component=jobmanager -n lakehouse -w

# 檢查作業(yè)狀態(tài)
kubectl logs flink-taskmanager-0 -n lakehouse | grep "JobManager"

3. Doris FE故障模擬

# 模擬Doris FE故障
kubectl delete pod doris-fe-0 -n lakehouse

# 觀察Doris FE狀態(tài)
kubectl get pods -l app=doris-fe -n lakehouse -w

# 檢查FE角色變更
kubectl exec -it doris-fe-1 -n lakehouse -- mysql -h127.0.0.1 -P9030 -uroot -e "SHOW FRONTENDS;"

4. 數(shù)據(jù)一致性驗(yàn)證

# 創(chuàng)建數(shù)據(jù)一致性驗(yàn)證作業(yè)
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
  name: data-consistency-check
  namespace: lakehouse
spec:
  template:
    spec:
      containers:
      - name: checker
        image: mysql:8.0
        command: ["/bin/sh", "-c"]
        args:
          - |
            # 檢查Paimon和Doris中的數(shù)據(jù)一致性
            paimon_count=$(mysql -hdoris-fe-service -P9030 -uroot -e "SELECT COUNT(*) FROM lakehouse.orders;" -s -N)
            doris_count=$(mysql -hdoris-fe-service -P9030 -uroot -e "SELECT COUNT(*) FROM lakehouse.orders;" -s -N)
            
            if [ "$paimon_count" -eq "$doris_count" ]; then
              echo "Data consistency check passed: Paimon count = $paimon_count, Doris count = $doris_count"
            else
              echo "Data inconsistency detected: Paimon count = $paimon_count, Doris count = $doris_count"
              exit 1
            fi
      restartPolicy: OnFailure
EOF

十三、總結(jié)

本指南詳細(xì)介紹了如何在Kubernetes環(huán)境中部署一套完整的湖倉(cāng)一體高可用架構(gòu),包括以下關(guān)鍵組件:

  • Kafka 3.0:作為分布式消息隊(duì)列,實(shí)現(xiàn)高吞吐量的數(shù)據(jù)接入和流轉(zhuǎn)
  • Flink 1.18:提供強(qiáng)大的實(shí)時(shí)流處理能力,支持 Exactly-Once 語(yǔ)義
  • Paimon 0.7:作為開(kāi)放數(shù)據(jù)湖存儲(chǔ)格式,提供統(tǒng)一的湖存儲(chǔ)層
  • Doris 2.1.6:作為高性能MPP分析型數(shù)據(jù)庫(kù),提供快速查詢能力
  • Kubernetes:作為容器編排平臺(tái),提供資源管理、自動(dòng)伸縮和高可用保障

1. 架構(gòu)優(yōu)勢(shì)

  • 高可用性:所有組件均采用多副本部署,避免單點(diǎn)故障
  • 彈性擴(kuò)展:基于Kubernetes的自動(dòng)伸縮能力,根據(jù)負(fù)載動(dòng)態(tài)調(diào)整資源
  • 數(shù)據(jù)一致性:通過(guò)事務(wù)機(jī)制和副本同步確保數(shù)據(jù)一致性
  • 統(tǒng)一元數(shù)據(jù):使用Hive Metastore作為統(tǒng)一元數(shù)據(jù)管理中心
  • 資源隔離:通過(guò)Kubernetes命名空間實(shí)現(xiàn)多租戶資源隔離

2. 關(guān)鍵配置要點(diǎn)

(1) Kafka:

  • 3節(jié)點(diǎn)Broker集群
  • 主題副本數(shù)設(shè)置為3
  • ISR配置為2

(2) Flink:

  • JobManager高可用配置
  • Checkpoint機(jī)制
  • RocksDB狀態(tài)后端

(3) Paimon:

  • ORC文件格式
  • 合并引擎配置
  • 增量快照管理

(4) Doris:

  • 3節(jié)點(diǎn)FE集群(1主2備)
  • 3節(jié)點(diǎn)BE集群
  • 數(shù)據(jù)副本數(shù)設(shè)置為3

2. 運(yùn)維建議

  • 監(jiān)控:部署Prometheus和Grafana實(shí)現(xiàn)全面監(jiān)控
  • 告警:配置關(guān)鍵指標(biāo)告警規(guī)則
  • 備份:定期備份元數(shù)據(jù)和關(guān)鍵配置
  • 演練:定期進(jìn)行故障恢復(fù)演練
  • 優(yōu)化:根據(jù)實(shí)際負(fù)載持續(xù)優(yōu)化性能參數(shù)

通過(guò)本指南的部署方案,可以構(gòu)建一個(gè)穩(wěn)定、高效、可擴(kuò)展的湖倉(cāng)一體高可用架構(gòu),滿足現(xiàn)代數(shù)據(jù)平臺(tái)的各種需求。

責(zé)任編輯:趙寧寧 來(lái)源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2021-06-11 14:01:51

數(shù)據(jù)倉(cāng)庫(kù)湖倉(cāng)一體 Flink

2022-12-13 17:42:47

Arctic存儲(chǔ)湖倉(cāng)

2021-06-07 11:22:38

大數(shù)據(jù)數(shù)據(jù)倉(cāng)庫(kù)湖倉(cāng)一體

2023-06-19 07:13:51

云原生湖倉(cāng)一體

2021-06-07 10:45:16

大數(shù)據(jù)數(shù)據(jù)倉(cāng)庫(kù)數(shù)據(jù)湖

2023-08-30 07:14:27

MaxCompute湖倉(cāng)一體

2024-09-03 14:59:00

2025-08-21 09:29:11

2022-06-24 10:41:53

日志數(shù)據(jù)

2023-03-27 21:24:18

架構(gòu)數(shù)據(jù)處理分析服務(wù)

2024-02-20 07:55:48

數(shù)據(jù)平臺(tái)架構(gòu)湖倉(cāng)一體Alluxio

2023-06-28 07:28:36

湖倉(cāng)騰訊架構(gòu)

2024-03-05 08:21:23

湖倉(cāng)一體數(shù)據(jù)湖數(shù)據(jù)倉(cāng)庫(kù)

2023-12-14 13:01:00

Hudivivo

2022-09-29 09:22:33

數(shù)據(jù)倉(cāng)

2023-05-26 06:45:08

2020-03-06 16:00:04

KubernetesSpark容器

2020-12-02 17:20:58

數(shù)據(jù)倉(cāng)庫(kù)阿里云數(shù)據(jù)湖
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)