前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、为什么要限流

任务系统都有一个上限,如果超过了这个上限,会对系统造成毁灭性的打击,因此在任何时刻都应该保证系统的并发请求数量不要超过某个阈值,限流就是这一目的

1.1、场见的限流算法

策略 原理描述 优缺点 应用
令牌桶 ⬤ 令牌按照恒定速率入桶,桶满则丢弃多余令牌
⬤ 请求来临时从桶中获取令牌,桶空则等待令牌或者拒绝当前请求
优点:速率可变,能够应对突发流量
缺点:复杂度高
请求速率限流算法
漏桶 ⬤ 以任意速率生成令牌放入桶中
⬤ 桶流出(消费处理流量)令牌的速率是固定的
⬤ 若流入的令牌(突发请求)超过了桶的容量,则被丢弃或者等待,桶的容量不变
优点:速率稳定
缺点:应对突发流量需要等待令牌
请求速率限流算法
计数器 ⬤ 对一个时间窗口设置一个可以访问的量
⬤ 下一个时间窗口清零计数
⬤ 单位时间超出阈值的请求,可以设置不同的策略,等待或者丢弃
优点:实现简单,单个计数周期不会超过配置阈值
缺点:在两端临界点可能出现两倍的流速,流量不均匀,容易瞬间造成流量突刺现象
请求速度限流算法
滑动窗口 ⬤ 将时间窗⼝划分成很多⼩的时间窗⼝,让流量分散到⼩的⼩的时间窗⼝,这样流量限流更平滑。⼩的时间窗⼝越多限流越平滑
优点:相对计数算法(固定时间窗⼝)限流更平滑
缺点:⽐固定时间窗⼝占⽤更多内存(多⼏个bucket对象)
请求速度限流算法

2、基于令牌桶的 Guava-RateLimiter

2.1、实例

acq 1 时并没有任何等待直接预消费了1个令牌;

acq 6 时,由于之前预消费了1个令牌,故而等待了2秒(上一个预消费令牌数1),之后又预消费了6个令牌;

acq 2 时同理,由于之前预消费了6个令牌,故而等待了12秒(上一个预消费令牌数6);

RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)

@Test
public void test_1(){
    // 1、创建一个RateLimiter,指定每秒放0.5个令牌(2秒放1个令牌
    RateLimiter rateLimiter = RateLimiter.create(0.5);

    int[] a = {1, 6, 2};
    for (int i = 0 ; i < a.length; i++){
        log.info("时间戳:{}, 获取 {} 个令牌需要 {}s", System.currentTimeMillis(), a[i], rateLimiter.acquire(a[i]));
    }

    // 1516166482561 acq 1: wait 0.0s
    // 1516166482563 acq 6: wait 1.997664s
    // 1516166484569 acq 2: wait 11.991958s
}

2.2、源码分析

2.1.1、两种模式

Guava有两种限流模式

⬤ 一种为稳定模式 (SmoothBursty: 令牌生成速度恒定,支持缓存一定时间的突发流量,MT底层采用这种方式)

