前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、位图的简单计算

1、java

1)普通-增&删&查

package com.hlj.util.Z005BitSet;

import lombok.extern.slf4j.Slf4j;

/**
 * @author zhangyujin
 * @date 2022/6/30  18:02.
 */
@Slf4j
public class StatusEnums {
  
    private final static long INIT = 1L << 0;
    private final static long CHSI_VERIFY = 1L << 1;
    private final static long GRADUATE = 1L << 2;

  
    static {
       
        System.out.println(Long.toBinaryString(INIT));
       
        System.out.println(Long.toBinaryString(CHSI_VERIFY));
        System.out.println(Long.toBinaryString(GRADUATE));
        System.out.println();
    }


    /**
     * 是否包含状态
     *
     * @param holder   当前状态
     * @param position 标志位状态
     * @return
     */
    public static boolean hasStatus(long holder, long position) {
        return (holder & position) == position;
    }

    /**
     * 添加状态
     *
     * @param holder   当前状态
     * @param position 添加状态标志位
     * @return 新状态
     */
    public static long addStatus(long holder, long position) {
        if (hasStatus(holder, position)) {
            return holder;
        }
        return holder | position;
    }

    /**
     * 移除状态
     *
     * @param holder   当前状态
     * @param position 移除状态标志位
     * @return 新状态
     */
    public static long removeStatus(long holder, long position) {
        if (!hasStatus(holder, position)) {
            return holder;
        }
        //异或移除
        return holder ^ position;
    }

  
  
    public static void main(String[] args) {
        long holder = 7L;
        System.out.println(Long.toBinaryString(holder));
        //0111

        System.out.println(StatusEnums.hasStatus(holder, INIT));
        // true -> 0001 在 0111 中
        System.out.println(StatusEnums.hasStatus(holder, CHSI_VERIFY));
        // true -> 0010 在 0111 中
        System.out.println(StatusEnums.hasStatus(holder, GRADUATE));
        // tuue -> 0100 在 0111 中

        holder = StatusEnums.removeStatus(holder, INIT);
        holder = StatusEnums.removeStatus(holder, CHSI_VERIFY);
        holder = StatusEnums.removeStatus(holder, GRADUATE);
        System.out.println(Long.toBinaryString(holder));
        //0000
        System.out.println(StatusEnums.hasStatus(holder, INIT));
        // false -> 0001 不在 0111 中
        System.out.println(StatusEnums.hasStatus(holder, CHSI_VERIFY));
        // false -> 0010 不在 0111 中
        System.out.println(StatusEnums.hasStatus(holder, GRADUATE));
        // false -> 0100 不在 0111 中
    }
}

a、场景-模拟签到

package com.hlj.util.Z005BitSet;

import java.util.*;

/**
 * @author zhangyujin
 * @date 2022/6/30  18:15.
 */
public class Sign {

    //int最多可以容纳32位,0b表示需jdk1.8+支持,左边第一位永远为0,即正整数,无日期含义;右边31位表示日期,从左到右1-31天
    private final static int monthSignRecord = 0b01111111111111111111111111111000;
    private final static int DAY28 = 0x7ffffff8;
    private final static int DAY29 = 0x7ffffffc;
    private final static int DAY30 = 0x7ffffffe;
    private final static int DAY31 = 0x7fffffff;
    private final static int MONTH_MODEL = 0x40000000;
    private final static Calendar CALENDAR = Calendar.getInstance(Locale.CHINA);

    /**
     * 签到总数
     *
     * @param holder 签到记录
     * @return
     */
    public static int signCount(int holder) {
        return Integer.bitCount(holder);
    }

    /**
     * 获取指定月是否全签到
     *
     * @param holder 签到记录
     * @return
     */
    public static boolean isSignFullInFixedMonth(int holder, int month) {
        CALENDAR.set(Calendar.MONTH, month - 1);
        int days = CALENDAR.getActualMaximum(Calendar.DATE);

        switch (days) {
            case 28:
                return (holder & DAY28) == DAY28;
            case 29:
                return (holder & DAY29) == DAY29;
            case 30:
                return (holder & DAY30) == DAY30;
            case 31:
                return (holder & DAY31) == DAY31;
        }
        throw new IllegalArgumentException("month is illegal");
    }

    /**
     * 获取指定月漏签次数
     *
     * @param holder 签到记录
     * @return
     */
    public static int unSignCountInFixedMonth(int holder, int month) {
        // java 月份从0开始算,因此传入的月份需要减去1
        CALENDAR.set(Calendar.MONTH, month - 1);
        return CALENDAR.getActualMaximum(Calendar.DATE) - Integer.bitCount(holder);
    }

    /**
     * 指定天是否签到
     *
     * @param holder   当前签到标识
     * @param position 签到天标识位
     * @return
     */
    public static boolean isSignInFixedDay(int holder, int position) {
        return (holder & position) == position;
    }

    /**
     * 按照天签到
     *
     * @param holder   当前签到标识
     * @param position 签到天标识位
     * @return 新签到标识
     */
    public static int signInFixedDay(int holder, int position) {
        if (isSignInFixedDay(holder, position)) {
            return holder;
        }
        return holder | position;
    }

    /**
     * 批量签到,适合补签场景
     *
     * @param holder 当前签到标识
     * @param days   批量签到序号
     * @return 新签到标识
     */
    public static int batchSign(int holder, List<Integer> days) {
        Iterator<Integer> it = days.iterator();
        while (it.hasNext()) {
            holder = holder | getPosition(it.next());
        }
        return holder;
    }

    public static int getPosition(int position) {
        //0b01000000000000000000000000000000 == 0x40000000 左边第一位永远为0,即正整数,无日期含义;右边31位表示日期,从右到左1-31天
        return MONTH_MODEL >>> (position - 1);
    }

