bucket4j限流示例

编程入门 行业动态 更新时间:2024-10-23 05:47:47

bucket4j限流<a href=https://www.elefans.com/category/jswz/34/1770116.html style=示例"/>

bucket4j限流示例

最近处理测试某个业务的性能,发现当kafka消息量特别大的时候需要限制kafka消息消费速度,因为接受消息的处理流程比较多,当消息量特别大的时候,如果kafka消息了消息(kafka是自动完成commit)提交给后台处理,一旦后台线程中断,就会导致有消息遗漏处理。
这里补充一句,为什么没有设置kafka手动提交,因为每个消息的处理流程有差异,时间不一,整体上只要接受到kafka消息,完成基本处理,就提交到线程池中了(有的消息要提交给多个线程池), 无法等没有线程池都处理完毕了才手动向kafka提交确认。

bucket4j

bucket4j是基于令牌桶算法的Java限流库, 主页在。 它主要用在3种场景:
a,限制比较重工作的速率。
b,将限流作为定时器,例如有些场景限制你对服务提供方的调用速度,因此使用限流器作为定时器,定时按照约定速率调用服务提供方。
c,限制对API访问速率。

示例简介

为了说明问题,我们开发一个简单的SpringBoot工程,就一个Rest API, 该接口会根据租户id进行限流。限流速度:每30秒2次。 因为逻辑比较简单就不再说明,直接看代码。

注意:这个示例只是演示如何使用bucket4j,实际工程中,对API调用的限流我们一般在API网关处完成。而不是在具体的API处。,本例子只是为了说明如何限流, 因为我们除了对API限流,也会多其他业务处理限流,例如我遇到的处理消息情况。

RateLimits 生成

public class RateLimits {private final LocalBucket bucket;public RateLimits(String limitsConfiguration) {LocalBucketBuilder builder = Bucket4j.builder();boolean initialized = false;for (String limitSrc : limitsConfiguration.split(",")) {long capacity = Long.parseLong(limitSrc.split(":")[0]);long duration = Long.parseLong(limitSrc.split(":")[1]);builder.addLimit(Bandwidth.simple(capacity, Duration.ofSeconds(duration)));initialized = true;}if (initialized) {bucket = builder.build();} else {throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);}}public boolean tryConsume() {return bucket.tryConsume(1);}public long getAvailableTokens() {return bucket.getAvailableTokens();}
}

服务层

@Service
@Slf4j
public class RateLimitSvcImpl implements RateLimitSvc {@Value("${rest.limits.tenant.enabled:false}")private boolean perTenantLimitsEnabled;@Value("${rest.limits.tenant.configuration:}")private String perTenantLimitsConfiguration;private ConcurrentMap<String, RateLimits> perTenantLimits = new ConcurrentHashMap<>();@Overridepublic boolean execRateLimit(String tenantId) {if (perTenantLimitsEnabled) {RateLimits rateLimits = perTenantLimitsputeIfAbsent(tenantId, id -> new RateLimits(perTenantLimitsConfiguration));if (!rateLimits.tryConsume()) {log.info("tryConsume false, tenantId={}, leftToken={}", tenantId, rateLimits.getAvailableTokens());return false;} else {log.info("tryConsume true, tenantId={}, leftToken={}", tenantId, rateLimits.getAvailableTokens());return true;}return true;}
}

具体API


@RestController
@RequestMapping
public class MyController {//实际上我们一般不会再controller中直接进行rateLimit, 而是在网关处,根据租户id,用户id,应用id,或者ip进行限流。//本程序只是RateLimit的例子,不建议直接在生产代码中使用@Autowiredprivate RateLimitSvc rateLimitSvc;@ApiOperation(value = "按用户id查询")@ApiImplicitParams({@ApiImplicitParam(name = "tenantId", value = "tenantId", defaultValue = "001", required = true, dataType = "string", paramType = "path"),})@GetMapping(value = "/tenants/{tenantId}", produces = "application/json;charset=UTF-8")public String getDevice(@PathVariable String tenantId) {if (rateLimitSvc.execRateLimit(tenantId)) {//这里实际中应该是调用设备服务查询数据库,本示例为了简化直接new了一个对象Device device = new Device();device.setId("001");device.setName("一号设备");return JSONObject.toJSONString(device, SerializerFeature.WriteMapNullValue);} else {JSONObject json = new JSONObject();json.put("errMsg", "too many requests");return json.toJSONString();}}
}

完整的代码在这里,欢迎fork, 加星。 谢谢!

效果截图

1, swagger访问效果

2, 日志信息

更多推荐

bucket4j限流示例

本文发布于:2024-02-06 03:45:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1746015.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:示例   bucket4j

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!