⬤ 一种为渐进模式 (SmoothWarmingUp: 令牌生成速度缓慢提升直到维持在一个稳定值,

原理:根据配置计算出限流器的 QPS,每次请求的时候检查是否有,缓存有⾜够的令牌,如果有,则直接返回通过;如果没有,则计算出需要等待的时间,如果等待时间⼩于超时时间,则等待,否则拒绝

2.1.2、核心思想

RateLimiter 核心思想有

1、响应本次请求后,动态计算下次可用服务的时间, 如果下次请求在可用时间之前,则需要等待SmoothRateLimiter 类中 nextFreeTicketMicros 表示下次可用服务的时间,例如,如果我们配置 QPS 为1,本次请求处理完成之后,下次请求的时间需要在1s中之后

2、RateLimiter 的子类 SmoothBursty 支持处理突发请求流量,例如,我们设置 QPS为1,在 10s中之内没有请求,那么令牌桶中就有10个空闲令牌,如果下次请求是acquire(20),那么就不需要等待20s了,因为令牌桶中已经有10个令牌了,SmoothRateLimiterstoredPermits 就是用来存储当前令牌桶中空闲的令牌数

3、SmoothWarmingUp 提出一种 “热身模式” 和 “冷区期” ,具体详情再说吧,先慢慢俩

2.1.3、SmoothRateLimiter 重要属性解析

@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {
  //当前持有的令牌数
  double storedPermits;
  
  //令牌数上限
  double maxPermits;
  
  //生成一个令牌需要的时间
  double stableIntervalMicros;
  
  //下次请求能获取令牌的时间,是个时间戳
  private long nextFreeTicketMicros;

2.1.4、SmoothBursty 具体实现

2.1.4.1、create

根据指定的 QPS 数创建 RateLimiter,底层调用如下:

SmoothBurstymaxBurstSeconds 构造函数参数,主要用于计算 maxPermitsmaxPermits = maxBurstSeconds * permitsPerSecond

//根据指定的 QPS 数创建  RateLimiter
public static RateLimiter create(double permitsPerSecond) {
  return create(RateLimiter.SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

@VisibleForTesting
static RateLimiter create(RateLimiter.SleepingStopwatch stopwatch, double permitsPerSecond) {
  // 1、创建 SmoothBursty 限流器
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0D /* maxBurstSeconds */);
  
  // 2、设置限流速率
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}

再看 setRate 方法,最终会调用 doSetRate, doSetRate是一个抽象方法,

image-20220415204834107

final void doSetRate(double permitsPerSecond, long nowMicros) {
  this.resync(nowMicros);
  //生成一个令牌需要的时间 
  double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  this.doSetRate(permitsPerSecond, stableIntervalMicros);
}

image-20220415204932057

实现如下

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  //设置最大令牌数(每秒最大生成令牌数)  this.maxBurstSeconds = 1 * 每秒生成令牌数
  this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == 1.0D / 0.0) {
    
    this.storedPermits = this.maxPermits;
  } else {
    this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
  }

}

2.1.4.2、acquire

acquire(int permits)RateLimiter 中获取x个令牌,该方法会一直阻塞,直到获取令牌,主要做了如下3件事

public double acquire(int permits) {
  // 1、获取当前请求需要等待的时间(惰性计算)
  long microsToWait = this.reserve(permits);

  // 2、seleep.microToWait (时间窗口)
  this.stopwatch.sleepMicrosUninterruptibly(microsToWait);

  // 3、返回 microToWait 对应的秒级时间
  return 1.0D * (double)microsToWait / (double)TimeUnit.SECONDS.toMicros(1L);
}


final long reserve(int permits) {
  //检查参数是否ok
  checkPermits(permits);
  synchronized(this.mutex()) {
    //计算需要等待的诗句
    return this.reserveAndGetWaitLength(permits, this.stopwatch.readMicros());
  }
}

2.1.4.3、核心接口 reserveAndGetWaitLength

该方法需要返回等待的时间,是 RateLimiter 的核心接口

RateLimiter 支持突发流量的本质是,将当前需要的令牌数量 requiredPermits 拆分成 storedPermitsToSpend(持有令牌中可用的数量)和 freshPermits(需要预支的令牌数量);分别计算需要等待的时间,然后更新 nextFreeTicketMicros 下次获取令牌的时间

什么意思呢?举个例子吧:

当前 RateLimiter 持有 4 个令牌,当前请求需要 6 个令牌;则 6 个令牌中 4 个是可以从持有的令牌中直接获取,而另外两个需要预支的令牌则需要单独计算时间;

伪代码:getReqWaitTime(6) = getWaitTime(4) + getFreshWait(6 - 4)

SmoothBursty 模式:

getWaitTime(4) 是可以直接获取的,即 time = 0

getFreshWait(6 - 4) 则等于 freshPermits * stableIntervalMicros (预支令牌数 * 生成一个令牌需要的时间)

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  //1.根据当前时间和预计下一秒时间判断有无新令牌产生,有则更新持有令牌数storedPermits 和 下次请求时间nextFreeTicketMicros
  resync(nowMicros); 
  long returnValue = nextFreeTicketMicros;

  //2.以下两句,根据请求需要的令牌数requiredPermits和storedPermits当前持有的令牌数storedPermits分别计算 持有令牌中可用的数量storedPermitsToSpend和需要预支的令牌数量freshPermits
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); 
  double freshPermits = requiredPermits - storedPermitsToSpend;

  //3.分别计算storedPermitsToSpend和freshPermits的等待时间
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
    + (long) (freshPermits * stableIntervalMicros);

  try {
    //4.更新nextFreeTicketMicros
    this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros); 
  } catch (ArithmeticException e) {
    this.nextFreeTicketMicros = Long.MAX_VALUE;
  }

  //5.更新storedPermits 
  this.storedPermits -= storedPermitsToSpend; 

  return returnValue;
}

image-20220526164937252

2.1.5、SmoothWarmingUp 具体实现

WarmingUpRateLimiter 的另一种实例不同于 SmoothBursty ,它存在一个 “热身” 的概念。

即:如果当前系统处于 “ 冷却期”( 即一段时间没有获取令牌,即:当前持有的令牌数量大于某个阈值),则下一次获取令牌需要等待的时间比 SmoothBursty 模式下的线性时间要大,并且逐步下降到一个稳定的数值。

大致原理:将 storedPermits 分成两个区间值:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。当请求进来时,如果当前系统处于 “cold” 的冷却期状态,从 [thresholdPermits, maxPermits] 区间去拿令牌,所需要等待的时间会长于从区间 [0, thresholdPermits) 拿相同令牌所需要等待的时间。当请求增多,storedPermits 减少到 thresholdPermits 以下时,此时拿令牌所需要等待的时间趋于稳定。这也就是所谓 “热身” 的过程。

反应到代码上,和 SmoothBursty 的不同有两点

create 方法不同;该方法指定了 “热身” 模型需要的关键参数

acquire 底层的 storedPermitsToWaitTime;由于 1 的缘故,获取当前令牌中可用令牌 storedPermitsToSpend 的等待时间,需要依据热身模型来计算