    public static void main(String[] args) {
        int month = 5;
        System.out.printf("签到记录:【%s】 %n", Integer.toBinaryString(monthSignRecord));
        // 签到记录:【1111111111111111111111111111000】

        System.out.printf("签到总数:【%d】 %n", signCount(monthSignRecord));
        // 签到总数:【28】

        System.out.printf("%d月漏签总数 %d %n", month, unSignCountInFixedMonth(monthSignRecord, month));
        // 5月漏签总数 3

        System.out.printf("全月是否全部签到 %b %n", isSignFullInFixedMonth(monthSignRecord, month));
        // 全月是否全部签到 false

        System.out.printf("签到前【%s】%n", Integer.toBinaryString(monthSignRecord));
        // 签到前【1111111111111111111111111111000】

        int signTemp = signInFixedDay(monthSignRecord, getPosition(30));
        signTemp = signInFixedDay(signTemp, getPosition(31));
        System.out.printf("签到后【%s】%n", Integer.toBinaryString(signTemp));
        // 签到后【1111111111111111111111111111011】

        System.out.printf("全月是否全部签到 %b %n", isSignFullInFixedMonth(signTemp, month));
        // 全月是否全部签到 false

        System.out.printf("批量签到前【%s】%n", Integer.toBinaryString(monthSignRecord));
        // 批量签到前【1111111111111111111111111111000】

        int signBatch = batchSign(monthSignRecord, Arrays.asList(29, 30, 31));
        System.out.printf("批量签到后【%s】%n", Integer.toBinaryString(signBatch));
        // 批量签到后【1111111111111111111111111111111】
        
        System.out.printf("全月是否全部签到 %b %n", isSignFullInFixedMonth(signBatch, month));
        // 全月是否全部签到 true
    }
}

2、数据库

1)场景用例1:

查询所有支持快递物流的商家(物流方式包括 快递 上门 地铁)

a、传统的方式

1、数据库使用逗号分隔字符串,通过 find_in_set 进行查询

2、快递:1、快递+上门:3、快递 + 上门 + 地铁 : 7,通过 select * from ^ in (1, 3, 7 ……)

image-20220630183936985

b、位图方式

image-20220630184234705

存储方式

左边第一位永远为 0,可以使用 varchar() 存储

CREATE TABLE `test`
(
    `id`          bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
    `status`      varchar(32)     NOT NULL COMMENT '产品状态 字典表 productstatus',
    `update_user` bigint unsigned DEFAULT '0' COMMENT '更新人',
    PRIMARY KEY (`id`) USING BTREE
) COMMENT ='测试表';
insert into test
(`status`,
 `create_name`,
 `update_user`)
values (1, 2, 3, '0101', 4, 5, 7, 9);

select LPAD(t.status & 0110, 4, '0') as '与运算' from test t;
# 0100
select LPAD(t.status | 0110, 4, '0') as '或运算' from test t;
# 0111
select LPAD(t.status ^ 0110, 4, '0') as '异或运算' from test t;
# 0011

3、redis

位图不是特殊的数据结构,它的内容其实就是普通的字符串,也就是 byte 数组。我们 可以使用普通的 get/set 直接获取和设置整个位图的内容,也可以使用位图操作 getbit/setbit 等将 byte 数组看成「位数组」来处理

在我们平时开发过程中,会有一些 bool 型数据需要存取,比如用户一年的签到记录, 签了是 1,没签是 0,要记录 365 天。如果使用普通的 key/value,每个用户要记录 365 个,当用户上亿的时候,需要的存储空间是惊人的。

为了解决这个问题,Redis 提供了位图数据结构,这样每天的签到记录只占据一个位, 365 天就是 365 个位,46 个字节 (46*8bit = 368 > 365一个稍长一点的字符串) 就可以完全容纳下,这就大大 节约了存储空间。

image-20210429152710443

1)基本使用

Redis 的位数组是自动扩展,如果设置了某个偏移位置超出了现有的内容范围,就会自动将位数组进行零扩充。

案例:接下来我们使用位操作将字符串设置为 hello (不是直接使用 set 指令),首先我们需要得到 helloASCII

ASCII 二进制
h 01101000
e 01100101
l 01101100
l 01101100
o 01101111

image-20210429153118381

接下来我们使用 redis-cli 设置第一个字符,也就是位数组的前 8 位,我们只需要设置值为 1 的位,如上图所示,

h 字符只有 1/2/4 位需要设置

e 字符只有 9/10/13/15 位需要设置。

l字符只有17182021

⬤ ………………

127.0.0.1:6379> setbit s 1 1 
(integer) 0
127.0.0.1:6379> setbit s 2 1 
(integer) 0
127.0.0.1:6379> setbit s 4 1 
(integer) 0
127.0.0.1:6379> get s
"h"


127.0.0.1:6379> setbit s 9 1 
(integer) 0
127.0.0.1:6379> setbit s 10 1
(integer) 0
127.0.0.1:6379> setbit s 13 1
(integer) 0
127.0.0.1:6379> setbit s 15 1
(integer) 0
127.0.0.1:6379> get s
"he"

a、零存零取

「零存」就是使用 setbit 对位值进行逐个设置,

127.0.0.1:6379> setbit w 1 1 
(integer) 0
127.0.0.1:6379> setbit w 2 1 
(integer) 0
127.0.0.1:6379> setbit w 4 1
(integer) 0


127.0.0.1:6379> getbit w 1 
(integer) 1
127.0.0.1:6379> getbit w 2
(integer) 1 
127.0.0.1:6379> getbit w 4 
(integer) 1 
127.0.0.1:6379> getbit w 5 
(integer) 0

b、整存零取

「整存」就是使用字符串一次性填充所有位数组,覆盖掉旧值。

