Promise的并发控制

编程入门 行业动态 更新时间:2024-10-27 01:22:33

<a href=https://www.elefans.com/category/jswz/34/1771451.html style=Promise的并发控制"/>

Promise的并发控制

一、场景

        给你一个有200个URL的数组,通过这些URL来发送请求,要求并发请求数不能超过五个。

        这是一道很常考的面试题,接下来让我们来学习一下Promise并发控制 

二、普通并发池的实现

        主要思路就是,判断当前队列是否满,满则等待,有空闲则补齐。

        利用 Promise.race 方法,可以判断一个Promise数组中 “谁最先完成” ,从而让等待中的函数开始运行。

/**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量* @param taskList 任务列表* @param max 最大并发数量* @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果*/
export const promisePool = <T>(taskList: task<T>[], limit: number) => {return new Promise<T[]>(async (resolve, reject) => {try {const length = taskList.length/**当前并发池 */const pool: Promise<T>[] = []/**结果数组 */const res = new Array<T>(length)/**完成的数量 */let count = 0for (let i = 0; i < length; i++) {const task = taskList[i]();//promise结束的回调const handler = (info: T) => {pool.splice(pool.indexOf(task), 1) //任务执行完就删除res[i] = info //不能使用res.push,否则不能保证结果顺序count++if (count === length) {resolve(res)}}task.then((data) => {handler(data)console.log(`第${i}个任务完成,结果为`, data);}, (err) => {handler(err)console.log(`第${i}个任务失败,原因为`, err);})pool.push(task)//如果到达了并发限制,就等到池子中任意一个结束if (pool.length >= limit) {await Promise.race(pool)}}} catch (error) {console.error('并发池出错', error);reject(error)}})
}

测试用例:

/**创造一个1s后得到结果的Promise */const getTask = () => {return async () => {await new Promise((resolve) => setTimeout(resolve, 1000))return new Date()}}//测试用例:
const testIt = async () => {const list = new Array(20).fill(0).map(() => getTask())const res = await promisePool(list, 5)console.log('res', res);
}
testIt()

打印结果:(观察控制台,可以发现是五个五个出现的)

三、让并发池可中断

        好,现在来了个新要求,用户点击了取消按钮后,你需要中断继续往并发池添加任务。 (常见场景:分片上传时,用户点击取消上传按钮)

        问题的关键核心就是,如何从外部 让内部的循环终止。 其实也很简单,设置一个变量,初始为false,当用户点击取消按钮时,变量变为true。在for循环中检测这个变量的值,为true就退出循环

        但是我们不能将这个变量设置为全局变量!否则如果有多处需要使用这个并发池,一处中断,全部遭殃。 在这里,我们就可以利用面向对象的思想,把这个变量作为对象内部的值,每个实例之间独立。“你终止你的,关我什么事? ” 

/**Promise并发池 - 可终止 - 每次都创建一个实例,避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStatic<T, Err>{/**是否取消。在循环中若发现这个变成了true,就会中断 */private isStop = false/**运行静态Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量* @param taskList 任务列表* @param max 最大并发数量* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果*/run = async (taskList: task<T>[], max: number) => {return new Promise<Array<T | Err>>(async (resolve, reject) => {type resType = T | Errtry {this.isStop = false //开始的时候设为falseconst length = taskList.lengthconst pool: Promise<resType>[] = []//并发池 let count = 0//当前结束了几个const res = new Array<resType>(length)for (let i = 0; i < length; i++) {let task = taskList[i]();if (this.isStop) return reject('并发池终止')//成功和失败都要执行的函数const handler = (_res: resType) => {pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务res[i] = _res //放入结果数组count++if (count === length) {return resolve(res)}}task.then((data) => {handler(data)console.log(`第${i}个任务完成,结果为`, data);}, (err) => {handler(err)console.log(`第${i}个任务失败,原因为`, err);})pool.push(task);if (pool.length === max) {//利用Promise.race方法来获得并发池中某任务完成的信号,当有任务完成才让程序继续执行,让循环把并发池塞满await Promise.race(pool)}}} catch (error) {console.error('promise并发池出错', error);reject(error)}})}/**停止并发池运行 */stop = () => {this.isStop = true}
}

测试用例:

/**可终止的并发池测试用例 */
const promisePoolStaticTest = () => {const list = new Array(18).fill(0).map(() => getTask())const pool = new PromisePoolStatic()pool.run(list, 3).catch((err) => {console.log('可终止的并发池测试用例出错 -- ', err)})//18个任务,每个花费1s完成,并发数量为3,共需要6s完成//我们在第三秒的时候中断setTimeout(() => pool.stop(), 3000)
}
promisePoolStaticTest()

结果如下:

        可以看到第九个任务结束之后,并发池没有进入新的任务了。 但是为什么已经终止了,还有Promise完成的回调打印出来? 因为执行终止函数时,并发池内仍有三个函数在运行,而正在运行的Promise无法终止,所以只能阻止新任务进入并发池。  (虽然无法终止Promise,但是可以终止Promise完成后的操作,这里不阐述)

四、动态并发池

        现在前面完成的操作,都是已经确定好了任务列表,才进行并发控制。如果我们需要动态添加任务的效果,如果队列没满就运行,队满则挂起等待,应该怎么做呢? (常见场景:全局axios请求并发控制

        主要思路: 队未满则直接运行,队满则加入等待队列。任务完成后,检查等待队列是否有任务


type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }/**动态并发池 */
export class PromisePoolDynamic<T> {/**最大并发数量 */private limit: number;/**当前正在跑的数量 */private runningCount: number;/**等待队列 */private queue: Array<taskWithCallbacks<T>>;/**动态并发池 - 构造函数* @param maxConcurrency 最大并发数量*/constructor(maxConcurrency: number) {this.limit = maxConcurrency;this.runningCount = 0;this.queue = [];}/**添加任务* @param task 任务,() => Promise<T>* @returns 结果*/addTask(task: task<T>) {//返回一个新的Promise实例,在任务完成前,会一直是pending状态return new Promise<T>((resolve, reject) => {const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;if (this.runningCount < this.limit) {//并发数量没满则运行console.log('任务添加:当前并发数', this.runningCount, '并发数量未满,直接运行');this.runTask(taskWithCallbacks);} else {//并发数量满则加入等待队列console.log('任务添加:当前并发数', this.runningCount, '并发数量满,挂起等待');this.queue.push(taskWithCallbacks);}});}/**运行任务* @param taskWithCallback 带有resolve和reject的任务*/private runTask(taskWithCallback: taskWithCallbacks<T>) {this.runningCount++;//当前并发数++taskWithCallback.task()//从对象中取出任务执行.then(result => {this.runningCount--;taskWithCallback.resolve(result);console.log('任务完成', result, '当前并发数', this.runningCount);this.checkQueue();}).catch(error => {this.runningCount--;taskWithCallback.reject(error);this.checkQueue();});}/**运行完成后,检查队列,看看是否有在等待的,有就取出第一个来运行 */private checkQueue() {if (this.queue.length > 0 && this.runningCount < this.limit) {const nextTask = this.queue.shift()!;console.log('并发池出现空位,取出等待队列的任务', nextTask);this.runTask(nextTask);}}
}

测试用例:

/**动态并发池的测试用例 */
const promisePoolDynamicTest = () => {const promisePoolDynamic = new PromisePoolDynamic(3) //一个最大并发3的动态并发池//最大并发3,我一次性添加7个任务promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()

测试结果:

五、结语

        关于并发池就到这里了。除了利用Promise.race,其实还可以递归等方式,不过Promise.race是最简单也是最容易理解的。

        如果代码中有哪里出现的不对,欢迎指出

更多推荐

Promise的并发控制

本文发布于:2023-11-16 23:09:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1634408.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Promise

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!