@Test
public void test_2() throws InterruptedException {
  //预热模式,设置预热时间和QPS,即在正式acquire前,限流器已经持有 5 * 4 = 20个令牌
  RateLimiter rateLimiter = RateLimiter.create(5, 4000, TimeUnit.MILLISECONDS);
  for (int i = 1; i < 50; i++) {
    System.out.println(System.currentTimeMillis() + " acq " + i + ": wait " + rateLimiter.acquire() + "s");
    if (i == 15) {
      Thread.sleep(2000);
      System.out.println(System.currentTimeMillis() + " acq " + 15 + ": wait " + rateLimiter.acquire() + "s");
    }
  }
}

1552395652026 acq 1: wait 0.0s
1552395652028 acq 2: wait 0.578357s
1552395652612 acq 3: wait 0.533835s
1552395653151 acq 4: wait 0.495191s
1552395653649 acq 5: wait 0.457239s
1552395654110 acq 6: wait 0.41631s
1552395654528 acq 7: wait 0.377524s
1552395654912 acq 8: wait 0.334018s
1552395655248 acq 9: wait 0.298249s
1552395655550 acq 10: wait 0.256165s
1552395655808 acq 11: wait 0.217752s
1552395656028 acq 12: wait 0.197672s
1552395656231 acq 13: wait 0.19451s
1552395656429 acq 14: wait 0.196465s
1552395656630 acq 15: wait 0.195714s
1552395658834 acq 15: wait 0.0s
1552395658834 acq 16: wait 0.34158s
1552395659180 acq 17: wait 0.296628s
1552395659482 acq 18: wait 0.256914s
1552395659744 acq 19: wait 0.216517s
1552395659965 acq 20: wait 0.195077s
1552395660164 acq 21: wait 0.195953s
1552395660365 acq 22: wait 0.195196s
1552395660564 acq 23: wait 0.196015s
1552395660764 acq 24: wait 0.195972s

acq 1 时并没有任何等待直接预消费了 1 个令牌

acq 2~11 时,由于当前系统处于冷却期,因此开始等待的时间较长,并且逐步下降到一个稳定值 acq 12~15 时,等待时间趋于稳定的 0.2 秒,即 1/QPS

acq 15 同时,sleep2 秒,即在当前基础上,又新增 5*2 个令牌;将系统过渡到冷却期

acq 15~结束,重复 acq 2~15 的过程。

2.1.5.1、SmoothWarmingUp & 预热模型

SmoothWarmingUpSmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:

static final class SmoothWarmingUp extends SmoothRateLimiter {
  //预热时间
  private final long warmupPeriodMicros;

  //预热区斜率
  private double slope;

  //区分冷区期和稳定器的阈值
  private double thresholdPermits;

  //code inteval/ stable inteval 的固定数值 硬编码写死的是 3
  private double coldFactor;

SmoothRateLimiter 类的注释文档中有对预热模型的详细解释

image-20220418173609666

横坐标:是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUpstoredPermits 分为两个区间:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。

纵坐标:请求的间隔时间,stableInterval 就是 1 / QPS,例如设置的 QPS 为 5,则 stableInterval 就是 200ms,coldInterval = stableInterval * coldFactor,这里的 coldFactor 硬编码写死的是 3。

当系统请求增多,图像会像左移动,直到 storedPermits 为 0。等待一段时间后,随着令牌的生成当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits

2.1.5.2、create

create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

根据指定的 ` QPS ` 和预热期来创建 RateLimiter,在这段预热时间内,RateLimiter 每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。

@VisibleForTesting
static RateLimiter create(
  SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit,
  double coldFactor) {
  //1.创建SmoothWarmingUp限流器
  RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);

  //2.设置限流速率
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}


public final void setRate(double permitsPerSecond) {
  Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized(this.mutex()) {
    this.doSetRate(permitsPerSecond, this.stopwatch.readMicros());
  }
}

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
  checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
  return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit,
                3.0);
}

SmoothWarmingUp(
  SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
  super(stopwatch);

  //1.设置预热时间
  this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); 

  //3.设置coldFactor为3
  this.coldFactor = coldFactor;
}

final void doSetRate(double permitsPerSecond, long nowMicros) {
  this.resync(nowMicros);
  // 如果QPS为5 则 stableIntervalMicros  = 1s/5 = 200ms
  double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  this.doSetRate(permitsPerSecond, stableIntervalMicros);
}


@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = maxPermits;

  //1.设置冷却期等待时间数值 coldIntervalMicros
  double coldIntervalMicros = stableIntervalMicros * coldFactor; 

  //2.设置冷却期的阈值,thresholdPermits 等于预热期产生令牌数的 一半 (4000000/200000 = 20 , 20 * 0.5 = 10)
  thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;

  //3.设置持有令牌的最大值,为thresholdPermits的2倍 为20
  maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);

  //4.设置预热区的斜率;纵坐标之差/横坐标之差 
  slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);

  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    storedPermits = 0.0;
  } else {
    storedPermits = (oldMaxPermits == 0.0)
      ? maxPermits 
      : storedPermits * maxPermits / oldMaxPermits;
  }
}

