大数据Doris之_5_数据更新
前言
Github:https://github.com/HealerJean
一、数据更新
1、数据更新概述
数据更新是指对具有相同
key的数据记录中的value列进行修改。对于不同的数据模型,数据更新的处理方式有所不同:
- 主键(
Unique)模型:- 存储方式:
Doris支持两种存储方式:Merge-on-Read(MoR)和Merge-on-Write(MoW)。MoR优化了写入性能,而MoW则提供了更好的分析性能 - 更新方式:支持使用
UPDATE语句进行少量数据更新,也支持通过StreamLoad、BrokerLoad、RoutineLoad和InsertInto等导入方式进行批量更新。 - 更新语义:所有导入操作遵循 “
UPSERT” 语义,即记录不存在则插入,存在则更新。 - 更新范围:支持整行更新和部分列更新,默认为整行更新。
- 存储方式:
- 聚合(
Aggregate)模型:在聚合模型中,数据更新是一种特殊用法。- 更新实现条件:当聚合函数设置为
REPLACE或REPLACE_IF_NOT_NULL时,可实现数据更新。 - 更新方式限制:仅支持基于导入方式的更新,不支持使用
UPDATE语句。 - 部分列更新:通过设置聚合函数为
REPLACE_IF_NOT_NULL,可实现部分列更新的能力。
- 更新实现条件:当聚合函数设置为
2、不同模型/实现的更新能力对比
Unique Key MoW |
Unique Key MoR |
Aggregate Key |
|
|---|---|---|---|
| 导入速度 | 导入过程中进行数据去重,小批量实时写入相比 MoR 约有 10% - 20% 的性能损失,大批量导入(例如千万级/亿级数据)相比 MoR 约有 30% - 50% 的性能损失 |
与 Duplicate Key 接近 |
与 Duplicate Key 接近 |
| 查询速度 | 与 Duplicate Key 接近 |
需要在查询期间进行去重,查询耗时约为 MoW 的 3-10 倍 |
如果聚合函数为 REPLACE/REPLACE_IF_NOT_NULL,查询速度与 MoR 接近 |
| 谓词下推 | 支持 | 不支持 | 不支持 |
| 资源消耗 | 导入资源消耗:相比 Duplicate Key/Unique Key MoR,约额外消耗 10% - 30% 的 CPU |
导入资源消耗:与 Duplicate Key 相近,无额外资源消耗。 |
与 Unique Key MoR 相同 |
查询资源消耗:与 Duplicate Key 接近,无额外资源消耗。 |
查询资源消耗:相比 Duplicate Key/Unique Key MoW,查询时额外消耗更多的 CPU 和内存。 |
与 Unique Key MoR 相同 |
|
Compaction 资源消耗:相比 Duplicate Key,消耗更多内存和 CPU,具体取决于数据特征和数据量。 |
Compaction 资源消耗:相比 Duplicate Key,需更多内存和 CPU,具体数值取决于数据特征和数据量。 |
与 Unique Key MoR 相同 |
3、功能支持对比
Unique Key MoW |
Unique Key MoR |
Aggregate Key |
|
|---|---|---|---|
UPDATE |
支持 | 支持 | 不支持 |
DELETE |
支持 | 支持 | 不支持(key 列支持) |
sequence 列 |
支持 | 支持 | 不支持 |
delete_sign 删除标记 |
支持 | 支持 | 不支持 |
| 部分列更新 | 支持 | 不支持 | 支持 (但无法更新 null 值) |
| 倒排索引 | 支持 | 不支持(Key 列上建立倒排索引) |
不支持(Key 列上建立倒排索引) |
二、主键模型的 Update 更新
1、适用场景
- 小范围数据更新:适用于更新少量数据的场景,例如修复某些记录中的错误字段,或更新某些字段的状态(如订单状态更新等)。
ETL(Extract(提取)、Transform(转换)、Load(加载))批量加工部分字段:适用于大批量更新某个字段,常见于ETL加工场景。注意:大范围数据更新仅适合低频调用。
2、基本原理
1、利用查询引擎自身的 where 过滤逻辑,从待更新表中筛选出需要被更新的行。
2、再利用 Unique 模型自带的 Value 列新数据替换旧数据的逻辑,将待更新的行变更后,再重新插入到表中,从而实现行级别更新。
1)同步
Update 语法在 Doris 中是一个同步语法,即 Update 语句执行成功,更新操作也就完成了,数据是可见的。
2)性能
Update语句的性能和待更新的行数以及查询条件的检索效率密切相关。
- 待更新的行数:待更新的行数越多,
Update语句的速度就会越慢。- 对于小范围更新,
Doris支持的频率与INSERT INTO语句类似 - 对于大范围更新,由于单个
update执行的时间较长,仅适用于低频调用。
- 对于小范围更新,
- 查询条件的检索效率:
Update实现原理是先将满足查询条件的行做读取处理,所以如果查询条件的检索效率高,则Update的速度也会快。- 条件列最好能命中索引或者分区分桶裁剪,这样 就不需要全表扫描,可以快速定位到需要更新的行,从而提升更新效率。
- 强烈不推荐条件列中包含
value列。
3、使用示例
假设在金融风控场景中,存在如下结构的交易明细表:
CREATE TABLE transaction_details (
transaction_id BIGINT NOT NULL, -- 唯一交易编号
user_id BIGINT NOT NULL, -- 用户编号
transaction_date DATE NOT NULL, -- 交易日期
transaction_time DATETIME NOT NULL, -- 交易时间
transaction_amount DECIMAL(18, 2), -- 交易金额
transaction_device STRING, -- 交易设备
transaction_region STRING, -- 交易地区
average_daily_amount DECIMAL(18, 2), -- 最近 3 个月日均交易金额
recent_transaction_count INT, -- 最近 7 天交易次数
has_dispute_history BOOLEAN, -- 是否有拒付记录
risk_level STRING -- 风险等级
)
UNIQUE KEY(transaction_id)
DISTRIBUTED BY HASH(transaction_id) BUCKETS 16
PROPERTIES (
"replication_num" = "3", -- 副本数量,默认 3
"enable_unique_key_merge_on_write" = "true" -- 启用 MOW 模式,支持合并更新
);
按照如下风控规则来更新每日所有交易记录的风险等级:
- 有拒付记录,风险为
high。 - 在高风险地区,风险为
high。 - 交易金额异常(超过日均
5倍),风险为high。 - 最近 7 天交易频繁:
- 交易次数 >
50,风险为 high。 - b. 交易次数在 20-50 之间,风险为 medium。
- 交易次数 >
- 非工作时间交易(凌晨 2 点到 4 点),风险为 medium。
- 默认风险为
low。
UPDATE transaction_details
SET risk_level = CASE
-- 有拒付记录或在高风险地区的交易
WHEN has_dispute_history = TRUE THEN 'high'
WHEN transaction_region IN ('high_risk_region1', 'high_risk_region2') THEN 'high'
-- 突然异常交易金额
WHEN transaction_amount > 5 * average_daily_amount THEN 'high'
-- 最近 7 天交易频率很高
WHEN recent_transaction_count > 50 THEN 'high'
WHEN recent_transaction_count BETWEEN 20 AND 50 THEN 'medium'
-- 非工作时间交易
WHEN HOUR(transaction_time) BETWEEN 2 AND 4 THEN 'medium'
-- 默认风险
ELSE 'low'
END
WHERE transaction_date = '2024-11-24';
三、主键模型的导入更新
1、整行更新
使用
Doris支持的StreamLoad、BrokerLoad、RoutineLoad、InsertInto等导入方式,向主键模型(Unique模型)导入数据时,Doris主键模型的导入是一种“upsert”模式。基于导入,对已有记录的更新,
- 如果没有相应主键的数据行,则插入新数据;
- 如果有相应主键的数据行,则进行更新
| 配置项 | 状态 | 是否默认 | 部分列更新时的行为 |
|---|---|---|---|
enable_unique_key_partial_update |
false |
默认 | 整行 UPSERT:未指定列被设为默认值,覆盖原记录。 |
enable_unique_key_partial_update |
true |
部分列更新:仅修改指定列,其他列保留原值,但受 enable_insert_strict 限制。 |
|
enable_insert_strict |
true |
默认 | 严格模式:部分列更新时仅允许修改已存在的主键,禁止插入新主键。 |
enable_insert_strict |
false |
非严格模式:部分列更新时,若主键不存在则插入新记录(类似 UPSERT)。 |
2、部分列更新
Doris在主键模型的导入更新中,提供了直接插入或更新部分列数据的功能,不需要先读取整行数据,从而大幅提升更新效率。
1)使用限制
- 2.0 版本仅在
UniqueKey的Merge-on-Write实现中支持部分列更新能力。 - 从 2.0.2 版本开始,支持使用
INSERTINTO进行部分列更新。 - 不支持在有同步物化视图的表上进行部分列更新。
- 不支持在进行
SchemaChange的表上进行部分列更新。
2)适用场景
- 实时动态列更新:需要在表中实时高频更新某些字段值
- 例如用户标签表中有一些关于用户最新行为信息的字段需要实时更新,以便广告/推荐系统能够据此进行实时分析和决策。
- 将多张源表拼接成一张大宽表。
- 数据修正。
3)使用示例
假设 Doris 中存在一张订单表 order_tbl,其中订单 id 是 Key 列,订单状态和订单金额是 Value 列。数据状态如下:此时,用户点击付款后,Doris 系统需要将订单 id 为 ‘1’ 的订单状态变更为 ‘待发货’。
| 订单 id | 订单金额 | 订单状态 |
|---|---|---|
| 1 | 100 | 待付款 |
a、StreamLoad / BrokerLoad / RoutineLoad
准备如下 csv 文件:
1,待发货
在导入时添加如下 header:
partial_columns:true
同时在 columns 中指定要导入的列(必须包含所有key 列,否则无法更新)。下面是一个 Stream Load 的例子:
curl --location-trusted -u root: -H "partial_columns:true" -H "column_separator:," -H "columns:order_id,order_status" -T /tmp/update.csv http://127.0.0.1:8030/api/db1/order_tbl/_stream_load
b、INSERT INTO
在所有数据模型中,INSERT INTO 给定部分列时默认行为是整行写入。为了防止误用,在 Merge-on-Write 实现中,INSERT INTO 默认仍然保持整行 UPSERT 的语义。如果需要开启部分列更新的语义,需要设置如下 session variable:
SET enable_unique_key_partial_update=true;
INSERT INTO order_tbl (order_id, order_status) VALUES (1, '待发货');
c、Flink Connector
如果使用 Flink Connector,需要添加如下配置:
'sink.properties.partial_columns' = 'true',
同时在 sink.properties.column 中指定要导入的列(必须包含所有 key 列,否则无法更新)。
更新后结果如下:
+----------+--------------+--------------+
| order_id | order_amount | order_status |
+----------+--------------+--------------+
| 1 | 100 | 待发货 |
+----------+--------------+--------------+
1 row in set (0.01 sec)
3、注意事项
由于
Merge-on-Write实现需要在数据写入时进行整行数据的补齐,以保证最优的查询性能,因此使用Merge-on-Write实现进行部分列更新会导致部分导入性能下降。
写入性能优化建议:
- 配置王道:使用配备
NVMe的SSD,或者极速SSD云盘。因为补齐数据时会大量读取历史数据,产生较高的读IOPS以及读吞吐。 - 开启行存:能够大大减少补齐数据时产生的
IOPS,导入性能提升明显。用户可以在建表时通过如下property来开启行存:
"store_row_column" = "true"
四、主键模型的更新并发控制
1、概览
Doris 采用多版本并发控制机制(MVCC - Multi-Version Concurrency Control)来管理并发更新。
每次数据写入操作均会分配一个写入事务,该事务确保数据写入的原子性(即写入操作要么完全成功,要么完全失败)。在写入事务提交时,系统会为其分配一个版本号。当用户使用 Unique Key 模型并多次导入数据时,如果存在重复主键,Doris 会根据版本号确定覆盖顺序:版本号较高的数据会覆盖版本号较低的数据。
2、UPDATE 并发控制(悲观锁)
问题:默认情况下,并不允许同一时间对同一张表并发进行多个 UPDATE 操作。主要原因是,Doris 目前支持的是行更新,这意味着,即使用户声明的是 SET v2 = 1,实际上,其他所有的 Value 列也会被覆盖一遍(尽管值没有变化)。 这就会存在一个问题,如果同时有两个 UPDATE 操作对同一行进行更新,那么其行为可能是不确定的,也就是可能存在脏数据。
解决::UPDATE 语句与通过导入实现更新在底层机制上存在较大差异。UPDATE 操作涉及两个步骤: 从数据库中读取待更新的数据,以及写入更新后的数据。默认情况下,UPDATE 语句通过表级锁提供了 Serializable 隔离级别的事务能力,高并发场景下性能较差,即多个 UPDATE 操作只能串行执行。用户也可以通过调整配置绕过这一限制
场景:去除控制,用户自己把握并发,但在实际应用中,如果用户自己可以保证即使并发更新,也不会同时对同一行进行操作的话,就可以手动打开并发限制。通过修改 FE 配置 enable_concurrent_update,当该配置值设置为 true 时,更新命令将不再提供事务保证。
3、Sequence 列(乐观锁)
问题: sequence 列目前只支持 Unique 模型,当通过多线程并发同步数据到 Doris 时,不同线程的数据可能会乱序到达。这种情况下,可能出现旧数据因较晚到达而错误覆盖新数据的情况。
解决:为了解决这个问题,Doris 支持了 sequence 列,通过用户在导入时指定 sequence 列,相同 key 列下,按照 sequence 列的值进行替换,较大值可以替换较小值,反之则无法替换。该方法将顺序的确定交给了用户,由用户控制替换顺序。
实现:在实现层面,Doris 增加了一个隐藏列DORIS_SEQUENCE_COL ,该列的类型由用户在建表时指定,在导入时确定该列具体值,并依据该值决定相同 Key 列下,哪一行生效。
1)启用 sequence column 支持
Doris的UniqueKey模型通过SequenceColumn(版本列) 解决数据更新冲突问题,确保多版本数据按正确顺序合并。
| 配置方式 | 适用场景 | 导入灵活性 |
|---|---|---|
sequence_col |
指定表中的某个列作为版本号列,更大的行会覆盖较小值 | 直接映射数据源列 |
sequence_type |
需动态生成顺序值(如按导入批次) | 通过 ORDER BY 或表达式指定 |
2)function_column.sequence_col
a、介绍
-
作用:指定表中的某个列作为版本号列,更大的行会覆盖较小值。
-
数据类型限制:
-
支持
BIGINT(存储自增数值或时间戳)、DATETIME(存储精确时间)、DATE(存储日期)。 -
类型需在建表时显式声明,而非通过
sequence_type指定。
-
b、使用示例
1. 创建支持 sequence col 的表
-- 创建 unique 模型的 test_table 数据表,并指定 sequence 列映射到表中的 modify_date 列。
CREATE TABLE test.test_table
(
user_id bigint,
date date,
group_id bigint,
modify_date date,
keyword VARCHAR(128)
)
UNIQUE KEY(user_id, date, group_id)
DISTRIBUTED BY HASH (user_id) BUCKETS 32
PROPERTIES(
"function_column.sequence_col" = 'modify_date',
"replication_num" = "1",
"in_memory" = "false"
);
MySQL> desc test_table;
+-------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-------+---------+---------+
| user_id | BIGINT | No | true | NULL | |
| date | DATE | No | true | NULL | |
| group_id | BIGINT | No | true | NULL | |
| modify_date | DATE | No | false | NULL | REPLACE |
| keyword | VARCHAR(128) | No | false | NULL | REPLACE |
+-------------+--------------+------+-------+---------+---------+
2、导入数据
1 2020-02-22 1 2020-02-21 a
1 2020-02-22 1 2020-02-22 b
1 2020-02-22 1 2020-03-05 c
1 2020-02-22 1 2020-02-26 d
1 2020-02-22 1 2020-02-23 e
1 2020-02-22 1 2020-02-24 b
3、结果:只保留了 modify_date = 2020-03-05 的数据
MySQL> select * from test_table;
+---------+------------+----------+-------------+---------+
| user_id | date | group_id | modify_date | keyword |
+---------+------------+----------+-------------+---------+
| 1 | 2020-02-22 | 1 | 2020-03-05 | c |
+---------+------------+----------+-------------+---------+
4、验证:结果中 __DORIS_SEQUENCE_COL__ 的值与 modify_date 列一致
SET show_hidden_columns=true;
SELECT user_id, item_id, ts, __DORIS_SEQUENCE_COL__ FROM user_log;
3)function_column.sequence_type
a、介绍
场景:无现成顺序列,需动态生成顺序值(如按数据到达顺序)。
限制:若同时配置 sequence_col 和 sequence_type,sequence_col 优先级更高
b、使用示例
1、建表
-- 1、建表
CREATE TABLE user_actions (
user_id INT,
action_time DATETIME,
action_type VARCHAR(20)
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
"function_column.sequence_type" = "Date" -- 隐式创建隐藏顺序列
);
-- 2、数据导入
CREATE ROUTINE LOAD db.job ON user_actions
COLUMNS(user_id, action_time, action_type)
ORDER BY action_time -- 动态指定顺序值来源
FROM KAFKA(
"kafka_broker_list" = "broker1:9092",
"kafka_topic" = "user_actions"
);
-- 3、效果:隐藏列 __DORIS_SEQUENCE_COL__ 的值由 action_time 填充,相同 user_id 下按时间覆盖
五、聚合模型的导入更新
1、整行更新
使用
Doris支持的StreamLoad,Broker Load,RoutineLoad,InsertInto等导入方式,往聚合模型(Agg模型)中进行数据导入时,都会将新的值与旧的聚合值,根据列的聚合函数产出新的聚合值,这个值可能是插入时产出,也可能是异步Compaction时产出,但是用户查询时,都会得到一样的返回值。
2、聚合模型的部分列更新
Aggregate表主要在预聚合场景使用而非数据更新的场景使用,但也可以通过将聚合函数设置为REPLACE_IF_NOT_NULL来实现部分列更新效果。
建表:将需要进行列更新的字段对应的聚合函数设置为REPLACE_IF_NOT_NULL
CREATE TABLE order_tbl (
order_id int(11) NULL,
order_amount int(11) REPLACE_IF_NOT_NULL NULL,
order_status varchar(100) REPLACE_IF_NOT_NULL NULL
) ENGINE=OLAP
AGGREGATE KEY(order_id)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(order_id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
数据写入:无论是 Stream Load、Broker Load、Routine Load 还是INSERT INTO, 直接写入要更新的字段的数据即可
示例:与前面例子相同,对应的 Stream Load 命令为(不需要额外的 header):
$ cat update.csv
1,To be shipped
curl --location-trusted -u root: -H "column_separator:," -H "columns:order_id,order_status" -T /tmp/update.csv http://127.0.0.1:8030/api/db1/order_tbl/_stream_load
对应的 INSERT INTO 语句为(不需要额外设置 session variable):
INSERT INTO order_tbl (order_id, order_status) values (1,'待发货');
3、注意事项
Aggregate Key 模型在写入过程中不做任何额外处理,所以写入性能不受影响,与普通的数据导入相同。但是在查询时进行聚合的代价较大,典型的聚合查询性能相比 Unique Key 模型的 Merge-on-Write 实现会有 5-10 倍的下降。


