JavaScript原生實戰(zhàn)手冊 · 異步重試機(jī)制:網(wǎng)絡(luò)請求的可靠性保障
在現(xiàn)代Web應(yīng)用中,網(wǎng)絡(luò)請求無處不在:調(diào)用API獲取數(shù)據(jù)、上傳文件、發(fā)送表單、實時通信等。但網(wǎng)絡(luò)環(huán)境往往不可預(yù)測:服務(wù)器臨時過載、網(wǎng)絡(luò)連接不穩(wěn)定、CDN節(jié)點故障、第三方服務(wù)限流等問題時有發(fā)生。一個偶然的網(wǎng)絡(luò)錯誤就可能讓整個功能失效,影響用戶體驗。今天我們就來打造一個智能的異步重試機(jī)制,讓應(yīng)用在各種不穩(wěn)定環(huán)境中都能穩(wěn)定運(yùn)行。
生活中的重試機(jī)制場景
場景一:在線支付系統(tǒng)
想象你在開發(fā)一個電商網(wǎng)站的支付功能:
用戶點擊支付按鈕 → 調(diào)用支付接口
↓
網(wǎng)絡(luò)超時/服務(wù)器繁忙 → 支付失敗
↓
用戶看到錯誤提示 → 用戶體驗糟糕
可能的結(jié)果:用戶放棄購買、訂單丟失、收入損失如果有智能重試機(jī)制:
用戶點擊支付按鈕 → 調(diào)用支付接口
↓
第一次失敗 → 等待1秒后自動重試
第二次失敗 → 等待2秒后自動重試
第三次成功 → 支付完成,用戶無感知場景二:大文件上傳
在文件管理系統(tǒng)中上傳大文件:
上傳進(jìn)度:[████████████████████████████████████████] 95%
↓
網(wǎng)絡(luò)中斷 → 上傳失敗 → 用戶需要重新上傳整個文件
vs
上傳進(jìn)度:[████████████████████████████████████████] 95%
↓
網(wǎng)絡(luò)中斷 → 自動重試 → 斷點續(xù)傳 → 上傳完成場景三:微服務(wù)架構(gòu)中的服務(wù)調(diào)用
在微服務(wù)系統(tǒng)中,服務(wù)之間頻繁調(diào)用:
用戶服務(wù) → 調(diào)用訂單服務(wù) → 調(diào)用庫存服務(wù) → 調(diào)用支付服務(wù)
任何一個環(huán)節(jié)的臨時故障都可能導(dǎo)致整個流程失敗
需要智能重試來提高系統(tǒng)的容錯能力傳統(tǒng)處理方式的痛點
痛點一:簡單粗暴的重試
// 原始的重試方式:固定間隔,沒有策略
asyncfunction simpleRetry(fn, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
returnawait fn();
} catch (error) {
if (i === maxRetries - 1) throw error;
awaitnewPromise(resolve => setTimeout(resolve, 1000)); // 固定等待1秒
}
}
}這種方式的問題:
- 所有錯誤都重試,包括不應(yīng)該重試的(如400錯誤)
- 固定間隔容易造成"驚群效應(yīng)"
- 沒有考慮服務(wù)器負(fù)載情況
- 缺乏靈活性和可配置性
痛點二:沒有指數(shù)退避
// 問題:大量客戶端同時重試,加重服務(wù)器負(fù)擔(dān)
async function badRetry() {
// 100個客戶端在同一時間重試
// 第1秒: 100個請求同時發(fā)出
// 第2秒: 100個請求同時重試
// 第3秒: 100個請求同時重試
// 服務(wù)器壓力巨大!
}痛點三:錯誤類型不區(qū)分
// 問題:所有錯誤一視同仁
try {
const response = await fetch('/api/data');
if (!response.ok) {
thrownewError('Request failed');
}
} catch (error) {
// 不管是404(不存在)還是503(服務(wù)不可用)都重試
// 404重試是浪費(fèi),503才需要重試
}痛點四:缺乏監(jiān)控和反饋
// 問題:重試過程黑盒化
async function fetchData() {
try {
return await retryRequest();
} catch (error) {
// 用戶不知道重試了幾次、為什么失敗
console.log('請求失敗'); // 信息太少
}
}我們的智能重試機(jī)制
現(xiàn)在讓我們來實現(xiàn)一個功能完備的重試管理器:
class RetryManager {
constructor(options = {}) {
this.options = {
maxAttempts: 3, // 最大重試次數(shù)
baseDelay: 1000, // 基礎(chǔ)延遲時間(毫秒)
maxDelay: 30000, // 最大延遲時間
backoffFactor: 2, // 退避因子(指數(shù)退避)
jitter: true, // 是否添加隨機(jī)抖動
retryCondition: (error) =>true, // 重試條件判斷函數(shù)
onRetry: (attempt, error, delay) => {}, // 重試回調(diào)
onSuccess: (result, attempts) => {}, // 成功回調(diào)
onFailure: (error, attempts) => {}, // 最終失敗回調(diào)
timeout: 0, // 單次請求超時時間(0表示不限制)
abortSignal: null, // 取消信號
...options
};
// 統(tǒng)計信息
this.stats = {
totalAttempts: 0,
totalSuccesses: 0,
totalFailures: 0,
totalRetries: 0,
averageAttempts: 0
};
}
// 執(zhí)行帶重試的異步函數(shù)
async execute(asyncFunction, ...args) {
let lastError;
let attempts = 0;
const startTime = Date.now();
for (let attempt = 1; attempt <= this.options.maxAttempts; attempt++) {
attempts = attempt;
this.stats.totalAttempts++;
try {
// 檢查是否被取消
this.checkAbortSignal();
// 執(zhí)行函數(shù)(可能帶超時)
const result = awaitthis.executeWithTimeout(asyncFunction, ...args);
// 成功統(tǒng)計
this.stats.totalSuccesses++;
this.updateAverageAttempts();
// 成功回調(diào)
this.options.onSuccess(result, attempts);
// 記錄成功日志
if (attempt > 1) {
console.log(`? 重試成功: 第${attempt}次嘗試成功,總耗時${Date.now() - startTime}ms`);
}
return result;
} catch (error) {
lastError = error;
// 檢查是否應(yīng)該重試此錯誤
if (!this.shouldRetry(error, attempt)) {
break;
}
// 如果不是最后一次嘗試,則等待后重試
if (attempt < this.options.maxAttempts) {
const delay = this.calculateDelay(attempt);
// 重試回調(diào)
this.options.onRetry(attempt, error, delay);
console.log(`?? 第${attempt}次嘗試失敗: ${error.message},${delay}ms后重試`);
// 等待指定時間
awaitthis.sleep(delay);
this.stats.totalRetries++;
}
}
}
// 所有重試都失敗了
this.stats.totalFailures++;
this.updateAverageAttempts();
// 失敗回調(diào)
this.options.onFailure(lastError, attempts);
console.error(`? 重試失敗: ${attempts}次嘗試后仍然失敗,總耗時${Date.now() - startTime}ms`);
throw lastError;
}
// 計算延遲時間(指數(shù)退避 + 隨機(jī)抖動)
calculateDelay(attempt) {
// 指數(shù)退避: baseDelay * backoffFactor^(attempt-1)
const exponentialDelay = this.options.baseDelay * Math.pow(this.options.backoffFactor, attempt - 1);
// 限制最大延遲
const cappedDelay = Math.min(exponentialDelay, this.options.maxDelay);
// 添加隨機(jī)抖動,避免驚群效應(yīng)
if (this.options.jitter) {
// 在延遲時間的±25%范圍內(nèi)添加隨機(jī)抖動
const jitterRange = cappedDelay * 0.25;
const jitter = (Math.random() - 0.5) * 2 * jitterRange;
returnMath.max(0, Math.round(cappedDelay + jitter));
}
return cappedDelay;
}
// 判斷是否應(yīng)該重試
shouldRetry(error, attempt) {
// 檢查是否被取消
if (this.isAborted()) {
returnfalse;
}
// 已經(jīng)是最后一次嘗試
if (attempt >= this.options.maxAttempts) {
returnfalse;
}
// 使用自定義重試條件判斷
returnthis.options.retryCondition(error, attempt);
}
// 帶超時的函數(shù)執(zhí)行
async executeWithTimeout(asyncFunction, ...args) {
if (this.options.timeout <= 0) {
return asyncFunction(...args);
}
const timeoutPromise = newPromise((_, reject) => {
setTimeout(() => {
reject(newError(`操作超時: 超過${this.options.timeout}ms`));
}, this.options.timeout);
});
returnPromise.race([
asyncFunction(...args),
timeoutPromise
]);
}
// 檢查取消信號
checkAbortSignal() {
if (this.options.abortSignal && this.options.abortSignal.aborted) {
thrownewError('操作已取消');
}
}
// 檢查是否被取消
isAborted() {
returnthis.options.abortSignal && this.options.abortSignal.aborted;
}
// 等待指定時間
sleep(ms) {
returnnewPromise(resolve => setTimeout(resolve, ms));
}
// 更新平均嘗試次數(shù)
updateAverageAttempts() {
const totalCompleted = this.stats.totalSuccesses + this.stats.totalFailures;
if (totalCompleted > 0) {
this.stats.averageAttempts = (this.stats.totalAttempts / totalCompleted).toFixed(2);
}
}
// 獲取統(tǒng)計信息
getStats() {
return {
...this.stats,
successRate: this.stats.totalSuccesses + this.stats.totalFailures > 0 ?
(this.stats.totalSuccesses / (this.stats.totalSuccesses + this.stats.totalFailures) * 100).toFixed(2) + '%' :
'0%'
};
}
// 重置統(tǒng)計信息
resetStats() {
this.stats = {
totalAttempts: 0,
totalSuccesses: 0,
totalFailures: 0,
totalRetries: 0,
averageAttempts: 0
};
}
// 靜態(tài)方法:HTTP請求重試
staticasync retryFetch(url, options = {}, retryOptions = {}) {
const retryManager = new RetryManager({
retryCondition: (error) => {
// 網(wǎng)絡(luò)錯誤總是重試
if (error.name === 'TypeError' && error.message.includes('fetch')) {
returntrue;
}
// HTTP狀態(tài)碼判斷
if (error.status) {
// 5xx服務(wù)器錯誤 - 重試
if (error.status >= 500) returntrue;
// 429限流錯誤 - 重試
if (error.status === 429) returntrue;
// 408請求超時 - 重試
if (error.status === 408) returntrue;
// 4xx客戶端錯誤 - 不重試
if (error.status >= 400 && error.status < 500) returnfalse;
}
returntrue; // 其他情況默認(rèn)重試
},
...retryOptions
});
return retryManager.execute(async () => {
const response = await fetch(url, options);
if (!response.ok) {
const error = newError(`HTTP ${response.status}: ${response.statusText}`);
error.status = response.status;
error.response = response;
throw error;
}
return response;
});
}
}
// 專門的數(shù)據(jù)庫重試管理器
class DatabaseRetryManager extends RetryManager {
constructor(options = {}) {
super({
maxAttempts: 5,
baseDelay: 500,
maxDelay: 5000,
retryCondition: (error) => {
// 數(shù)據(jù)庫相關(guān)的可重試錯誤
const retryableErrors = [
'ConnectionError',
'TimeoutError',
'DeadlockError',
'LockWaitTimeoutError',
'ConnectionLostError'
];
return retryableErrors.some(errorType =>
error.name.includes(errorType) || error.message.includes(errorType)
);
},
onRetry: (attempt, error, delay) => {
console.log(`??? 數(shù)據(jù)庫操作重試: 第${attempt}次失敗(${error.message}),${delay}ms后重試`);
},
...options
});
}
}
// 專門的API重試管理器
class APIRetryManager extends RetryManager {
constructor(options = {}) {
super({
maxAttempts: 3,
baseDelay: 1000,
maxDelay: 10000,
retryCondition: (error) => {
// 不重試客戶端錯誤(4xx),除了429限流
if (error.status >= 400 && error.status < 500 && error.status !== 429) {
returnfalse;
}
// 其他情況都重試
returntrue;
},
onRetry: (attempt, error, delay) => {
if (error.status === 429) {
console.log(`?? API限流重試: 第${attempt}次觸發(fā)限流,${delay}ms后重試`);
} else {
console.log(`?? API請求重試: 第${attempt}次失敗(${error.message}),${delay}ms后重試`);
}
},
...options
});
}
}
// 文件上傳重試管理器
class UploadRetryManager extends RetryManager {
constructor(options = {}) {
super({
maxAttempts: 5,
baseDelay: 2000,
maxDelay: 30000,
timeout: 60000, // 60秒超時
retryCondition: (error) => {
// 網(wǎng)絡(luò)錯誤和服務(wù)器錯誤都重試
if (error.name === 'TypeError') returntrue;
if (error.status >= 500) returntrue;
if (error.status === 408 || error.status === 429) returntrue;
// 超時錯誤重試
if (error.message.includes('超時') || error.message.includes('timeout')) {
returntrue;
}
returnfalse;
},
onRetry: (attempt, error, delay) => {
console.log(`?? 文件上傳重試: 第${attempt}次失敗,${delay}ms后重試上傳`);
},
...options
});
}
}基礎(chǔ)功能展示
讓我們看看這個重試管理器的基本使用:
// 創(chuàng)建基礎(chǔ)重試管理器
const retryManager = new RetryManager({
maxAttempts: 3,
baseDelay: 1000,
backoffFactor: 2,
jitter: true,
onRetry: (attempt, error, delay) => {
console.log(`重試中: 第${attempt}次失敗,${delay}ms后重試`);
}
});
// 重試不可靠的網(wǎng)絡(luò)請求
asyncfunction unreliableNetworkCall() {
// 模擬70%的失敗率
if (Math.random() < 0.7) {
thrownewError('網(wǎng)絡(luò)連接超時');
}
return { data: '請求成功的數(shù)據(jù)' };
}
try {
const result = await retryManager.execute(unreliableNetworkCall);
console.log('請求成功:', result);
} catch (error) {
console.error('所有重試失敗:', error.message);
}
// 使用靜態(tài)方法快速重試HTTP請求
asyncfunction fetchUserData(userId) {
try {
const response = await RetryManager.retryFetch(`/api/users/${userId}`, {
method: 'GET',
headers: { 'Authorization': 'Bearer token123' }
}, {
maxAttempts: 5,
baseDelay: 2000
});
returnawait response.json();
} catch (error) {
console.error('用戶數(shù)據(jù)獲取失敗:', error.message);
throw error;
}
}
// 查看重試統(tǒng)計
console.log('重試統(tǒng)計:', retryManager.getStats());實際項目應(yīng)用示例
1. 健壯的HTTP客戶端
class RobustHttpClient {
constructor(options = {}) {
this.baseURL = options.baseURL || '';
this.defaultHeaders = options.headers || {};
this.timeout = options.timeout || 30000;
// 為不同類型的請求配置不同的重試策略
this.retryManagers = {
// GET請求:讀操作,可以多次重試
GET: new APIRetryManager({
maxAttempts: 5,
baseDelay: 1000,
maxDelay: 10000
}),
// POST請求:寫操作,需要謹(jǐn)慎重試
POST: new APIRetryManager({
maxAttempts: 3,
baseDelay: 2000,
retryCondition: (error) => {
// 只重試網(wǎng)絡(luò)錯誤和5xx服務(wù)器錯誤
if (error.name === 'TypeError') returntrue;
if (error.status >= 500) returntrue;
returnfalse;
}
}),
// PUT/DELETE請求:冪等操作,可以重試
PUT: new APIRetryManager({
maxAttempts: 4,
baseDelay: 1500
}),
DELETE: new APIRetryManager({
maxAttempts: 4,
baseDelay: 1500
})
};
// 請求攔截器
this.requestInterceptors = [];
this.responseInterceptors = [];
// 統(tǒng)計信息
this.stats = {
totalRequests: 0,
successfulRequests: 0,
failedRequests: 0,
totalRetries: 0
};
}
// 添加請求攔截器
addRequestInterceptor(interceptor) {
this.requestInterceptors.push(interceptor);
}
// 添加響應(yīng)攔截器
addResponseInterceptor(interceptor) {
this.responseInterceptors.push(interceptor);
}
// 通用請求方法
async request(config) {
// 應(yīng)用請求攔截器
let processedConfig = { ...config };
for (const interceptor ofthis.requestInterceptors) {
processedConfig = await interceptor(processedConfig);
}
// 構(gòu)建完整的請求配置
const requestConfig = this.buildRequestConfig(processedConfig);
// 選擇重試管理器
const retryManager = this.retryManagers[requestConfig.method.toUpperCase()] ||
this.retryManagers.GET;
this.stats.totalRequests++;
try {
const response = await retryManager.execute(async () => {
returnthis.executeRequest(requestConfig);
});
// 應(yīng)用響應(yīng)攔截器
let processedResponse = response;
for (const interceptor ofthis.responseInterceptors) {
processedResponse = await interceptor(processedResponse);
}
this.stats.successfulRequests++;
this.stats.totalRetries += (response._retryCount || 0);
return processedResponse;
} catch (error) {
this.stats.failedRequests++;
this.stats.totalRetries += (error._retryCount || 0);
throw error;
}
}
// 執(zhí)行實際的HTTP請求
async executeRequest(config) {
const url = config.url.startsWith('http') ? config.url : this.baseURL + config.url;
// 添加取消信號支持
const controller = new AbortController();
if (config.timeout) {
setTimeout(() => controller.abort(), config.timeout);
}
const response = await fetch(url, {
method: config.method,
headers: config.headers,
body: config.body,
signal: controller.signal,
...config.fetchOptions
});
// 檢查響應(yīng)狀態(tài)
if (!response.ok) {
const error = newError(`HTTP ${response.status}: ${response.statusText}`);
error.status = response.status;
error.response = response;
throw error;
}
// 解析響應(yīng)數(shù)據(jù)
const contentType = response.headers.get('content-type');
let data;
if (contentType && contentType.includes('application/json')) {
data = await response.json();
} elseif (contentType && contentType.includes('text/')) {
data = await response.text();
} else {
data = await response.blob();
}
return {
data,
status: response.status,
statusText: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
config
};
}
// 構(gòu)建請求配置
buildRequestConfig(config) {
return {
method: config.method || 'GET',
url: config.url,
headers: {
'Content-Type': 'application/json',
...this.defaultHeaders,
...config.headers
},
body: config.data ? JSON.stringify(config.data) : config.body,
timeout: config.timeout || this.timeout,
fetchOptions: config.fetchOptions || {}
};
}
// 便捷方法
asyncget(url, config = {}) {
returnthis.request({ method: 'GET', url, ...config });
}
async post(url, data, config = {}) {
returnthis.request({ method: 'POST', url, data, ...config });
}
async put(url, data, config = {}) {
returnthis.request({ method: 'PUT', url, data, ...config });
}
asyncdelete(url, config = {}) {
returnthis.request({ method: 'DELETE', url, ...config });
}
// 上傳文件
async upload(url, file, options = {}) {
const uploadRetryManager = new UploadRetryManager({
onRetry: (attempt, error, delay) => {
if (options.onRetry) {
options.onRetry(attempt, error, delay);
}
}
});
return uploadRetryManager.execute(async () => {
const formData = new FormData();
formData.append(options.fieldName || 'file', file);
// 添加額外字段
if (options.fields) {
Object.entries(options.fields).forEach(([key, value]) => {
formData.append(key, value);
});
}
returnthis.executeRequest({
method: 'POST',
url,
body: formData,
headers: {
// 不設(shè)置Content-Type,讓瀏覽器自動設(shè)置multipart/form-data
...this.defaultHeaders,
...options.headers
},
timeout: options.timeout || 60000
});
});
}
// 批量請求(并發(fā)控制)
async batchRequest(requests, options = {}) {
const { concurrency = 5, failFast = false } = options;
const results = [];
// 分批處理請求
for (let i = 0; i < requests.length; i += concurrency) {
const batch = requests.slice(i, i + concurrency);
const batchPromises = batch.map(async (requestConfig, index) => {
try {
const result = awaitthis.request(requestConfig);
return { index: i + index, success: true, data: result };
} catch (error) {
if (failFast) {
throw error;
}
return { index: i + index, success: false, error };
}
});
const batchResults = awaitPromise.all(batchPromises);
results.push(...batchResults);
}
return results.sort((a, b) => a.index - b.index);
}
// 獲取統(tǒng)計信息
getStats() {
return {
...this.stats,
successRate: this.stats.totalRequests > 0 ?
(this.stats.successfulRequests / this.stats.totalRequests * 100).toFixed(2) + '%' :
'0%',
averageRetries: this.stats.totalRequests > 0 ?
(this.stats.totalRetries / this.stats.totalRequests).toFixed(2) :
'0'
};
}
// 健康檢查
async healthCheck(endpoint = '/health') {
try {
const response = awaitthis.get(endpoint, { timeout: 5000 });
return {
healthy: true,
status: response.status,
data: response.data,
timestamp: newDate().toISOString()
};
} catch (error) {
return {
healthy: false,
error: error.message,
timestamp: newDate().toISOString()
};
}
}
}
// 使用示例
const httpClient = new RobustHttpClient({
baseURL: 'https://api.example.com',
headers: {
'Authorization': 'Bearer your-token-here'
},
timeout: 30000
});
// 添加請求攔截器(自動添加時間戳)
httpClient.addRequestInterceptor(async (config) => {
console.log(`?? 發(fā)送請求: ${config.method} ${config.url}`);
config.headers['X-Request-Time'] = Date.now();
return config;
});
// 添加響應(yīng)攔截器(記錄響應(yīng)時間)
httpClient.addResponseInterceptor(async (response) => {
const requestTime = response.config.headers['X-Request-Time'];
const responseTime = Date.now() - requestTime;
console.log(`? 請求完成: ${response.config.method} ${response.config.url} (${responseTime}ms)`);
return response;
});
// 使用示例
asyncfunction demonstrateHttpClient() {
try {
// GET請求
const userData = await httpClient.get('/users/123');
console.log('用戶數(shù)據(jù):', userData.data);
// POST請求
const newUser = await httpClient.post('/users', {
name: '張三',
email: 'zhangsan@example.com'
});
console.log('創(chuàng)建用戶:', newUser.data);
// 文件上傳
const fileInput = document.getElementById('file-input');
if (fileInput && fileInput.files[0]) {
const uploadResult = await httpClient.upload('/upload', fileInput.files[0], {
fields: { description: '用戶頭像' },
onRetry: (attempt, error, delay) => {
console.log(`上傳重試: 第${attempt}次失敗,${delay}ms后重試`);
}
});
console.log('上傳成功:', uploadResult.data);
}
// 批量請求
const batchRequests = [
{ method: 'GET', url: '/users/1' },
{ method: 'GET', url: '/users/2' },
{ method: 'GET', url: '/users/3' }
];
const batchResults = await httpClient.batchRequest(batchRequests, {
concurrency: 2
});
console.log('批量請求結(jié)果:', batchResults);
// 健康檢查
const health = await httpClient.healthCheck();
console.log('服務(wù)健康狀態(tài):', health);
// 查看統(tǒng)計信息
console.log('HTTP客戶端統(tǒng)計:', httpClient.getStats());
} catch (error) {
console.error('請求失敗:', error.message);
}
}
// 定期健康檢查
setInterval(async () => {
const health = await httpClient.healthCheck();
if (!health.healthy) {
console.warn('?? 服務(wù)不健康:', health.error);
}
}, 60000); // 每分鐘檢查一次2. 分布式任務(wù)處理系統(tǒng)
class DistributedTaskProcessor {
constructor(options = {}) {
this.options = {
maxConcurrency: options.maxConcurrency || 5,
taskTimeout: options.taskTimeout || 30000,
retryDelay: options.retryDelay || 1000,
maxRetries: options.maxRetries || 3,
...options
};
// 任務(wù)隊列
this.taskQueue = [];
this.runningTasks = newMap();
this.completedTasks = [];
this.failedTasks = [];
// 重試管理器
this.retryManager = new RetryManager({
maxAttempts: this.options.maxRetries,
baseDelay: this.options.retryDelay,
timeout: this.options.taskTimeout,
onRetry: (attempt, error, delay) => {
console.log(`?? 任務(wù)重試: 第${attempt}次失敗(${error.message}),${delay}ms后重試`);
}
});
// 任務(wù)統(tǒng)計
this.stats = {
totalTasks: 0,
completedTasks: 0,
failedTasks: 0,
averageExecutionTime: 0
};
this.isProcessing = false;
}
// 添加任務(wù)
addTask(task) {
const taskId = this.generateTaskId();
const taskWrapper = {
id: taskId,
task: task.fn,
data: task.data || {},
priority: task.priority || 0,
createdAt: newDate(),
retryCount: 0,
maxRetries: task.maxRetries || this.options.maxRetries,
timeout: task.timeout || this.options.taskTimeout,
onProgress: task.onProgress,
onComplete: task.onComplete,
onError: task.onError
};
this.taskQueue.push(taskWrapper);
this.stats.totalTasks++;
// 按優(yōu)先級排序
this.taskQueue.sort((a, b) => b.priority - a.priority);
console.log(`? 任務(wù)已添加: ${taskId} (優(yōu)先級: ${taskWrapper.priority})`);
// 如果沒在處理,開始處理
if (!this.isProcessing) {
this.processQueue();
}
return taskId;
}
// 批量添加任務(wù)
addTasks(tasks) {
return tasks.map(task =>this.addTask(task));
}
// 處理任務(wù)隊列
async processQueue() {
if (this.isProcessing) return;
this.isProcessing = true;
console.log('?? 開始處理任務(wù)隊列');
while (this.taskQueue.length > 0 || this.runningTasks.size > 0) {
// 啟動新任務(wù)(在并發(fā)限制內(nèi))
while (this.taskQueue.length > 0 &&
this.runningTasks.size < this.options.maxConcurrency) {
const task = this.taskQueue.shift();
this.executeTask(task);
}
// 等待一小段時間再檢查
awaitthis.sleep(100);
}
this.isProcessing = false;
console.log('? 任務(wù)隊列處理完成');
// 輸出最終統(tǒng)計
this.printStats();
}
// 執(zhí)行單個任務(wù)
async executeTask(taskWrapper) {
const { id, task, data, timeout, onProgress, onComplete, onError } = taskWrapper;
this.runningTasks.set(id, taskWrapper);
console.log(`?? 開始執(zhí)行任務(wù): ${id}`);
const startTime = Date.now();
try {
// 使用重試管理器執(zhí)行任務(wù)
const result = awaitthis.retryManager.execute(async () => {
// 創(chuàng)建任務(wù)執(zhí)行上下文
const taskContext = {
id,
data,
progress: (percent, message) => {
if (onProgress) {
onProgress(percent, message);
}
console.log(`?? 任務(wù)進(jìn)度 ${id}: ${percent}% - ${message}`);
},
isCancelled: () =>false// 可以擴(kuò)展為支持任務(wù)取消
};
returnawait task(taskContext);
});
// 任務(wù)完成
const executionTime = Date.now() - startTime;
this.onTaskComplete(taskWrapper, result, executionTime);
if (onComplete) {
onComplete(result);
}
} catch (error) {
// 任務(wù)失敗
const executionTime = Date.now() - startTime;
this.onTaskFailed(taskWrapper, error, executionTime);
if (onError) {
onError(error);
}
} finally {
this.runningTasks.delete(id);
}
}
// 任務(wù)完成處理
onTaskComplete(taskWrapper, result, executionTime) {
const completedTask = {
...taskWrapper,
result,
executionTime,
completedAt: newDate(),
status: 'completed'
};
this.completedTasks.push(completedTask);
this.stats.completedTasks++;
// 更新平均執(zhí)行時間
this.updateAverageExecutionTime(executionTime);
console.log(`? 任務(wù)完成: ${taskWrapper.id} (耗時: ${executionTime}ms)`);
}
// 任務(wù)失敗處理
onTaskFailed(taskWrapper, error, executionTime) {
const failedTask = {
...taskWrapper,
error: error.message,
executionTime,
failedAt: newDate(),
status: 'failed'
};
this.failedTasks.push(failedTask);
this.stats.failedTasks++;
console.error(`? 任務(wù)失敗: ${taskWrapper.id} (耗時: ${executionTime}ms) - ${error.message}`);
}
// 獲取任務(wù)狀態(tài)
getTaskStatus(taskId) {
// 檢查正在運(yùn)行的任務(wù)
if (this.runningTasks.has(taskId)) {
return {
status: 'running',
task: this.runningTasks.get(taskId)
};
}
// 檢查已完成的任務(wù)
const completed = this.completedTasks.find(t => t.id === taskId);
if (completed) {
return { status: 'completed', task: completed };
}
// 檢查失敗的任務(wù)
const failed = this.failedTasks.find(t => t.id === taskId);
if (failed) {
return { status: 'failed', task: failed };
}
// 檢查隊列中的任務(wù)
const queued = this.taskQueue.find(t => t.id === taskId);
if (queued) {
return { status: 'queued', task: queued };
}
return { status: 'not_found' };
}
// 取消任務(wù)
cancelTask(taskId) {
// 從隊列中移除
const queueIndex = this.taskQueue.findIndex(t => t.id === taskId);
if (queueIndex !== -1) {
this.taskQueue.splice(queueIndex, 1);
console.log(`?? 任務(wù)已取消: ${taskId} (從隊列中移除)`);
returntrue;
}
// 正在運(yùn)行的任務(wù)標(biāo)記為取消(需要任務(wù)本身支持檢查取消狀態(tài))
if (this.runningTasks.has(taskId)) {
const task = this.runningTasks.get(taskId);
task.cancelled = true;
console.log(`?? 任務(wù)已標(biāo)記取消: ${taskId} (正在運(yùn)行,等待任務(wù)檢查取消狀態(tài))`);
returntrue;
}
returnfalse;
}
// 暫停任務(wù)處理
pause() {
this.isProcessing = false;
console.log('?? 任務(wù)處理已暫停');
}
// 恢復(fù)任務(wù)處理
resume() {
if (!this.isProcessing) {
console.log('?? 任務(wù)處理已恢復(fù)');
this.processQueue();
}
}
// 獲取統(tǒng)計信息
getStats() {
return {
...this.stats,
queuedTasks: this.taskQueue.length,
runningTasks: this.runningTasks.size,
successRate: this.stats.totalTasks > 0 ?
(this.stats.completedTasks / this.stats.totalTasks * 100).toFixed(2) + '%' :
'0%'
};
}
// 輸出統(tǒng)計信息
printStats() {
const stats = this.getStats();
console.log('\n?? 任務(wù)處理統(tǒng)計:');
console.log(`總?cè)蝿?wù)數(shù): ${stats.totalTasks}`);
console.log(`已完成: ${stats.completedTasks}`);
console.log(`失敗: ${stats.failedTasks}`);
console.log(`成功率: ${stats.successRate}`);
console.log(`平均執(zhí)行時間: ${stats.averageExecutionTime}ms`);
}
// 工具方法
generateTaskId() {
return`task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
sleep(ms) {
returnnewPromise(resolve => setTimeout(resolve, ms));
}
updateAverageExecutionTime(executionTime) {
const totalCompleted = this.stats.completedTasks;
if (totalCompleted === 1) {
this.stats.averageExecutionTime = executionTime;
} else {
this.stats.averageExecutionTime = Math.round(
(this.stats.averageExecutionTime * (totalCompleted - 1) + executionTime) / totalCompleted
);
}
}
}
// 使用示例
const taskProcessor = new DistributedTaskProcessor({
maxConcurrency: 3,
taskTimeout: 10000,
maxRetries: 2
});
// 定義一些示例任務(wù)
const tasks = [
{
fn: async (context) => {
context.progress(0, '開始處理圖片');
// 模擬圖片處理
for (let i = 0; i <= 100; i += 10) {
context.progress(i, `處理進(jìn)度 ${i}%`);
awaitnewPromise(resolve => setTimeout(resolve, 100));
// 模擬隨機(jī)失敗
if (i === 50 && Math.random() < 0.3) {
thrownewError('圖片處理失敗');
}
}
return { processed: true, size: '1024x768' };
},
data: { imageId: 'img_001', format: 'jpg' },
priority: 1,
onComplete: (result) => {
console.log('??? 圖片處理完成:', result);
},
onError: (error) => {
console.error('??? 圖片處理失敗:', error.message);
}
},
{
fn: async (context) => {
context.progress(0, '開始數(shù)據(jù)同步');
// 模擬數(shù)據(jù)同步
const steps = ['連接數(shù)據(jù)庫', '讀取數(shù)據(jù)', '轉(zhuǎn)換格式', '寫入目標(biāo)', '驗證結(jié)果'];
for (let i = 0; i < steps.length; i++) {
context.progress((i / steps.length) * 100, steps[i]);
awaitnewPromise(resolve => setTimeout(resolve, 200));
// 模擬偶發(fā)錯誤
if (i === 2 && Math.random() < 0.2) {
thrownewError('數(shù)據(jù)轉(zhuǎn)換失敗');
}
}
return { syncedRecords: 1500, duration: '2.1s' };
},
data: { source: 'database_a', target: 'database_b' },
priority: 2,
onComplete: (result) => {
console.log('?? 數(shù)據(jù)同步完成:', result);
}
},
{
fn: async (context) => {
context.progress(0, '開始文件上傳');
// 模擬文件上傳
for (let i = 0; i <= 100; i += 5) {
context.progress(i, `上傳進(jìn)度 ${i}%`);
awaitnewPromise(resolve => setTimeout(resolve, 50));
}
return { uploaded: true, url: 'https://cdn.example.com/file123.pdf' };
},
data: { fileName: 'report.pdf', size: '2.5MB' },
priority: 0,
onComplete: (result) => {
console.log('?? 文件上傳完成:', result);
}
}
];
// 添加任務(wù)到處理器
console.log('添加任務(wù)到處理隊列...');
const taskIds = taskProcessor.addTasks(tasks);
// 監(jiān)控任務(wù)狀態(tài)
setTimeout(() => {
taskIds.forEach(id => {
const status = taskProcessor.getTaskStatus(id);
console.log(`任務(wù)狀態(tài) ${id}: ${status.status}`);
});
}, 3000);
// 5秒后輸出最終統(tǒng)計
setTimeout(() => {
console.log('最終統(tǒng)計信息:', taskProcessor.getStats());
}, 8000);性能優(yōu)化和最佳實踐
1. 智能退避策略
class AdaptiveRetryManager extends RetryManager {
constructor(options = {}) {
super(options);
this.successHistory = [];
this.errorPatterns = newMap();
}
// 自適應(yīng)延遲計算
calculateDelay(attempt, error) {
// 基礎(chǔ)指數(shù)退避
let delay = super.calculateDelay(attempt);
// 根據(jù)錯誤類型調(diào)整
if (error && error.status === 429) {
// 限流錯誤:增加延遲
delay *= 2;
} elseif (error && error.status >= 500) {
// 服務(wù)器錯誤:根據(jù)歷史成功率調(diào)整
const recentSuccessRate = this.getRecentSuccessRate();
if (recentSuccessRate < 0.5) {
delay *= 1.5; // 成功率低時增加延遲
}
}
return delay;
}
// 獲取最近的成功率
getRecentSuccessRate() {
const recentResults = this.successHistory.slice(-10);
if (recentResults.length === 0) return1;
const successes = recentResults.filter(result => result).length;
return successes / recentResults.length;
}
// 記錄執(zhí)行結(jié)果
recordResult(success, error = null) {
this.successHistory.push(success);
if (this.successHistory.length > 100) {
this.successHistory.shift();
}
if (!success && error) {
const pattern = this.getErrorPattern(error);
const count = this.errorPatterns.get(pattern) || 0;
this.errorPatterns.set(pattern, count + 1);
}
}
getErrorPattern(error) {
return`${error.name}:${error.status || 'unknown'}`;
}
}2. 斷路器模式
class CircuitBreakerRetryManager extends RetryManager {
constructor(options = {}) {
super(options);
this.circuitBreaker = {
state: 'CLOSED', // CLOSED, OPEN, HALF_OPEN
failureCount: 0,
failureThreshold: options.failureThreshold || 5,
timeoutDuration: options.timeoutDuration || 60000,
nextAttempt: 0,
successCount: 0
};
}
async execute(asyncFunction, ...args) {
// 檢查斷路器狀態(tài)
if (this.circuitBreaker.state === 'OPEN') {
if (Date.now() < this.circuitBreaker.nextAttempt) {
thrownewError('斷路器開啟狀態(tài),請稍后重試');
} else {
this.circuitBreaker.state = 'HALF_OPEN';
this.circuitBreaker.successCount = 0;
}
}
try {
const result = awaitsuper.execute(asyncFunction, ...args);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.circuitBreaker.failureCount = 0;
if (this.circuitBreaker.state === 'HALF_OPEN') {
this.circuitBreaker.successCount++;
if (this.circuitBreaker.successCount >= 2) {
this.circuitBreaker.state = 'CLOSED';
console.log('?? 斷路器已關(guān)閉,恢復(fù)正常');
}
}
}
onFailure() {
this.circuitBreaker.failureCount++;
if (this.circuitBreaker.state === 'HALF_OPEN') {
this.openCircuit();
} elseif (this.circuitBreaker.failureCount >= this.circuitBreaker.failureThreshold) {
this.openCircuit();
}
}
openCircuit() {
this.circuitBreaker.state = 'OPEN';
this.circuitBreaker.nextAttempt = Date.now() + this.circuitBreaker.timeoutDuration;
console.log(`? 斷路器已開啟,${this.circuitBreaker.timeoutDuration / 1000}秒后重試`);
}
getCircuitState() {
return {
state: this.circuitBreaker.state,
failureCount: this.circuitBreaker.failureCount,
nextAttempt: this.circuitBreaker.nextAttempt
};
}
}3. 并發(fā)控制
class ConcurrentRetryManager {
constructor(maxConcurrency = 5) {
this.maxConcurrency = maxConcurrency;
this.running = 0;
this.queue = [];
}
async execute(retryManager, asyncFunction, ...args) {
returnnewPromise((resolve, reject) => {
this.queue.push({
retryManager,
asyncFunction,
args,
resolve,
reject
});
this.processQueue();
});
}
async processQueue() {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
return;
}
this.running++;
const task = this.queue.shift();
try {
const result = await task.retryManager.execute(task.asyncFunction, ...task.args);
task.resolve(result);
} catch (error) {
task.reject(error);
} finally {
this.running--;
this.processQueue();
}
}
}總結(jié)
通過我們自制的異步重試機(jī)制,我們實現(xiàn)了:
核心優(yōu)勢:
- ? 智能重試:指數(shù)退避+隨機(jī)抖動,避免驚群效應(yīng)
- ? 靈活配置:可配置的重試條件、次數(shù)、延遲策略
- ? 錯誤區(qū)分:智能判斷哪些錯誤需要重試,哪些不需要
- ? 詳細(xì)監(jiān)控:完整的重試過程監(jiān)控和統(tǒng)計
強(qiáng)大功能:
- ? 指數(shù)退避算法:避免給服務(wù)器造成過大壓力
- ? 隨機(jī)抖動:防止多客戶端同時重試
- ? 超時控制:防止單個請求占用過長時間
- ? 取消支持:支持中途取消重試操作
- ? 統(tǒng)計分析:詳細(xì)的成功率和性能統(tǒng)計
實際應(yīng)用場景:
- ? HTTP客戶端:健壯的網(wǎng)絡(luò)請求處理
- ? 任務(wù)處理:分布式任務(wù)的可靠執(zhí)行
- ? 文件上傳:大文件上傳的斷點續(xù)傳
- ? 數(shù)據(jù)庫操作:數(shù)據(jù)庫連接的容錯處理
高級特性:
- ? 斷路器模式:服務(wù)降級和快速失敗
- ? 自適應(yīng)策略:根據(jù)歷史情況調(diào)整重試參數(shù)
- ? 并發(fā)控制:避免過多并發(fā)請求
- ? 批量處理:高效處理大量異步任務(wù)
這個重試機(jī)制不僅解決了網(wǎng)絡(luò)不穩(wěn)定環(huán)境下的可靠性問題,更重要的是提供了企業(yè)級應(yīng)用所需的監(jiān)控、統(tǒng)計和控制能力。無論是簡單的API調(diào)用還是復(fù)雜的分布式任務(wù)處理,都能提供穩(wěn)定可靠的重試保障。
掌握了這個工具,你的應(yīng)用就能在各種不穩(wěn)定的網(wǎng)絡(luò)環(huán)境中穩(wěn)如磐石,為用戶提供始終可靠的服務(wù)體驗!




























