多模態(tài)特征融合的軸承故障診斷混合深度學(xué)習(xí)框架:時(shí)頻域協(xié)同分析與神經(jīng)ODE動(dòng)態(tài)建模
算法流程
開始
│
├─ 初始化設(shè)置(隨機(jī)種子、目錄創(chuàng)建)
├─ 自定義層定義(SEBlock1, NeuralODEBlock1, EnhancedGatedAttention1等)
├─ 混合模型構(gòu)建(build_hybrid_model)
│ ├─ 輸入層
│ ├─ 共享卷積層
│ ├─ 三條并行路徑:
│ │ ├─ 時(shí)域路徑(卷積+注意力+神經(jīng)ODE)
│ │ ├─ 頻域路徑(傅里葉神經(jīng)算子+注意力)
│ │ └─ LSTM路徑(LSTM+SE塊)
│ ├─ 多路徑特征融合(注意力加權(quán))
│ └─ 輸出層(全連接+分類)
├─ 數(shù)據(jù)加載與預(yù)處理(load_surf_dataset)
│ ├─ 讀取.mat/.csv文件
│ ├─ 信號(hào)長(zhǎng)度標(biāo)準(zhǔn)化
│ └─ 標(biāo)簽編碼
├─ 模型編譯與訓(xùn)練
│ ├─ 自定義F1評(píng)估指標(biāo)
│ ├─ 類別權(quán)重平衡
│ └─ 模型訓(xùn)練
├─ 性能評(píng)估與可視化
│ ├─ 訓(xùn)練指標(biāo)曲線
│ ├─ 測(cè)試集評(píng)估
│ ├─ 噪聲魯棒性測(cè)試
│ ├─ 特征空間可視化(UMAP/t-SNE)
│ ├─ 混淆矩陣/ROC曲線
│ └─ 層激活可視化
└─ 高級(jí)信號(hào)分析
├─ 時(shí)域/頻域分析
├─ 時(shí)頻分析(STFT/小波)
└─ 特征分布可視化
結(jié)束模型架構(gòu)設(shè)計(jì):構(gòu)建三路徑混合模型
時(shí)域路徑使用卷積神經(jīng)網(wǎng)絡(luò)提取局部特征,結(jié)合神經(jīng)ODE模擬連續(xù)動(dòng)態(tài)系統(tǒng)
頻域路徑采用傅里葉神經(jīng)算子進(jìn)行頻域特征提取
LSTM路徑捕獲時(shí)序依賴關(guān)系,結(jié)合SE注意力機(jī)制增強(qiáng)關(guān)鍵特征
多路徑特征通過(guò)注意力加權(quán)融合,最后通過(guò)全連接層分類
數(shù)據(jù)處理流程:
從.mat/.csv文件加載軸承振動(dòng)信號(hào)
統(tǒng)一信號(hào)長(zhǎng)度為1024個(gè)采樣點(diǎn)(不足則填充)
對(duì)故障類別標(biāo)簽進(jìn)行編碼和one-hot轉(zhuǎn)換
添加通道維度適配卷積網(wǎng)絡(luò)輸入
模型訓(xùn)練策略:
使用Adam優(yōu)化器(學(xué)習(xí)率1e-4)和類別加權(quán)交叉熵?fù)p失
引入自定義F1分?jǐn)?shù)作為評(píng)估指標(biāo)
采用5:1的訓(xùn)練測(cè)試比例劃分?jǐn)?shù)據(jù)集
執(zhí)行50個(gè)訓(xùn)練周期(batch size=256)
性能評(píng)估方法:
在原始測(cè)試集和不同信噪比的加噪測(cè)試集上評(píng)估
計(jì)算準(zhǔn)確率、精確率、召回率、F1值和AUC
通過(guò)混淆矩陣分析各類別分類效果
使用t-SNE/UMAP可視化高維特征空間
可視化分析技術(shù):
訓(xùn)練過(guò)程指標(biāo)曲線繪制
層激活熱力圖和注意力權(quán)重可視化
時(shí)域波形、頻譜圖、時(shí)頻分析展示
特征分布箱線圖和小提琴圖
模型決策邊界投影(PCA/UMAP)
魯棒性測(cè)試方案:
添加5-20dB高斯白噪聲模擬實(shí)際工況
在不同噪聲水平下評(píng)估模型性能衰減
對(duì)比噪聲信號(hào)與原始信號(hào)的頻譜特征
算法在信號(hào)分析中的應(yīng)用
應(yīng)用領(lǐng)域 | 具體應(yīng)用方式 | 技術(shù)價(jià)值 |
特征提取 | 三路徑架構(gòu)分別捕獲時(shí)域瞬態(tài)特征、頻域共振特征和時(shí)序依賴特征 | 克服單一模型特征提取局限性,提升故障特征表征能力 |
噪聲魯棒性 | 高斯噪聲注入測(cè)試(5-20dB SNR)評(píng)估模型抗干擾能力 | 驗(yàn)證模型在實(shí)際工業(yè)噪聲環(huán)境下的適用性 |
故障模式分離 | t-SNE/UMAP可視化展示不同故障類別在高維特征空間的聚類效果 | 直觀呈現(xiàn)模型對(duì)故障模式的區(qū)分能力 |
動(dòng)態(tài)過(guò)程建模 | 神經(jīng)ODE塊模擬故障演化過(guò)程,通過(guò)歐拉積分實(shí)現(xiàn)連續(xù)狀態(tài)演化 | 捕捉故障發(fā)展的時(shí)間動(dòng)態(tài)特性,增強(qiáng)模型物理可解釋性 |
注意力機(jī)制 | SE塊實(shí)現(xiàn)通道級(jí)特征選擇,門控注意力實(shí)現(xiàn)時(shí)間維度特征增強(qiáng) | 自動(dòng)聚焦故障相關(guān)特征,抑制背景噪聲干擾 |
頻域分析 | 傅里葉神經(jīng)算子學(xué)習(xí)頻域表示,結(jié)合包絡(luò)譜分析提取故障特征頻率 | 增強(qiáng)對(duì)軸承周期性沖擊特征的捕獲能力 |
時(shí)頻分析 | STFT和小波變換可視化展示故障信號(hào)的時(shí)頻能量分布 | 輔助分析故障信號(hào)的瞬態(tài)沖擊特性和調(diào)制現(xiàn)象 |
決策解釋性 | 可視化卷積核響應(yīng)、注意力權(quán)重和特征重要性 | 增強(qiáng)模型透明度,輔助故障機(jī)理分析 |
實(shí)時(shí)監(jiān)測(cè) | 輕量化架構(gòu)設(shè)計(jì)(如1D卷積替代全連接)降低計(jì)算復(fù)雜度 | 滿足工業(yè)現(xiàn)場(chǎng)實(shí)時(shí)監(jiān)測(cè)需求 |
小樣本學(xué)習(xí) | 遷移學(xué)習(xí)技術(shù)將預(yù)訓(xùn)練模型適配到新故障類型 | 解決實(shí)際工業(yè)場(chǎng)景標(biāo)注數(shù)據(jù)稀缺問(wèn)題 |
# 導(dǎo)入必要的庫(kù)
import os # 提供操作系統(tǒng)相關(guān)功能
import numpy as np # 科學(xué)計(jì)算庫(kù)
import tensorflow as tf # 深度學(xué)習(xí)框架
from tensorflow.keras.layers import ( # Keras層組件
Input, Conv1D, BatchNormalization, Dense, Dropout, LSTM, Concatenate, Add,
LayerNormalization, Activation, GlobalAveragePooling1D, Multiply, Reshape, Lambda)
from tensorflow.keras.models import Model # 模型構(gòu)建
from tensorflow.keras.utils import to_categorical # 類別編碼
from sklearn.preprocessing import LabelEncoder # 標(biāo)簽編碼
from sklearn.utils import class_weight # 類別權(quán)重計(jì)算
import matplotlib.pyplot as plt # 繪圖庫(kù)
import scipy.io # MATLAB文件處理
import pandas as pd # 數(shù)據(jù)處理
import pickle # 數(shù)據(jù)序列化
from scipy.signal import stft, hilbert # 信號(hào)處理
import seaborn as sns # 統(tǒng)計(jì)可視化
import umap # 降維可視化
from sklearn.manifold import TSNE # t-SNE降維
from sklearn.metrics import ( # 評(píng)估指標(biāo)
accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc)
# 設(shè)置隨機(jī)種子確保結(jié)果可重現(xiàn)
seed_value = 1234
np.random.seed(seed_value)
tf.manual_seed(seed_value)
# 創(chuàng)建報(bào)告目錄用于保存結(jié)果
os.makedirs("Report", exist_ok=True)
# 定義信號(hào)維度參數(shù)
n_u, n_y = 2, 2 # 輸入輸出通道數(shù)
seq_len = 1024 # 信號(hào)長(zhǎng)度
input_shape = (seq_len, 1) # 輸入形狀
num_classes = 3 # 故障類別數(shù)
### 自定義神經(jīng)網(wǎng)絡(luò)層實(shí)現(xiàn) ###
class SEBlock1(Layer):
"""擠壓激勵(lì)注意力模塊 (Squeeze-and-Excitation Block)"""
def __init__(self, reduction_ratio=8, **kwargs):
"""
初始化SE塊
:param reduction_ratio: 通道壓縮比例
"""
super().__init__(**kwargs)
self.reduction_ratio = reduction_ratio
def build(self, input_shape):
"""構(gòu)建層權(quán)重"""
channels = input_shape[-1] # 獲取輸入通道數(shù)
# 使用1x1卷積替代全連接層加速計(jì)算
self.conv1 = Conv1D(channels//self.reduction_ratio, 1, activation='relu') # 降維卷積
self.conv2 = Conv1D(channels, 1, activation='sigmoid') # 重建卷積
super().build(input_shape)
def call(self, inputs):
"""前向傳播邏輯"""
# 全局平均池化替代全連接層
se = tf.reduce_mean(inputs, axis=1, keepdims=True) # 空間維度壓縮
se = self.conv1(se) # 通道壓縮
se = self.conv2(se) # 通道重建
return Multiply()([inputs, se]) # 通道加權(quán)
def compute_output_shape(self, input_shape):
"""輸出形狀計(jì)算"""
return input_shape
class NeuralODEBlock1(Layer):
"""神經(jīng)常微分方程塊 (Neural Ordinary Differential Equations Block)"""
def __init__(self, units, time_steps=10, **kwargs):
"""
初始化神經(jīng)ODE塊
:param units: 隱藏單元數(shù)
:param time_steps: 時(shí)間步長(zhǎng)
"""
super().__init__(**kwargs)
self.units = units
self.time_steps = time_steps
self.dense1 = Dense(units, activation='tanh') # 非線性變換層
self.dense2 = Dense(units) # 導(dǎo)數(shù)計(jì)算層
def call(self, x):
"""前向傳播實(shí)現(xiàn)歐拉積分"""
outputs = [] # 存儲(chǔ)各時(shí)間步輸出
h = x # 初始狀態(tài)
# 通過(guò)歐拉方法模擬ODE
for t in range(self.time_steps):
dx = self.dense2(self.dense1(h)) # 計(jì)算導(dǎo)數(shù)
h = h + dx # 狀態(tài)更新
outputs.append(h) # 保存當(dāng)前狀態(tài)
# 拼接所有時(shí)間步輸出
stacked = tf.concat(outputs, axis=1)
return stacked
def compute_output_shape(self, input_shape):
"""輸出形狀計(jì)算"""
return (input_shape[0], self.time_steps, self.units)
class EnhancedGatedAttention1(Layer):
"""增強(qiáng)門控注意力機(jī)制 (Enhanced Gated Attention)"""
def __init__(self, d_model, **kwargs):
"""
初始化注意力層
:param d_model: 特征維度
"""
super().__init__(**kwargs)
self.d_model = d_model
# 使用較少注意力頭加速計(jì)算
self.mha = tf.keras.layers.MultiHeadAttention(
num_heads=2, key_dim=d_model//2) # 多頭注意力
# 使用1D卷積替代全連接加速門控計(jì)算
self.gate = Conv1D(1, kernel_size=1, activation='sigmoid') # 門控生成
# 使用批歸一化替代層歸一化加速訓(xùn)練
self.norm = BatchNormalization() # 歸一化層
def call(self, x):
"""前向傳播邏輯"""
attn = self.mha(x, x) # 自注意力計(jì)算
gate = self.gate(x) # 門控信號(hào)生成
# 殘差連接+門控注意力
return self.norm(x + attn * gate) # 特征更新
class VanillaSelfAttention(Layer):
"""標(biāo)準(zhǔn)自注意力機(jī)制 (Vanilla Self-Attention)"""
def __init__(self, d_model, **kwargs):
"""初始化標(biāo)準(zhǔn)注意力層"""
super().__init__(**kwargs)
self.d_model = d_model
# 使用與增強(qiáng)門控注意力相同的配置
self.mha = tf.keras.layers.MultiHeadAttention(
num_heads=2, key_dim=d_model//2)
self.norm = BatchNormalization()
def call(self, x):
"""前向傳播邏輯"""
attn = self.mha(x, x) # 自注意力計(jì)算
return self.norm(x + attn) # 殘差連接
class FourierNeuralOperator1(Layer):
"""傅里葉神經(jīng)算子 (Fourier Neural Operator)"""
def __init__(self, modes, filters, **kwargs):
"""
初始化FNO層
:param modes: 保留的傅里葉模式數(shù)
:param filters: 濾波器數(shù)量
"""
super().__init__(**kwargs)
self.modes = modes
self.filters = filters
# 使用更窄的全連接層加速計(jì)算
self.fft_dense = Dense(filters//4, activation='gelu') # 傅里葉域變換
self.ifft_dense = Dense(filters//2) # 逆傅里葉域變換
def call(self, x):
"""前向傳播邏輯"""
# 1. 快速傅里葉變換
x_fft = tf.signal.fft(tf.cast(x, tf.complex64))
x_fft = tf.math.real(x_fft[..., :self.modes]) # 保留主要模式
# 2. 傅里葉域特征變換
x_fft = self.fft_dense(x_fft)
x_fft = self.ifft_dense(x_fft)
# 3. 填充并逆變換回時(shí)域
x_fft = tf.pad(x_fft, [[0,0],[0,0],[0,tf.shape(x)[1]-self.modes]])
return tf.math.real(tf.signal.ifft(tf.cast(x_fft, tf.complex64)))
### 混合模型構(gòu)建函數(shù) ###
def build_hybrid_model(input_shape, num_classes):
"""
構(gòu)建三路徑混合故障診斷模型
:param input_shape: 輸入形狀 (序列長(zhǎng)度, 通道數(shù))
:param num_classes: 分類類別數(shù)
:return: 構(gòu)建好的Keras模型
"""
# 1. 輸入層
inputs = Input(shape=input_shape)
# ===== 共享預(yù)處理層 =====
shared_conv = Conv1D(32, 3, padding='same', name='shared_conv')(inputs)
# ===== 路徑1: 時(shí)域特征提取 =====
# 1.1 基礎(chǔ)特征提取
x_time = Conv1D(64, 3, padding='same', name='time_conv1')(shared_conv)
x_time = EnhancedGatedAttention1(d_model=64, name='time_attn1')(x_time)
# 1.2 中間特征保存用于跨路徑連接
time_mid = Conv1D(64, 3, padding='same', name='time_mid')(x_time)
# 1.3 神經(jīng)ODE時(shí)間演化
x_time = GlobalAveragePooling1D(name='time_gap1')(x_time)
x_time = Reshape((1, 64))(x_time)
x_time = NeuralODEBlock1(units=64, time_steps=50, name='time_ode')(x_time)
x_time = GlobalAveragePooling1D(name='time_gap2')(x_time)
# ===== 路徑2: 頻域特征提取 =====
# 2.1 傅里葉神經(jīng)算子
x_freq = FourierNeuralOperator1(modes=16, filters=64, name='fno')(shared_conv)
# 2.2 跨路徑連接時(shí)域特征
x_freq = Concatenate(axis=-1, name='freq_concat')([x_freq, time_mid])
x_freq = Conv1D(64, 1, name='freq_merge')(x_freq)
# 2.3 頻域注意力增強(qiáng)
x_freq = EnhancedGatedAttention1(d_model=64, name='freq_attn')(x_freq)
x_freq = GlobalAveragePooling1D(name='freq_gap')(x_freq)
# ===== 路徑3: LSTM時(shí)序建模 =====
# 3.1 LSTM時(shí)序特征提取
x_lstm = LSTM(64, return_sequences=True, name='lstm1')(shared_conv)
# 3.2 時(shí)域特征適配
time_mid_adjusted = Conv1D(64, 1, name='time_mid_adjust')(time_mid)
# 3.3 頻域特征適配
x_freq_expanded = Lambda(
lambda x: tf.repeat(x[0], tf.shape(x[1])[1], axis=1),
name='freq_expand')([x_freq[:, tf.newaxis, :], x_lstm])
# 3.4 多源特征融合
x_lstm = Concatenate(axis=-1, name='lstm_concat')([
x_lstm, time_mid_adjusted, x_freq_expanded])
x_lstm = Conv1D(64, 1, name='lstm_merge')(x_lstm)
# 3.5 通道注意力增強(qiáng)
x_lstm = SEBlock1(name='lstm_se')(x_lstm)
x_lstm = GlobalAveragePooling1D(name='lstm_gap')(x_lstm)
# ===== 多路徑特征融合 =====
# 4.1 特征拼接
fused = Concatenate(name='final_concat')([x_time, x_freq, x_lstm])
# 4.2 注意力加權(quán)融合
attention_units = fused.shape[-1] # 自動(dòng)獲取特征維度
attention_weights = Dense(attention_units, activation='softmax')(fused)
fused = Multiply(name='attention_scale')([fused, attention_weights])
# ===== 輸出層 =====
# 5.1 全連接層
out = Dense(128, activation='gelu', name='dense1')(fused)
out = Dropout(0.5, name='dropout1')(out)
# 5.2 分類輸出層
out = Dense(num_classes, activation='softmax', name='output')(out)
return Model(inputs, out)
### 數(shù)據(jù)加載函數(shù) ###
def load_surf_dataset(folder, seq_len=1024):
"""
加載SURF軸承故障數(shù)據(jù)集
:param folder: 數(shù)據(jù)文件夾路徑
:param seq_len: 信號(hào)長(zhǎng)度
:return: 信號(hào)數(shù)組和標(biāo)簽數(shù)組
"""
X = [] # 存儲(chǔ)信號(hào)
y = [] # 存儲(chǔ)標(biāo)簽
class_names = sorted(os.listdir(folder)) # 獲取故障類別
# 遍歷每個(gè)故障類別
for label in class_names:
class_path = os.path.join(folder, label)
# 遍歷類別文件夾中的文件
for fname in os.listdir(class_path):
file_path = os.path.join(class_path, fname)
# 處理MATLAB數(shù)據(jù)文件
if fname.endswith('.mat'):
mat = scipy.io.loadmat(file_path)
for key in mat:
if not key.startswith("__"): # 跳過(guò)系統(tǒng)變量
signal = mat[key].squeeze()
break
# 處理CSV數(shù)據(jù)文件
elif fname.endswith('.csv'):
df = pd.read_csv(file_path, header=None)
signal = df.values.squeeze()
else:
continue
# 信號(hào)長(zhǎng)度標(biāo)準(zhǔn)化
if len(signal) >= seq_len:
signal = signal[:seq_len] # 截?cái)? else:
# 填充不足部分
signal = np.pad(signal, (0, seq_len - len(signal)))
X.append(signal)
y.append(label)
return np.array(X), np.array(y)
### 自定義評(píng)估指標(biāo) ###
def f1_score(y_true, y_pred):
"""自定義F1分?jǐn)?shù)計(jì)算函數(shù)"""
y_pred = tf.round(y_pred) # 預(yù)測(cè)概率轉(zhuǎn)類別
# 計(jì)算真陽(yáng)性、假陽(yáng)性、假陰性
tp = tf.reduce_sum(tf.cast(y_true * y_pred, 'float'), axis=0)
fp = tf.reduce_sum(tf.cast((1 - y_true) * y_pred, 'float'), axis=0)
fn = tf.reduce_sum(tf.cast(y_true * (1 - y_pred), 'float'), axis=0)
# 計(jì)算精確率和召回率
precision = tp / (tp + fp + tf.keras.backend.epsilon())
recall = tp / (tp + fn + tf.keras.backend.epsilon())
# 計(jì)算F1分?jǐn)?shù)
f1 = 2 * precision * recall / (precision + recall + tf.keras.backend.epsilon())
return tf.reduce_mean(f1) # 返回宏平均F1
### 主執(zhí)行流程 ###
if __name__ == "__main__":
# === 數(shù)據(jù)準(zhǔn)備 ===
# 加載訓(xùn)練集和測(cè)試集
X_train, y_train = load_surf_dataset("Veriseti_Surf/train", seq_len=seq_len)
X_test, y_test = load_surf_dataset("Veriseti_Surf/test", seq_len=seq_len)
# 添加通道維度 (N, seq_len) -> (N, seq_len, 1)
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]
# 標(biāo)簽編碼和one-hot轉(zhuǎn)換
encoder = LabelEncoder()
y_train_enc = encoder.fit_transform(y_train)
y_test_enc = encoder.transform(y_test)
y_train_cat = to_categorical(y_train_enc, num_classes=num_classes)
y_test_cat = to_categorical(y_test_enc, num_classes=num_classes)
# === 模型構(gòu)建與編譯 ===
model = build_hybrid_model(input_shape=input_shape, num_classes=num_classes)
# 計(jì)算類別權(quán)重處理不平衡數(shù)據(jù)
class_weights = class_weight.compute_class_weight(
class_weight='balanced',
classes=np.unique(y_train_enc),
y=y_train_enc
)
class_weights_dict = dict(enumerate(class_weights))
# 模型編譯
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
loss='categorical_crossentropy',
metrics=[
'accuracy',
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall'),
f1_score # 使用自定義F1指標(biāo)
]
)
# 打印模型結(jié)構(gòu)
model.summary()
# === 模型訓(xùn)練 ===
history = model.fit(
X_train, y_train_cat,
validation_data=(X_test, y_test_cat),
epochs=50,
batch_size=256,
class_weight=class_weights_dict # 類別權(quán)重
)
# === 訓(xùn)練結(jié)果分析 ===
# 提取訓(xùn)練指標(biāo)
train_acc = history.history['accuracy']
train_prec = history.history['precision']
train_rec = history.history['recall']
train_f1 = history.history['f1_score']
train_loss = history.history['loss']
# 提取驗(yàn)證指標(biāo)
val_acc = history.history['val_accuracy']
val_prec = history.history['val_precision']
val_rec = history.history['val_recall']
val_f1 = history.history['val_f1_score']
val_loss = history.history['val_loss']
# 保存指標(biāo)結(jié)果
with open("ablation/ab1_acc.pkl", "wb") as f:
pickle.dump(train_acc, f)
with open("ablation/ab1_prec.pkl", "wb") as f:
pickle.dump(train_prec, f)
with open("ablation/ab1_rec.pkl", "wb") as f:
pickle.dump(train_rec, f)
with open("ablation/ab1_f1.pkl", "wb") as f:
pickle.dump(train_f1, f)
# === 模型可視化分析 ===
# 1. 訓(xùn)練過(guò)程指標(biāo)曲線
plt.figure(figsize=(10, 6))
epochs = range(1, len(train_acc)+1)
plt.plot(epochs, train_acc, label='Accuracy')
plt.plot(epochs, train_prec, label='Precision')
plt.plot(epochs, train_rec, label='Recall')
plt.plot(epochs, train_f1, label='F1-Score')
plt.title('Training Metrics Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Score')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig("Report/training_metrics.pdf")
plt.show()
# 2. 特征空間可視化 (UMAP)
feature_extractor = Model(inputs=model.input, outputs=model.layers[-3].output)
features = feature_extractor.predict(X_test)
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, metric='euclidean', random_state=42)
embedding = reducer.fit_transform(features)
plt.figure(figsize=(8, 6))
scatter = plt.scatter(embedding[:, 0], embedding[:, 1], c=y_test_enc, cmap='Set1', alpha=0.8)
handles, _ = scatter.legend_elements()
plt.legend(handles=handles, labels=encoder.classes_.tolist())
plt.title("UMAP Visualization of Feature Space")
plt.xlabel("UMAP 1")
plt.ylabel("UMAP 2")
plt.grid(True)
plt.tight_layout()
plt.savefig("Report/feature_space_umap.pdf")
plt.show()
# 3. 混淆矩陣
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
cm = confusion_matrix(y_test_enc, y_pred_classes)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=encoder.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.savefig("Report/confusion_matrix.pdf")
plt.show()
# 4. ROC曲線
y_test_onehot = to_categorical(y_test_enc, num_classes=num_classes)
plt.figure(figsize=(8, 6))
for i in range(num_classes):
fpr, tpr, _ = roc_curve(y_test_onehot[:, i], y_pred[:, i])
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, label=f'{encoder.classes_[i]} (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Each Class')
plt.legend()
plt.grid(True)
plt.savefig("Report/roc_curve.pdf")
plt.show()
# === 噪聲魯棒性測(cè)試 ===
def add_gaussian_noise(signal, snr_db):
"""添加高斯白噪聲"""
signal_power = np.mean(signal ** 2)
snr_linear = 10 ** (snr_db / 10)
noise_power = signal_power / snr_linear
noise = np.random.normal(0, np.sqrt(noise_power), signal.shape)
return signal + noise
# 測(cè)試不同信噪比下的性能
snr_levels = [5, 10, 15, 20]
X_test_noisy = {}
# 生成加噪測(cè)試集
for snr in snr_levels:
noisy_signals = []
for signal in X_test.squeeze():
noisy_signal = add_gaussian_noise(signal, snr)
noisy_signals.append(noisy_signal)
X_test_noisy[snr] = np.array(noisy_signals)[..., np.newaxis]
# 評(píng)估各信噪比下的模型性能
snr_results = {}
for snr in [np.inf] + snr_levels: # 包含無(wú)噪聲情況
if snr == np.inf:
X_input = X_test
snr_label = "∞"
else:
X_input = X_test_noisy[snr]
snr_label = f"{snr} dB"
# 模型預(yù)測(cè)
y_pred = model.predict(X_input)
y_pred_cls = np.argmax(y_pred, axis=1)
# 計(jì)算評(píng)估指標(biāo)
acc = accuracy_score(y_test_enc, y_pred_cls)
prec = precision_score(y_test_enc, y_pred_cls, average='macro', zero_division=0)
rec = recall_score(y_test_enc, y_pred_cls, average='macro', zero_division=0)
f1 = f1_score(y_test_enc, y_pred_cls, average='macro', zero_division=0)
snr_results[snr_label] = {'acc': acc, 'prec': prec, 'rec': rec, 'f1': f1}
# 可視化噪聲魯棒性結(jié)果
snr_labels = list(snr_results.keys())
acc_list = [snr_results[snr]['acc'] for snr in snr_labels]
prec_list = [snr_results[snr]['prec'] for snr in snr_labels]
rec_list = [snr_results[snr]['rec'] for snr in snr_labels]
f1_list = [snr_results[snr]['f1'] for snr in snr_labels]
plt.figure(figsize=(10, 6))
plt.plot(snr_labels, acc_list, marker='o', label='Accuracy')
plt.plot(snr_labels, prec_list, marker='s', label='Precision')
plt.plot(snr_labels, rec_list, marker='^', label='Recall')
plt.plot(snr_labels, f1_list, marker='d', label='F1-Score')
plt.title('Model Performance under Varying SNR Levels')
plt.xlabel('SNR (dB)')
plt.ylabel('Score')
plt.ylim(0.0, 1.05)
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.savefig("Report/noise_robustness.pdf")
plt.show()
# === 信號(hào)分析 ===
# 1. 時(shí)域分析
def plot_time_domain(signal, title="Time-Domain Signal", sampling_rate=1000):
time = np.linspace(0, len(signal) / sampling_rate, len(signal))
plt.figure(figsize=(10, 3))
plt.plot(time, signal)
plt.title(title)
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.grid(True)
plt.tight_layout()
plt.savefig(f"Report/{title.replace(' ', '_')}.pdf")
plt.show()
# 2. 頻域分析
def plot_fft(signal, sampling_rate=1000, title="Frequency Spectrum"):
N = len(signal)
freq = np.fft.fftfreq(N, d=1/sampling_rate)
fft_vals = np.fft.fft(signal)
plt.figure(figsize=(10, 3))
plt.plot(freq[:N//2], np.abs(fft_vals)[:N//2])
plt.title(title)
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.grid(True)
plt.tight_layout()
plt.savefig(f"Report/{title.replace(' ', '_')}.pdf")
plt.show()
# 應(yīng)用分析函數(shù)
sample_signal = X_test[0].squeeze()
plot_time_domain(sample_signal, "Normal Bearing Signal")
plot_fft(sample_signal, "Normal Bearing Spectrum")
# === 模型解釋性分析 ===
# 1. 注意力權(quán)重可視化
attention_layer = model.get_layer('time_attn1')
attention_extractor = Model(inputs=model.input, outputs=attention_layer.output)
attention_weights = attention_extractor.predict(X_test[:1]).squeeze()
plt.figure(figsize=(12, 4))
plt.plot(sample_signal, label='Input Signal')
plt.plot(attention_weights * np.max(sample_signal), alpha=0.7, label='Attention Weights')
plt.title('Attention Weights over Time Domain Signal')
plt.xlabel('Time step')
plt.legend()
plt.savefig("Report/attention_weights.pdf")
plt.show()
# 2. 神經(jīng)ODE狀態(tài)演化可視化
ode_extractor = Model(inputs=model.input, outputs=model.get_layer('time_ode').output)
ode_outputs = ode_extractor.predict(X_test[:1]).squeeze()
plt.figure(figsize=(12, 6))
for i in range(5): # 可視化5個(gè)特征
plt.plot(ode_outputs[:, i], label=f'Feature {i+1}')
plt.xlabel('Neural ODE Time Steps')
plt.ylabel('Feature value')
plt.title('Neural ODE Feature Evolution Over Time')
plt.legend()
plt.savefig("Report/neural_ode_evolution.pdf")
plt.show()本文轉(zhuǎn)載自??????高斯的手稿??

