2.1.5.3、storedPermitsToWaitTime

前面说到,SmoothWarmingUpSmoothBursty 的一个重要区别就在于 “获取当前令牌中可用令牌的等待时间”storedPermitsToWaitTime 方法, 而 “获取预支令牌的等待时间” 和之前一致。**

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {

  //1.获取当前持有令牌数和阈值的差值availablePermitsAboveThreshold
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;

  //2.如果availablePermitsAboveThreshold>0,即当前持有令牌数>阈值,即到达冷区期;计算等待时间
  if (availablePermitsAboveThreshold > 0.0) {

    //3.计算WARM UP PERIOD部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2”
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    micros = (long) (permitsAboveThresholdToTake * (permitsToTime(availablePermitsAboveThreshold)
                                                    + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);

    //4.剩余的令牌从 stable部分拿
    permitsToTake -= permitsAboveThresholdToTake;
  }
  //5.stable 部分令牌获取花费的时间
  micros += (stableIntervalMicros * permitsToTake);
  return micros;
}

举个例子:创建限流器时 create(5, 4000, TimeUnit.MILLISECONDS);预热了 20 个令牌

场景1:当前持有 20 个令牌,请求一个令牌;需要等待的时间为

image-20220418182916589

场景 2:当前持有 18 个令牌,请求 1 个令牌;需要等待的时间为:

image-20220418183026895

场景3:当前持有 20 个令牌,一次性请求 11 个令牌;需要等待的时间为:

image-20220418183113660

场景4:当前持有 10 个令牌,一次性请求 1 个令牌;需要等待的时间为:

image-20220418183133770

2.1.5.4、小结

1、当 storedPermits - thresholdPermits = availablePermitsAboveThreshold > 0 (冷却期)且 permitsToTake < availablePermitsAboveThreshold 时,等待时间是 WARM UP PERIOD 中的一个梯形面积;permitsToTake 是持有令牌中可用的数量

2、当 storedPermits - thresholdPermits = availablePermitsAboveThreshold > 0 (冷却期)且 permitsToTake > availablePermitsAboveThreshold 时,等待时间是 1+ (permitsToTake - availablePermitsAboveThreshold)* stable;即梯形 + 矩形的面积

3、当 storedPermits - thresholdPermits = availablePermitsAboveThreshold < 0 时(稳定期),等待时间是 permitsToTake * stable;即矩形的面积

4、当 storedPermits 等于 0 后,系统创建新的令牌后,获取等待时间的顺序为 4->2->1;即前文说的当系统请求增多,图像会像左移动,直到 storedPermits0。等待一段时间后,随着令牌的生成当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits。是一个动态调整的过程。

2.3、RateLimiter 流程总结

总结一下 SmoothWarmingUpSmoothBursty 的创建和使用令牌的过程:

1、SmoothWarmingUpSmoothBursty 的最大区别就在于,“获取已持有令牌中可用令牌的等待时间” 不同,SmoothBursty 是直接返回的,SmoothWarmingUp 则是基于 “热身模型” 和 “冷却期”(即一段时间没有获取令牌,衡量指标:当前持有的令牌数量大于某个阈值)的机制进行动态调整(冷却期按照梯形区域返回,否则按照矩形区域返回)

2、预支令牌的等待时间算法一致,waitTime = 预支令牌数量 * 生成一个令牌需要的时间(1/QPSSmoothWarmingUp 为系统提供一种冷启动的可能,例如:某系统底层使用缓存中间件,假如没有 “热身”,突发流量很可能造成缓存击穿等问题;WarmingUp 让系统应对突发流量有一个 “渐进准备资源” 的过程,Rhino 使用的令牌桶的平滑限流,即 WarmingUp 模式

image-20220526164957928

2.3.1、RateLimter阻塞方式和非阻塞方式获取令牌

⬤ 非阻塞式:尝试获取许可,如果获取到许可,则执行代码块1,如果没有获取到就认为被限流,则执行代码块2;

⬤ 阻塞式:尝试获取许可,获取到则执行代码块1,没有获取到,则阻塞,等待获取到许可后,执行代码块1,注意不会执行代码块2;

if (获得许可) {
    执行代码块 1
} else {
    执行代码块 2
}

2.3.1.1、非阻塞方式


@Test
public void rateLimiterTest() {
  RateLimiter rateLimiter = RateLimiter.create(5);
  for (int i = 0; i < 10; i++) {
    //非阻塞方式获取令牌,立即返回成功或者失败
    if (rateLimiter.tryAcquire()) {
      System.out.println(LocalTime.now()+"拿到令牌通过");
    }
    else {
      System.out.println(LocalTime.now()+"未拿到令牌不通过");
    }
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  
}




// 尝试获取1个许可,如果获取到,则返回true,否则返回false
boolean tryAcquire();

// 尝试获取多个许可,如果获取到,则返回true,否则返回false
boolean tryAcquire(int permits)
// 示例tryAcquire(3)

// 在timeout时间内,尝试获取1个许可,如果获取到,则返回true,否则返回false
tryAcquire(Duration timeout);
// 示例:tryAcquire(Duration.ofSeconds(3))
tryAcquire(long timeout, TimeUnit unit);
// 示例:tryAcquire(3, TimeUnit.SECONDS);

// 在timeout时间内,尝试获取多个许可,如果获取到,则返回true,否则返回false
tryAcquire(int permits, Duration timeout)
tryAcquire(int permits, long timeout, TimeUnit unit)

2.3.1.2、阻塞方式

调用的是 ` acquire`

@Test
public void testAcquire() {
  RateLimiter rateLimiter = RateLimiter.create(2);

  for (int i = 0; i < 10; i++) {
    double sleep = rateLimiter.acquire();
    System.out.println("now: " + LocalTime.now() + "  sleep: " + sleep);
  }
}

2.3.2、问题

2.3.1.1、单机限流

Guava 解决的问题,一般都是单机的,以限流为例,使用 guava 限流,只做到了单机限流,但是我们的服务一般都会由多台机器(集群),虽然我们可以通过计算单机和集群的比例,来设置限流数量,但是有几个问题:

1、机器增减时,要保证总流量保持不变,就需要修改每一台机器的流量限制,这个不是很方便;

2、Guava 的限流器,并不是公平的,至于什么是公平和非公平,可以参考

2.3.1.2、集群限流

guavaRateLimiter,限流是通过获取“许可”来控制流量的,只不过是单机管理自己的许可。

问题1:如果将所有机器的“许可”汇集到一个地方,所有机器都从这个地方获取许可,不就可以实现集群限流吗?

答案:可以使用redis来保存所有机器的“许可”。这样做,可以实现集群限流,但不能保证单机的流量限制,其实对于现在的微服务来说,请求被平均分给所有机器,是服务平台的问题,所以可以不用关心这个问题。

3、计数器算法

计数器算法是在一定的时间间隔里,记录请求次数,当请求次数超过该时间限制时,就把计数器清零,然后重新计算。当请求次数超过间隔内的最大次数时,拒绝访问。

缺点:这种方法实现简单,但有一个明显的缺点,那就是在两个间隔之间,如果有密集的请求。则会导致单位时间内的实际请求超过阈值。

例如:一个接口每分钟允许访问100次。实现方式如下:

1、设置一个计数器 count ,接收一个请求就将计数器加一,同时记录当前时间。

2、判断当前时间和上次统计时间是否为同一分钟。

如果是,则判断 count 是否超过阈值,如果超过阈值,则返回限流拒绝。

如果不是,则吧 ` count` 重置为1,判断是否超过阈值。

如下图所示,该计数器算法要求每分钟请求的阈值不超过100个。

image-20220526171037025

3.1、代码实现

3.1.1、自定义注解

@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EntryTimes {

    /** 方法允许进入的次数 */
    int value() default 1;
   
     /**  可以访问的前缀 url前缀  */
    String prefix() default "";
}

3.1.2、Aspect:切面拦截

/**
 * 控制每个用户访问Controller方法的次数
 * Created by fengchuanbo on 2017/5/25.
 */
@Aspect
@Component
public class MethodEntryTimesLimitInterceptor {

  private static final String METHOD_CAN_ENTRY_TIMES_KEY = "method:entry:times:";

  @Resource
  private StringRedisTemplate stringRedisTemplate;

  /**
     * 需要有 EntryTimes 标注,并且第一个参数需要是 AuthUser 才可以
     */
  @Around("@annotation(com.duodian.admore.zhaobutong.annotation.EntryTimes)")
  public Object aroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
    HttpServletRequest request = ((ServletRequestAttributes) 
                                  RequestContextHolder.getRequestAttributes()).getRequest();
    String token = request.getParameter("token");
    String aes = AesUtils.LoginDecrypt(token);
    Long userId = Long.valueOf(aes.split("#")[0]);
    MethodSignature signature = (MethodSignature) pjp.getSignature();
    Method method = signature.getMethod();
    EntryTimes annotation = method.getAnnotation(EntryTimes.class);
    int times = annotation.value();
    String key = METHOD_CAN_ENTRY_TIMES_KEY + ":" + annotation.prefix() + ":" +  userId;
    // 没有整个方法使用一个redis链接,是为了方式方法执行时间过长一直占用redis链接。
    Long increment = getEntryTimes(key);
    Object retVal;
    try {
      // 放在try里面,才能执行finally
      if (increment > times){
        // 设置十秒钟超时,防止finally在系统崩溃或重启时没有执行造成用户不能操作。
        expireKey(key,10);
        return Response.of(Code.ACTION_FREQUENT);
      }
      //调用核心逻辑
      retVal = pjp.proceed();
    }finally {
      deleteKey(key);
    }
    return retVal;
  }

  private Long getEntryTimes(String key){
    return stringRedisTemplate.opsForValue().increment(key,1);
  }

  private void deleteKey(String key){
    stringRedisTemplate.delete(key);
  }

  private void expireKey(String key, int seconds){
    stringRedisTemplate.boundValueOps(key).expire(seconds, TimeUnit.SECONDS);
  }
}

4、滑动时间窗口算法

4.1:zset实现

限流中的滑动窗口可以简单理解为,设定的单位时间就是一个窗口,窗口可以分割多个更小的时间单元,随着时间的推移,窗口会向右移动。比如一个接口一分钟限制调用 1000次,1分钟就可以理解为一个窗口

设计思路 : 每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。

方案:想想 zset 数据结构的 score 值,是不是可以 通过 score 来圈出这个时间窗口来。而且我们只需要保留这个时间窗口,窗口之外的数据都 可以砍掉。

缺点:如果这 个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流 的,因为会消耗大量的存储空间。

1、key:用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保存下来,同一个用户同一种行为用一个 zset 记录

2、score:当前时间戳

3、valuezset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了,用 uuid 会 比较浪费空间,那就改用毫秒时间戳吧。

4、milliseconds:过期时间,为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。

image-20210507141302926

4.2、代码实现

4.2.1、基本代码

@Slf4j
public class SimpleRateLimiter {

    private Jedis jedis;

    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     *
     * @param userId 用户Id
     * @param actionKey 动作
     * @param period 过期时间 单位(秒)
     * @param maxCount 最大次数
     * @return 返回是否允许访问
     * 1、获取key = userId + "#" + actionKey;
     * 2、
     */
    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        // 1、获取key = userId + "#" + actionKey;
        String key = userId + "#" + actionKey;
        long now = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        //开启事务
        pipe.multi();
        //第二个参数是 score,第三个参数是value
        pipe.zadd(key, now, "" + now);
        ////删除 now -period* 1000 毫秒之前的数据(也就是说保留periods秒之内的数据)
        // 看清楚了这是直接删除了
        pipe.zremrangeByScore(key, 0, now - period * 1000);
        //当前窗口的元素个数
        Response<Long> count = pipe.zcard(key);
        //一定要设置过期时间,可能大部分用户是冷用户,因为要维护period时间内的记录,所以key过期时间要稍微比period大
        pipe.expire(key, period + 1);
        //执行事务
        pipe.exec();
        try {
            pipe.close();
        } catch (IOException e) {
            log.info("SimpleRateLimiter#isActionAllowed pipe.close() error", e);
        }
      
         //通过统计滑动窗口内的行为数量与阈值 `max_count `进行比较就可以得出当前的行为是 允许。用代码表示如下:
        return count.get() <= maxCount;
    }

}

4.2.2、实例测试

@Test
public void test() {
  Jedis jedis = new Jedis("127.0.0.1");
  SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
  for (int i = 0; i < 20; i++) {
    log.info("限流状态:[{}]", limiter.isActionAllowed("QYD", "reply", 60, 5));
  }

  try {
    Thread.sleep(10000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

5、漏斗限流算法

1、漏洞的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,它就会变满,直至再也装不进去。

2、如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。

3、如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。

4、如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

一般来说,流出速度是固定的,即不管你请求有多少,速率有多快,我反正就这么个速度处理。当然,特殊情况下,需要加快速度处理,也可以动态调整流出速率

重点:两个速率:

流入速率:即实际的用户请求速率或压力测试的速率,

流出速率:即服务端处理速率。

image-20210507151345443

5.1、单机代码实现

5.1.1、漏斗 FunnelDTO

@Data
public class FunnelDTO {
  /** 容量 */
  Integer capacity;
  /** 剩余容量 */
  Integer leftQuota;
  /** 流出速率 */
  Float leakingRate;
  /** 计算起始时间 */
  Long startTime;


  public FunnelDTO(int capacity, float leakingRate) {
    this.capacity = capacity;
    this.leftQuota = capacity;
    this.leakingRate = leakingRate;
    this.startTime = System.currentTimeMillis();
  }


  /**
     * 1、计算时间间隔
     * 2、计算时间间隔内流出的数量
     *  2.1、如果获取到的小于0, 说明间隔时间太长(没有往漏斗里放数据,漏斗空了),整数数字过大溢出了,则初始化数据
     * 2.2、如果获取到的流出量小于1,则说明腾出空间太小(最小单位是1),则直接返回,不执行
     * 3、
     */
  public void makeSpace() {
    Long nowTime = System.currentTimeMillis();
    // 1、计算时间间隔
    Long deltaTs = nowTime - startTime;

    // 2、计算时间间隔内流出的数量
    Integer deltaQuota = (int) (deltaTs * leakingRate);
    // 2.1、如果获取到的小于0, 说明间隔时间太长(没有往漏斗里放数据,漏斗空了),整数数字过大溢出了,则初始化数据
    if (deltaQuota < 0) {
      this.leftQuota = capacity;
      this.startTime = nowTime;
      return;
    }
    // 2.2、如果获取到的流出量小于1,则说明腾出空间太小(最小单位是1),则直接返回,不执行
    if (deltaQuota < 1) {
      return;
    }

    // 3、重新计算
    // 当前时间 = nowTime
    // 剩余容量 = 当前剩余容量 + 流出速率 * 间隔时间(如果超出了总容量,剩余容量 = 总容量)
    this.leftQuota += deltaQuota;
    this.startTime = nowTime;
    if (this.leftQuota > this.capacity) {
      this.leftQuota = this.capacity;
    }
  }

  /**
     * 判断是否能加入交易
     * 1、漏斗重新计算剩余容量和当前时间
     * 2、
     * @param quota 定额
     * @return
     */
  public boolean watering(int quota) {
    // 1、漏斗重新计算剩余容量和当前时间
    makeSpace();

    // 2、如果剩余容量大于当前要流入漏斗的量,则执行成功,返回true
    if (this.leftQuota >= quota) {
      this.leftQuota -= quota;
      return true;
    }
    return false;
  }

}

5.1.2、FunnelRateLimiter

public class FunnelRateLimiter {


  private Map<String, FunnelDTO> funnels ;
  public FunnelRateLimiter(Map funnels) {
    this.funnels = funnels;
  }


  /**
     * 限流方法
     * @param userId
     * @param actionKey
     * @param capacity
     * @param leakingRate
     * @return
     * 1、根据用户Id和动作获取对应的漏斗
     * 2、往漏斗中放入定额数据,看是否能放下
     */
  public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
    // 1、根据用户Id和动作获取对应的漏斗
    String key = String.format("%s:%s", userId, actionKey);
    FunnelDTO funnel = funnels.get(key);
    if (funnel == null) {
      funnel = new FunnelDTO(capacity, leakingRate);
      funnels.put(key, funnel);
    }

    //2、往漏斗中放入定额数据
    return funnel.watering(1);
  }


}

5.2、分布式的漏斗算法

问题1:上面是单机的,那么分布式用Redis如何实现呢?

答案:观察上面的 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一 个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

问题2:用hash的话,可以保证是原子性操作吗?

答案:我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里 运算,再回填到 hash 结构,这三个过程无法原子化

问题3:那如何保证hash是原子性操作呢?

答案:进行适当的加锁控制 + LUA脚本。

问题4:加锁失败了怎么办?

答案:一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码 复杂度也跟着升高很多。

问题5:有更好的选择吗?

答案:Redis-Cell 救星来了!

5.2.1、Redis-Cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并 提供了原子的限流指令。有了这个模块,限流问题就非常简单了。

该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这 个指令具体该如何使用。

5.2.1.1:举例

1、允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速 率)

2、漏斗的初始容量为 15,也就是说一开始可以连续回复 15 个帖子,然后才开始受漏水 速率的影响。

cl.throttle [key] [capacity] [operation_times] [time_span] [quota]
cl.throttle laoqian:replay 15 30 60 1

15 漏斗容量 [capacity] 
30 operation_times / 60 time_span 每60s最多30次(漏水速率)
1  quota 可选参数,默认是1 指的是每次行为占用一个空间

5.2.1.2、总结

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的重试时间都计算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试

cl.throttle laoqian:reply 15 30 60 1
1) (integer) 0  # 0表示允许,1表示拒绝
2) (integer) 15 # 漏斗容量capacity
3) (integer) 14 # 漏斗剩余空间left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2  # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