127.0.0.1:6379> set w h # 整存 (integer) 0


127.0.0.1:6379> getbit w 1
(integer) 1
127.0.0.1:6379> getbit w 2 
(integer) 1 
127.0.0.1:6379> getbit w 4 
(integer) 1 
127.0.0.1:6379> getbit w 5
(integer) 0

c、不可打印字符,显示16进制

如果对应位的字节是不可打印字符,redis-cli 会显示该字符的 16 进制形式。

127.0.0.1:6379> setbit x 0 1 
(integer) 0
127.0.0.1:6379> setbit x 1 1
(integer) 0
127.0.0.1:6379> get x 
"\xc0"

2)统计和查找

Redis 提供了位图统计指令 bitcount 和位图查找指令 bitpos

bitcount 用来统计指定位置范围内 1 的个数,

bitpos 用来查找指定范围内出现的第一个 01

startend 参数是字节索引(第几个字符),也就是说指定的位范围必须是 8 的倍数, 而不能任意指定

​ 这很奇怪,我表示不是很能理解 Antirez 为什么要这样设计。因为这个设 计,我们无法直接计算某个月内用户签到了多少天,而必须要将这个月所覆盖的字节内容全 部取出来 (getrange 可以取出字符串的子串) 然后在内存里进行统计,这个非常繁琐。

1、通过 bitcount 统计用户一共签到了多少天,

2、通过 bitpos 指令查找用户从 哪一天开始第一次签到。

如果指定了范围参数[start, end],就可以统计在某个时间范围内用户 签到了多少天,用户自某天以后的哪天开始签到

127.0.0.1:6379> set w hello
OK
127.0.0.1:6379> bitcount w     # 总共有
(integer) 21
127.0.0.1:6379> bitcount w 0 0  # 第一个字符中 1 的位数,h ->  3个1
(integer) 3
127.0.0.1:6379> bitcount w 0 1  # 前两个字符中 1 的位数  he -> 7个1
(integer) 7


127.0.0.1:6379> bitpos w 0 # 第一个bit 0 位 
(integer) 0
127.0.0.1:6379> bitpos w 1 # 第一个bit 1 位 
(integer) 1

127.0.0.1:6379> bitpos w 1 1 1  # 从第二个字符算起(1-1),第一个 1 位
(integer) 9
127.0.0.1:6379> bitpos w 1 2 2  # 从第三个字符算起(2-2),第一个 1 位
(integer) 17

3)魔术指令 bitfield

前文我们设置 ( setbit ) 和获取 ( getbit ) 指定位的值都是单个位的,如果要一次操作多个位,就必须使用管道来处理。

不过 Redis3.2 版本以后新增了一个功能强大的指令,有了这条指令,不用管道也可以一次进行多个位的操作。

bitfield 有三个子指令,分别是 get / set / incrby,它们都可以对指定位片段进行读写,但是最多只能处理 64 个连续的位,如果 超过 64 位,就得使用多个子指令,bitfield 可以一次执行多个子指令

image-20210429173845400

⬤ 有符号数是指获取的位数组中第一个位是符号位,剩下的才是值,如果第一位是 1,那就是负数。(有符号数最多可以获取 64 位)

⬤ 无符号数表示非负数,没有符号位,获取的位数组全部都是值。无符号数只能获取 63 位(因为首位表示正数/负数)

127.0.0.1:6379> set w hello
OK
127.0.0.1:6379> bitfield w get u4 0 # 从第一个位开始取 4 个位,结果是无符号数 (u)
(integer) 6
0110 -> 6


127.0.0.1:6379> bitfield w get u3 2 # 从第三个位开始取 3 个位,结果是无符号数 (u) 
(integer) 5
101 -> 5

127.0.0.1:6379> bitfield w get i4 0 # 从第一个位开始取 4 个位,结果是有符号数 (i) 
1) (integer) 6
0110 -> 0开头表示正数 -> 6


127.0.0.1:6379> bitfield w get i3 2  # 从第三个位开始取 3 个位,结果是有符号数 (i)
1) (integer) -3

101  -> 1开头表示负数  -> 减1 -> 100 取反 011 -> 3 -> 加负号 -3

a、一次执行多个指令

127.0.0.1:6379> bitfield w get u4 0 get u3 2 get i4 0 get i3 2 
1) (integer) 6
2) (integer) 5
3) (integer) 6
4) (integer) -3

b、set:修改字符

然后我们使用 set 子指令将第二个字符 e 改成 a,a 的 ASCII 码是 97。

127.0.0.1:6379> bitfield w set u8 8 97  #从第9个位开始,将接下来的 8 个位用无符号数 97 替换 
1) (integer) 101
127.0.0.1:6379> get w
"hallo"

c、incrby:自增

再看第三个子指令 incrby,它用来对指定范围的位进行自增操作。既然提到自增,就有可能出现溢出,bitfield 指令提供了溢出策略子指令 overflow,用户可以选择溢出行为。

⬤ 如果增加了正数,会出现上溢

⬤ 如果增加的是负数,就会出现下溢出。

1、折返 (wrap)

Redis 默认的处理是折返。如果出现了溢出,就将溢出的符号位丢掉。

◯ 如果是 8 位无符号数 255, 加 1 后就会溢出,会全部变零。

◯ 如果是 8 位有符号数 127,加 1 后就会溢出变成 -128

127.0.0.1:6379> set w hello
OK
127.0.0.1:6379> bitfield w incrby u4 2 1  # 从第三个位开始,对接下来的 4 位无符号数 +1
1) (integer) 11
127.0.0.1:6379> bitfield w incrby u4 2 1 
1) (integer) 12
127.0.0.1:6379> bitfield w incrby u4 2 1 
1) (integer) 13
127.0.0.1:6379> bitfield w incrby u4 2 1
1) (integer) 14
127.0.0.1:6379> bitfield w incrby u4 2 1
1) (integer) 15

