如何在 NestJS 中使用 Node.js 流高效處理大文件流式傳輸與數(shù)據(jù)處理
本文將學(xué)習(xí)如何在 NestJS 服務(wù)器上高效、可靠地處理大文件,結(jié)合 Node.js 流、S3 存儲(chǔ)桶以及 CSV 轉(zhuǎn) JSON 的實(shí)際示例。閱讀本文后,你將不再擔(dān)心因大文件導(dǎo)致服務(wù)器崩潰的問(wèn)題。
前置要求
為充分理解本文內(nèi)容,你需要具備以下基礎(chǔ)知識(shí):
- HTTP 下載與上傳的基本原理
- 使用 Multer 處理文件上傳的經(jīng)驗(yàn)
- AWS S3 SDK 的基本使用
- NestJS 架構(gòu)的基本理解
項(xiàng)目初始化
首先,創(chuàng)建一個(gè) NestJS 項(xiàng)目:
nest new stream-app
cd stream-app接著,生成所需的模塊與控制器文件:
nest g module files \
&& nest g controller files \
&& nest g service files \
&& nest g controller files/csv \
&& nest g service files/csv \
&& nest g controller files/s3 \
&& nest g service files/s3安裝項(xiàng)目所需依賴:
npm install multer csv-parser mime-types @aws-sdk/client-s3 @nestjs/config
npm install -D @types/multer @types/mime-types其中:
multer:用于處理文件上傳csv-parser:用于將 CSV 轉(zhuǎn)換為 JSONmime-types:用于設(shè)置正確的文件 Content-Type@aws-sdk/client-s3:用于上傳文件至 S3 兼容存儲(chǔ)(如 DigitalOcean Spaces)@nestjs/config:用于讀取環(huán)境變量
隨后,在 app.module.ts 中導(dǎo)入 ConfigModule:
import { Module } from"@nestjs/common";
import { AppController } from"./app.controller";
import { AppService } from"./app.service";
import { FilesModule } from"./files/files.module";
import { ConfigModule } from"@nestjs/config";
@Module({
imports: [ConfigModule.forRoot({ isGlobal: true }), FilesModule],
controllers: [AppController],
providers: [AppService],
})
exportclassAppModule {}最后,在項(xiàng)目根目錄下創(chuàng)建名為 storage 的文件夾,并放入一個(gè)至少 100MB 的大文件,例如:
stream-app/storage/large-report.pdfNestJS 中的基礎(chǔ)流式傳輸
向用戶發(fā)送大文件的錯(cuò)誤方式是使用 readFileSync()。該方法會(huì)將整個(gè)文件加載到內(nèi)存中,一次性發(fā)送,對(duì)大文件或高并發(fā)應(yīng)用極不實(shí)用。
// 錯(cuò)誤示例 —— 切勿使用
@Get('download-bad')
getFileBad(@Res() res: Response) {
const filePath = join(process.cwd(), 'storage', 'large-report.pdf');
const fileBuffer = readFileSync(filePath); // 將整個(gè)文件加載進(jìn)內(nèi)存
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Disposition', 'attachment; filename="report.pdf"');
return res.send(fileBuffer); // 一次性發(fā)送全部緩沖數(shù)據(jù)
}幸運(yùn)的是,Node.js 提供了流(Stream)機(jī)制,可以高效、漸進(jìn)、非阻塞地處理數(shù)據(jù)。通過(guò) createReadStream(),文件將以 64KB 的默認(rèn)塊大小逐步讀取。
更新 files.controller.ts:
import {
Controller,
Get,
Query,
Res,
HttpException,
HttpStatus,
Post,
UploadedFile,
UseInterceptors,
ConsoleLogger,
} from"@nestjs/common";
import { Response } from"express";
import { extname, join } from"path";
import { createReadStream, statSync } from"fs";
import { StreamableFile } from"@nestjs/common";
import * as mime from"mime-types";
import { FilesService } from"./files.service";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
@Controller("files")
exportclassFilesController {
constructor(private readonly filesService: FilesService) {}
@Get("download")
getFile(@Res({ passthrough: true }) res: Response) {
const filePath = join(process.cwd(), "storage", "large-report.pdf");
const fileStream = createReadStream(filePath);
res.set({
"Content-Type": "application/pdf",
"Content-Disposition": 'attachment; filename="report.pdf"',
});
returnnewStreamableFile(fileStream);
}
}在上述代碼中,@Res({ passthrough: true }) 告訴 NestJS 允許我們自定義響應(yīng)頭,同時(shí)仍由框架負(fù)責(zé)發(fā)送響應(yīng)數(shù)據(jù),無(wú)需手動(dòng)調(diào)用 res.send()。
我們?cè)O(shè)置的響應(yīng)頭包括:
Content-Type:告知瀏覽器文件類型Content-Disposition:告知瀏覽器文件名及應(yīng)觸發(fā)下載
StreamableFile(fileStream) 將原始流包裝成 NestJS 可識(shí)別的響應(yīng)對(duì)象,適用于 Express 和 Fastify。若需切換至 Fastify,僅需修改 main.ts 并安裝適配器即可。
優(yōu)化文件下載功能
上述示例雖可運(yùn)行,但在生產(chǎn)環(huán)境中還需增強(qiáng)錯(cuò)誤處理、輸入驗(yàn)證、正確設(shè)置響應(yīng)頭及復(fù)用邏輯。
更新 files.service.ts:
import {
Injectable,
StreamableFile,
NotFoundException,
BadRequestException,
} from"@nestjs/common";
import { join } from"path";
import { createReadStream, existsSync } from"fs";
import { ReadStream } from"fs";
@Injectable()
exportclassFilesService {
getFileStream(fileName: string): { stream: ReadStream; path: string } {
try {
// 基礎(chǔ)文件名驗(yàn)證
if (!fileName || typeof fileName !== "string") {
thrownewBadRequestException("無(wú)效的文件名");
}
// 防止目錄遍歷攻擊
if (
fileName.includes("..") ||
fileName.includes("/") ||
fileName.includes("\\")
) {
thrownewBadRequestException(
"無(wú)效文件名:包含路徑遍歷字符"
);
}
const filePath = join(process.cwd(), "storage", fileName);
if (!existsSync(filePath)) {
thrownewNotFoundException(`文件 '${fileName}' 未找到`);
}
const stream = createReadStream(filePath);
return { stream, path: filePath };
} catch (error) {
if (
error instanceofNotFoundException ||
error instanceofBadRequestException
) {
throw error;
}
thrownewBadRequestException(
`獲取文件流失敗 ${fileName}: ${error.message}`
);
}
}
}上述代碼中:
- 驗(yàn)證文件名非空且為字符串,防止崩潰
- 攔截路徑遍歷攻擊,確保僅能訪問(wèn)
storage目錄內(nèi)的文件 - 使用 NestJS 異常機(jī)制進(jìn)行統(tǒng)一錯(cuò)誤處理
existsSync()用于檢查指定路徑是否存在,存在返回true,否則返回false。
隨后更新 files.controller.ts,添加以下端點(diǎn):
@Get('improved-download')
downloadFile(@Query('name') name: string, @Res({ passthrough: true }) res: Response) {
if (!name) {
thrownewHttpException('文件名是必需的', HttpStatus.BAD_REQUEST);
}
const { stream, path } = this.filesService.getFileStream(name);
const fileSize = statSync(path).size;
const fileExtension = extname(path);
const contentType = mime.lookup(fileExtension) || 'application/octet-stream';
res.set({
'Content-Type': contentType,
'Content-Disposition': `attachment; filename="${name}"`,
'Content-Length': fileSize.toString(),
'Cache-Control': 'no-cache, no-store, must-revalidate',
});
returnnewStreamableFile(stream);
}在該端點(diǎn)中:
- 使用查詢參數(shù)
name動(dòng)態(tài)選擇文件 - 調(diào)用
getFileStream(name)獲取流與路徑 - 使用
statSync()獲取文件大小,用于瀏覽器顯示下載進(jìn)度 - 通過(guò)
mime-types庫(kù)自動(dòng)映射文件擴(kuò)展名到正確的 MIME 類型(如application/pdf) - 設(shè)置
Cache-Control防止瀏覽器緩存過(guò)期文件
下載文件時(shí),瀏覽器可能緩存響應(yīng),導(dǎo)致用戶獲取舊版本。通過(guò)設(shè)置
Cache-Control可避免此問(wèn)題。
上傳大文件
接下來(lái),我們將學(xué)習(xí)如何通過(guò)流式方式上傳文件至磁盤和 S3 存儲(chǔ)桶。
上傳至磁盤
在 FilesController 中添加以下上傳路由:
@Post('upload')
@UseInterceptors(
FileInterceptor('file', {
storage: diskStorage({
destination: './uploads',
filename: (req, file, callback) => {
const uniqueName = Date.now() + extname(file.originalname);
callback(null, uniqueName);
},
}),
limits: {
fileSize: 500 * 1024 * 1024, // 500MB
},
}),
)
handleUpload(@UploadedFile() file: Express.Multer.File) {
return {
message: '文件上傳成功',
filename: file.filename,
size: file.size,
};
}@UseInterceptors 是 NestJS 的裝飾器,用于為路由綁定攔截器。此處使用 FileInterceptor,它是對(duì) Multer 的封裝,自動(dòng)從請(qǐng)求中提取文件并解析。
diskStorage 將文件分塊寫入磁盤,而非加載至內(nèi)存。filename 函數(shù)用于生成唯一文件名。
通過(guò) @UploadedFile() 裝飾器,可獲取文件對(duì)象,包含 filename、originalname、mimetype、size、path 等信息。由于使用了 diskStorage,file.buffer 將為 undefined。
上傳至 S3
本例中,我們將先通過(guò) diskStorage 上傳至本地,再將文件流直接推送至 S3 存儲(chǔ)桶。
本例使用 DigitalOcean Spaces,其完全兼容 S3 協(xié)議,使用相同的 AWS SDK,僅需替換端點(diǎn)與 CDN 地址。
更新 s3.service.ts:
import { Injectable } from"@nestjs/common";
import { ConfigService } from"@nestjs/config";
import { S3Client, PutObjectCommand } from"@aws-sdk/client-s3";
import { Readable } from"stream";
import * as path from"path";
@Injectable()
exportclassS3Service {
privates3: S3Client;
privatereadonlybucketName: string;
privatereadonlyendpoint: string;
privatereadonlyregion: string;
privatereadonlycdnUrl: string;
constructor(private readonly configService: ConfigService) {
this.bucketName = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_BUCKET_NAME"
);
this.endpoint = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_ENDPOINT"
);
this.region = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_REGION"
);
this.cdnUrl = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_CDN_URL"
);
const accessKeyId = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_ACCESS_KEY_ID"
);
const secretAccessKey = this.configService.getOrThrow<string>(
"DIGITAL_OCEAN_SPACE_SECRET_KEY"
);
this.s3 = newS3Client({
endpoint: this.endpoint,
forcePathStyle: false,
region: this.region,
credentials: {
accessKeyId,
secretAccessKey,
},
});
}
asyncuploadImageStream(payload: {
location: string;
file: {
stream: Readable;
filename: string;
mimetype: string;
size: number;
};
}): Promise<{ path: string; key: string }> {
const { location, file } = payload;
const uid = Date.now().toString();
const extension = path.extname(file.filename);
const key = `${location}/${uid}${extension}`;
const command = newPutObjectCommand({
Bucket: this.bucketName,
Key: key,
Body: file.stream,
ContentLength: file.size,
});
try {
awaitthis.s3.send(command);
return {
path: `${this.cdnUrl}/${key}`,
key,
};
} catch (error) {
console.error("上傳文件流失敗:", error);
thrownewError("文件上傳失敗");
}
}
}在 uploadImageStream() 方法中:
- 生成唯一文件鍵(key)
- 使用 AWS SDK v3 創(chuàng)建上傳命令,將可讀流作為
Body - 在
try-catch中執(zhí)行上傳并返回路徑與鍵
隨后更新 s3.controller.ts:
import {
Controller,
Post,
UploadedFile,
UseInterceptors,
BadRequestException,
} from"@nestjs/common";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
import * as fs from"fs";
import * as path from"path";
import { S3Service } from"./s3.service";
@Controller("s3")
exportclassS3Controller {
constructor(private readonly s3Service: S3Service) {}
@Post("upload")
@UseInterceptors(
FileInterceptor("file", {
storage: diskStorage({
destination: "./uploads",
filename: (req, file, cb) => {
cb(null, `${Date.now()}-${file.originalname}`);
},
}),
limits: { fileSize: 200 * 1024 * 1024 },
})
)
asyncuploadToS3(@UploadedFile() file: Express.Multer.File) {
if (!file) {
thrownewBadRequestException("未上傳文件");
}
const location = "uploads";
const filePath = file.path;
const readStream = fs.createReadStream(filePath);
const { size } = fs.statSync(filePath);
try {
const uploadResult = awaitthis.s3Service.uploadImageStream({
location,
file: {
stream: readStream,
filename: file.originalname,
mimetype: file.mimetype,
size,
},
});
return {
message: "文件已上傳至 S3",
...uploadResult,
};
} catch (error) {
thrownewError(`文件上傳失敗: ${error.message}`);
} finally {
// 清理臨時(shí)文件
if (file.path && fs.existsSync(file.path)) {
fs.unlinkSync(file.path);
}
}
}
}在 uploadToS3 中:
- 將文件流與元數(shù)據(jù)傳遞給
uploadImageStream() - 成功后返回 S3 路徑與鍵
- 最終通過(guò)
fs.unlinkSync()刪除本地臨時(shí)文件
處理大文件:CSV 轉(zhuǎn) JSON 示例
更新 csv.service.ts:
import { Injectable, BadRequestException } from"@nestjs/common";
import * as csv from"csv-parser";
import { Readable } from"stream";
exportinterfaceCsvRow {
[key: string]: string;
}
exportinterfaceCsvProcessingResult {
totalRows: number;
data: CsvRow[];
}
@Injectable()
exportclassCsvService {
asyncprocessCsvStream(fileStream: Readable): Promise<CsvProcessingResult> {
returnnewPromise((resolve, reject) => {
constresults: CsvRow[] = [];
// 創(chuàng)建 CSV 解析流
const csvStream = csv();
// 錯(cuò)誤處理
csvStream.on("error", (error) => {
reject(newBadRequestException(`CSV 解析失敗: ${error.message}`));
});
// 處理完成
csvStream.on("end", () => {
resolve({
totalRows: results.length,
data: results,
});
});
// 流式處理
fileStream.pipe(csvStream).on("data", (data: CsvRow) => {
results.push(data);
// 對(duì)于超大文件,建議替換為數(shù)據(jù)庫(kù)寫入邏輯:
// this.databaseService.insertRow(data);
// 或批量累積后批量插入以提升性能
});
});
}
}在 processCsvStream() 中:
- 創(chuàng)建 Promise 處理異步流
- 使用
csv-parser創(chuàng)建轉(zhuǎn)換流,將 CSV 數(shù)據(jù)逐行轉(zhuǎn)為 JSON 對(duì)象 - 通過(guò)
fileStream.pipe(csvStream)將原始數(shù)據(jù)輸入解析器 - 每解析一行,觸發(fā)
data事件,將結(jié)果存入數(shù)組 - 出錯(cuò)時(shí)拒絕 Promise,完成時(shí)返回結(jié)果
更新 csv.controller.ts:
import {
Controller,
Post,
UploadedFile,
UseInterceptors,
BadRequestException,
} from"@nestjs/common";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
import * as fs from"fs";
import { CsvService } from"./csv.service";
@Controller("csv")
exportclassCsvController {
constructor(private readonly csvService: CsvService) {}
@Post("upload")
@UseInterceptors(
FileInterceptor("file", {
storage: diskStorage({
destination: "./uploads",
filename: (req, file, cb) => {
cb(null, `${Date.now()}-${file.originalname}`);
},
}),
limits: { fileSize: 50 * 1024 * 1024 }, // 50MB 限制
})
)
asynchandleCsvUpload(@UploadedFile() file: Express.Multer.File) {
if (!file) {
thrownewBadRequestException("未上傳文件");
}
// 創(chuàng)建文件讀取流(真正流式處理)
const fileStream = fs.createReadStream(file.path);
try {
// 使用服務(wù)流式處理 CSV
const result = awaitthis.csvService.processCsvStream(fileStream);
return {
message: "CSV 處理成功",
filename: file.originalname,
...result,
};
} catch (error) {
thrownewBadRequestException(`CSV 處理失敗: ${error.message}`);
} finally {
// 清理臨時(shí)文件
if (file.path && fs.existsSync(file.path)) {
fs.unlinkSync(file.path);
}
}
}
}最后,確認(rèn) files.module.ts 中的控制器與提供者配置正確:
import { Module } from"@nestjs/common";
import { FilesController } from"./files.controller";
import { FilesService } from"./files.service";
import { CsvController } from"./csv/csv.controller";
import { S3Controller } from"./s3/s3.controller";
import { S3Service } from"./s3/s3.service";
import { CsvService } from"./csv/csv.service";
@Module({
controllers: [FilesController, CsvController, S3Controller],
providers: [FilesService, S3Service, CsvService],
})
exportclassFilesModule {}總結(jié)
本文詳細(xì)介紹了在 NestJS 中如何通過(guò) Node.js 流實(shí)現(xiàn):
- 大文件下載
- 文件上傳至磁盤與 S3
- CSV 文件流式轉(zhuǎn)換為 JSON
你已掌握正確的實(shí)踐方式、常見陷阱及其原因。下一步可考慮:
- 將 CSV 解析結(jié)果直接寫入數(shù)據(jù)庫(kù)
- 為 S3 上傳添加重試機(jī)制
原文鏈接:https://www.telerik.com/blogs/how-stream-large-files-handle-data-efficiently-nodejs-streams-nestjs作者:Christian Nwamba




























