前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、为什么要限流

任务系统都有一个上限,如果超过了这个上限,会对系统造成毁灭性的打击,因此在任何时刻都应该保证系统的并发请求数量不要超过某个阈值,限流就是这一目的

策略 原理描述 优缺点 应用 使用场景
令牌桶 ⬤ 令牌按照恒定速率入桶,桶满则丢弃多余令牌
⬤ 请求来临时从桶中获取令牌,桶空则等待令牌或者拒绝当前请求
优点:速率可变,能够应对突发流量
缺点:复杂度高
请求速率限流算法  
漏桶 ⬤ 以任意速率生成令牌放入桶中
⬤ 桶流出(消费处理流量)令牌的速率是固定的
⬤ 若流入的令牌(突发请求)超过了桶的容量,则被丢弃或者等待,桶的容量不变
优点:速率稳定
缺点:应对突发流量需要等待令牌
请求速率限流算法  
计数器 ⬤ 对一个时间窗口设置一个可以访问的量
⬤ 下一个时间窗口清零计数
⬤ 单位时间超出阈值的请求,可以设置不同的策略,等待或者丢弃
优点:实现简单,单个计数周期不会超过配置阈值
缺点:在两端临界点可能出现两倍的流速,流量不均匀,容易瞬间造成流量突刺现象
请求速度限流算法  
滑动窗口 ⬤ 将时间窗⼝划分成很多⼩的时间窗⼝,让流量分散到⼩的⼩的时间窗⼝,这样流量限流更平滑。⼩的时间窗⼝越多限流越平滑
优点:相对计数算法(固定时间窗⼝)限流更平滑
缺点:⽐固定时间窗⼝占⽤更多内存(多⼏个bucket对象)
请求速度限流算法  

二、令牌桶限流

image-20240914130840659

1、 Guava-RateLimiter

1)简单样例

acq 1 时并没有任何等待直接预消费了1个令牌 (首次来不用等);

acq 6 时,由于之前预消费了 1 个令牌,故而等待了 2 秒(上一个预消费令牌数1),之后又预消费了 6 个令牌;

acq 2 时同理,由于之前预消费了 6 个令牌,故而等待了 12 秒(上一个预消费令牌数6);

RateLimiter 通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)

@Test
public void test_1(){
    // 1、创建一个RateLimiter,指定每秒放0.5个令牌(2秒放1个令牌)
    RateLimiter rateLimiter = RateLimiter.create(0.5);

    int[] a = {1, 6, 2};
    for (int i = 0 ; i < a.length; i++){
        log.info("时间戳:{}, 获取 {} 个令牌需要 {}s", 
                 System.currentTimeMillis(), 
                 a[i], 
                 rateLimiter.acquire(a[i]));
    }

    // 1516166482561 acq 1: wait 0.0s
    // 1516166482563 acq 6: wait 1.997664s
    // 1516166484569 acq 2: wait 11.991958s
}

问题1:首次来为什么不用等?

答案:这实际上不完全准确。在 Guava 的 RateLimiter 实现中,首次请求许可时,通常也会立即得到许可,但这并不意味着没有速率限制。首次请求的“立即得到”是因为内部实现会预分配一些许可,或者更准确地说,是因为内部状态(如桶中的水量)允许立即发放一个许可。但随后的请求将会受到你设定的速率的限制。 这里的“立即”实际上是相对于内部状态而言的,并非完全无条件的“首次请求总是立即成功”。内部机制会确保长期的平均速率符合你设定的值。

虽然 RateLimiter 在首次请求时可能会显得像是“不用等”,但这主要是因为内部机制允许快速启动,而非没有速率限制。随着请求的继续,速率限制的效果将逐渐显现

2)简介

a、两种模式

Guava 有两种限流模式

⬤ 一种为稳定   突发 模式 SmoothBursty:令牌生成速度恒定,支持缓存一定时间的突发流量

⬤ 一种为渐进模式 SmoothWarmingUp :令牌生成速度缓慢提升直到维持在一个稳定值,

原理:根据配置计算出限流器的 QPS,每次请求的时候检查是否有缓存有⾜够的令牌,如果有,则直接返回通过;如果没有,则计算出需要等待的时间,如果等待时间⼩于超时时间,则等待,否则拒绝

b、核心思想

RateLimiter 核心思想有

1、响应本次请求后,动态计算下次可用服务的时间, 如果下次请求在可用时间之前,则需要等待SmoothRateLimiter 类中 nextFreeTicketMicros 表示下次可用服务的时间,例如,如果我们配置 QPS 为1,本次请求处理完成之后,下次请求的时间需要在 1s 中之后

2、 SmoothBursty 支持处理突发请求流量,例如,我们设置 QPS 1,在 10s中之内没有请求,那么令牌桶中就有 10个空闲令牌,如果下次请求是 acquire(20),那么就不需要等待 20s 了,因为令牌桶中已经有 10 个令牌了,SmoothRateLimiterstoredPermits 就是用来存储当前令牌桶中空闲的令牌数

3、SmoothWarmingUp 提出一种 “热身模式” 和 “冷却期” ,具体详情再说吧,先慢慢俩

3)源码-抽象父类- SmoothRateLimiter

@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {
  //当前持有的令牌数
  double storedPermits;
  
  //令牌数上限
  double maxPermits;
  
  //生成一个令牌需要的时间
  double stableIntervalMicros;
  
  //下次请求能获取令牌的时间,是个时间戳
  private long nextFreeTicketMicros;

4)源码-子类- SmoothBursty

a、创建令牌: create

根据指定的 QPS 数创建 RateLimiter,底层调用如下:

参数:maxBurstSeconds

定义突发流量持续时间:maxBurstSeconds 定义了令牌桶能够容忍的突发流量持续时间的上限(以秒为单位)。这意味着,在这个时间段内,系统可以允许请求的速率暂时超过设定的平均速率,从而应对突发的流量高峰。

计算最大令牌数: maxBurstSeconds 与每秒允许通过的令牌数permitsPerSecond相乘,得到令牌桶能够存储的最大令牌数(maxPermits)。这个最大令牌数限定了在突发流量期间,系统能够额外处理的请求数量。

⬤ 平滑突发流量:通过设定 maxBurstSecondsSmoothBursty 能够在保证系统稳定性的同时,允许一定程度的突发流量。当流量超过平均速率时,系统会从令牌桶中消耗额外的令牌来处理这些请求,直到令牌耗尽或突发流量结束。这种方式有助于平滑突发流量,避免系统因瞬间过载而崩溃。

