自适应分布式速率限制(distributed rate limiter)


本文是针对xratelimiter的算法说明。

https://github.com/xnnyygn/xratelimiter

首先对rate limiter做一下简单介绍。

rate limiter主要用在限流,比如希望网站的访问在一定的量的时候,对于API有调用量限制,作为云服务商限制访问用户网站的流量等等。

常见的rate limiter算法有token bucket。具体算法这里不作展开。一般实现中,会有一个记录最后添加了token的时间戳,还有一个记录当前token数的变量。由于有两个变量,在使用redis实现集中式的token bucket时无法原子修改。

如果你想要基于token bucket实现多台服务器的限流的话,常见的是写一个redis+lua的方案,lua用来实现原子修改。除此之外,还有一种不常被提起的方案,就是根据服务器台数分割token bucket,个人称之为1/n方案,比如说

全部token bucket:容量60,再填充速率60/1秒,初始token为0

有3台服务器的话,每台的token bucket为:容量20,再填充速率20/1秒,初始token为0

如果你的服务器负载平均,或者你想限制出去的流量的话,1/n方案是一个不错的方案。

除了redis的集中式方案和1/n之外,个人尝试了调查其他的分布式rate limiter的方案,发现这方面资料比较少。少数提到了多个本地rate limiter,用GOSSIP同步消息。具体通信什么,并没有提及。相对有参考价值的,是这篇

https://pdfs.semanticscholar.org/presentation/1b2a/01c2c5980dc48349327e33b575017294130e.pdf

里面提出的FPS(Flow Proportional Share)方案,即根据流量自动分配多个rate limiter的比重。这篇slide里面也提到了使用GOSSIP同步。

经过一段时间分析,按照个人理解整理出了一套自适应的分布式速率限制的方案。核心代码在 StaticMemberDistributedTokenBucketRateLimiter 这个类中。主要内容有

  • concurrent token bucket rate limiter(lock-free)
  • 流量取样(request sampler)
  • 各limiter的比重计算(FPS的伪代码可能有问题,最后没有使用)
  • 两阶段的limiter重新分配(链式比重收集,GOSSIP用来传播结果)

下面分别讲解。

concurrent token bucket rate limiter(lock-free)

在多线程环境下使用token bucket的话,由于有两个变量,所以最简单的方案是取token时加锁。即使是加锁方案,由于是多台,理论上性能也会比redis的集中式方案好。除此之外,个人设计了一个简单的无锁方案。原理是用一个64bit变量同时保存时间戳和当前的token数,然后修改时CAS。具体来说

时间戳 << 22 | tokens数

用前64bit的前41bit保存时间戳,后22位保存tokens数。tokens数最大不超过2^22 – 1 = 2097151 约200万的话没有问题。为什么是前41bit,可以参考twitter的snowflake。另外对于token bucket的场景,可以使用服务启动时间或者创建token bucket的时间作为epoch,各台服务器之间不需要同步时间。

具体代码,参见 ConcurrentTokenBucketRateLimiter

流量取样(request sampler)

FPS方案中使用了流量取样。取样指的是一定概率取样,即

RAND() < RATIO

取样之后要临时保存。由于是连续数据,考虑到存储空间,个人使用ring buffer的方式实现。使用ring buffer的好处是,新增数据的时候,如果超过了数量,会覆盖最早的数据,这样就不用专门处理删除逻辑。其次,对于较早的数据,使用binary search调整起始指针(因为是时序数据)。还有一些小的优化,比如说ring buffer的长度是2的幂,类似2/4/8/16/32/64/128等等,这样在做mod操作的话,直接用

n & (length - 1)

代替。

流量取样的类由于是取样,不是每次是执行。多线程下的处理个人只是加了个锁,估计有无锁的ring buffer,之后有空再考虑优化。

各limiter的比重计算

FPS的伪代码有比重计算,但是个人尝试了多次,感觉伪代码可能有问题,无法得到比较满意的比重调整,所以暂时使用了相对直觉的一种计算——最近一段时间请求的token总数,包括获取失败的。如果单台服务器的token总数增加了,那么在总体中的比例也可能增加。因为是流量取样,这块可能不准确,而且实际中有比重不安定的现象。需要优化。

