偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

如何在 NestJS 中使用 Node.js 流高效處理大文件流式傳輸與數(shù)據(jù)處理

開發(fā) 前端
本文將學(xué)習(xí)如何在 NestJS 服務(wù)器上高效、可靠地處理大文件,結(jié)合 Node.js 流、S3 存儲(chǔ)桶以及 CSV 轉(zhuǎn) JSON 的實(shí)際示例。閱讀本文后,你將不再擔(dān)心因大文件導(dǎo)致服務(wù)器崩潰的問(wèn)題。

本文將學(xué)習(xí)如何在 NestJS 服務(wù)器上高效、可靠地處理大文件,結(jié)合 Node.js 流、S3 存儲(chǔ)桶以及 CSV 轉(zhuǎn) JSON 的實(shí)際示例。閱讀本文后,你將不再擔(dān)心因大文件導(dǎo)致服務(wù)器崩潰的問(wèn)題。

前置要求

為充分理解本文內(nèi)容,你需要具備以下基礎(chǔ)知識(shí):

  • HTTP 下載與上傳的基本原理
  • 使用 Multer 處理文件上傳的經(jīng)驗(yàn)
  • AWS S3 SDK 的基本使用
  • NestJS 架構(gòu)的基本理解

項(xiàng)目初始化

首先,創(chuàng)建一個(gè) NestJS 項(xiàng)目:

nest new stream-app
cd stream-app

接著,生成所需的模塊與控制器文件:

nest g module files \
&& nest g controller files \
&& nest g service files \
&& nest g controller files/csv \
&& nest g service files/csv \
&& nest g controller files/s3 \
&& nest g service files/s3

安裝項(xiàng)目所需依賴:

npm install multer csv-parser mime-types @aws-sdk/client-s3 @nestjs/config
npm install -D @types/multer @types/mime-types

其中:

  • multer:用于處理文件上傳
  • csv-parser:用于將 CSV 轉(zhuǎn)換為 JSON
  • mime-types:用于設(shè)置正確的文件 Content-Type
  • @aws-sdk/client-s3:用于上傳文件至 S3 兼容存儲(chǔ)(如 DigitalOcean Spaces)
  • @nestjs/config:用于讀取環(huán)境變量

隨后,在 app.module.ts 中導(dǎo)入 ConfigModule

import { Module } from"@nestjs/common";
import { AppController } from"./app.controller";
import { AppService } from"./app.service";
import { FilesModule } from"./files/files.module";
import { ConfigModule } from"@nestjs/config";

@Module({
imports: [ConfigModule.forRoot({ isGlobal: true }), FilesModule],
controllers: [AppController],
providers: [AppService],
})
exportclassAppModule {}

最后,在項(xiàng)目根目錄下創(chuàng)建名為 storage 的文件夾,并放入一個(gè)至少 100MB 的大文件,例如:

stream-app/storage/large-report.pdf

NestJS 中的基礎(chǔ)流式傳輸

向用戶發(fā)送大文件的錯(cuò)誤方式是使用 readFileSync()。該方法會(huì)將整個(gè)文件加載到內(nèi)存中,一次性發(fā)送,對(duì)大文件或高并發(fā)應(yīng)用極不實(shí)用。

// 錯(cuò)誤示例 —— 切勿使用
@Get('download-bad')
getFileBad(@Res() res: Response) {
  const filePath = join(process.cwd(), 'storage', 'large-report.pdf');
  const fileBuffer = readFileSync(filePath); // 將整個(gè)文件加載進(jìn)內(nèi)存

  res.setHeader('Content-Type', 'application/pdf');
  res.setHeader('Content-Disposition', 'attachment; filename="report.pdf"');

  return res.send(fileBuffer); // 一次性發(fā)送全部緩沖數(shù)據(jù)
}

幸運(yùn)的是,Node.js 提供了流(Stream)機(jī)制,可以高效、漸進(jìn)、非阻塞地處理數(shù)據(jù)。通過(guò) createReadStream(),文件將以 64KB 的默認(rèn)塊大小逐步讀取。

更新 files.controller.ts

import {
Controller,
Get,
Query,
Res,
HttpException,
HttpStatus,
Post,
UploadedFile,
UseInterceptors,
ConsoleLogger,
} from"@nestjs/common";
import { Response } from"express";
import { extname, join } from"path";
import { createReadStream, statSync } from"fs";
import { StreamableFile } from"@nestjs/common";
import * as mime from"mime-types";
import { FilesService } from"./files.service";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";