⬤ 应对实际应用场景:在实际应用中,maxBurstSeconds 的设置需要根据具体业务场景和系统负载能力进行调整。例如,在电商平台的秒杀活动中,可以适当提高 maxBurstSeconds 的值,以应对用户短时间内的大量请求;而在对实时性要求较高的系统中,可能需要降低 maxBurstSeconds 的值,以保证系统的快速响应能力。

//根据指定的 QPS 数创建  RateLimiter
public static RateLimiter create(double permitsPerSecond) {
  return create(RateLimiter.SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

@VisibleForTesting
static RateLimiter create(RateLimiter.SleepingStopwatch stopwatch, double permitsPerSecond) {
  // 1、创建 SmoothBursty 限流器
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  
  // 2、设置限流速率
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}

再看 setRate 方法,最终会调用 doSetRate, doSetRate是一个抽象方法,

image-20220415204834107


final void doSetRate(double permitsPerSecond, long nowMicros) {
  this.resync(nowMicros);
  
  //生成一个令牌需要的时间 
  double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  this.doSetRate(permitsPerSecond, stableIntervalMicros);
}

image-20220415204932057

实现如下

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  //设置最大令牌数  this.maxBurstSeconds = 1 * 每秒生成令牌数
  this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
  
  if (oldMaxPermits == 1.0D / 0.0) {
    this.storedPermits = this.maxPermits;
  } else {
    this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
  }

}

b、获取令牌: acquire

acquire(int permits)RateLimiter 中获取 x 个令牌,该方法会一直阻塞,直到获取令牌,主要做了如下3件事

public double acquire(int permits) {
  // 1、获取当前请求需要等待的时间(惰性计算)
  long microsToWait = this.reserve(permits);

  // 2、seleep.microToWait (时间窗口)
  this.stopwatch.sleepMicrosUninterruptibly(microsToWait);

  // 3、返回 microToWait 对应的秒级时间
  return 1.0D * (double)microsToWait / (double)TimeUnit.SECONDS.toMicros(1L);
}


final long reserve(int permits) {
  //检查参数是否ok
  checkPermits(permits);
  synchronized(this.mutex()) {
    //计算需要等待的诗句
    return this.reserveAndGetWaitLength(permits, this.stopwatch.readMicros());
  }
}

C、核心接口-内部: reserveAndGetWaitLength

该方法需要返回等待的时间,是 RateLimiter 的核心接口

RateLimiter 支持突发流量的本质是,将当前需要的令牌数量 requiredPermits 拆分成 storedPermitsToSpend(持有令牌中可用的数量)和 freshPermits(需要预支的令牌数量);分别计算需要等待的时间,然后更新 nextFreeTicketMicros 下次获取令牌的时间

什么意思呢?举个例子吧:

当前 RateLimiter 持有 4 个令牌,当前请求需要 6 个令牌;则 6 个令牌中 4 个是可以从持有的令牌中直接获取,而另外两个需要预支的令牌则需要单独计算时间;

伪代码:getReqWaitTime(6) = getWaitTime(4) + getFreshWait(6 - 4)

SmoothBursty 模式:

getWaitTime(4) 是可以直接获取的,即 time = 0

getFreshWait(6 - 4) 则等于 freshPermits * stableIntervalMicros (预支令牌数 * 生成一个令牌需要的时间)

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  //1.根据当前时间和预计下一秒时间判断有无新令牌产生,有则更新持有令牌数storedPermits 和下次请求时间nextFreeTicketMicros
  resync(nowMicros); 
  long returnValue = nextFreeTicketMicros;

  //2.以下两句,根据请求需要的令牌数requiredPermits和storedPermits当前持有的令牌数storedPermits分别计算 持有令牌中可用的数量storedPermitsToSpend和需要预支的令牌数量freshPermits
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); 
  double freshPermits = requiredPermits - storedPermitsToSpend;

  //3.分别计算storedPermitsToSpend和freshPermits的等待时间
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) 
    							+ (long) (freshPermits * stableIntervalMicros);

  try {
    //4.更新 nextFreeTicketMicros 下次请求能获取令牌时间: 本次等待时间 + 上次需要等待时间
    this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros); 
  } catch (ArithmeticException e) {
    this.nextFreeTicketMicros = Long.MAX_VALUE;
  }

  //5.更新storedPermits 
  this.storedPermits -= storedPermitsToSpend; 

  return returnValue;
}

image-20220526164937252

5)源码-子类:SmoothWarmingUp

WarmingUpRateLimiter 的另一种实例不同于 SmoothBursty ,它存在一个 “热身” 的概念。

即:如果当前系统处于 “ 冷却期”( 即一段时间没有获取令牌,即:当前持有的令牌数量大于某个阈值),则下一次获取令牌需要等待的时间比 SmoothBursty 模式下的线性时间要大,并且逐步下降到一个稳定的数值。

大致原理:将 storedPermits 分成两个区间值:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。当请求进来时,如果当前系统处于 cold” 的冷却期状态,从 [thresholdPermits, maxPermits] 区间去拿令牌,所需要等待的时间会长于从区间 [0, thresholdPermits) 拿相同令牌所需要等待的时间。当请求增多,storedPermits 减少到 thresholdPermits 以下时,此时拿令牌所需要等待的时间趋于稳定。这也就是所谓 “热身” 的过程。

反应到代码上,和 SmoothBursty 的不同有两点

create 方法不同;该方法指定了 “热身” 模型需要的关键参数

acquire 获取当前令牌中可用令牌 storedPermitsToSpend 的等待时间,需要依据热身模型来计算

@Test
public void test_2() throws InterruptedException {
  //预热模式,设置预热时间和 QPS,即在正式acquire前,限流器已经持有 5 * 4 = 20个令牌
  RateLimiter rateLimiter = RateLimiter.create(5, 4000, TimeUnit.MILLISECONDS);
  for (int i = 1; i < 50; i++) {
    System.out.println(System.currentTimeMillis() +" acq " + i + ": wait " + rateLimiter.acquire() + "s");
    if (i == 15) {
      Thread.sleep(2000);
      System.out.println(System.currentTimeMillis() +" acq " + 15 + ": wait " + rateLimiter.acquire() + "s");
    }
  }
}

