关于任务队列的实现
背景
或者叫任务池吧,我也不太确定这个的学名叫啥,反正就是那种可以限制同一时间执行的任务数量的工具函数。
我前两天偷懒让 LLM(Claude)给我生成了一段,他是这样写的:
export const runConcurrency = async <T>(
tasks: Array<() => Promise<T>>,
concurrency: number = 3
): Promise<Array<T | Error>> => {
const results: Array<T | Error> = [];
const executing: Promise<void>[] = [];
let index = 0;
const enqueue = async (): Promise<void> => {
if (index >= tasks.length) {
return;
}
const taskIndex = index++;
const task = tasks[taskIndex];
try {
const result = await task();
results[taskIndex] = result;
} catch (err) {
results[taskIndex] = err as Error;
}
// 当前任务完成后,如果还有待执行的任务,继续执行下一个
if (index < tasks.length) {
const nextPromise = enqueue();
executing.push(nextPromise);
}
}
// 启动初始的并发任务
for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
executing.push(enqueue());
}
// 等待所有任务执行完成
await Promise.all(executing);
return results;
}
看起来挺像一回事的,results
放所有执行结果,executing
放执行中/执行过的任务,调用 enqueue
捞出新任务执行,任务完成时再递归调用 enqueue
捞新的,最后用 all
确保所有任务都已经 fulfilled
了。
问题
但他写的代码是有问题的,你能看出来吗?我举个例子:
const results = await runConcurrency([1,2,3,4,5,6].map((i) => {
return () => new Promise((resolve, reject) => {
setTimeout(() => {
console.log(i)
resolve(i)
}, 1000)
})
}))
console.log(a)
/*
打印结果:
1
2
3
[1,2,3] 这里是 a
4
5
6
*/
在任务没执行完之前就返回了 results
,这个问题在排查的时候还会带有误导性,我一时间就没反应过来,我给按钮加了个打印 results
的事件,完事之后点了一下,发现没问题啊,results
就是 [1,2,3,4,5,6]
啊,但列表就只显示出前 3 项。
后面仔细一想不对,是 results
这个数组,在 runConcurrency
返回的那一刻只有三个元素,但此时 runConcurrency
还没有执行完成,等过了一段时间执行完成后他还会往 results
里塞元素。
回到代码,问题的关键还是在异步任务顺序上,首先 enqueue 只是一个函数定义,不涉及异步操作,可以跳过,看这部分:
// 启动初始的并发任务
for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
executing.push(enqueue());
}
// 等待所有任务执行完成
await Promise.all(executing);
这里执行了 n 次 enqueue
,然后都塞进了 executing
数组,所以有 n 个任务进了异步队列,接下来 Promise.all
又创建了一个异步任务,等待 executing
执行。
但此时 executing
里面只有 n 个任务,剩下的任务都还没创建出来,因为 enqueue
还没执行完,没有递归调用自己,就算执行完了,新的任务也会放在 all
后面去执行,所以 await
之后只会有初始的 n 个结果。
改进
在我的逼迫下,LLM 又给了一段代码,这段是没有问题的:
export const runConcurrency = async <T>(
tasks: Array<() => Promise<T>>,
concurrency: number = 3
): Promise<Array<T | Error>> => {
const results: Array<T | Error> = new Array(tasks.length);
let currentIndex = 0;
// 创建工作线程函数
const worker = async () => {
while (currentIndex < tasks.length) {
const taskIndex = currentIndex++;
const task = tasks[taskIndex];
try {
const result = await task();
results[taskIndex] = result;
} catch (err) {
results[taskIndex] = err as Error;
}
}
};
// 创建指定数量的工作线程
const workers = Array(Math.min(concurrency, tasks.length))
.fill(null)
.map(() => worker());
// 等待所有工作线程完成
await Promise.all(workers);
return results;
}
看起来和原来的结构很像,并且在 concurrency
为 3,9 个任务的场景下,const workers = Array(Math.min(concurrency, tasks.length))
只执行了 3 次 worker
函数,但是关键在于 while
。
执行的 3 次 worker
,每个的内部都有一个循环去通过 currentIndex
获取任务,因为 JS 执行时是单线程的,所以不会出现竞争,所以 worker
就相当于执行器,有 3 个执行器去自动找任务执行:
进入
while
,拿到 currentIndex 指向的任务,执行执行完成重回
while
,回到第一步
题外话
我感觉 Claude 比千问聪明多了,至少他不容易顺从,换成千问,我一个反问他就会开始顺从,认为我说的是对的,然后开始胡说八道。
Subscribe to my newsletter
Read articles from Erioifpud directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by