6、流量控制应用场景

6.1、应用场景

6.1.1、服务端接口限制被调用的速度

解决方案:可以使用 Guava 限流器突发模式,配合非阻塞方法 tryAcquire 限制被调用速度,无特殊情况下tryRequire超时时间建议设置为0(能获取到就通过,否则就拒绝调)

补充说明:突发模式的突发时间 (maxBrushSeconds) 配置可以根据历史流量的波动情况确定,多数场景中设置1-5s即可,突发事件不建议设置为 0,这样可能会 导致一些不符合逾期的欲绝行为,另外,不建议在服务端中接口中使用限流的 acquirttryAcquire 方法做阻塞等待,这样会增加请求延迟给调用方带来不好的体验。

6.1.2、客户端限制发出请求的速度

解决方案:可以使用 Guava 限流器突发模式,配合非阻塞方法 tryAcquire限制发出请求的速度

补充说明:无特殊情况下,突发模式的突发时间 (maxBrushSeconds) 可以设置为0(我是请求方,还突发啥呢),配合 acquirt 的流量整形效果来让客户端发出匀速请求,另外客户端如果存在失败重试机制,那么可能需要考虑一下重试间隔会由于限流器的阻塞而被拉长的情况

6.1.3、秒杀或者大促场景保证不被打垮