@Controller("files")
exportclassFilesController {
constructor(private readonly filesService: FilesService) {}

@Get("download")
getFile(@Res({ passthrough: true }) res: Response) {
    const filePath = join(process.cwd(), "storage", "large-report.pdf");
    const fileStream = createReadStream(filePath);

    res.set({
      "Content-Type": "application/pdf",
      "Content-Disposition": 'attachment; filename="report.pdf"',
    });

    returnnewStreamableFile(fileStream);
  }
}

在上述代碼中,@Res({ passthrough: true }) 告訴 NestJS 允許我們自定義響應(yīng)頭,同時(shí)仍由框架負(fù)責(zé)發(fā)送響應(yīng)數(shù)據(jù),無(wú)需手動(dòng)調(diào)用 res.send()。

我們?cè)O(shè)置的響應(yīng)頭包括:

  • Content-Type:告知瀏覽器文件類型
  • Content-Disposition:告知瀏覽器文件名及應(yīng)觸發(fā)下載

StreamableFile(fileStream) 將原始流包裝成 NestJS 可識(shí)別的響應(yīng)對(duì)象,適用于 Express 和 Fastify。若需切換至 Fastify,僅需修改 main.ts 并安裝適配器即可。

優(yōu)化文件下載功能

上述示例雖可運(yùn)行,但在生產(chǎn)環(huán)境中還需增強(qiáng)錯(cuò)誤處理、輸入驗(yàn)證、正確設(shè)置響應(yīng)頭及復(fù)用邏輯。

更新 files.service.ts

import {
Injectable,
StreamableFile,
NotFoundException,
BadRequestException,
} from"@nestjs/common";
import { join } from"path";
import { createReadStream, existsSync } from"fs";
import { ReadStream } from"fs";

@Injectable()
exportclassFilesService {
getFileStream(fileName: string): { stream: ReadStream; path: string } {
    try {
      // 基礎(chǔ)文件名驗(yàn)證
      if (!fileName || typeof fileName !== "string") {
        thrownewBadRequestException("無(wú)效的文件名");
      }

      // 防止目錄遍歷攻擊
      if (
        fileName.includes("..") ||
        fileName.includes("/") ||
        fileName.includes("\\")
      ) {
        thrownewBadRequestException(
          "無(wú)效文件名:包含路徑遍歷字符"
        );
      }

      const filePath = join(process.cwd(), "storage", fileName);

      if (!existsSync(filePath)) {
        thrownewNotFoundException(`文件 '${fileName}' 未找到`);
      }

      const stream = createReadStream(filePath);
      return { stream, path: filePath };
    } catch (error) {
      if (
        error instanceofNotFoundException ||
        error instanceofBadRequestException
      ) {
        throw error;
      }
      thrownewBadRequestException(
        `獲取文件流失敗 ${fileName}: ${error.message}`
      );
    }
  }
}

上述代碼中:

  • 驗(yàn)證文件名非空且為字符串,防止崩潰
  • 攔截路徑遍歷攻擊,確保僅能訪問(wèn) storage 目錄內(nèi)的文件
  • 使用 NestJS 異常機(jī)制進(jìn)行統(tǒng)一錯(cuò)誤處理

existsSync() 用于檢查指定路徑是否存在,存在返回 true,否則返回 false。

隨后更新 files.controller.ts,添加以下端點(diǎn):

@Get('improved-download')
downloadFile(@Query('name') name: string, @Res({ passthrough: true }) res: Response) {
if (!name) {
    thrownewHttpException('文件名是必需的', HttpStatus.BAD_REQUEST);
  }

const { stream, path } = this.filesService.getFileStream(name);
const fileSize = statSync(path).size;
const fileExtension = extname(path);
const contentType = mime.lookup(fileExtension) || 'application/octet-stream';

  res.set({
    'Content-Type': contentType,
    'Content-Disposition': `attachment; filename="${name}"`,
    'Content-Length': fileSize.toString(),
    'Cache-Control': 'no-cache, no-store, must-revalidate',
  });

returnnewStreamableFile(stream);
}

