在为中小企业开发管理系统时,我们经常会遇到这样的场景:用户需要导入几千甚至上万条数据,或者导出大量报表。如果处理不当,系统会出现请求超时、页面卡死等问题。今天分享我们在实际项目中如何解决这个问题。
问题场景:同步处理的困境
假设你正在开发一个企业管理系统,用户需要批量导入产品数据:
// ❌ 传统的同步处理方式
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
const workbook = new ExcelJS.Workbook();
await workbook.xlsx.read(file.buffer); // 读取 Excel
const rows = workbook.getWorksheet(1).getRows();
// 直接在请求中处理所有数据
for (const row of rows) {
await this.productService.create({...}); // 逐行插入数据库
}
return { success: true, count: rows.length };
}
这种方式的问题:
- HTTP 超时:处理 5000 条数据可能需要 2-3 分钟,浏览器会超时
- 用户体验差:页面卡死,用户不知道进度
- 服务器压力大:单个请求占用资源过长
- 无法追溯:失败后不知道哪些数据导入成功,哪些失败
解决方案:异步任务队列
我们的解决方案是引入 Bull + Redis 异步任务队列:
架构设计
用户上传文件
↓
API 接收并创建任务记录(立即返回任务ID)
↓
文件上传到 OSS 私有桶
↓
任务进入 Redis 队列
↓
Worker 进程异步处理
↓
处理结果写入数据库 + 错误文件上传 OSS
↓
用户通过任务ID查询进度和结果
核心实现
1. 任务创建(秒级响应)
// ✅ 异步处理方式
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
// 1. 上传文件到 OSS 私有桶
const fileUrl = await this.uploadService.uploadPrivate(
file.buffer,
'imports/products'
);
// 2. 创建任务记录
const task = await this.taskService.create({
taskNo: this.generateTaskNo(), // T[timestamp][random]
taskType: TaskType.IMPORT,
businessType: 'PRODUCT_IMPORT',
status: TaskStatus.PENDING,
fileUrl,
userId: req.user.id,
expiredAt: addDays(new Date(), 7), // 7天后自动清理
});
// 3. 加入队列(非阻塞)
await this.importQueue.add('import', {
taskId: task.id,
fileUrl,
});
// 4. 立即返回任务ID
return {
taskId: task.id,
taskNo: task.taskNo,
message: '导入任务已创建,正在后台处理'
};
}
2. Worker 异步处理
@Processor('import-export')
export class ImportExportProcessor {
@Process({ name: 'import', concurrency: 2 }) // 并发度为2
async handleImport(job: Job) {
const { taskId, fileUrl } = job.data;
try {
// 更新任务状态为"处理中"
await this.taskService.update(taskId, {
status: TaskStatus.PROCESSING,
startedAt: new Date(),
});
// 1. 下载文件
const fileBuffer = await this.downloadFile(fileUrl);
// 2. 解析 Excel
const workbook = new ExcelJS.Workbook();
await workbook.xlsx.read(Readable.from(fileBuffer));
const rows = workbook.getWorksheet(1).getRows(2); // 跳过表头
// 3. 批量处理数据
let successCount = 0;
let failCount = 0;
const errors = [];
for (const [index, row] of rows.entries()) {
try {
await this.productService.create({
name: row.getCell(1).value,
price: row.getCell(2).value,
// ...
});
successCount++;
// 实时更新进度
if (index % 100 === 0) {
await this.taskService.updateProgress(taskId, {
totalCount: rows.length,
successCount,
failCount,
});
}
} catch (error) {
failCount++;
errors.push({
row: index + 2,
data: row.values,
error: error.message,
});
}
}
// 4. 生成错误报告(如果有错误)
let errorFileUrl = null;
if (errors.length > 0) {
errorFileUrl = await this.generateErrorReport(errors);
}
// 5. 更新任务为完成状态
await this.taskService.update(taskId, {
status: TaskStatus.SUCCESS,
finishedAt: new Date(),
totalCount: rows.length,
successCount,
failCount,
errorFileUrl,
});
} catch (error) {
// 6. 处理失败
await this.taskService.update(taskId, {
status: TaskStatus.FAILED,
finishedAt: new Date(),
errorMessage: error.message,
});
throw error;
}
}
@OnQueueActive()
onActive(job: Job) {
console.log(`任务开始: ${job.id}`);
}
@OnQueueCompleted()
onCompleted(job: Job) {
console.log(`任务完成: ${job.id}`);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(`任务失败: ${job.id}`, error);
}
}
3. 任务状态查询
@Get('tasks/:taskId')
async getTaskStatus(@Param('taskId') taskId: string) {
const task = await this.taskService.findOne(taskId);
return {
taskNo: task.taskNo,
status: task.status, // PENDING / PROCESSING / SUCCESS / FAILED
progress: {
totalCount: task.totalCount,
successCount: task.successCount,
failCount: task.failCount,
percentage: task.totalCount > 0
? Math.floor((task.successCount + task.failCount) / task.totalCount * 100)
: 0
},
errorFileUrl: task.errorFileUrl, // 错误详情文件
startedAt: task.startedAt,
finishedAt: task.finishedAt,
};
}
前端轮询进度
// 前端代码示例
async function uploadAndTrack(file: File) {
// 1. 上传文件并创建任务
const { taskId, taskNo } = await api.post('/import', { file });
message.info(`任务 ${taskNo} 已创建,正在处理...`);
// 2. 轮询任务状态
const interval = setInterval(async () => {
const task = await api.get(`/tasks/${taskId}`);
// 更新进度条
setProgress(task.progress.percentage);
if (task.status === 'SUCCESS') {
clearInterval(interval);
message.success(`导入完成!成功 ${task.progress.successCount} 条`);
// 如果有失败数据,提示下载错误报告
if (task.progress.failCount > 0) {
Modal.confirm({
title: `有 ${task.progress.failCount} 条数据导入失败`,
content: '是否下载错误详情?',
onOk: () => window.open(task.errorFileUrl),
});
}
} else if (task.status === 'FAILED') {
clearInterval(interval);
message.error('导入失败:' + task.errorMessage);
}
}, 2000); // 每2秒查询一次
}
关键技术点
1. 任务编号生成
为了让用户方便追溯,我们生成人性化的任务编号:
generateTaskNo(): string {
// T + 时间戳(Base36) + 4字节随机数
const timestamp = Date.now().toString(36).toUpperCase();
const random = crypto.randomBytes(4).toString('hex').toUpperCase();
return `T${timestamp}${random}`;
}
// 示例输出: T1ABCDE12345678
2. 文件生命周期管理
@Cron(CronExpression.EVERY_DAY_AT_2AM)
async cleanupExpiredTasks() {
// 清理过期任务(默认7天)
const expiredTasks = await this.taskRepository.find({
where: {
expiredAt: LessThan(new Date()),
status: In([TaskStatus.SUCCESS, TaskStatus.FAILED]),
},
});
for (const task of expiredTasks) {
// 删除 OSS 文件
if (task.fileUrl) {
await this.uploadService.deleteFile(task.fileUrl);
}
if (task.errorFileUrl) {
await this.uploadService.deleteFile(task.errorFileUrl);
}
// 删除任务记录
await this.taskRepository.remove(task);
}
console.log(`已清理 ${expiredTasks.length} 个过期任务`);
}
3. 错误反馈机制
当导入失败时,生成详细的错误报告:
async generateErrorReport(errors: ErrorItem[]): Promise<string> {
const workbook = new ExcelJS.Workbook();
const sheet = workbook.addWorksheet('导入错误');
// 添加表头
sheet.columns = [
{ header: '行号', key: 'row', width: 10 },
{ header: '原始数据', key: 'data', width: 50 },
{ header: '错误原因', key: 'error', width: 30 },
];
// 添加错误数据
errors.forEach(err => {
sheet.addRow({
row: err.row,
data: JSON.stringify(err.data),
error: err.error,
});
});
// 上传到 OSS
const buffer = await workbook.xlsx.writeBuffer();
return await this.uploadService.uploadPrivate(
buffer,
'errors/import-errors.xlsx'
);
}
性能优化建议
1. 批量插入
// ❌ 逐条插入(慢)
for (const row of rows) {
await this.productRepository.insert(row);
}
// ✅ 批量插入(快10倍以上)
const batchSize = 500;
for (let i = 0; i < rows.length; i += batchSize) {
const batch = rows.slice(i, i + batchSize);
await this.productRepository.insert(batch);
}
2. 并发控制
@Process({ name: 'import', concurrency: 2 }) // 限制并发度
// 原因:
// - 并发度太高:数据库连接数耗尽
// - 并发度太低:资源浪费
// - 建议:根据数据库连接池大小设置(通常2-4)
3. 内存优化
// ✅ 流式处理大文件
const stream = Readable.from(fileBuffer);
await workbook.xlsx.read(stream); // 边读边处理,不占用大内存
方案对比
| 维度 | 同步处理 | 异步队列 |
|---|---|---|
| 响应速度 | 慢(2-3分钟) | 快(<1秒) |
| 用户体验 | 差(页面卡死) | 好(实时进度) |
| 错误处理 | 全部失败 | 部分失败可追溯 |
| 服务器压力 | 高(阻塞请求) | 低(异步处理) |
| 可扩展性 | 差 | 好(可横向扩展 Worker) |
总结
对于企业管理系统的大数据量导入导出,异步任务队列 是最佳实践:
- 用户体验好:秒级响应,实时进度,不阻塞页面
- 可追溯性强:任务编号、详细日志、错误报告
- 性能可控:并发度限制、批量操作、定时清理
- 扩展性好:可以横向增加 Worker 节点
这套方案我们在多个企业项目中应用,效果显著。如果你的系统也有类似需求,可以参考这个架构。
关于作者:ekent,ek Studio 技术负责人,专注于企业管理系统开发和架构设计。