1552395652026 acq 1: wait 0.0s
1552395652028 acq 2: wait 0.578357s
1552395652612 acq 3: wait 0.533835s
1552395653151 acq 4: wait 0.495191s
1552395653649 acq 5: wait 0.457239s
1552395654110 acq 6: wait 0.41631s
1552395654528 acq 7: wait 0.377524s
1552395654912 acq 8: wait 0.334018s
1552395655248 acq 9: wait 0.298249s
1552395655550 acq 10: wait 0.256165s
1552395655808 acq 11: wait 0.217752s
1552395656028 acq 12: wait 0.197672s
1552395656231 acq 13: wait 0.19451s
1552395656429 acq 14: wait 0.196465s
1552395656630 acq 15: wait 0.195714s
1552395658834 acq 15: wait 0.0s
1552395658834 acq 16: wait 0.34158s
1552395659180 acq 17: wait 0.296628s
1552395659482 acq 18: wait 0.256914s
1552395659744 acq 19: wait 0.216517s
1552395659965 acq 20: wait 0.195077s
1552395660164 acq 21: wait 0.195953s
1552395660365 acq 22: wait 0.195196s
1552395660564 acq 23: wait 0.196015s
1552395660764 acq 24: wait 0.195972s

acq 1 时并没有任何等待直接预消费了 1 个令牌

acq 211 时,由于当前系统处于冷却期,因此开始等待的时间较长,并且逐步下降到一个稳定值 acq 1215 时,等待时间趋于稳定的 0.2 秒,即 1 / QPS

acq 15 同时,sleep 2 秒,即在当前基础上,又新增 5 * 2 个令牌;将系统过渡到冷却期

acq 15~结束,重复 acq 215 的过程。

a、预热模型

SmoothWarmingUpSmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:

static final class SmoothWarmingUp extends SmoothRateLimiter {
  //预热时间
  private final long warmupPeriodMicros;

  //预热区斜率
  private double slope;

  //区分冷区期和稳定器的阈值
  private double thresholdPermits;

  //code inteval/ stable inteval 的固定数值 硬编码写死的是 3
  private double coldFactor;

横坐标:是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUpstoredPermits 分为两个区间:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。

纵坐标:请求的间隔时间(单词请求所需时间),**stableInterval 就是 1 / QPS,例如设置的 QPS5,则 stableInterval 就是 200ms,coldInterval = stableInterval * coldFactor,这里的 coldFactor 硬编码写死的是 3

当系统请求增多,图像会像左移动,直到 storedPermits0。等待一段时间后,随着令牌的生成当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits

image-20220418173609666

b、创建令牌 create

create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

根据指定的 ` QPS ` 和预热期来创建 RateLimiter,在这段预热时间内,RateLimiter 每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
  checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
  //1、创建 冷却因子默认为3
  return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, 3.0);
}

static RateLimiter create(
  SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit,double coldFactor) {
 
  //1.创建SmoothWarmingUp限流器
  RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);

  //2.设置限流速率
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}


public final void setRate(double permitsPerSecond) {
  Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), 
                              "rate must be positive");
  synchronized(this.mutex()) {
    this.doSetRate(permitsPerSecond, this.stopwatch.readMicros());
  }
}

//构造器
SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
  super(stopwatch);

  //1.设置预热时间
  this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); 

  //3.设置coldFactor为3
  this.coldFactor = coldFactor;
}



final void doSetRate(double permitsPerSecond, long nowMicros) {
  this.resync(nowMicros);
  // 单词请求时间:如果QPS为5 则 stableIntervalMicros  = 1s/5 = 200ms
  double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  this.doSetRate(permitsPerSecond, stableIntervalMicros);
}


@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = maxPermits;

  //1.设置冷却期等待时间数值 coldIntervalMicros = 单词请求时间 * 冷却因子
  double coldIntervalMicros = stableIntervalMicros * coldFactor; 

  //2.设置冷却期的阈值,thresholdPermits 默认等于预热期产生令牌数的 一半 (4000000/200000 = 20 , 20 * 0.5 = 10)
  thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;

  // 3、设置持有令牌的最大值,为 thresholdPermits 的 2 倍 为20
  //   冷却器阈值  + 2 * (预热时间/每个请求预热时间 + 冷却期等待时间(每个请求预热时间 * 3))
  maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);

  //4.设置预热区的斜率;纵坐标之差/横坐标之差 
  slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);

  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
       storedPermits = 0.0;
  } else {
    storedPermits = (oldMaxPermits == 0.0) ? maxPermits : storedPermits * maxPermits / oldMaxPermits;
  }
}

c、核心接口-内部: storedPermitsToWaitTime

前面说到,SmoothWarmingUpSmoothBursty 的一个重要区别就在于 “获取当前令牌中可用令牌的等待时间”storedPermitsToWaitTime 方法, 而 获取预支令牌的等待时间” 和之前一致。

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {

  //1.获取当前持有令牌数和阈值的差值 availablePermitsAboveThreshold
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;

  //2.如果availablePermitsAboveThreshold > 0,即当前持有令牌数 > 阈值,即到达冷区期;计算等待时间
  if (availablePermitsAboveThreshold > 0.0) {

    //3.计算WARM UP PERIOD部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2”
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    micros = (long) (permitsAboveThresholdToTake * (permitsToTime(availablePermitsAboveThreshold)
                                                    + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);

    //4.剩余的令牌从 stable部分拿
    permitsToTake -= permitsAboveThresholdToTake;
  }
  //5.stable 部分令牌获取花费的时间
  micros += (stableIntervalMicros * permitsToTake);
  return micros;
}

举个例子:创建限流器时 create(5, 4000, TimeUnit.MILLISECONDS);预热了 20 个令牌

场景1:当前持有 20 个令牌,请求一个令牌;需要等待的时间为

image-20220418182916589

场景 2:当前持有 18 个令牌,请求 1 个令牌;需要等待的时间为:

image-20220418183026895

场景3:当前持有 20 个令牌,一次性请求 11 个令牌;需要等待的时间为:

image-20220418183113660

场景4:当前持有 10 个令牌,一次性请求 1 个令牌;需要等待的时间为:

image-20220418183133770

d、小结

1、当 storedPermits - ` thresholdPermits = availablePermitsAboveThreshold > 0 (冷却期)且 permitsToTake < availablePermitsAboveThreshold 时,等待时间是 WARM UP PERIOD 中的一个梯形面积;**permitsToTake` 是持有令牌中可用的数量**

2、当 storedPermits - thresholdPermits = availablePermitsAboveThreshold > 0 (冷却期)且 permitsToTake > availablePermitsAboveThreshold 时,等待时间是 1+ (permitsToTake - availablePermitsAboveThreshold)* stable;即梯形 + 矩形的面积

3、当 storedPermits - thresholdPermits = availablePermitsAboveThreshold < 0 时(稳定期),等待时间是 permitsToTake * stable;即矩形的面积

4、当 storedPermits 等于 0 后,系统创建新的令牌后,获取等待时间的顺序为 4 - > 2-> 1;即前文说的当系统请求增多,图像会像左移动,直到 storedPermits0。等待一段时间后,随着令牌的生成当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits。是一个动态调整的过程。

6)RateLimiter 总结

