rxjs 速率限制(每秒请求数)和并发

编程入门 行业动态 更新时间:2024-10-26 16:21:37
本文介绍了rxjs 速率限制(每秒请求数)和并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想弄清楚如何在 rxjs 中编写速率限制器.用于访问大多数 api(twitter、facebook 等)如果开箱即用的方法不支持,我认为可以编写调度程序.例如 highland.js 有 ratelimit.我不想丢弃任何物品,例如窗口、样品等.

I am trying to figure out how to write a rate limiter in rxjs. Used to access most apis (twitter, facebook, etc) If not supported by out of the box methods, i would assume a scheduler could be written. For instance highland.js has ratelimit. I don't want to drop any items like with window, sample, etc.

var source = Rx.Observable.create(function (observer) { // queue of requests _.each(requests, function(r) { observer.onNext(r); }); observer.onCompleted(); // Any cleanup logic might go here return function () { console.log('disposed'); } }) // what goes here, if built in (e.g. 2 requests per 2 seconds or 15 request per 15 minutes) // SHOULD ONLY RUN var subscription = source.subscribe( function (x) { console.log('onNext: %s', x); }, function (e) { console.log('onError: %s', e); }, function () { console.log('onCompleted'); });

编辑 1:想着这样的事情,使用令牌桶算法,还是很粗糙但是...

EDIT 1: Thinking about something like this, using the token bucket algorithm, still really rough but...

Rx.Observable.prototype.tokenBucket = function(options, scheduler) { function time() { return new Date().getTime(); } var BUCKET = { capacity: options.capacity || Infinity, left: options.capacity, last: time(), tokensPerInterval: options.tokensPerInterval, interval: options.interval }; //var BUCKET = _.merge(defaultOptions, options); console.log(BUCKET); var source = this, scheduler = scheduler || (scheduler = Rx.Scheduler.timeout); return Rx.Observable.create(function(observer) { var d1 = source.subscribe(function(mainValue) { return throttle(mainValue); }); function throttle(x, tokens) { if (BUCKET.capacity === Infinity) { return observer.onNext(x); } // return x; // the number of tokens to add every S milliseconds = (r*S)/1000. var self = BUCKET; var now = time(); var deltaMS = Math.max(now - self.last, 0); self.last = now; var dripAmount = deltaMS * (self.tokensPerInterval / self.interval); self.left = Math.min(self.left + dripAmount, self.capacity); if (self.left < 1) { var interval = Math.ceil((1 - self.left) * self.interval); scheduler.scheduleWithRelative(interval, function (s, i) { return throttle(x); }); } else { self.left -= tokens || 1; console.log('calling'); return observer.onNext(x); } } return function() { d1.dispose(); console.log('disposed tokenBucket'); }; }); }; var start = moment(); var source = Rx.Observable.range(1, 20) .tokenBucket({capacity: 2, tokensPerInterval: 2, interval: 2000}) var subscription = source.subscribe( function (x) { console.log('onNext: %s', x); addToDom(x); }, function (e) { console.log('onError: %s', e); }, function () { console.log('onCompleted'); }); function addToDom(x) { var ul = document.getElementById('c'); var li = document.createElement('li'); li.innerHTML = x + ' - ' + moment().diff(start, 'seconds') + 's ago'; ul.appendChild(li); }

<script src="cdnjs.cloudflare/ajax/libs/moment.js/2.10.3/moment.min.js"></script> <script src="cdnjs.cloudflare/ajax/libs/rxjs/2.5.3/rx.all.js"></script> <ul id="c"></ul>

推荐答案

我在我的个人项目中遇到了一个非常相似的问题,并决定发布一个可重用的解决方案作为 npm 包 www.npmjs/package/rx-op-lossless-throttle

I've faced a very similar issue in my personal project and decided to publish a reusable solution as a npm package www.npmjs/package/rx-op-lossless-throttle

不同于www.g9labs/2016/03/21/lossless-rate-limiting-with-rxjs/ 它不会强制延迟每个事件.

Unlike www.g9labs/2016/03/21/lossless-rate-limiting-with-rxjs/ it doesn't force the delay on every single event.

更多推荐

rxjs 速率限制(每秒请求数)和并发

本文发布于:2023-11-06 09:43:04,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1563342.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:速率   rxjs

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!