本文是针对xratelimiter的算法说明。
https://github.com/xnnyygn/xratelimiter
首先对rate limiter做一下简单介绍。
rate limiter主要用在限流,比如希望网站的访问在一定的量的时候,对于API有调用量限制,作为云服务商限制访问用户网站的流量等等。
常见的rate limiter算法有token bucket。具体算法这里不作展开。一般实现中,会有一个记录最后添加了token的时间戳,还有一个记录当前token数的变量。由于有两个变量,在使用redis实现集中式的token bucket时无法原子修改。
如果你想要基于token bucket实现多台服务器的限流的话,常见的是写一个redis+lua的方案,lua用来实现原子修改。除此之外,还有一种不常被提起的方案,就是根据服务器台数分割token bucket,个人称之为1/n方案,比如说
全部token bucket:容量60,再填充速率60/1秒,初始token为0 有3台服务器的话,每台的token bucket为:容量20,再填充速率20/1秒,初始token为0
如果你的服务器负载平均,或者你想限制出去的流量的话,1/n方案是一个不错的方案。
除了redis的集中式方案和1/n之外,个人尝试了调查其他的分布式rate limiter的方案,发现这方面资料比较少。少数提到了多个本地rate limiter,用GOSSIP同步消息。具体通信什么,并没有提及。相对有参考价值的,是这篇
https://pdfs.semanticscholar.org/presentation/1b2a/01c2c5980dc48349327e33b575017294130e.pdf
里面提出的FPS(Flow Proportional Share)方案,即根据流量自动分配多个rate limiter的比重。这篇slide里面也提到了使用GOSSIP同步。
经过一段时间分析,按照个人理解整理出了一套自适应的分布式速率限制的方案。核心代码在 StaticMemberDistributedTokenBucketRateLimiter 这个类中。主要内容有
- concurrent token bucket rate limiter(lock-free)
- 流量取样(request sampler)
- 各limiter的比重计算(FPS的伪代码可能有问题,最后没有使用)
- 两阶段的limiter重新分配(链式比重收集,GOSSIP用来传播结果)
下面分别讲解。
concurrent token bucket rate limiter(lock-free)
在多线程环境下使用token bucket的话,由于有两个变量,所以最简单的方案是取token时加锁。即使是加锁方案,由于是多台,理论上性能也会比redis的集中式方案好。除此之外,个人设计了一个简单的无锁方案。原理是用一个64bit变量同时保存时间戳和当前的token数,然后修改时CAS。具体来说
时间戳 << 22 | tokens数
用前64bit的前41bit保存时间戳,后22位保存tokens数。tokens数最大不超过2^22 – 1 = 2097151 约200万的话没有问题。为什么是前41bit,可以参考twitter的snowflake。另外对于token bucket的场景,可以使用服务启动时间或者创建token bucket的时间作为epoch,各台服务器之间不需要同步时间。
具体代码,参见 ConcurrentTokenBucketRateLimiter
流量取样(request sampler)
FPS方案中使用了流量取样。取样指的是一定概率取样,即
RAND() < RATIO
取样之后要临时保存。由于是连续数据,考虑到存储空间,个人使用ring buffer的方式实现。使用ring buffer的好处是,新增数据的时候,如果超过了数量,会覆盖最早的数据,这样就不用专门处理删除逻辑。其次,对于较早的数据,使用binary search调整起始指针(因为是时序数据)。还有一些小的优化,比如说ring buffer的长度是2的幂,类似2/4/8/16/32/64/128等等,这样在做mod操作的话,直接用
n & (length - 1)
代替。
流量取样的类由于是取样,不是每次是执行。多线程下的处理个人只是加了个锁,估计有无锁的ring buffer,之后有空再考虑优化。
各limiter的比重计算
FPS的伪代码有比重计算,但是个人尝试了多次,感觉伪代码可能有问题,无法得到比较满意的比重调整,所以暂时使用了相对直觉的一种计算——最近一段时间请求的token总数,包括获取失败的。如果单台服务器的token总数增加了,那么在总体中的比例也可能增加。因为是流量取样,这块可能不准确,而且实际中有比重不安定的现象。需要优化。
两阶段的limiter重新分配
最后是相对独立的基于比重的limiter重新分配的算法。为什么是两阶段,原因是比重必须从各台服务器上收集,计算后分配到各台服务器。各台服务器单独计算后应用是否有可能?个人觉得是没有可能的,原因是你无法保证所有服务器合起来的capacity等配置不超过全局capacity等配置。不管limiter的比重计算用了什么算法,总需要一个过程在各台服务器上收集比重。其次,分为两阶段,是为了区分要通讯的数据。在收集完比重之后,计算出来的结果,即每台服务器新的token bucket配置,需要传给所有服务器。个人觉得在这里使用GOSSIP比较合适,虽然是最终一致性,有一定时间内合起来的capacity可能超过全局capacity,但是最终会小于或者等于capacity。如果你在这里,或者连同收集阶段做类似2pc的方案,总会有各种极端条件导致策略会非常复杂。
实际的两阶段limiter重新分配方案
第一阶段:收集阶段 链式收集 A -> B -> C onRefreshTimeout { sendLimiterWeightCollectingRpc( currentConfig.round + 1, {selfEndpoint -> calaculateWeight()}, remainingEndpoints: allEndpoints - selfEndpoint ) } onReceiveLimiterWeightCollectingRpc(round, weightMap, remainingEndpoints) { if(round != currentConfig.round + 1 || !remainingEndpoints.contains(selfEndpoint)) { return } weightMap += (selfEndpoint -> calculateWeight()) if(remainingEndpoints.size == 1) { buildMultiLimiterConfig(round, weightMap) sendMultiLimiterConfigToOtherMembers() return } sendLimiterWeightCollectingRpc( round, weightMap + {selfEndpoint -> calaculateWeight()}, remainingEndpoints - selfEndpoint ) } 第二阶段:传播阶段 X <-> Y onSyncMultiLimiterConfig { if no other member { return } sendMultiLimiterConfigSyncRpc(random endpoint, currentConfig.round) } onReceiveMultiLimiterConfigSyncRpc(endpoint, round) { if(round >= currentConfig.round) { replyMultiLimiterConfigSyncResponse(currentConfig.round) } else { replyMultiLimiterConfigSyncResponse(currentConfig.round, currentConfig) } } onReceiveMultiLimiterConfigSyncResponse(round, config) { if(round > currentConfig.round) { applyConfig(config) } else if(round < currentConfig.round) { replyMultiLimiterConfigSyncResponse(currentConfig.round, currentConfig) } }
收集阶段主要就两个过程,一个是发起,一个是处理。在处理的时候,如果当前节点是最后一个节点,那就表示收集完成,生成新的配置,并传播给其他节点。这样做的好处是更新只有一个节点。
这里采用链式,而不是集中式的原因是集中式需要超过两个过程(发起,RPC和RESPONSE),以及代码会更复杂一些(需要线程安全的收集结果类等)。另外,对于重新分配,集群中的节点必须全部参与才能得到最终结果,如果链式收集中途失败了,这次尝试就直接失败,等待下次收集。
单个节点生成新的配置之后就是GOSSIP的强项——传播数据了。这里就不具体展开了。
BTW,上面的round是类似version一样的单调增加的数据,GOSSIP传播的时候,先判断对方和自己的round谁大,然后使用谁的数据。round会在onRefreshTimeout临时增加。
还有现在的方案是固定成员,如果是动态成员的话,可以使用xgossip的成员增减方案,但是请不要加入失败检测。对于rate limiter来说,排除失败的limiter会引起问题(总capacity超过全局capacity)。
以上就是xratelimiter中自适应分布式rate limiter的算法介绍。如果对算法感兴趣,欢迎直接阅读源代码或者联系我。