返回博客列表
后端架构异步处理任务队列

企业系统中的大数据量导入导出:从同步到异步的架构演进

分享我们在企业管理系统中处理 Excel 大批量导入导出的实战经验,从同步阻塞到异步队列的完整解决方案。

作者: ekent·发布于 2026年2月7日

在为中小企业开发管理系统时,我们经常会遇到这样的场景:用户需要导入几千甚至上万条数据,或者导出大量报表。如果处理不当,系统会出现请求超时、页面卡死等问题。今天分享我们在实际项目中如何解决这个问题。

问题场景:同步处理的困境

假设你正在开发一个企业管理系统,用户需要批量导入产品数据:

// ❌ 传统的同步处理方式
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
  const workbook = new ExcelJS.Workbook();
  await workbook.xlsx.read(file.buffer);  // 读取 Excel

  const rows = workbook.getWorksheet(1).getRows();

  // 直接在请求中处理所有数据
  for (const row of rows) {
    await this.productService.create({...});  // 逐行插入数据库
  }

  return { success: true, count: rows.length };
}

这种方式的问题

  1. HTTP 超时:处理 5000 条数据可能需要 2-3 分钟,浏览器会超时
  2. 用户体验差:页面卡死,用户不知道进度
  3. 服务器压力大:单个请求占用资源过长
  4. 无法追溯:失败后不知道哪些数据导入成功,哪些失败

解决方案:异步任务队列

我们的解决方案是引入 Bull + Redis 异步任务队列:

架构设计

用户上传文件
    ↓
API 接收并创建任务记录(立即返回任务ID)
    ↓
文件上传到 OSS 私有桶
    ↓
任务进入 Redis 队列
    ↓
Worker 进程异步处理
    ↓
处理结果写入数据库 + 错误文件上传 OSS
    ↓
用户通过任务ID查询进度和结果

核心实现

1. 任务创建(秒级响应)

// ✅ 异步处理方式
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
  // 1. 上传文件到 OSS 私有桶
  const fileUrl = await this.uploadService.uploadPrivate(
    file.buffer,
    'imports/products'
  );

  // 2. 创建任务记录
  const task = await this.taskService.create({
    taskNo: this.generateTaskNo(),      // T[timestamp][random]
    taskType: TaskType.IMPORT,
    businessType: 'PRODUCT_IMPORT',
    status: TaskStatus.PENDING,
    fileUrl,
    userId: req.user.id,
    expiredAt: addDays(new Date(), 7),  // 7天后自动清理
  });

  // 3. 加入队列(非阻塞)
  await this.importQueue.add('import', {
    taskId: task.id,
    fileUrl,
  });

  // 4. 立即返回任务ID
  return {
    taskId: task.id,
    taskNo: task.taskNo,
    message: '导入任务已创建,正在后台处理'
  };
}

2. Worker 异步处理

@Processor('import-export')
export class ImportExportProcessor {

  @Process({ name: 'import', concurrency: 2 })  // 并发度为2
  async handleImport(job: Job) {
    const { taskId, fileUrl } = job.data;

    try {
      // 更新任务状态为"处理中"
      await this.taskService.update(taskId, {
        status: TaskStatus.PROCESSING,
        startedAt: new Date(),
      });

      // 1. 下载文件
      const fileBuffer = await this.downloadFile(fileUrl);

      // 2. 解析 Excel
      const workbook = new ExcelJS.Workbook();
      await workbook.xlsx.read(Readable.from(fileBuffer));
      const rows = workbook.getWorksheet(1).getRows(2);  // 跳过表头

      // 3. 批量处理数据
      let successCount = 0;
      let failCount = 0;
      const errors = [];

      for (const [index, row] of rows.entries()) {
        try {
          await this.productService.create({
            name: row.getCell(1).value,
            price: row.getCell(2).value,
            // ...
          });
          successCount++;

          // 实时更新进度
          if (index % 100 === 0) {
            await this.taskService.updateProgress(taskId, {
              totalCount: rows.length,
              successCount,
              failCount,
            });
          }
        } catch (error) {
          failCount++;
          errors.push({
            row: index + 2,
            data: row.values,
            error: error.message,
          });
        }
      }

      // 4. 生成错误报告(如果有错误)
      let errorFileUrl = null;
      if (errors.length > 0) {
        errorFileUrl = await this.generateErrorReport(errors);
      }

      // 5. 更新任务为完成状态
      await this.taskService.update(taskId, {
        status: TaskStatus.SUCCESS,
        finishedAt: new Date(),
        totalCount: rows.length,
        successCount,
        failCount,
        errorFileUrl,
      });

    } catch (error) {
      // 6. 处理失败
      await this.taskService.update(taskId, {
        status: TaskStatus.FAILED,
        finishedAt: new Date(),
        errorMessage: error.message,
      });
      throw error;
    }
  }