a、2 种模式创建和使用令牌过程区别

总结一下 SmoothWarmingUpSmoothBursty 的创建和使用令牌的过程:

1、SmoothWarmingUpSmoothBursty 的最大区别就在于,“获取已持有令牌中可用令牌的等待时间” 不同,SmoothBursty 是直接返回的,SmoothWarmingUp 则是基于 “热身模型” 和 “冷却期”(即一段时间没有获取令牌,衡量指标:当前持有的令牌数量大于某个阈值)的机制进行动态调整(冷却期按照梯形区域返回,否则按照矩形区域返回)

2、预支令牌的等待时间算法一致,waitTime = 预支令牌数量 * 生成一个令牌需要的时间(1/QPSSmoothWarmingUp 为系统提供一种冷启动的可能,

例如:某系统底层使用缓存中间件,假如没有 “热身”,突发流量很可能造成缓存击穿等问题;WarmingUp 让系统应对突发流量有一个 “渐进准备资源” 的过程,Rhino 使用的令牌桶的平滑限流,即 ` WarmingUp` 模式

image-20220526164957928

b、阻塞方式和非阻塞方式获取令牌

⬤ 非阻塞式:尝试获取许可,如果获取到许可,则执行代码块1,如果没有获取到就认为被限流,则执行代码块2;

⬤ 阻塞式:尝试获取许可,获取到则执行代码块1,没有获取到,则阻塞,等待获取到许可后,执行代码块1,注意不会执行代码块2;

if (获得许可) {
    执行代码块 1
} else {
    执行代码块 2
}
1、非阻塞方式 tryAcquire

@Test
public void rateLimiterTest() {
  RateLimiter rateLimiter = RateLimiter.create(5);
  for (int i = 0; i < 10; i++) {
    //非阻塞方式获取令牌,立即返回成功或者失败
    if (rateLimiter.tryAcquire()) {
        System.out.println(LocalTime.now()+"拿到令牌通过");
    }
    else {
      System.out.println(LocalTime.now()+"未拿到令牌不通过");
    }
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  
}


// 尝试获取1个许可,如果获取到,则返回true,否则返回false
boolean tryAcquire();

// 尝试获取多个许可,如果获取到,则返回true,否则返回false
boolean tryAcquire(int permits)
// 示例tryAcquire(3)

// 在timeout时间内,尝试获取1个许可,如果获取到,则返回true,否则返回false
tryAcquire(Duration timeout);
// 示例:tryAcquire(Duration.ofSeconds(3))
tryAcquire(long timeout, TimeUnit unit);
// 示例:tryAcquire(3, TimeUnit.SECONDS);

// 在timeout时间内,尝试获取多个许可,如果获取到,则返回true,否则返回false
tryAcquire(int permits, Duration timeout)
tryAcquire(int permits, long timeout, TimeUnit unit)
2、阻塞方式 acquire
@Test
public void testAcquire() {
  RateLimiter rateLimiter = RateLimiter.create(2);

  for (int i = 0; i < 10; i++) {
    double sleep = rateLimiter.acquire();
    System.out.println("now: " + LocalTime.now() + "  sleep: " + sleep);
  }
}

C、问题

1、单机限流

Guava 解决的问题,一般都是单机的,以限流为例,使用 guava 限流,只做到了单机限流,但是我们的服务一般都会由多台机器(集群),虽然我们可以通过计算单机和集群的比例,来设置限流数量,但是有几个问题:

1、机器增减时,要保证总流量保持不变,就需要修改每一台机器的流量限制,这个不是很方便;

2、Guava 的限流器,并不是公平的,至于什么是公平和非公平,可以参考

2、集群限流

guavaRateLimiter,限流是通过获取“许可”来控制流量的,只不过是单机管理自己的许可。

问题 1:如果将所有机器的“许可”汇集到一个地方,所有机器都从这个地方获取许可,不就可以实现集群限流吗?

答案:可以使用 redis 来保存所有机器的“许可”。这样做,可以实现集群限流,但不能保证单机的流量限制,其实对于现在的微服务来说,请求被平均分给所有机器,是服务平台的问题,所以可以不用关心这个问题。

二、计数器算法

计数器算法是在一定的时间间隔里,记录请求次数,当请求次数超过该时间限制时,就把计数器清零,然后重新计算。当请求次数超过间隔内的最大次数时,拒绝访问。

缺点:这种方法实现简单,但有一个明显的缺点,那就是在两个间隔之间,如果有密集的请求。则会导致单位时间内的实际请求超过阈值。

例如:一个接口每分钟允许访问100次。实现方式如下:

1、设置一个计数器 count ,接收一个请求就将计数器加一,同时记录当前时间。

2、判断当前时间和上次统计时间是否为同一分钟。

如果是,则判断 count 是否超过阈值,如果超过阈值,则返回限流拒绝。

如果不是,则吧 ` count` 重置为1,判断是否超过阈值。

如下图所示,该计数器算法要求每分钟请求的阈值不超过100个。

image-20220526171037025

1、代码实现

1)自定义注解

@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EntryTimes {

    /** 方法允许进入的次数 */
    int value() default 1;
   
     /**  可以访问的前缀 url前缀  */
    String prefix() default "";
}

2)Aspect:切面拦截

/**
 * 控制每个用户访问Controller方法的次数
 * Created by fengchuanbo on 2017/5/25.
 */
@Aspect
@Component
public class MethodEntryTimesLimitInterceptor {

  private static final String METHOD_CAN_ENTRY_TIMES_KEY = "method:entry:times:";

  @Resource
  private StringRedisTemplate stringRedisTemplate;