两阶段的limiter重新分配

最后是相对独立的基于比重的limiter重新分配的算法。为什么是两阶段,原因是比重必须从各台服务器上收集,计算后分配到各台服务器。各台服务器单独计算后应用是否有可能?个人觉得是没有可能的,原因是你无法保证所有服务器合起来的capacity等配置不超过全局capacity等配置。不管limiter的比重计算用了什么算法,总需要一个过程在各台服务器上收集比重。其次,分为两阶段,是为了区分要通讯的数据。在收集完比重之后,计算出来的结果,即每台服务器新的token bucket配置,需要传给所有服务器。个人觉得在这里使用GOSSIP比较合适,虽然是最终一致性,有一定时间内合起来的capacity可能超过全局capacity,但是最终会小于或者等于capacity。如果你在这里,或者连同收集阶段做类似2pc的方案,总会有各种极端条件导致策略会非常复杂。

实际的两阶段limiter重新分配方案

第一阶段:收集阶段
链式收集

A -> B -> C

onRefreshTimeout {
  sendLimiterWeightCollectingRpc(
    currentConfig.round + 1, 
    {selfEndpoint -> calaculateWeight()},
    remainingEndpoints: allEndpoints - selfEndpoint
  )
}

onReceiveLimiterWeightCollectingRpc(round, weightMap, remainingEndpoints) {
  if(round != currentConfig.round + 1 || !remainingEndpoints.contains(selfEndpoint)) {
    return  
  }
  weightMap += (selfEndpoint -> calculateWeight())
  if(remainingEndpoints.size == 1) {
    buildMultiLimiterConfig(round, weightMap)
    sendMultiLimiterConfigToOtherMembers()
    return
  }

  sendLimiterWeightCollectingRpc(
    round, 
    weightMap + {selfEndpoint -> calaculateWeight()},
    remainingEndpoints - selfEndpoint
  )
}

第二阶段:传播阶段

X <-> Y

onSyncMultiLimiterConfig {
  if no other member {
    return
  }
  sendMultiLimiterConfigSyncRpc(random endpoint, currentConfig.round)
}

onReceiveMultiLimiterConfigSyncRpc(endpoint, round) {
  if(round >= currentConfig.round) {
    replyMultiLimiterConfigSyncResponse(currentConfig.round)
  } else {
    replyMultiLimiterConfigSyncResponse(currentConfig.round, currentConfig)
  }
}

onReceiveMultiLimiterConfigSyncResponse(round, config) {
  if(round > currentConfig.round) {
    applyConfig(config)
  } else if(round < currentConfig.round) {
    replyMultiLimiterConfigSyncResponse(currentConfig.round, currentConfig)
  }
}

收集阶段主要就两个过程,一个是发起,一个是处理。在处理的时候,如果当前节点是最后一个节点,那就表示收集完成,生成新的配置,并传播给其他节点。这样做的好处是更新只有一个节点。

这里采用链式,而不是集中式的原因是集中式需要超过两个过程(发起,RPC和RESPONSE),以及代码会更复杂一些(需要线程安全的收集结果类等)。另外,对于重新分配,集群中的节点必须全部参与才能得到最终结果,如果链式收集中途失败了,这次尝试就直接失败,等待下次收集。

单个节点生成新的配置之后就是GOSSIP的强项——传播数据了。这里就不具体展开了。

BTW,上面的round是类似version一样的单调增加的数据,GOSSIP传播的时候,先判断对方和自己的round谁大,然后使用谁的数据。round会在onRefreshTimeout临时增加。

还有现在的方案是固定成员,如果是动态成员的话,可以使用xgossip的成员增减方案,但是请不要加入失败检测。对于rate limiter来说,排除失败的limiter会引起问题(总capacity超过全局capacity)。

以上就是xratelimiter中自适应分布式rate limiter的算法介绍。如果对算法感兴趣,欢迎直接阅读源代码或者联系我。