  @OnQueueActive()
  onActive(job: Job) {
    console.log(`任务开始: ${job.id}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job) {
    console.log(`任务完成: ${job.id}`);
  }

  @OnQueueFailed()
  onFailed(job: Job, error: Error) {
    console.error(`任务失败: ${job.id}`, error);
  }
}

3. 任务状态查询

@Get('tasks/:taskId')
async getTaskStatus(@Param('taskId') taskId: string) {
  const task = await this.taskService.findOne(taskId);

  return {
    taskNo: task.taskNo,
    status: task.status,           // PENDING / PROCESSING / SUCCESS / FAILED
    progress: {
      totalCount: task.totalCount,
      successCount: task.successCount,
      failCount: task.failCount,
      percentage: task.totalCount > 0
        ? Math.floor((task.successCount + task.failCount) / task.totalCount * 100)
        : 0
    },
    errorFileUrl: task.errorFileUrl,  // 错误详情文件
    startedAt: task.startedAt,
    finishedAt: task.finishedAt,
  };
}

前端轮询进度

// 前端代码示例
async function uploadAndTrack(file: File) {
  // 1. 上传文件并创建任务
  const { taskId, taskNo } = await api.post('/import', { file });

  message.info(`任务 ${taskNo} 已创建,正在处理...`);

  // 2. 轮询任务状态
  const interval = setInterval(async () => {
    const task = await api.get(`/tasks/${taskId}`);

    // 更新进度条
    setProgress(task.progress.percentage);

    if (task.status === 'SUCCESS') {
      clearInterval(interval);
      message.success(`导入完成!成功 ${task.progress.successCount} 条`);

      // 如果有失败数据,提示下载错误报告
      if (task.progress.failCount > 0) {
        Modal.confirm({
          title: `有 ${task.progress.failCount} 条数据导入失败`,
          content: '是否下载错误详情?',
          onOk: () => window.open(task.errorFileUrl),
        });
      }
    } else if (task.status === 'FAILED') {
      clearInterval(interval);
      message.error('导入失败:' + task.errorMessage);
    }
  }, 2000);  // 每2秒查询一次
}

关键技术点

1. 任务编号生成

为了让用户方便追溯,我们生成人性化的任务编号:

generateTaskNo(): string {
  // T + 时间戳(Base36) + 4字节随机数
  const timestamp = Date.now().toString(36).toUpperCase();
  const random = crypto.randomBytes(4).toString('hex').toUpperCase();
  return `T${timestamp}${random}`;
}

// 示例输出: T1ABCDE12345678

2. 文件生命周期管理

@Cron(CronExpression.EVERY_DAY_AT_2AM)
async cleanupExpiredTasks() {
  // 清理过期任务(默认7天)
  const expiredTasks = await this.taskRepository.find({
    where: {
      expiredAt: LessThan(new Date()),
      status: In([TaskStatus.SUCCESS, TaskStatus.FAILED]),
    },
  });

  for (const task of expiredTasks) {
    // 删除 OSS 文件
    if (task.fileUrl) {
      await this.uploadService.deleteFile(task.fileUrl);
    }
    if (task.errorFileUrl) {
      await this.uploadService.deleteFile(task.errorFileUrl);
    }

    // 删除任务记录
    await this.taskRepository.remove(task);
  }

  console.log(`已清理 ${expiredTasks.length} 个过期任务`);
}

3. 错误反馈机制

当导入失败时,生成详细的错误报告:

async generateErrorReport(errors: ErrorItem[]): Promise<string> {
  const workbook = new ExcelJS.Workbook();
  const sheet = workbook.addWorksheet('导入错误');

  // 添加表头
  sheet.columns = [
    { header: '行号', key: 'row', width: 10 },
    { header: '原始数据', key: 'data', width: 50 },
    { header: '错误原因', key: 'error', width: 30 },
  ];

  // 添加错误数据
  errors.forEach(err => {
    sheet.addRow({
      row: err.row,
      data: JSON.stringify(err.data),
      error: err.error,
    });
  });

  // 上传到 OSS
  const buffer = await workbook.xlsx.writeBuffer();
  return await this.uploadService.uploadPrivate(
    buffer,
    'errors/import-errors.xlsx'
  );
}

性能优化建议

1. 批量插入

// ❌ 逐条插入(慢)
for (const row of rows) {
  await this.productRepository.insert(row);
}

// ✅ 批量插入(快10倍以上)
const batchSize = 500;
for (let i = 0; i < rows.length; i += batchSize) {
  const batch = rows.slice(i, i + batchSize);
  await this.productRepository.insert(batch);
}

2. 并发控制

@Process({ name: 'import', concurrency: 2 })  // 限制并发度

// 原因:
// - 并发度太高:数据库连接数耗尽
// - 并发度太低:资源浪费
// - 建议:根据数据库连接池大小设置(通常2-4)

3. 内存优化

// ✅ 流式处理大文件
const stream = Readable.from(fileBuffer);
await workbook.xlsx.read(stream);  // 边读边处理,不占用大内存

方案对比

维度同步处理异步队列
响应速度慢(2-3分钟)快(<1秒
用户体验差(页面卡死)好(实时进度)
错误处理全部失败部分失败可追溯
服务器压力高(阻塞请求)低(异步处理)
可扩展性好(可横向扩展 Worker)

总结

对于企业管理系统的大数据量导入导出,异步任务队列 是最佳实践:

  1. 用户体验好:秒级响应,实时进度,不阻塞页面
  2. 可追溯性强:任务编号、详细日志、错误报告
  3. 性能可控:并发度限制、批量操作、定时清理
  4. 扩展性好:可以横向增加 Worker 节点

这套方案我们在多个企业项目中应用,效果显著。如果你的系统也有类似需求,可以参考这个架构。


关于作者:ekent,ek Studio 技术负责人,专注于企业管理系统开发和架构设计。