  /**
     * 需要有 EntryTimes 标注,并且第一个参数需要是 AuthUser 才可以
     */
  @Around("@annotation(com.duodian.admore.zhaobutong.annotation.EntryTimes)")
  public Object aroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
    HttpServletRequest request = ((ServletRequestAttributes) 
                                  RequestContextHolder.getRequestAttributes()).getRequest();
    String token = request.getParameter("token");
    String aes = AesUtils.LoginDecrypt(token);
    Long userId = Long.valueOf(aes.split("#")[0]);
    MethodSignature signature = (MethodSignature) pjp.getSignature();
    Method method = signature.getMethod();
    EntryTimes annotation = method.getAnnotation(EntryTimes.class);
    int times = annotation.value();
    String key = METHOD_CAN_ENTRY_TIMES_KEY + ":" + annotation.prefix() + ":" +  userId;
    // 没有整个方法使用一个redis链接,是为了方式方法执行时间过长一直占用redis链接。
    Long increment = getEntryTimes(key);
    Object retVal;
    try {
      // 放在try里面,才能执行finally
      if (increment > times){
        // 设置十秒钟超时,防止finally在系统崩溃或重启时没有执行造成用户不能操作。
        expireKey(key,10);
        return Response.of(Code.ACTION_FREQUENT);
      }
      //调用核心逻辑
      retVal = pjp.proceed();
    }finally {
      deleteKey(key);
    }
    return retVal;
  }

  private Long getEntryTimes(String key){
    return stringRedisTemplate.opsForValue().increment(key,1);
  }

  private void deleteKey(String key){
    stringRedisTemplate.delete(key);
  }

  private void expireKey(String key, int seconds){
    stringRedisTemplate.boundValueOps(key).expire(seconds, TimeUnit.SECONDS);
  }
}

四、滑动时间窗口算法

1、zset实现

限流中的滑动窗口可以简单理解为,设定的单位时间就是一个窗口,窗口可以分割多个更小的时间单元,随着时间的推移,窗口会向右移动。比如一个接口一分钟限制调用 1000次, 1 分钟就可以理解为一个窗口

设计思路 : 每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。

方案:想想 zset 数据结构的 score 值,是不是可以 通过 score 来圈出这个时间窗口来。而且我们只需要保留这个时间窗口,窗口之外的数据都 可以砍掉。

缺点:如果这 个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流 的,因为会消耗大量的存储空间。

1、key:用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保存下来,同一个用户同一种行为用一个 zset 记录

2、score:当前时间戳

3、valuezset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了,用 uuid 会 比较浪费空间,那就改用毫秒时间戳吧。

4、milliseconds:过期时间,为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。

image-20210507141302926

2、代码实现

1)基本代码

@Slf4j
public class SimpleRateLimiter {

    private Jedis jedis;

    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     *
     * @param userId 用户Id
     * @param actionKey 动作
     * @param period 过期时间 单位(秒)
     * @param maxCount 最大次数
     * @return 返回是否允许访问
     * 1、获取key = userId + "#" + actionKey;
     * 2、
     */
    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        // 1、获取key = userId + "#" + actionKey;
        String key = userId + "#" + actionKey;
        long now = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        //开启事务
        pipe.multi();
      
        //第二个参数是 score,第三个参数是value
        pipe.zadd(key, now, "" + now);
      
        ////删除 now -period* 1000 毫秒之前的数据(也就是说保留periods秒之内的数据)
        // 看清楚了这是直接删除了
        pipe.zremrangeByScore(key, 0, now - period * 1000);
      
        //当前窗口的元素个数
        Response<Long> count = pipe.zcard(key);
      
        //一定要设置过期时间,可能大部分用户是冷用户,因为要维护period时间内的记录,所以key过期时间要稍微比period大
        pipe.expire(key, period + 1);
      
        //执行事务
        pipe.exec();
        try {
            pipe.close();
        } catch (IOException e) {
            log.info("SimpleRateLimiter#isActionAllowed pipe.close() error", e);
        }
      
         //通过统计滑动窗口内的行为数量与阈值 `max_count `进行比较就可以得出当前的行为是 允许。用代码表示如下:
        return count.get() <= maxCount;
    }

}

2)实例测试

@Test
public void test() {
  Jedis jedis = new Jedis("127.0.0.1");
  SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
  for (int i = 0; i < 20; i++) {
    log.info("限流状态:[{}]", limiter.isActionAllowed("QYD", "reply", 60, 5));
  }

  try {
    Thread.sleep(10000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

五、漏斗限流算法

1、漏洞的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,它就会变满,直至再也装不进去。

2、如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。

3、如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。

4、如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

一般来说,流出速度是固定的,即不管你请求有多少,速率有多快,我反正就这么个速度处理。当然,特殊情况下,需要加快速度处理,也可以动态调整流出速率

重点:两个速率:

流入速率:即实际的用户请求速率或压力测试的速率,

流出速率:即服务端处理速率。

image-20210507151345443

1、单机代码实现

1)漏斗 FunnelDTO

@Data
public class FunnelDTO {
  /** 容量 */
  Integer capacity;
  /** 剩余容量 */
  Integer leftQuota;
  /** 流出速率 */
  Float leakingRate;
  /** 计算起始时间 */
  Long startTime;


  public FunnelDTO(int capacity, float leakingRate) {
    this.capacity = capacity;
    this.leftQuota = capacity;
    this.leakingRate = leakingRate;
    this.startTime = System.currentTimeMillis();
  }


  /**
     * 1、计算时间间隔
     * 2、计算时间间隔内流出的数量
     *  2.1、如果获取到的小于0, 说明间隔时间太长(没有往漏斗里放数据,漏斗空了),整数数字过大溢出了,则初始化数据
     * 2.2、如果获取到的流出量小于1,则说明腾出空间太小(最小单位是1),则直接返回,不执行
     * 3、
     */
  public void makeSpace() {
    Long nowTime = System.currentTimeMillis();
    // 1、计算时间间隔
    Long deltaTs = nowTime - startTime;

    // 2、计算时间间隔内流出的数量
    Integer deltaQuota = (int) (deltaTs * leakingRate);
    // 2.1、如果获取到的小于0, 说明间隔时间太长(没有往漏斗里放数据,漏斗空了),整数数字过大溢出了,则初始化数据
    if (deltaQuota < 0) {
      this.leftQuota = capacity;
      this.startTime = nowTime;
      return;
    }
    // 2.2、如果获取到的流出量小于1,则说明腾出空间太小(最小单位是1),则直接返回,不执行
    if (deltaQuota < 1) {
      return;
    }

    // 3、重新计算
    // 当前时间 = nowTime
    // 剩余容量 = 当前剩余容量 + 流出速率 * 间隔时间(如果超出了总容量,剩余容量 = 总容量)
    this.leftQuota += deltaQuota;
    this.startTime = nowTime;
    if (this.leftQuota > this.capacity) {
      this.leftQuota = this.capacity;
    }
  }

  /**
     * 判断是否能加入交易
     * 1、漏斗重新计算剩余容量和当前时间
     * 2、
     * @param quota 定额
     * @return
     */
  public boolean watering(int quota) {
    // 1、漏斗重新计算剩余容量和当前时间
    makeSpace();

    // 2、如果剩余容量大于当前要流入漏斗的量,则执行成功,返回true
    if (this.leftQuota >= quota) {
      this.leftQuota -= quota;
      return true;
    }
    return false;
  }

}

2)FunnelRateLimiter

public class FunnelRateLimiter {


  private Map<String, FunnelDTO> funnels ;
  public FunnelRateLimiter(Map funnels) {
    this.funnels = funnels;
  }


  /**
     * 限流方法
     * @param userId
     * @param actionKey
     * @param capacity
     * @param leakingRate
     * @return
     * 1、根据用户Id和动作获取对应的漏斗
     * 2、往漏斗中放入定额数据,看是否能放下
     */
  public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
    // 1、根据用户Id和动作获取对应的漏斗
    String key = String.format("%s:%s", userId, actionKey);
    FunnelDTO funnel = funnels.get(key);
    if (funnel == null) {
      funnel = new FunnelDTO(capacity, leakingRate);
      funnels.put(key, funnel);
    }

    //2、往漏斗中放入定额数据
    return funnel.watering(1);
  }


}

