When developing management systems for small and medium enterprises, we often encounter scenarios where users need to import thousands or even tens of thousands of records, or export large reports. If not handled properly, the system will experience request timeouts and frozen pages. Today, I'll share how we solved this problem in our real projects.
The Problem: Synchronous Processing Dilemma
Imagine you're developing an enterprise management system where users need to bulk import product data:
// ❌ Traditional synchronous approach
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
const workbook = new ExcelJS.Workbook();
await workbook.xlsx.read(file.buffer); // Read Excel
const rows = workbook.getWorksheet(1).getRows();
// Process all data directly in the request
for (const row of rows) {
await this.productService.create({...}); // Insert row by row
}
return { success: true, count: rows.length };
}
Problems with this approach:
- HTTP Timeout: Processing 5000 records may take 2-3 minutes, browser will timeout
- Poor UX: Page freezes, users don't know the progress
- Server Pressure: Single request occupies resources too long
- No Traceability: After failure, unclear which data succeeded or failed
Solution: Asynchronous Task Queue
Our solution is to introduce Bull + Redis asynchronous task queue:
Architecture Design
User uploads file
↓
API receives and creates task record (returns task ID immediately)
↓
File uploaded to OSS private bucket
↓
Task enters Redis queue
↓
Worker process handles asynchronously
↓
Results written to database + error file uploaded to OSS
↓
User queries progress and results via task ID
Core Implementation
1. Task Creation (Second-level Response)
// ✅ Asynchronous approach
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
// 1. Upload file to OSS private bucket
const fileUrl = await this.uploadService.uploadPrivate(
file.buffer,
'imports/products'
);
// 2. Create task record
const task = await this.taskService.create({
taskNo: this.generateTaskNo(), // T[timestamp][random]
taskType: TaskType.IMPORT,
businessType: 'PRODUCT_IMPORT',
status: TaskStatus.PENDING,
fileUrl,
userId: req.user.id,
expiredAt: addDays(new Date(), 7), // Auto-cleanup after 7 days
});
// 3. Add to queue (non-blocking)
await this.importQueue.add('import', {
taskId: task.id,
fileUrl,
});
// 4. Return task ID immediately
return {
taskId: task.id,
taskNo: task.taskNo,
message: 'Import task created, processing in background'
};
}
2. Worker Asynchronous Processing
@Processor('import-export')
export class ImportExportProcessor {
@Process({ name: 'import', concurrency: 2 }) // Concurrency set to 2
async handleImport(job: Job) {
const { taskId, fileUrl } = job.data;
try {
// Update task status to "PROCESSING"
await this.taskService.update(taskId, {
status: TaskStatus.PROCESSING,
startedAt: new Date(),
});
// 1. Download file
const fileBuffer = await this.downloadFile(fileUrl);
// 2. Parse Excel
const workbook = new ExcelJS.Workbook();
await workbook.xlsx.read(Readable.from(fileBuffer));
const rows = workbook.getWorksheet(1).getRows(2); // Skip header
// 3. Batch process data
let successCount = 0;
let failCount = 0;
const errors = [];
for (const [index, row] of rows.entries()) {
try {
await this.productService.create({
name: row.getCell(1).value,
price: row.getCell(2).value,
// ...
});
successCount++;
// Real-time progress update
if (index % 100 === 0) {
await this.taskService.updateProgress(taskId, {
totalCount: rows.length,
successCount,
failCount,
});
}
} catch (error) {
failCount++;
errors.push({
row: index + 2,
data: row.values,
error: error.message,
});
}
}
// 4. Generate error report (if errors exist)
let errorFileUrl = null;
if (errors.length > 0) {
errorFileUrl = await this.generateErrorReport(errors);
}
// 5. Update task to completed status
await this.taskService.update(taskId, {
status: TaskStatus.SUCCESS,
finishedAt: new Date(),
totalCount: rows.length,
successCount,
failCount,
errorFileUrl,
});
} catch (error) {
// 6. Handle failure
await this.taskService.update(taskId, {
status: TaskStatus.FAILED,
finishedAt: new Date(),
errorMessage: error.message,
});
throw error;
}
}
@OnQueueActive()
onActive(job: Job) {
console.log(`Task started: ${job.id}`);
}
@OnQueueCompleted()
onCompleted(job: Job) {
console.log(`Task completed: ${job.id}`);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(`Task failed: ${job.id}`, error);
}
}
3. Task Status Query
@Get('tasks/:taskId')
async getTaskStatus(@Param('taskId') taskId: string) {
const task = await this.taskService.findOne(taskId);
return {
taskNo: task.taskNo,
status: task.status, // PENDING / PROCESSING / SUCCESS / FAILED
progress: {
totalCount: task.totalCount,
successCount: task.successCount,
failCount: task.failCount,
percentage: task.totalCount > 0
? Math.floor((task.successCount + task.failCount) / task.totalCount * 100)
: 0
},
errorFileUrl: task.errorFileUrl, // Error details file
startedAt: task.startedAt,
finishedAt: task.finishedAt,
};
}
Frontend Progress Polling
// Frontend code example
async function uploadAndTrack(file: File) {
// 1. Upload file and create task
const { taskId, taskNo } = await api.post('/import', { file });
message.info(`Task ${taskNo} created, processing...`);
// 2. Poll task status
const interval = setInterval(async () => {
const task = await api.get(`/tasks/${taskId}`);
// Update progress bar
setProgress(task.progress.percentage);
if (task.status === 'SUCCESS') {
clearInterval(interval);
message.success(`Import completed! ${task.progress.successCount} records succeeded`);
// Prompt to download error report if failures exist
if (task.progress.failCount > 0) {
Modal.confirm({
title: `${task.progress.failCount} records failed to import`,
content: 'Download error details?',
onOk: () => window.open(task.errorFileUrl),
});
}
} else if (task.status === 'FAILED') {
clearInterval(interval);
message.error('Import failed: ' + task.errorMessage);
}
}, 2000); // Query every 2 seconds
}
Key Technical Points
1. Task Number Generation
To help users track tasks easily, we generate human-friendly task numbers:
generateTaskNo(): string {
// T + Timestamp(Base36) + 4-byte random
const timestamp = Date.now().toString(36).toUpperCase();
const random = crypto.randomBytes(4).toString('hex').toUpperCase();
return `T${timestamp}${random}`;
}
// Example output: T1ABCDE12345678
2. File Lifecycle Management
@Cron(CronExpression.EVERY_DAY_AT_2AM)
async cleanupExpiredTasks() {
// Cleanup expired tasks (default 7 days)
const expiredTasks = await this.taskRepository.find({
where: {
expiredAt: LessThan(new Date()),
status: In([TaskStatus.SUCCESS, TaskStatus.FAILED]),
},
});
for (const task of expiredTasks) {
// Delete OSS files
if (task.fileUrl) {
await this.uploadService.deleteFile(task.fileUrl);
}
if (task.errorFileUrl) {
await this.uploadService.deleteFile(task.errorFileUrl);
}
// Delete task record
await this.taskRepository.remove(task);
}
console.log(`Cleaned up ${expiredTasks.length} expired tasks`);
}
3. Error Feedback Mechanism
When import fails, generate detailed error reports:
async generateErrorReport(errors: ErrorItem[]): Promise<string> {
const workbook = new ExcelJS.Workbook();
const sheet = workbook.addWorksheet('Import Errors');
// Add header
sheet.columns = [
{ header: 'Row', key: 'row', width: 10 },
{ header: 'Original Data', key: 'data', width: 50 },
{ header: 'Error Reason', key: 'error', width: 30 },
];
// Add error data
errors.forEach(err => {
sheet.addRow({
row: err.row,
data: JSON.stringify(err.data),
error: err.error,
});
});
// Upload to OSS
const buffer = await workbook.xlsx.writeBuffer();
return await this.uploadService.uploadPrivate(
buffer,
'errors/import-errors.xlsx'
);
}
Performance Optimization Tips
1. Batch Insert
// ❌ Insert one by one (slow)
for (const row of rows) {
await this.productRepository.insert(row);
}
// ✅ Batch insert (10x faster)
const batchSize = 500;
for (let i = 0; i < rows.length; i += batchSize) {
const batch = rows.slice(i, i + batchSize);
await this.productRepository.insert(batch);
}
2. Concurrency Control
@Process({ name: 'import', concurrency: 2 }) // Limit concurrency
// Reason:
// - Too high: Database connection pool exhausted
// - Too low: Resource waste
// - Recommended: Based on DB connection pool size (typically 2-4)
3. Memory Optimization
// ✅ Stream processing for large files
const stream = Readable.from(fileBuffer);
await workbook.xlsx.read(stream); // Process while reading, low memory usage
Solution Comparison
| Dimension | Synchronous | Asynchronous Queue |
|---|---|---|
| Response Speed | Slow (2-3 min) | Fast (<1 sec) |
| User Experience | Poor (frozen page) | Good (real-time progress) |
| Error Handling | All-or-nothing | Partial failure traceable |
| Server Pressure | High (blocking requests) | Low (async processing) |
| Scalability | Poor | Good (horizontal Worker scaling) |
Summary
For large-scale import/export in enterprise management systems, asynchronous task queues are the best practice:
- Great UX: Second-level response, real-time progress, non-blocking
- Strong Traceability: Task numbers, detailed logs, error reports
- Controlled Performance: Concurrency limits, batch operations, scheduled cleanup
- Good Scalability: Horizontal Worker node expansion
We've applied this solution in multiple enterprise projects with excellent results. If your system has similar requirements, feel free to refer to this architecture.
About the Author: ekent, Technical Lead at ek Studio, specializing in enterprise management system development and architecture design.