解决方案:可以使用 Guava 限流器预热模式,配合非阻塞方法 tryAcquire 限制被调用速度,建议设置一个合理的超时时间

补充说明:预热时间的长度可以根据业务场景特点配置,限流阈值在压测的时候确定,这里建议使用tryAcquire 进行可超时阻塞的目的是为了保证 “ 先到先得 “的公平性

6.2、问题

6.2.1、使用限流器后实际流量超过阈值

解释:这种情况只能是出现在突发模式情况下 ,突发模式限流器会请求存量令牌和新令牌,新令牌的生成速度等于限流速度,而超额部分的情况来自于存量令牌,在实际流量超过阈值不多的情况下,令牌桶中的数量很难被耗尽

建议:一方面我们要充分理解限流原理,帮助我建立正确的限流效果预期,另一方面我们应该尽量使用合理的配置来实现限流需求,限流阈值一般情况大于接口正常情况下的流量,突发时间流量应参考历史最大流量和平均流量的差异来决定

6.2.2、在客户端设置匀速调用场景,服务端使用了限流器后发现实际流量无法达到阈值上限

解释:这种情况是非常偶然的,实际是由于限流配置不当导致的,假设客户端以 50QPS 的速度发出情况,即每20ms一个,服务端的限流器配置为 30QPS,且不支持突发流量(突发模式设置突发时间为0,或者使用了预热模式) 这种情况下服务端会严格按照 30ms一个的速度接收请求,因此,客户端在 20ms之后发出第二个请求时,服务端尚未满足 30ms的时间间隔,就出现了每2个请求就有一个被拒绝的情况