2、分布式的漏斗算法

问题1:上面是单机的,那么分布式用Redis如何实现呢?

答案:观察上面的 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一 个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

问题2:用hash的话,可以保证是原子性操作吗?

答案:我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里 运算,再回填到 hash 结构,这三个过程无法原子化

问题3:那如何保证hash是原子性操作呢?

答案:进行适当的加锁控制 + LUA脚本。

问题4:加锁失败了怎么办?

答案:一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码 复杂度也跟着升高很多。

问题5:有更好的选择吗?

答案:Redis-Cell 救星来了!

1)Redis-Cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并 提供了原子的限流指令。有了这个模块,限流问题就非常简单了。

该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这 个指令具体该如何使用。

a、举例

1、允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速 率)

2、漏斗的初始容量为 15,也就是说一开始可以连续回复 15 个帖子,然后才开始受漏水 速率的影响。

cl.throttle [key] [capacity] [operation_times] [time_span] [quota]
cl.throttle laoqian:replay 15 30 60 1

15 漏斗容量 [capacity] 
30 operation_times / 60 time_span 每60s最多30次(漏水速率)
1  quota 可选参数,默认是1 指的是每次行为占用一个空间

B、总结

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的重试时间都计算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试

cl.throttle laoqian:reply 15 30 60 1
1) (integer) 0  # 0表示允许,1表示拒绝
2) (integer) 15 # 漏斗容量capacity
3) (integer) 14 # 漏斗剩余空间left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2  # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

六、流量控制应用场景

1、应用场景

1)各种限流算法基本使用场景

a、令牌桶算法

  • 电商抢购或热点事件:在电商促销或社交媒体出现热点事件时,用户请求量会急剧增加。令牌桶算法能够允许一定程度的突发流量,同时保证系统不会过载,适合应对这种场景。
  • 网络流量控制:在网络环境中,令牌桶算法可以根据网络带宽和流量负载动态调整发送数据的速率和数量,保证网络的稳定性和可靠性。
  • 服务器资源管理:用于控制服务器资源的访问,如CPU、内存、磁盘和网络等,防止服务器过载和崩溃。
  • 数据库访问控制:动态调整访问数据库的速率和数量,保护数据库不被过度访问。
  • 云计算资源管理:在云计算环境中,根据资源负载和性能要求调整虚拟机、容器等资源的使用速率和数量。

b、漏斗算法

  • 流量整形:漏桶算法以恒定的速率处理请求,使得流量输出更加均匀,适用于需要平滑流量的场景,如保护数据库访问。
  • 防止突发流量:虽然漏桶算法在应对突发流量时可能导致请求失败,但它能有效防止系统因突发流量而崩溃。

c、计数器算法

  • 简单限流需求:对于不需要精确控制流量、仅需简单限流的场景,计数器算法因其实现简单而适用。
  • 注意:然而,计数器算法存在“时间临界点”缺陷,即在时间窗口的临界点附近可能出现流量激增,因此不适用于对流量控制要求较高的场景。

d、滑动窗口算法

  • 需要灵活应对突发流量的场景:滑动窗口限流算法通过将时间窗口划分为多个小窗口,并动态计算当前窗口内的请求数,从而更灵活地应对突发流量。
  • 保护后端服务:防止后端服务因大流量冲击而崩溃,保证服务的稳定性和可靠性。
  • API调用限制:对API调用进行限制,确保公平使用并防止恶意攻击。

2)Guava- RatLimter 2 种模式使用场景

b、SmoothBursty 使用场景

SmoothBursty 限流器主要用于允许系统处理一定程度的突发流量,同时保持整体的平滑限流效果。它通过令牌桶算法来存储和释放令牌,从而控制请求的速率。以下是一些具体的使用场景:

应对流量突发:在电商促销、游戏发布等场景中,用户请求量可能会在短时间内激增。使用 SmoothBursty 限流器可以允许系统在保持整体平稳的情况下,处理部分突发流量,避免系统因请求量过大而崩溃。

高并发系统:在高并发系统中,请求量可能会持续保持在较高水平。SmoothBursty 限流器可以通过调节令牌桶的容量和生成速率,来适应不同的并发需求,从而确保系统的稳定性和可用性。

API 限流:对于公共API 接口,为了防止恶意调用或资源耗尽,通常会设置限流策略。SmoothBursty 限流器可以根据接口的 QPS(每秒查询率)要求,灵活设置令牌桶的容量和生成速率,以实现API的平滑限流。

b、SmoothWarmingUp 使用场景

SmoothWarmingUp: 限流器主要用于需要资源预热或系统逐步达到稳定状态的场景。它通过逐渐增加令牌生成的速率,来平滑过渡到目标速率,从而避免在系统启动时因突发请求而导致的过载。以下是一些具体的使用场景:

⬤ 系统重启后的预热:当系统重启后,很多资源(如数据库连接、缓存等)需要重新加载和初始化。此时,如果立即以最大速率接收请求,可能会导致系统不堪重负。使用 SmoothWarmingUp 限流器可以在系统重启后逐步增加请求处理速率,给系统足够的预热时间。

⬤ 冷启动服务:对于长时间未接收请求的服务,如 API 接口、Web 服务等,重新启动后可能需要逐渐适应请求量。SmoothWarmingUp 限流器可以在此期间逐渐增加服务处理请求的速率,从而避免服务因瞬间流量过大而崩溃。

⬤ 资源懒加载:在一些系统中,某些资源(如文件、图片等)可能采用懒加载方式,即首次请求时才加载。此时,如果大量请求同时到达,可能会导致资源加载缓慢或失败。使用 SmoothWarmingUp 限流器可以平滑处理这些请求,给资源加载足够的时间。

2)实际使用场景

a、服务端接口限制被调用的速度

