前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、引入

1、控制流量:限流算法在分布式领域是一个经常被提起的话题,当系统的处理能力有限时,如何阻止 计划外的请求继续对系统施压,这是一个需要重视的问题。老钱在这里用 “断尾求生” 形 容限流背后的思想,当然还有很多成语也表达了类似的意思,如弃卒保车、壮士断腕等等

2、控制用户行为:除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求。比如在 UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定 时间内允许的次数,超过了次数那就是非法行为。对非法行为,业务必须规定适当的惩处策 略。

2、滑动窗口:zset实现

限流中的滑动窗口可以简单理解为,设定的单位时间就是一个窗口,窗口可以分割多个更小的时间单元,随着时间的推移,窗口会向右移动。比如一个接口一分钟限制调用 1000次,1分钟就可以理解为一个窗口

设计思路 : 每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。

方案:想想 zset 数据结构的 score 值,是不是可以 通过 score 来圈出这个时间窗口来。而且我们只需要保留这个时间窗口,窗口之外的数据都 可以砍掉。

缺点:如果这 个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流 的,因为会消耗大量的存储空间。

1、key:用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保存下来,同一个用户同一种行为用一个 zset 记录

2、score:当前时间戳

3、valuezset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了,用 uuid 会 比较浪费空间,那就改用毫秒时间戳吧。

4、milliseconds:过期时间,为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。

image-20210507141302926

2.1、代码实现

2.1.1、基本代码

@Slf4j
public class SimpleRateLimiter {

    private Jedis jedis;

    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     *
     * @param userId 用户Id
     * @param actionKey 动作
     * @param period 过期时间 单位(秒)
     * @param maxCount 最大次数
     * @return 返回是否允许访问
     * 1、获取key = userId + "#" + actionKey;
     * 2、
     */
    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        // 1、获取key = userId + "#" + actionKey;
        String key = userId + "#" + actionKey;
        long now = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        //开启事务
        pipe.multi();
        //第二个参数是score,第三个参数是value
        pipe.zadd(key, now, "" + now);
        //画图帮助理解
        pipe.zremrangeByScore(key, 0, now - period * 1000);
        //当前窗口的元素个数
        Response<Long> count = pipe.zcard(key);
        //一定要设置过期时间,可能大部分用户是冷用户,因为要维护period时间内的记录,所以key过期时间要稍微比period大
        pipe.expire(key, period + 1);
        //执行事务
        pipe.exec();
        try {
            pipe.close();
        } catch (IOException e) {
            log.info("SimpleRateLimiter#isActionAllowed pipe.close() error", e);
        }
      
         //通过统计滑动窗口内的行为数量与阈值 `max_count `进行比较就可以得出当前的行为是 允许。用代码表示如下:
        return count.get() <= maxCount;
    }

}

2.1.1、实例测试