127.0.0.1:6379> bitfield w incrby u4 2 1 # 溢出折返了 
1) (integer) 0

2、饱和截断 SAT

饱和截断 (sat),超过了范围就停留在最大/最小值。`

127.0.0.1:6379> set w hello
OK
127.0.0.1:6379> bitfield w overflow sat incrby u4 2 1 # 从第三个位开始,对接下来的 4 位无符号数 +1
1) (integer) 11
127.0.0.1:6379> bitfield w overflow sat incrby u4 2 1
1) (integer) 12
127.0.0.1:6379> bitfield w overflow sat incrby u4 2 1
1) (integer) 13
127.0.0.1:6379> bitfield w overflow sat incrby u4 2 1
1) (intege) 14
127.0.0.1:6379> bitfield w overflow sat incrby u4 2 1
1) (integer) 15
127.0.0.1:6379> bitfield w overflow sat incrby u4 2 1 # 保持最大值
1) (integer) 15

3、失败不执行 FAIL

失败 (fail) 报错不执行

127.0.0.1:6379> set w hello
OK
127.0.0.1:6379> bitfield w overflow fail incrby u4 2 1 1) (integer) 11
127.0.0.1:6379> bitfield w overflow fail incrby u4 2 1 1) (integer) 12
127.0.0.1:6379> bitfield w overflow fail incrby u4 2 1 1) (integer) 13
127.0.0.1:6379> bitfield w overflow fail incrby u4 2 1 1) (integer) 14
127.0.0.1:6379> bitfield w overflow fail incrby u4 2 1 1) (integer) 15
127.0.0.1:6379> bitfield w overflow fail incrby u4 2 1 # 不执行
1) (nil)

二、BitMap

1、BitMap 简介

1)优点

1、节约内存:针对海量数据的存储,可以极大的节约存储成本!当需要存储一些很大,且无序,不重复的整数集合,那使用 Bitmap 的存储成本是非常低的。

2、天然去重: 因为bitmap 每个值都只对应唯一的一个位置,不能存储两个值,所以 Bitmap 结构天然适合做去重操作。

3、快速定位: 同样因为其下标的存在,可以快速定位数据!比如想判断数字 ` 99999 是否存在于该 bitmap中,若是使用传统的集合型存储,那就要逐个遍历每个元素进行判断,时间复杂度为 O(N)。而由于采用 Bitmap存储,只要查看对应的下标数的值是0还是1即可,时间复杂度为 O(1)。所以使用 bitmap可以非常方便快速的查询某个数据是否在bitmap`中。

4、二进制判断: 还有因为其类集合的特性,对于一些集合的交并集等操作也可以支持!比如想查询[1,2,3]与[3,4,5] 两个集合的交集,用传统方式取交集就要两层循环遍历。而 Bitmap实现的底层原理,就是把 01110000 00011100 进行与操作就行了。而计算机做与、或、非、异或等等操作是非常快的。

2)缺点

1、类型单一:只能存储正整数而不能是其他的类型;

2、稀疏存储不适合:不适合存储稀疏的集合,比如,一个集合存放了两个数 [1,99999999] ,那用 bitmap 存储的话就很不划算,这也与它本来节约存储的优点也背离了;

3:重复数据无法存储:不适用于存储重复的数据

3)优化

既然 bitmap 的优点如此突出,那应该如何去优化它存在的一些局限呢?

1、 针对存储非正整数的类型,如字符串类型的,可以考虑将字符串类型的数据利用类似 hash的方法,映射成整数的形式来使用bitmap ,但是这个方法会有 hash 冲突的问题,解决这个可以优化 hash 方法,采用多重 hash 来解决,但是根据经验,这个效果都不太好,通常的做法就是针对字符串建立映射表的方式

2、针对 bitmap 的优化最核心的还是对于其存储成本的优化,毕竟大数据领域里面,大多数时候数据都是稀疏数据,而我们又经常需要使用到 bitmap 的特长,比如去重等属性,所以存在一些进一步的优化,比较知名的有WAHEWAHRoaringBitmap等,其中性能最好并且应用最为广泛的当属RoaringBitmap

2、实现方案 - BitSet

缺点1:只能处理 32 2^32 =4294967296

缺点2:不适合存储稀疏的集合,比如,一个集合存放了两个数 [1,99999999] ,那用 它 存储的话就很不划算

package com.hlj.util.Z005BitSet;

import org.junit.Test;

import java.util.Arrays;
import java.util.BitSet;

/**
 * BitSet的应用场景  海量数据去重、排序、压缩存储
 * BitSet的基本操作   and(与)、or(或)、xor(异或)
 * Bitset,也就是位图,由于可以用非常紧凑的格式来表示给定范围的连续数据而经常出现在各种算法设计中。
 * 基本原理是,用1位来表示一个数据是否出现过,0为没有出现过,1表示出现过。使用用的时候既可根据某一个是否为0表示此数是否出现过。
 */
public class D01BitSet {

    /**
     * 给定一个的整数数组,请找到其中缺少的数字。
     */
    private void printMissingNumber(int[] numbers, int count) {
        System.out.println("一共有" + count + "个数据:目前数组为" + Arrays.toString(numbers));
        // 一共有10个数据:目前数组为[1, 2, 3, 4, 6, 9, 8]


        // 一共有多少数字,
        BitSet bitSet = new BitSet(count);
        // 将指定索引处的位设置为 true。
        for (int number : numbers) {
            bitSet.set(number - 1);
        }


        //一共丢失了多少位
        int missingCount = count - numbers.length;
        int lastMissingIndex = 0; // 索引从0开始
        for (int i = 0; i < missingCount; i++) {
            //返回第一个设置为 false 的位的索引,这发生在指定的起始索引或之后的索引上。
            lastMissingIndex = bitSet.nextClearBit(lastMissingIndex);
            System.out.println(++lastMissingIndex);
        }
    }

    @Test
    public void start() {
        // 丢失3个数据
        printMissingNumber(new int[]{1, 2, 3, 4, 6, 9, 8}, 10);

        // 丢失1个数据
        printMissingNumber(new int[]{1, 2, 3, 5}, 5);

    }

}

3、实现方案- RoaringBitmap 压缩

为了解决 BitMap 存储较为稀疏数据时,浪费存储空间较多的问题,我们引入了稀疏位图索引 Roaring BitMapRoarinBitMap 有较高的计算性能及压缩效率。 RoaringBitmap 是一种高效的 Bitmap 压缩算法,目前已被广泛应用在各种语言和各种大数据平台。适合计算超高基维的,常用于去重、标签筛选、时间序列等计算中。

1)简介

a、数据分桶

RoaringBitmap32 位无符号整数按照高 16 位进行分桶,即最多可能有 2^16=65536 个桶,每个桶称为一个 container

⬤ 存储数据时,根据数据的高 16 位找到对应的桶(container),如果找不到则新建一个桶,然后将低 16位放入该桶中。

假设我们有一个 32 位无符号整数 N = 6200(其二进制表示为 00000000 00000000 00011000 10110000

计算高16位

16 位可以通过将整数右移 16 位来得到。右移操作会丢弃整数的低 16 位,并将高 16 位移到低 16 位的位置, high_16_bits = N >> 16, 对于 N = 6200,因为 6200 小于 2^16(即 65536 ),所以它的高 16 位都是 0。因此,high_16_bits 的结果是 0

计算低16位:

16 位可以通过将整数与 0xFFFF(即二进制的 11111111 11111111)进行位与操作来得到。但是,对于 6200 来说,由于它的高 16 位都是0,所以直接取它的值就是它的低 16 位(但这不是一个通用的方法,仅适用于这种情况)。通用的方法是:low_16_bits = N & 0xFFFF; 对于 N = 6200low_16_bits 就是 6200 本身。

b、容器类型

RoaringBitmap 中,容器(Container)是用于存储整数的数据结构,主要有三种类型

  • 数组容器( Array Container用于存储稀疏数据。当 container 中的 Integer 数量小于 4096 时,使用 Short类型的有序数组来存储值。由于 short2 字节,因此 n 个数据占用 2n 字节。当数据量达到 4096 时,会转换为BitmapContainer 。。
  • 位图容器(Bitmap Container用于存储稠密数据。无论 container 中存储了多少个数据,其占用的空间始终是 8KB(因为需要 65536bit 来表示 065535 的所有可能值)。。
  • 运行长度编码容器(Run Container采用行程长度编码Run Length Encoding , RLE )来压缩连续的数据。对于连续出现的数字,只记录初始数字和后续数量,从而节省存储空间。。

数组容器( Array Container

举例:假设我们需要存储一个整数集合,其中包含的元素相对较少且分布不均。例如,集合中只包含少数几个整数,如{1, 1000, 20000}。在这个例子中,我们可以使用 ArrayContainer 来存储这些元素。 ArrayContainer 内部使用一个有序的 Short 数组来存储数据,这样可以方便地进行二分查找,提高查询效率。

位图容器(Bitmap Container

举例:如果我们需要存储一个包含大量整数的集合,且这些整数在较低的 16 位范围内分布较为密集,例如集合中包含大量连续的整数或者整数分布较为均匀。在这种情况下,使用 BitmapContainer 会更高效。BitmapContainer 内部使用一个固定长度的 Long 数组来表示位图,每个 bit 位表示一个可能的整数值是否存在。由于 BitmapContainer 的固定长度为 1024Long(即 65536 个bit),因此无论存储多少数据,其占用的空间都是固定的( 8KB)。

运行长度编码容器(Run Container

举例:当需要存储一系列连续的整数时,RunContainer 会非常有用。例如,集合中包含连续的整数序列{11, 12, 13, 14, 15, 27, 28, 29}。RunContainer 使用行程长度编码( Run Length Encoding , RLE )来压缩这些数据,只记录序列的起始值和长度。在这个例子中,序列会被压缩为两个二元组(11,4)和(27,2 ),表示从 11开始有4个连续递增的值,从27开始有2个连续递增的值。这种压缩方式可以大大减少存储空间,并提高查询效率。

c、高效存储与查询

⬤ 通过将数据分桶并使用不同类型的容器存储,RoaringBitmap 能够高效地存储和查询大量数据。

⬤ 在查询时,首先根据数值的高 16 位找到对应的桶,然后在桶中使用适当的容器进行查找。

16 位作为索引查找具体的数据块,当前索引值为 0 ,低 16 位作为 value 进行存储。

RoaringBitMap在进行数据存储时,会先根据高 16 位找到对应的索引 key (二分查找),低 16 位作为 key 对应的 value,先通过 key 检查对应的 container 容器,如果发现 container 不存在的话,就先创建一个 key 和对应的 container,否则直接将低 16 位存储到对应的 container 中。

image-20240913113840162

**举例说明 **

1、数据准备

假设数据包括:[11, 12, 13, 14, 15, 27, 28, 29, 6200, 6202, 6204, …]

2、分桶与存储

⬤ 以整数 6200 为例,其 16 进制表示为 00001838,高 16 位为0000,低16位为 1838

⬤ 首先,根据高160000 找到对应的桶(假设这是第一个桶)。

⬤ 然后,检查该桶中是否已经存在,如果不存在则创建一个新的 ArrayContainer

⬤ 将低 161838(即十进制中的 6200 )添加到 ArrayContainer 中。

⬤ 对于连续的整数序列(如 1115 ),这些可能会存储在 RunContainer 中,以减少存储空间。

3、查询操作

⬤ 当需要查询某个数值(如 6202 )是否存在时,同样先根据高 16 位找到对应的桶。

⬤ 然后在桶中使用相应的容器进行查找。

⬤ 如果数值存在,则返回结果;如果不存在,则表示该数值不在集合中。

e、问题

1、超过 32 位的二进制怎么处理

⬤ 分片处理:可以将 64 亿数据分成多个较小的子集,每个子集包含一定数量的 32 位无符号整数,每个子集可以单独使用一个RoaringBitmap 实例来管理。这种分片处理的方式可以并行处理数据,提高处理效率,并且更容易在现有硬件上实现,但是如果是Long型且无规律,则没办法,只能用下面这种了

扩展 RoaringBitmap对于 64 位整数,由于 long 型整数通常是 64位的,而传统的 RoaringBitmap 是为 32 位整数设计的,因此不能直接使用 32 位的 RoaringBitmap 来处理 long 型的整数。但可以通过一些扩展或修改来支持 64 位整数。可以将 64 位整数拆分为高 32 位和低 32 位,分别使用两个 RoaringBitmap(或类似的位图数据结构)来管理。在查询或操作时,需要同时考虑这两个部分。再或者可以寻找支持 64 位整数的类似 RoaringBitmap 的数据结构,如 Roaring64NavigableBitmap 等;当处理 long 型的整数时,

2、用布隆过滤器还是压缩位图?

布隆过滤器:其内存占用主要取决于位数m 和哈希函数的数量 k。通过调整这些参数,可以控制布隆过滤器的内存占用和误报率。

RoaringBitmap:其内存占用取决于整数的分布情况和所使用的容器的类型。对于稀疏的整数集合,RoaringBitmap 可能比布隆过滤器更节省内存;但对于稠密的整数集合,情况可能有所不同。

2)使用

@Test
public void test() {
    // 创建两个 RoaringBitmap 实例
    RoaringBitmap bitmap1 = new RoaringBitmap();
    RoaringBitmap bitmap2 = new RoaringBitmap();

    // 向 bitmap1 添加元素
    bitmap1.add(1);
    bitmap1.add(100);
    bitmap1.add(1000);

    // 向 bitmap2 添加元素
    bitmap2.add(100);
    bitmap2.add(200);
    bitmap2.add(1000);


    // 执行交集操作
    RoaringBitmap intersection = RoaringBitmap.and(bitmap1, bitmap2);
    System.out.println("Intersection: " + intersection); // 应该包含 100 和 1000
    // Intersection: {100,1000}

    // 执行差集操作 (bitmap1 - bitmap2)
    RoaringBitmap difference = RoaringBitmap.andNot(bitmap1, bitmap2);
    System.out.println("Difference (bitmap1 - bitmap2): " + difference); // 应该只包含 1
    // Difference (bitmap1 - bitmap2): {1}

    // 检查元素是否存在
    boolean contains100 = bitmap1.contains(100);
    System.out.println("Does bitmap1 contain 100? " + contains100); // 输出 true
    // Does bitmap1 contain 100? true

    // 输出 bitmap1 的内容
    System.out.println("Bitmap1: " + bitmap1);
    // Bitmap1: {1,100,1000}
}

4、实现方案- 自定义压缩 Bitmap

1) ArrayContainer

public class ArrayContainer {


    public byte[] values;

    public ArrayContainer(int initialCapacity) {
        this.values = new byte[initialCapacity];
    }
}

2)CompressedBitmap

public class CompressedBitmap {

    public static final char Y = 1;

    public static final char N = 0;
    public static final int ARRAY_CONTAINER_SIZE = 16;

    ArrayContainer[] containers;


    /**
     * 初始化容量(计算容器数量,当前按照一个容器16进行拆分)
     *
     * @param initialCapacity initialCapacity
     * @return {@link }
     */
    public CompressedBitmap(long initialCapacity) {
        int containersSize = (int) (initialCapacity / ARRAY_CONTAINER_SIZE);
        this.containers = new ArrayContainer[containersSize];
    }

    /**
     * bitMap添加数据
     *
     * @param offset offset
     */
    public void add(long offset) {

        // 1、根据偏移量定位容器所在分片
        int shardIndex = getShardIndex(offset);
        ArrayContainer container = containers[shardIndex];
        if (container == null) {
            container = new ArrayContainer(ARRAY_CONTAINER_SIZE);
        }

        // 2、根据偏移量定位容器内部位置
        int shardOffset = getShardOffset(offset);
        container.values[shardOffset] = Y;
        containers[shardIndex] = container;
    }


    /**
     * 获取偏移量数据
     *
     * @param offset offset
     * @return 获取值(1 || 0)
     */
    public int get(long offset) {
        int shardIndex = getShardIndex(offset);
        ArrayContainer container = containers[shardIndex];
        if (container == null) {
            return N;
        }

        int shardOffset = getShardOffset(offset);
        if (Y == container.values[shardOffset]) {
            return Y;
        }
        return N;
    }

    /**
     * 根据偏移量定位容器所在分片
     *
     * @param offset offset
     * @return {@link int}
     */
    public int getShardIndex(long offset) {
        return (int) ((offset - 1) / ARRAY_CONTAINER_SIZE);
    }

    /**
     * 根据偏移量定位容器内部位置
     *
     * @param offset offset
     * @return {@link int}
     */
    public int getShardOffset(long offset) {
        return (int) (offset % ARRAY_CONTAINER_SIZE);
    }


}

3)使用

@Test
public void test(){
    CompressedBitmap bitmap = new CompressedBitmap(64);
    bitmap.add(63);
    System.out.println(bitmap.get(63));
    // 1
    System.out.println(bitmap.get(64));
    // 0
    System.out.println(bitmap.get(2));
    // 0
}

5、实现方案- Redis 实现压缩

Redis 本身是支持 Bitmap 存储的,但是压缩的 Bitmap 是不支持的。根据上面的原理,同样可以把一个大的 Bitmap 转成一个个小的 Bitmap 来达到压缩的目的。

1)方案实现

a、CompressedBitInfo

@Data
public class CompressedBitInfo {

    /**
     * 真实offset
     */
    private long sourceOffset;

    /**
     * 分桶的编号
     */
    private long bucketIndex;

    /**
     * 桶内的offset
     */
    private long bucketOffset;

    /**
     * key
     */
    private String bitKey;

    /**
     * bitValue
     */
    private Boolean bitValue;

}

b、CompressedBizBO

@Data
public class CompressedBizBO {

    /**
     * key
     */
    private String key;

    /**
     * 过期时间
     */
    private Integer expireSeconds;

    /**
     * 桶大小
     */
    private Long bucketSize;

}

c、CompressedBitUtils

@Slf4j
public class CompressedBitUtils {

    /**
     * 设置压缩位图在offset上的值,并且设置过期时间(秒)
     */
    public static CompressedBitInfo setCompressedBit(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long offset) {
        return setCompressedBit(compressedInfoEnum, offset, true);
    }



    /**
     * 删除压缩位图在offset上的值(相当于设置为false)
     */
    public static CompressedBitInfo remCompressedBit(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long offset) {
        return setCompressedBit(compressedInfoEnum, offset, false);
    }


    /**
     * 设置压缩位图在offset上的值,并且设置过期时间(秒)
     */
    public static CompressedBitInfo setCompressedBit(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long offset, boolean value) {
        FilterCompressedConfiguration filterCompressedConfiguration = SpringUtils.getBean(FilterCompressedConfiguration.class);
        CompressedBizBO compressedBiz = filterCompressedConfiguration.getCompressedBizMap().get(compressedInfoEnum.getCode());

        CompressedBitInfo bitInfo = getBitInfo(compressedInfoEnum, offset);
        String bitKey = bitInfo.getBitKey();
        RedisService redisService = SpringUtils.getBean(RedisService.class);
        redisService.setBit(bitKey, bitInfo.getBucketOffset(), value);
        redisService.expire(bitKey, compressedBiz.getExpireSeconds());

        long sourceOffset = getSourceOffset(compressedInfoEnum, bitInfo.getBucketIndex(), bitInfo.getBucketOffset());
        bitInfo.setSourceOffset(sourceOffset);
        bitInfo.setBitValue(value);

        return bitInfo;
    }
    /**
     * 获取压缩位图在offset上的值
     */
    public static CompressedBitInfo getCompressedBit(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long offset) {
        CompressedBitInfo bitInfo = getBitInfo(compressedInfoEnum, offset);
        String bitKey = bitInfo.getBitKey();
        log.debug("getCompressedBit with key:{}, offset:{}", bitKey, bitInfo.getBucketOffset());
        RedisService redisService = SpringUtils.getBean(RedisService.class);
        Boolean bitValue = redisService.getBit(bitKey, bitInfo.getBucketOffset());
        long sourceOffset = getSourceOffset(compressedInfoEnum, bitInfo.getBucketIndex(), bitInfo.getBucketOffset());
        bitInfo.setSourceOffset(sourceOffset);
        bitInfo.setBitValue(bitValue);
        return bitInfo;
    }

    /**
     * 获取压缩位图每个小桶的子key集合
     */
    public static List<String> getAllBucketKeys(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long maxOffset) {
        List<String> result = Lists.newArrayList();
        FilterCompressedConfiguration filterCompressedConfiguration = SpringUtils.getBean(FilterCompressedConfiguration.class);
        CompressedBizBO compressedBiz = filterCompressedConfiguration.getCompressedBizMap().get(compressedInfoEnum.getCode());

        CompressedBitInfo bitInfo = getBitInfo(compressedInfoEnum, maxOffset);
        for (int i = 0; i <= bitInfo.getBucketIndex(); i++) {
            String bitKey = getBitKey(compressedBiz.getKey(), i);
            result.add(bitKey);
        }
        return result;
    }

    /**
     * 删除所有桶里的的Bitmap
     */
    public static void deleteAllCompressedBit(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long maxOffset) {
        FilterCompressedConfiguration filterCompressedConfiguration = SpringUtils.getBean(FilterCompressedConfiguration.class);
        CompressedBizBO compressedBiz = filterCompressedConfiguration.getCompressedBizMap().get(compressedInfoEnum.getCode());

        CompressedBitInfo bitInfo = getBitInfo(compressedInfoEnum, maxOffset);
        for (int i = 0; i < bitInfo.getBucketIndex(); i++) {
            String bitKey = getBitKey(compressedBiz.getKey(), i);
            RedisService redisService = SpringUtils.getBean(RedisService.class);
            redisService.expire(bitKey, 0);
        }
    }


    /**
     * 将java中的bitmap转换为redis的字节数组
     */
    private static byte[] getByteArray(List<Long> bits) {
        Iterator<Long> iterator = bits.iterator();
        BitSet bitSet = new BitSet();
        while (iterator.hasNext()) {
            long offset = iterator.next();
            bitSet.set((int) offset);
        }
        byte[] targetBitmap = bitSet.toByteArray();
        convertJavaToRedisBitmap(targetBitmap);
        return targetBitmap;
    }

    /**
     * 将java中的字节数组转换为redis的bitmap数据形式
     *
     * @param bytes
     */
    private static void convertJavaToRedisBitmap(byte[] bytes) {
        int len = bytes.length;
        for (int i = 0; i < len; i++) {
            byte b1 = bytes[i];
            if (b1 == 0) {
                continue;
            }
            byte transByte = 0;
            for (byte j = 0; j < 8; j++) {
                transByte |= (b1 & (1 << j)) >> j << (7 - j);
            }
            bytes[i] = transByte;
        }
    }


    /**
     * getBitInfo
     */
    public static CompressedBitInfo getBitInfo(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long sourceOffset) {
        FilterCompressedConfiguration filterCompressedConfiguration = SpringUtils.getBean(FilterCompressedConfiguration.class);
        CompressedBizBO compressedBiz = filterCompressedConfiguration.getCompressedBizMap().get(compressedInfoEnum.getCode());

        CompressedBitInfo bucketInfo = new CompressedBitInfo();
        bucketInfo.setSourceOffset(sourceOffset);

        long bucketSize = compressedBiz.getBucketSize();
        long bucketIndex = sourceOffset / bucketSize;
        bucketInfo.setBucketIndex(bucketIndex);

        long bucketOffset = sourceOffset % bucketSize;
        bucketInfo.setBucketOffset(bucketOffset);

        String bitKey = getBitKey(compressedBiz.getKey(), bucketIndex);
        bucketInfo.setBitKey(bitKey);
        return bucketInfo;
    }


    public static String getBitKey(String key, long bucketIndex) {
        return new StringBuffer(key).append("_").append(bucketIndex).toString();
    }

    /**
     * getSourceOffset
     *
     * @param bucketIndex  bucketIndex
     * @param bucketIndex  bucketIndex
     * @param bucketOffset bucketOffset
     * @return {@link long}
     */
    public static long getSourceOffset(CompressedEnum.CompressedInfoEnum compressedInfoEnum, long bucketIndex, long bucketOffset) {
        FilterCompressedConfiguration filterCompressedConfiguration = SpringUtils.getBean(FilterCompressedConfiguration.class);
        CompressedBizBO compressedBiz = filterCompressedConfiguration.getCompressedBizMap().get(compressedInfoEnum.getCode());
        return bucketIndex * compressedBiz.getBucketSize() + bucketOffset;
    }

}

d、CompressedEnum

public interface CompressedEnum {


    /**
     * CompressedKeyEnum
     */
    @Getter
    @AllArgsConstructor
    enum CompressedInfoEnum implements CompressedEnum {

        DEFAULT("default", "默认"),
        ;

        /**
         * key
         */
        private final String code;


        /**
         * 描述
         */
        private final String desc;
    }
}

e、FilterCompressedConfiguration

@Slf4j
@Data
@Configuration
@ConfigurationProperties("filter")
public class FilterCompressedConfiguration {


    /**
     * 压缩过滤器配置
     */
    private Map<String, CompressedBizBO> compressedBizMap;

}

2)Redis 高效写入

由于直接 setbit 的方式写入数据耗时太长,因此探索了如下方式:一次写入一个字节数组的数据,增加每次的写入量,并减少写入次数,最终达到快速写入的目的。

1、首先,将 bitmap 进行拆分,生成一个个小的 bitmap经过测试小 bitmap 的大小维持在 65536 时比较合理

2、然后,将 Java Bitmap 转换为 Java 字节,再将 Java 字节转换为 Redis 字节,通过 Set key byte的方式写入 Redis 缓存。

问题1:为什么要将 Java 字节转换为 Redis 字节,然后再写入呢?直接将 JavaBitSet 使用 toByteArray() 转换成字节数组,然后写入到 Redis 中不行么?

答案:实际上写入是可以的,但是读取数据的时候,就不能直接使用 Redisgetbit 方法来直接判断指定偏移量 offset 的值了,因为接口性能还需要 getbit 方法来保障。

**问题2:那为什么不能直接写入并读取 **

答案:这是因为 Java 字节和 Redis 字节在存储结构上不相同 ,一个是大端存储,一个是小端存储,,一个 8 位的 byte00000010Java 中表示 2 ,就是说从右向左是第二位为true ;但是 Redis 中是从左向右读,表示偏移量为 6 的为 true,正好相反。

image-20240914112939921

/**
 * 将java中的bitmap转换为redis的字节数组
 *
 */
private byte[] getByteArray(List<Long> bits) {
    Iterator<Long> iterator = bits.iterator();
    BitSet bitSet = new BitSet();
    while (iterator.hasNext()) {
        long offset = iterator.next();
        bitSet.set((int) offset);
    }
    byte[] targetBitmap = bitSet.toByteArray();
    convertJavaToRedisBitmap(targetBitmap);
    return targetBitmap;
}

/**
 * 将java中的字节数组转换为redis的bitmap数据形式
 *
 * @param bytes
 */
private static void convertJavaToRedisBitmap(byte[] bytes) {
    int len = bytes.length;
    for (int i = 0; i < len; i++) {
        byte b1 = bytes[i];
        if (b1 == 0) {
            continue;
        }
        byte transByte = 0;
        for (byte j = 0; j < 8; j++) {
            transByte |= (b1 & (1 << j)) >> j << (7 - j);
        }
        bytes[i] = transByte;
    }
}

3)数据加载策略(42.94亿)

65536 个桶,每个桶 65536offsetID40 亿 大概 500M (2^32/8/1024/1024)左右。

优点:

1、单值小:每个 bitmap 最多有 65536Offsetbit 位),相当于最大 8kb ,不会在缓存中产生较大 key 而导致性能差

2、写入快:每个key 一次性写入到缓存,但是最多可以含有 65536offsetID)42.94 亿群体只需要 65536 次写入

3、查询效率高getbit 支持查询 key 下面每个 offset ,时间复杂度是 O(1)

4、拆分压缩数据:拆分多个小的 bitmap ,相当于是对 bitmap 进行了压缩

ContactAuthor