解决方案:可以使用 Guava 限流器突发模式,配合非阻塞方法 tryAcquire 限制被调用速度,无特殊情况下 tryRequire 超时时间建议设置为 0(能获取到就通过,否则就拒绝调)

补充说明:突发模式的突发时间 (maxBrushSeconds) 配置可以根据历史流量的波动情况确定,多数场景中设置 1 - 5s 即可,突发事件不建议设置为 0,这样可能会 导致一些不符合逾期的欲绝行为,另外,不建议在服务端中接口中使用限流的 acquirttryAcquire 方法做阻塞等待,这样会增加请求延迟给调用方带来不好的体验。

b、客户端限制发出请求的速度

解决方案:可以使用 Guava 限流器突发模式,配合非阻塞方法 tryAcquire 限制发出请求的速度

补充说明:无特殊情况下,突发模式的突发时间 (maxBrushSeconds) 可以设置为 0(我是请求方,还突发啥呢),配合 acquirt 的流量整形效果来让客户端发出匀速请求,另外客户端如果存在失败重试机制,那么可能需要考虑一下重试间隔会由于限流器的阻塞而被拉长的情况

c、秒杀或者大促场景保证不被打垮

解决方案:可以使用 Guava 限流器预热模式,配合非阻塞方法 tryAcquire 限制被调用速度,建议设置一个合理的超时时间

补充说明:预热时间的长度可以根据业务场景特点配置,限流阈值在压测的时候确定,这里建议使用 tryAcquire 进行可超时阻塞的目的是为了保证 “ 先到先得 “的公平性

2、限流问题

1)超过限流阈值

a、使用限流器后实际流量超过阈值

解释:这种情况只能是出现在突发模式情况下 ,突发模式限流器会请求存量令牌和新令牌,新令牌的生成速度等于限流速度,而超额部分的情况来自于存量令牌,在实际流量超过阈值不多的情况下,令牌桶中的数量很难被耗尽

建议:一方面我们要充分理解限流原理,帮助我建立正确的限流效果预期,另一方面我们应该尽量使用合理的配置来实现限流需求,限流阈值一般情况大于接口正常情况下的流量,突发时间流量应参考历史最大流量和平均流量的差异来决定

2)未超过阈值

a、在客户端设置匀速调用场景,服务端使用了限流器后发现实际流量无法达到阈值上限

解释:这种情况是非常偶然的,实际是由于限流配置不当导致的,假设客户端以 50QPS 的速度发出情况,即每 20ms一个,服务端的限流器配置为 30QPS,且不支持突发流量(突发模式设置突发时间为 0,或者使用了预热模式) 这种情况下服务端会严格按照 30ms一个的速度接收请求,因此,客户端在 20ms之后发出第二个请求时,服务端尚未满足 30ms的时间间隔,就出现了每 2个请求就有一个被拒绝的情况

建议:在遇到这种问题的时候,首先考虑限流阈值是否合理,其次,如果在服务端使用突发模式限流,尽量不要把突发时间设置为0,如果使用预热模式限流,应该参考服务容量,配置一足够大的限流阈值

3)参数配置怎么合理

a、SmoothBurstymaxBurstSeconds 突发时间配置成多少合理?

举例说明:假设有一个电商平台的秒杀活动,该活动的平均每秒请求量(QPS)为100,即每秒允许通过100个请求。然而,考虑到活动开始时用户可能会集中涌入,导致请求量激增,我们可以适当配置maxBurstSeconds来应对这种突发情况。

  • 场景一:如果系统资源充足,且业务对延迟的容忍度较高,可以将maxBurstSeconds设置得稍大一些,比如5秒。这样,令牌桶最多可以存储500个令牌(如果每秒生成 100 个令牌),从而允许在突发流量期间处理额外的请求。
  • 场景二:如果系统资源有限,或者业务对延迟非常敏感,那么可能需要将maxBurstSeconds设置得小一些,比如1秒或更少。这样可以减少突发流量对系统性能的影响,但也可能导致部分请求在高峰期间被限流

b、SmoothWarmingUpwarmupPeriod 预热时间设置多少合理?

举例说明:假设我们有一个在线视频直播平台,该平台在每天凌晨进行服务器重启以进行日常维护和更新。重启后,系统需要加载各种资源(如数据库连接、流媒体服务器配置、缓存数据等)以准备接收用户请求。由于用户通常在早上逐渐上线并开始观看直播,因此系统需要在这段时间内平稳地过渡到高负载状态。

推断过程:系统重启并加载所有资源大约需要 5分钟,用户流量在重启后的前 10 分钟内逐渐增加。我们需要确保在warmupPeriod内,系统能够逐步适应并处理增加的请求量,同时尽量减少对用户体验的影响。我们可以将warmupPeriod设置为15分钟。这样,系统就有足够的时间来加载资源、预热并准备处理逐渐增加的用户请求

七、限流使用

1、限流工具类

1)自定义限流工具类

1、RateLimiterAspect

package com.healerjean.proj.aspect;

import com.google.common.util.concurrent.RateLimiter;
import com.healerjean.proj.util.limit.RateLimiterPropertyUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * RateLimiterAspect
 *
 * @author zhangyujin
 * @date 2022/9/8  21:44.
 */
@Component
@Aspect
public class RateLimiterAspect {
    /**
     * log
     */
    private static final Logger log = LoggerFactory.getLogger(RateLimiterAspect.class);

    /**
     *
     */
    private RateLimiterAspect() {
    }


    /**
     * @param pjp
     * @return
     * @throws Throwable
     */
    @Around("execution(* com.healerjean.proj.controller.*Controller.*(..))")
    public Object logAround(ProceedingJoinPoint pjp) throws Throwable {
        Object[] args = pjp.getArgs();
        Class<?> clazz = pjp.getTarget().getClass();
        log.info("开始拦截:" + clazz.getName());

        try {
            this.limit(clazz.getName());
        } catch (Exception var5) {
            log.error(var5.getMessage(), var5);
        }

        return pjp.proceed(args);
    }

    /**
     *
     * @param className
     */
    public void limit(String className) {
        RateLimiter rateLimiter = RateLimiterPropertyUtil.getMap().get(className);
        if (null != rateLimiter) {
            log.info("开始获取令牌:" + className + " rate:" + rateLimiter.getRate());
            double waitTime = rateLimiter.acquire();
            log.info(className + ":获取成功 waitTime :" + waitTime + "ms");
        }
    }
}

2、RateLimiterPropertyUtil

package com.healerjean.proj.util.limit;

import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * RateLimiterPropertyUtil
 *
 * @author zhangyujin
 * @date 2022/9/8  21:37.
 */