@Test
public void test() {
  Jedis jedis = new Jedis("127.0.0.1");
  SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
  for (int i = 0; i < 20; i++) {
    log.info("限流状态:[{}]", limiter.isActionAllowed("QYD", "reply", 60, 5));
  }

  try {
    Thread.sleep(10000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

3、计数器方法:string incr

3.1、代码实现

3.1.1、自定义注解

@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EntryTimes {

    /** 方法允许进入的次数 */
    int value() default 1;
   
     /**  可以访问的前缀 url前缀  */
    String prefix() default "";
}


3.1.1、Aspect:切面拦截


/**
 * 控制每个用户访问Controller方法的次数
 * Created by fengchuanbo on 2017/5/25.
 */
@Aspect
@Component
public class MethodEntryTimesLimitInterceptor {

  private static final String METHOD_CAN_ENTRY_TIMES_KEY = "method:entry:times:";

  @Resource
  private StringRedisTemplate stringRedisTemplate;

  /**
     * 需要有 EntryTimes 标注,并且第一个参数需要是 AuthUser 才可以
     */
  @Around("@annotation(com.duodian.admore.zhaobutong.annotation.EntryTimes)")
  public Object aroundAdvice(ProceedingJoinPoint pjp) throws Throwable {
    HttpServletRequest request = ((ServletRequestAttributes) 
                                  RequestContextHolder.getRequestAttributes()).getRequest();
    String token = request.getParameter("token");
    String aes = AesUtils.LoginDecrypt(token);
    Long userId = Long.valueOf(aes.split("#")[0]);
    MethodSignature signature = (MethodSignature) pjp.getSignature();
    Method method = signature.getMethod();
    EntryTimes annotation = method.getAnnotation(EntryTimes.class);
    int times = annotation.value();
    String key = METHOD_CAN_ENTRY_TIMES_KEY + ":" + annotation.prefix() + ":" +  userId;
    // 没有整个方法使用一个redis链接,是为了方式方法执行时间过长一直占用redis链接。
    Long increment = getEntryTimes(key);
    Object retVal;
    try {
      // 放在try里面,才能执行finally
      if (increment > times){
        // 设置十秒钟超时,防止finally在系统崩溃或重启时没有执行造成用户不能操作。
        expireKey(key,10);
        return Response.of(Code.ACTION_FREQUENT);
      }
      //调用核心逻辑
      retVal = pjp.proceed();
    }finally {
      deleteKey(key);
    }
    return retVal;
  }

  private Long getEntryTimes(String key){
    return stringRedisTemplate.opsForValue().increment(key,1);
  }

  private void deleteKey(String key){
    stringRedisTemplate.delete(key);
  }

  private void expireKey(String key, int seconds){
    stringRedisTemplate.boundValueOps(key).expire(seconds, TimeUnit.SECONDS);
  }
}

4、漏斗限流

1、漏洞的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,它就会变满,直至再也装不进去。

2、如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。

3、如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。

4、如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

一般来说,流出速度是固定的,即不管你请求有多少,速率有多快,我反正就这么个速度处理。当然,特殊情况下,需要加快速度处理,也可以动态调整流出速率

重点:两个速率:

流入速率:即实际的用户请求速率或压力测试的速率,

流出速率:即服务端处理速率。

image-20210507151345443

4.1、单机代码实现

4.1.1、漏斗 FunnelDTO

@Data
public class FunnelDTO {
  /** 容量 */
  Integer capacity;
  /** 剩余容量 */
  Integer leftQuota;
  /** 流出速率 */
  Float leakingRate;
  /** 计算起始时间 */
  Long startTime;


  public FunnelDTO(int capacity, float leakingRate) {
    this.capacity = capacity;
    this.leftQuota = capacity;
    this.leakingRate = leakingRate;
    this.startTime = System.currentTimeMillis();
  }


  /**
     * 1、计算时间间隔
     * 2、计算时间间隔内流出的数量
     *  2.1、如果获取到的小于0, 说明间隔时间太长(没有往漏斗里放数据,漏斗空了),整数数字过大溢出了,则初始化数据
     * 2.2、如果获取到的流出量小于1,则说明腾出空间太小(最小单位是1),则直接返回,不执行
     * 3、
     */
  public void makeSpace() {
    Long nowTime = System.currentTimeMillis();
    // 1、计算时间间隔
    Long deltaTs = nowTime - startTime;

    // 2、计算时间间隔内流出的数量
    Integer deltaQuota = (int) (deltaTs * leakingRate);
    // 2.1、如果获取到的小于0, 说明间隔时间太长(没有往漏斗里放数据,漏斗空了),整数数字过大溢出了,则初始化数据
    if (deltaQuota < 0) {
      this.leftQuota = capacity;
      this.startTime = nowTime;
      return;
    }
    // 2.2、如果获取到的流出量小于1,则说明腾出空间太小(最小单位是1),则直接返回,不执行
    if (deltaQuota < 1) {
      return;
    }

    // 3、重新计算
    // 当前时间 = nowTime
    // 剩余容量 = 当前剩余容量 + 流出速率 * 间隔时间(如果超出了总容量,剩余容量 = 总容量)
    this.leftQuota += deltaQuota;
    this.startTime = nowTime;
    if (this.leftQuota > this.capacity) {
      this.leftQuota = this.capacity;
    }
  }

  /**
     * 判断是否能加入交易
     * 1、漏斗重新计算剩余容量和当前时间
     * 2、
     * @param quota 定额
     * @return
     */
  public boolean watering(int quota) {
    // 1、漏斗重新计算剩余容量和当前时间
    makeSpace();

    // 2、如果剩余容量大于当前要流入漏斗的量,则执行成功,返回true
    if (this.leftQuota >= quota) {
      this.leftQuota -= quota;
      return true;
    }
    return false;
  }

}

4.1.2、FunnelRateLimiter

public class FunnelRateLimiter {


  private Map<String, FunnelDTO> funnels ;
  public FunnelRateLimiter(Map funnels) {
    this.funnels = funnels;
  }


  /**
     * 限流方法
     * @param userId
     * @param actionKey
     * @param capacity
     * @param leakingRate
     * @return
     * 1、根据用户Id和动作获取对应的漏斗
     * 2、往漏斗中放入定额数据,看是否能放下
     */
  public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
    // 1、根据用户Id和动作获取对应的漏斗
    String key = String.format("%s:%s", userId, actionKey);
    FunnelDTO funnel = funnels.get(key);
    if (funnel == null) {
      funnel = new FunnelDTO(capacity, leakingRate);
      funnels.put(key, funnel);
    }

    //2、往漏斗中放入定额数据
    return funnel.watering(1);
  }


}

4.2、分布式的漏斗算法

问题1:上面是单机的,那么分布式用Redis如何实现呢?

答案:观察上面的 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一 个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

问题2:用hash的话,可以保证是原子性操作吗?

答案:我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里 运算,再回填到 hash 结构,这三个过程无法原子化

问题3:那如何保证hash是原子性操作呢?

答案:进行适当的加锁控制 + LUA脚本。

问题4:加锁失败了怎么办?

答案:一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码 复杂度也跟着升高很多。

问题5:有更好的选择吗?

答案:Redis-Cell 救星来了!

4.2.1、Redis-Cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并 提供了原子的限流指令。有了这个模块,限流问题就非常简单了。

该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这 个指令具体该如何使用。

4.2.1.1:举例

1、允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速 率)

2、漏斗的初始容量为 15,也就是说一开始可以连续回复 15 个帖子,然后才开始受漏水 速率的影响。

cl.throttle [key] [capacity] [operation_times] [time_span] [quota]
cl.throttle laoqian:replay 15 30 60 1

15 漏斗容量 [capacity] 
30 operation_times / 60 time_span 每60s最多30次(漏水速率)
1  quota 可选参数,默认是1 指的是每次行为占用一个空间

4.2.1.2、总结

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的重试时间都计算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试

cl.throttle laoqian:reply 15 30 60 1
1) (integer) 0  # 0表示允许,1表示拒绝
2) (integer) 15 # 漏斗容量capacity
3) (integer) 14 # 漏斗剩余空间left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2  # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

5、令牌桶

如果令牌的数量超过里桶的限制的话,令牌就会溢出,这时候就直接舍弃多余的令牌。

每个请求过来必须拿到桶里面拿到了令牌才允许请求(拿令牌的速度是不限制的,这就意味着如果瞬间有大量的流量请求进来,可以短时间内拿到大量的令牌),拿不到令牌的话直接拒绝。 令牌桶算法我们可以通过Google开源的guava包创建一个令牌桶算法的限流器。

令牌桶和漏桶不同点:令牌桶新增了一个匀速生产令牌的中间人以恒定的速度往桶里面放令牌,上面的漏桶,流入速率根本不控制,用户请求压力直接达到漏桶来(令牌桶这个匀速流入速率和mq对于mysql请求量的控制很像)。

image-20210507164157301

ContactAuthor