关于任务队列的实现

ErioifpudErioifpud
2 min read

背景

或者叫任务池吧,我也不太确定这个的学名叫啥,反正就是那种可以限制同一时间执行的任务数量的工具函数。

我前两天偷懒让 LLM(Claude)给我生成了一段,他是这样写的:

export const runConcurrency = async <T>(
  tasks: Array<() => Promise<T>>, 
  concurrency: number = 3
): Promise<Array<T | Error>> => {
  const results: Array<T | Error> = [];
  const executing: Promise<void>[] = [];
  let index = 0;

  const enqueue = async (): Promise<void> => {
    if (index >= tasks.length) {
      return;
    }

    const taskIndex = index++;
    const task = tasks[taskIndex];

    try {
      const result = await task();
      results[taskIndex] = result;
    } catch (err) {
      results[taskIndex] = err as Error;
    }

    // 当前任务完成后,如果还有待执行的任务,继续执行下一个
    if (index < tasks.length) {
      const nextPromise = enqueue();
      executing.push(nextPromise);
    }
  }

  // 启动初始的并发任务
  for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
    executing.push(enqueue());
  }

  // 等待所有任务执行完成
  await Promise.all(executing);

  return results;
}

看起来挺像一回事的,results 放所有执行结果,executing 放执行中/执行过的任务,调用 enqueue 捞出新任务执行,任务完成时再递归调用 enqueue 捞新的,最后用 all 确保所有任务都已经 fulfilled 了。

问题

但他写的代码是有问题的,你能看出来吗?我举个例子:

const results = await runConcurrency([1,2,3,4,5,6].map((i) => {
  return () => new Promise((resolve, reject) => {
    setTimeout(() => {
        console.log(i)
        resolve(i)
    }, 1000)   
  })   
}))
console.log(a)
/*
打印结果:
1
2
3
[1,2,3] 这里是 a
4
5
6
*/

在任务没执行完之前就返回了 results,这个问题在排查的时候还会带有误导性,我一时间就没反应过来,我给按钮加了个打印 results 的事件,完事之后点了一下,发现没问题啊,results 就是 [1,2,3,4,5,6] 啊,但列表就只显示出前 3 项。

后面仔细一想不对,是 results 这个数组,在 runConcurrency 返回的那一刻只有三个元素,但此时 runConcurrency 还没有执行完成,等过了一段时间执行完成后他还会往 results 里塞元素。

回到代码,问题的关键还是在异步任务顺序上,首先 enqueue 只是一个函数定义,不涉及异步操作,可以跳过,看这部分:

// 启动初始的并发任务
  for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
    executing.push(enqueue());
  }

  // 等待所有任务执行完成
  await Promise.all(executing);

这里执行了 n 次 enqueue,然后都塞进了 executing 数组,所以有 n 个任务进了异步队列,接下来 Promise.all 又创建了一个异步任务,等待 executing 执行。

但此时 executing 里面只有 n 个任务,剩下的任务都还没创建出来,因为 enqueue 还没执行完,没有递归调用自己,就算执行完了,新的任务也会放在 all 后面去执行,所以 await 之后只会有初始的 n 个结果。

改进

在我的逼迫下,LLM 又给了一段代码,这段是没有问题的:

export const runConcurrency = async <T>(
  tasks: Array<() => Promise<T>>, 
  concurrency: number = 3
): Promise<Array<T | Error>> => {
  const results: Array<T | Error> = new Array(tasks.length);
  let currentIndex = 0;

  // 创建工作线程函数
  const worker = async () => {
    while (currentIndex < tasks.length) {
      const taskIndex = currentIndex++;
      const task = tasks[taskIndex];

      try {
        const result = await task();
        results[taskIndex] = result;
      } catch (err) {
        results[taskIndex] = err as Error;
      }
    }
  };

  // 创建指定数量的工作线程
  const workers = Array(Math.min(concurrency, tasks.length))
    .fill(null)
    .map(() => worker());

  // 等待所有工作线程完成
  await Promise.all(workers);

  return results;
}

看起来和原来的结构很像,并且在 concurrency 为 3,9 个任务的场景下,const workers = Array(Math.min(concurrency, tasks.length)) 只执行了 3 次 worker 函数,但是关键在于 while

执行的 3 次 worker,每个的内部都有一个循环去通过 currentIndex 获取任务,因为 JS 执行时是单线程的,所以不会出现竞争,所以 worker 就相当于执行器,有 3 个执行器去自动找任务执行:

  1. 进入 while,拿到 currentIndex 指向的任务,执行

  2. 执行完成重回 while,回到第一步

题外话

我感觉 Claude 比千问聪明多了,至少他不容易顺从,换成千问,我一个反问他就会开始顺从,认为我说的是对的,然后开始胡说八道。

0
Subscribe to my newsletter

Read articles from Erioifpud directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Erioifpud
Erioifpud