在該端點(diǎn)中:

  • 使用查詢參數(shù) name 動(dòng)態(tài)選擇文件
  • 調(diào)用 getFileStream(name) 獲取流與路徑
  • 使用 statSync() 獲取文件大小,用于瀏覽器顯示下載進(jìn)度
  • 通過(guò) mime-types 庫(kù)自動(dòng)映射文件擴(kuò)展名到正確的 MIME 類型(如 application/pdf
  • 設(shè)置 Cache-Control 防止瀏覽器緩存過(guò)期文件

下載文件時(shí),瀏覽器可能緩存響應(yīng),導(dǎo)致用戶獲取舊版本。通過(guò)設(shè)置 Cache-Control 可避免此問(wèn)題。

上傳大文件

接下來(lái),我們將學(xué)習(xí)如何通過(guò)流式方式上傳文件至磁盤和 S3 存儲(chǔ)桶。

上傳至磁盤

在 FilesController 中添加以下上傳路由:

@Post('upload')
@UseInterceptors(
FileInterceptor('file', {
    storage: diskStorage({
      destination: './uploads',
      filename: (req, file, callback) => {
        const uniqueName = Date.now() + extname(file.originalname);
        callback(null, uniqueName);
      },
    }),
    limits: {
      fileSize: 500 * 1024 * 1024, // 500MB
    },
  }),
)
handleUpload(@UploadedFile() file: Express.Multer.File) {
return {
    message: '文件上傳成功',
    filename: file.filename,
    size: file.size,
  };
}

@UseInterceptors 是 NestJS 的裝飾器,用于為路由綁定攔截器。此處使用 FileInterceptor,它是對(duì) Multer 的封裝,自動(dòng)從請(qǐng)求中提取文件并解析。

diskStorage 將文件分塊寫入磁盤,而非加載至內(nèi)存。filename 函數(shù)用于生成唯一文件名。

通過(guò) @UploadedFile() 裝飾器,可獲取文件對(duì)象,包含 filename、originalname、mimetype、size、path 等信息。由于使用了 diskStorage,file.buffer 將為 undefined。

上傳至 S3

本例中,我們將先通過(guò) diskStorage 上傳至本地,再將文件流直接推送至 S3 存儲(chǔ)桶。

本例使用 DigitalOcean Spaces,其完全兼容 S3 協(xié)議,使用相同的 AWS SDK,僅需替換端點(diǎn)與 CDN 地址。

更新 s3.service.ts

import { Injectable } from"@nestjs/common";
import { ConfigService } from"@nestjs/config";
import { S3Client, PutObjectCommand } from"@aws-sdk/client-s3";
import { Readable } from"stream";
import * as path from"path";

@Injectable()
exportclassS3Service {
privates3: S3Client;
privatereadonlybucketName: string;
privatereadonlyendpoint: string;
privatereadonlyregion: string;
privatereadonlycdnUrl: string;

constructor(private readonly configService: ConfigService) {
    this.bucketName = this.configService.getOrThrow<string>(
      "DIGITAL_OCEAN_SPACE_BUCKET_NAME"
    );
    this.endpoint = this.configService.getOrThrow<string>(
      "DIGITAL_OCEAN_SPACE_ENDPOINT"
    );
    this.region = this.configService.getOrThrow<string>(
      "DIGITAL_OCEAN_SPACE_REGION"
    );
    this.cdnUrl = this.configService.getOrThrow<string>(
      "DIGITAL_OCEAN_SPACE_CDN_URL"
    );
    const accessKeyId = this.configService.getOrThrow<string>(
      "DIGITAL_OCEAN_SPACE_ACCESS_KEY_ID"
    );
    const secretAccessKey = this.configService.getOrThrow<string>(
      "DIGITAL_OCEAN_SPACE_SECRET_KEY"
    );
    this.s3 = newS3Client({
      endpoint: this.endpoint,
      forcePathStyle: false,
      region: this.region,
      credentials: {
        accessKeyId,
        secretAccessKey,
      },
    });
  }

asyncuploadImageStream(payload: {
    location: string;
    file: {
      stream: Readable;
      filename: string;
      mimetype: string;
      size: number;
    };
  }): Promise<{ path: string; key: string }> {
    const { location, file } = payload;
    const uid = Date.now().toString();
    const extension = path.extname(file.filename);
    const key = `${location}/${uid}${extension}`;

    const command = newPutObjectCommand({
      Bucket: this.bucketName,
      Key: key,
      Body: file.stream,
      ContentLength: file.size,
    });

    try {
      awaitthis.s3.send(command);
      return {
        path: `${this.cdnUrl}/${key}`,
        key,
      };
    } catch (error) {
      console.error("上傳文件流失敗:", error);
      thrownewError("文件上傳失敗");
    }
  }
}

在 uploadImageStream() 方法中:

  • 生成唯一文件鍵(key)
  • 使用 AWS SDK v3 創(chuàng)建上傳命令,將可讀流作為 Body
  • 在 try-catch 中執(zhí)行上傳并返回路徑與鍵

隨后更新 s3.controller.ts

import {
Controller,
Post,
UploadedFile,
UseInterceptors,
BadRequestException,
} from"@nestjs/common";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
import * as fs from"fs";
import * as path from"path";
import { S3Service } from"./s3.service";

@Controller("s3")
exportclassS3Controller {
constructor(private readonly s3Service: S3Service) {}

@Post("upload")
@UseInterceptors(
    FileInterceptor("file", {
      storage: diskStorage({
        destination: "./uploads",
        filename: (req, file, cb) => {
          cb(null, `${Date.now()}-${file.originalname}`);
        },
      }),
      limits: { fileSize: 200 * 1024 * 1024 },
    })
  )
asyncuploadToS3(@UploadedFile() file: Express.Multer.File) {
    if (!file) {
      thrownewBadRequestException("未上傳文件");
    }

    const location = "uploads";
    const filePath = file.path;
    const readStream = fs.createReadStream(filePath);
    const { size } = fs.statSync(filePath);

    try {
      const uploadResult = awaitthis.s3Service.uploadImageStream({
        location,
        file: {
          stream: readStream,
          filename: file.originalname,
          mimetype: file.mimetype,
          size,
        },
      });

      return {
        message: "文件已上傳至 S3",
        ...uploadResult,
      };
    } catch (error) {
      thrownewError(`文件上傳失敗: ${error.message}`);
    } finally {
      // 清理臨時(shí)文件
      if (file.path && fs.existsSync(file.path)) {
        fs.unlinkSync(file.path);
      }
    }
  }
}

在 uploadToS3 中:

  • 將文件流與元數(shù)據(jù)傳遞給 uploadImageStream()
  • 成功后返回 S3 路徑與鍵
  • 最終通過(guò) fs.unlinkSync() 刪除本地臨時(shí)文件

處理大文件:CSV 轉(zhuǎn) JSON 示例

更新 csv.service.ts

import { Injectable, BadRequestException } from"@nestjs/common";
import * as csv from"csv-parser";
import { Readable } from"stream";

exportinterfaceCsvRow {
  [key: string]: string;
}

exportinterfaceCsvProcessingResult {
totalRows: number;
data: CsvRow[];
}

@Injectable()
exportclassCsvService {
asyncprocessCsvStream(fileStream: Readable): Promise<CsvProcessingResult> {
    returnnewPromise((resolve, reject) => {
      constresults: CsvRow[] = [];

      // 創(chuàng)建 CSV 解析流
      const csvStream = csv();

      // 錯(cuò)誤處理
      csvStream.on("error", (error) => {
        reject(newBadRequestException(`CSV 解析失敗: ${error.message}`));
      });

      // 處理完成
      csvStream.on("end", () => {
        resolve({
          totalRows: results.length,
          data: results,
        });
      });

      // 流式處理
      fileStream.pipe(csvStream).on("data", (data: CsvRow) => {
        results.push(data);
        // 對(duì)于超大文件,建議替換為數(shù)據(jù)庫(kù)寫入邏輯:
        // this.databaseService.insertRow(data);
        // 或批量累積后批量插入以提升性能
      });
    });
  }
}

在 processCsvStream() 中:

  • 創(chuàng)建 Promise 處理異步流
  • 使用 csv-parser 創(chuàng)建轉(zhuǎn)換流,將 CSV 數(shù)據(jù)逐行轉(zhuǎn)為 JSON 對(duì)象
  • 通過(guò) fileStream.pipe(csvStream) 將原始數(shù)據(jù)輸入解析器
  • 每解析一行,觸發(fā) data 事件,將結(jié)果存入數(shù)組
  • 出錯(cuò)時(shí)拒絕 Promise,完成時(shí)返回結(jié)果

更新 csv.controller.ts

import {
Controller,
Post,
UploadedFile,
UseInterceptors,
BadRequestException,
} from"@nestjs/common";
import { FileInterceptor } from"@nestjs/platform-express";
import { diskStorage } from"multer";
import * as fs from"fs";
import { CsvService } from"./csv.service";

@Controller("csv")
exportclassCsvController {
constructor(private readonly csvService: CsvService) {}

@Post("upload")
@UseInterceptors(
    FileInterceptor("file", {
      storage: diskStorage({
        destination: "./uploads",
        filename: (req, file, cb) => {
          cb(null, `${Date.now()}-${file.originalname}`);
        },
      }),
      limits: { fileSize: 50 * 1024 * 1024 }, // 50MB 限制
    })
  )
asynchandleCsvUpload(@UploadedFile() file: Express.Multer.File) {
    if (!file) {
      thrownewBadRequestException("未上傳文件");
    }

    // 創(chuàng)建文件讀取流(真正流式處理)
    const fileStream = fs.createReadStream(file.path);

    try {
      // 使用服務(wù)流式處理 CSV
      const result = awaitthis.csvService.processCsvStream(fileStream);

      return {
        message: "CSV 處理成功",
        filename: file.originalname,
        ...result,
      };
    } catch (error) {
      thrownewBadRequestException(`CSV 處理失敗: ${error.message}`);
    } finally {
      // 清理臨時(shí)文件
      if (file.path && fs.existsSync(file.path)) {
        fs.unlinkSync(file.path);
      }
    }
  }
}

最后,確認(rèn) files.module.ts 中的控制器與提供者配置正確:

import { Module } from"@nestjs/common";
import { FilesController } from"./files.controller";
import { FilesService } from"./files.service";
import { CsvController } from"./csv/csv.controller";
import { S3Controller } from"./s3/s3.controller";
import { S3Service } from"./s3/s3.service";
import { CsvService } from"./csv/csv.service";

@Module({
controllers: [FilesController, CsvController, S3Controller],
providers: [FilesService, S3Service, CsvService],
})
exportclassFilesModule {}

總結(jié)

本文詳細(xì)介紹了在 NestJS 中如何通過(guò) Node.js 流實(shí)現(xiàn):

  • 大文件下載
  • 文件上傳至磁盤與 S3
  • CSV 文件流式轉(zhuǎn)換為 JSON

你已掌握正確的實(shí)踐方式、常見陷阱及其原因。下一步可考慮:

  • 將 CSV 解析結(jié)果直接寫入數(shù)據(jù)庫(kù)
  • 為 S3 上傳添加重試機(jī)制

原文鏈接:https://www.telerik.com/blogs/how-stream-large-files-handle-data-efficiently-nodejs-streams-nestjs作者:Christian Nwamba

責(zé)任編輯:武曉燕 來(lái)源: 前端小石匠
相關(guān)推薦

2021-07-15 10:15:52

Node.jsJSON前端

2021-05-18 09:01:39

Node.jsJSON文件

2021-07-30 11:20:53

JavaScriptNode.jsWeb Develop

2020-08-05 08:31:51

SSL TLSNode.js

2021-07-03 17:43:03

Node.jsNode變量

2025-02-05 08:13:48

Go語(yǔ)言范式

2024-05-16 16:06:59

JSON數(shù)據(jù)二進(jìn)制

2021-08-20 16:05:28

JavaScript node.js 應(yīng)用安全

2020-10-26 08:34:13

Node.jsCORS前端

2023-10-04 07:35:03

2022-07-25 11:33:48

Python大文件

2021-12-25 22:29:57

Node.js 微任務(wù)處理事件循環(huán)

2023-10-05 12:43:48

數(shù)據(jù)處理

2010-03-10 14:03:41

python處理文本

2021-10-25 09:00:37

Node.jsJS前端

2021-07-26 05:24:59

Node.js SO_RESUEPORLibuv

2023-12-13 09:00:00

2022-11-17 09:52:12

RHEL 9Node.js

2021-09-07 07:53:43

工具

2021-06-02 00:29:08

Node.jsAcceptEmfile
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)