建议:在遇到这种问题的时候,首先考虑限流阈值是否合理,其次,如果在服务端使用突发模式限流,尽量不要把突发时间设置为0,如果使用预热模式限流,应该参考服务容量,配置一足够大的限流阈值

7、限流工具类

7.1、RateLimiterAspect

package com.healerjean.proj.aspect;

import com.google.common.util.concurrent.RateLimiter;
import com.healerjean.proj.util.limit.RateLimiterPropertyUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * RateLimiterAspect
 *
 * @author zhangyujin
 * @date 2022/9/8  21:44.
 */
@Component
@Aspect
public class RateLimiterAspect {
    /**
     * log
     */
    private static final Logger log = LoggerFactory.getLogger(RateLimiterAspect.class);

    /**
     *
     */
    private RateLimiterAspect() {
    }


    /**
     * @param pjp
     * @return
     * @throws Throwable
     */
    @Around("execution(* com.healerjean.proj.controller.*Controller.*(..))")
    public Object logAround(ProceedingJoinPoint pjp) throws Throwable {
        Object[] args = pjp.getArgs();
        Class<?> clazz = pjp.getTarget().getClass();
        log.info("开始拦截:" + clazz.getName());

        try {
            this.limit(clazz.getName());
        } catch (Exception var5) {
            log.error(var5.getMessage(), var5);
        }

        return pjp.proceed(args);
    }