@Service
@Slf4j
public class RateLimiterPropertyUtil {

    /**
     * MAP_RATE
     */
    private static ConcurrentHashMap<String, RateLimiter> MAP_RATE = new ConcurrentHashMap();
    /**
     * props
     */
    private static Properties PROPS;

    private RateLimiterPropertyUtil() {
    }

    @PostConstruct
    public void init(){
        load();
    }

    private static synchronized void load() {
        log.info("开始加载限流文件内容.......");
        PROPS = new Properties();
        InputStream in = null;
        try {
            in = RateLimiterPropertyUtil.class.getResourceAsStream("/rateLimiter.properties");
            PROPS.load(in);
            Set<Map.Entry<Object, Object>> set = PROPS.entrySet();

            for (Map.Entry<Object, Object> objectObjectEntry : set) {
                Map.Entry<Object, Object> entry = objectObjectEntry;
                String key = String.valueOf(entry.getKey()).trim();
                String value = String.valueOf(entry.getValue());
                log.info("限流策略:key:{}, value:{}", key, value);
                if (StringUtils.hasText(key) && StringUtils.hasText(value)) {
                    RateLimiter rateLimiter = RateLimiter.create(Double.valueOf(value));
                    MAP_RATE.put(key, rateLimiter);
                }
            }
        } catch (FileNotFoundException var17) {
            log.error("jdbc.properties文件未找到");
        } catch (IOException var18) {
            log.error("出现IOException");
        } finally {
            try {
                if (null != in) {
                    in.close();
                }
            } catch (IOException var16) {
                log.error("jdbc.properties文件流关闭出现异常");
            }
        }

        log.info("properties文件内容:" + PROPS);
    }

    public static ConcurrentHashMap<String, RateLimiter> getMap() {
        return MAP_RATE;
    }

}

3、rateLimiter.properties

建议使用配置中心

com.healerjean.proj.controller.RateLimitController=1

2、自适应限流

相对与单机限流、集群限流等传统固定 QPS 限流机制,自适应限流无需设置限流阈值,解决固定 QPS 限流机制所存在的评估阈值难、维护成本高掣肘新技术推广等3大痛点问题,可为业务系统提供更好的防护作用。

1)自适应限流思路&方案选型

业界主流自适应限流方案都是基于并发度限流,核心计算公式分如下两大类,经过对两种方案的特点综合评估和压测对比,方案二(gradient 限流算法)的限流效果更加稳定

方案 核心算法公式 特点 代表产品
方案一:根据低负载延迟与最大 QPS,计算最大并发度 max_concurrency = noload_rt * max_QPS 1、最大并发度计算更精准,波动较小,但上探与下探的区间也较小
2、max_QPS 做为一个参考指标,QPS 过低时会间接影响到最大并发度计算
Dubbo、Brpc、Kartos-go
方案二(gradient 限流算法):根据低负载延迟与当前延迟计算梯度,再根据梯度计算最大并发度 gradient = noload_rt / actural_rt
max_concurrency = max_concurrency * gradient + explor_step
1、最大并发度的波动大,上探区间也就较大
2、最大并发度的计算只与请求延时一个指标有关,受干扰的因素少
Netfix推出的concurrency_limit

a、gradient(梯度)限流算法

自适应限流在每次请求结束后,都会根据 gradient(梯度)限流算法 计算最大并发度,gradient 是计算低负载时的 RTT 与当前RTT 的比值来判断是否出现请求是否拥塞的情况。

术语 解释 计算
noload_rt 低负载时延时(低负载时平均耗时)  
actural_rt 当前请求的耗时  
gradient 计算并发度梯度  
max_concurrency 并发度阈值,当 Provider 当前并发超过并发度阈值,即拒绝请求  
explor_ste 并发度上探步长(默认取值 0.8  

1、noload_rt(低负载时延时)计算:

由于业务处理请求的平均耗时在低负载时也不是一成不变的(例如某个时间段参数密度集中的出现变化),所以 noload_rt 应随着 actural_rt 整体的变化趋势平滑的随之变化,这里使用 指数移动平均算法,对采集每次请求结束后的 actural_rt 去计算 noload_rt

另外真实情况下,个别请求耗时会因特殊原因明显过高(比如突然 gc 卡顿),会影响到 noload_rt 计算,这里采用 zscore 算法 对其进行降噪处理

actural_rt = zscore ( actural_rt, score )   // zscore降噪

noload_rt =  noload_rt * (1 - factor) + actural_rt * factor  //指数移动平均算法
  
  
  
factor指数移动平均系数默认取 2.0 / (1000+1), 即近1000次请求的平均耗时
zscorezscore 算法用于对个别耗时长的请求耗时降噪
scorezscore 算法分数系数默认取 1.0相当于过滤超过TP90以上的耗时精确视为噪音

2、gradient(梯度)计算:

gradient = noload_rt / actural_rt   (gradient 最大值取1最小值取0.5)

3、max_concurrency最大并发度阈值)计算:

gradient 等于1时,说明当前请求没有拥塞;当 gradient 小于1时,说明当前开始拥塞,需要降低 max_concurrency。所以max_concurrency 的更新方式是

max_concurrency =   max_concurrency  *  gradient + gradient == 1 ? explor_step : 0

4、按照应用优先级限流

自适应限流提供应用分级设置,限流时可优先限制低优先级应用,其实现思路是:

根据滑动窗口算法,统计各个应用优先级在一个周期内(1s )的访问量以及总限流次数,跟据总限流次数与各个应用优先级的流量比例,定时调整各个应用优先级拒绝流量的概率,按对应的概率随机拒绝请求。

如下图所场景,总流量 4000 qps,总限流次数 2500 次,则单机预期的流量为 1500 qps,这时若低优先级应用拒绝 2500 次请求,即符合预期。

image-20240914153448807

5、CPU 安全水位与最高水位机制

在较特殊情况下,低负载时请求耗时趋势突然发生较大变化,这时 gradient(梯度)算法可能来不及自适应调整 max_concurrency 而导致误限,这里通过 CPU 安全水位机制,CPU 低于安全水位时不限流,避免在安全范围内出现误限情况

CPU 最高水位:

CPU 超过最高水位时会触发 CPU 过载保护,作为最后的系统过载保护屏障,CPU 过载时会按比例拒绝请求,防止一刀切导致大量限流。

如:1000 qps 时可让 CPU 到达 100%CPU 最高水位设置为 90%,这时会随机拒绝 10% 的流量( 100 qps),保障CPU90% 附近波动

CPU采集频率:500ms 采集一次 CPU 指标

ContactAuthor