    /**
     *
     * @param className
     */
    public void limit(String className) {
        RateLimiter rateLimiter = RateLimiterPropertyUtil.getMap().get(className);
        if (null != rateLimiter) {
            log.info("开始获取令牌:" + className + " rate:" + rateLimiter.getRate());
            double waitTime = rateLimiter.acquire();
            log.info(className + ":获取成功 waitTime :" + waitTime + "ms");
        }
    }
}

7.2、RateLimiterPropertyUtil

package com.healerjean.proj.util.limit;

import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * RateLimiterPropertyUtil
 *
 * @author zhangyujin
 * @date 2022/9/8  21:37.
 */
@Service
@Slf4j
public class RateLimiterPropertyUtil {

    /**
     * MAP_RATE
     */
    private static ConcurrentHashMap<String, RateLimiter> MAP_RATE = new ConcurrentHashMap();
    /**
     * props
     */
    private static Properties PROPS;

    private RateLimiterPropertyUtil() {
    }

    @PostConstruct
    public void init(){
        load();
    }

    private static synchronized void load() {
        log.info("开始加载限流文件内容.......");
        PROPS = new Properties();
        InputStream in = null;
        try {
            in = RateLimiterPropertyUtil.class.getResourceAsStream("/rateLimiter.properties");
            PROPS.load(in);
            Set<Map.Entry<Object, Object>> set = PROPS.entrySet();

            for (Map.Entry<Object, Object> objectObjectEntry : set) {
                Map.Entry<Object, Object> entry = objectObjectEntry;
                String key = String.valueOf(entry.getKey()).trim();
                String value = String.valueOf(entry.getValue());
                log.info("限流策略:key:{}, value:{}", key, value);
                if (StringUtils.hasText(key) && StringUtils.hasText(value)) {
                    RateLimiter rateLimiter = RateLimiter.create(Double.valueOf(value));
                    MAP_RATE.put(key, rateLimiter);
                }
            }
        } catch (FileNotFoundException var17) {
            log.error("jdbc.properties文件未找到");
        } catch (IOException var18) {
            log.error("出现IOException");
        } finally {
            try {
                if (null != in) {
                    in.close();
                }
            } catch (IOException var16) {
                log.error("jdbc.properties文件流关闭出现异常");
            }
        }

        log.info("properties文件内容:" + PROPS);
    }

    public static ConcurrentHashMap<String, RateLimiter> getMap() {
        return MAP_RATE;
    }


}

7.3、rateLimiter.properties

建议使用配置中心

com.healerjean.proj.controller.RateLimitController=1

ContactAuthor