大数据Doris之_4_数据导入
前言
Github:https://github.com/HealerJean
一、导入概览
ApacheDoris提供了多种导入和集成数据的方法,您可以使用合适的导入方式从各种源将数据导入到数据库中。ApacheDoris提供的数据导入方式可以分为四类,
| 导入方式 | 使用场景 | 支持格式 | 导入模式 | 吞吐量 | 延迟 | 资源消耗 | 适用数据规模 | 并发建议 | 关键限制因素 |
|---|---|---|---|---|---|---|---|---|---|
Stream Load |
导入本地文件或者应用程序写入 | csv、json、parquet、orc |
同步 | 100MB/s ~ 1GB/s | 秒级 | 高(占用 BE 写入线程) | 单次 10MB~1GB | 单节点建议≤10 并发 | 并行度、文件格式 |
Broker Load |
从对象存储、HDFS 等导入 |
SQL |
异步 | TB 级 / 小时 | 分钟级 | 中(依赖 Broker 进程) | 单任务≥10GB | 集群级并发(依赖存储) | HDFS / 对象存储读取速度、分区并行度 |
INSERT INTO VALUES |
通过 JDBC 等接口逐条 / 小批量写入 |
SQL |
同步 | 数千条 / 秒 | 毫秒级 | 高(逐行解析开销大) | 单批次≤10 万行 | 建议配合批量参数 | SQL 解析开销、事务提交频率 |
INSERT INTO SELECT |
可以导入外部表或者对象存储、HDFS 中的文件 |
SQL |
同步 | 取决于源表扫描速度 | 秒级~分钟级 | 中(依赖源表 IO) | 视源表规模而定 | 单线程(可多任务并行) | 外部表查询效率、SQL 执行计划 |
Routine Load |
从 kakfa 实时导入 |
csv、json |
异步 | 百万条 / 秒 | 毫秒级~秒级 | 中(依赖 Kafka 消费能力) |
持续流数据 | 按 Kafka 分区数并发 | Kafka 分区数、数据解析效率 |
MySQL Load |
从本地数据导入 | CSV |
同步 | 与 Stream Load 接近 | 秒级 | 高(需解析 SQL) |
中小规模数据 | 同 Stream Load | |
Group Commit |
高频小批量导入 | 取决于基础导入方式 | - | 提升 30%~50% 吞吐量 | 亚秒级 | 低(批量聚合提交) | 单批次≤1 万行 | 需配置合理提交间隔 |
1、实时写入:
应用程序通过
HTTP或者JDBC实时写入数据到Doris表中,适用于需要实时分析和查询的场景。
- 极少量数据(5 分钟一次)时可以使用
JDBC INSERT写入数据。 - 并发较高或者频次较高(大于
20并发或者 1 分钟写入多次)时建议打开 [Group Commit],使用JDBCINSERT或者StreamLoad写入数据。 - 吞吐较高时推荐使用 [
Stream Load]通过HTTP写入数据。
2、流式同步
通过实时数据流(如
Flink、Kafka、事务数据库)将数据实时导入到Doris表中,适用于需要实时分析和查询的场景。
- 可以使用 [
FlinkDorisConnector]将Flink的实时数据流写入到Doris表中。 - 可以使用 [
Routine Load] 将Kafka的实时数据流写入到Doris表中。Routine Load方式下,Doris会调度任务将Kafka中的数据拉取并写入Doris中,目前支持csv和json格式的数据。KafkaConnector方式下,由Kafka将数据写入到Doris中,支持avro、json、csv、protobuf格式的数据。
- 可以使用 [
FlinkCDC]或 [Datax] 将事务数据库的CDC数据流写入到Doris中。
3、批量导入
将数据从外部存储系统(如对象存储、
HDFS、本地文件、NAS)批量加载到Doris表中,适用于非实时数据导入的需求。
- 可以使用 [
Broker Load]将对象存储和HDFS中的文件写入到 Doris 中。 - 可以使用 [
INSERT INTO SELECT]将对象存储、HDFS和 NAS 中的文件同步写入到Doris中,配合 [JOB]可以异步写入。 - 可以使用 [
Stream Load] 将本地文件写入Doris中。
4、外部数据源集成:
通过与外部数据源(如
Hive、JDBC、Iceberg等)的集成,实现对外部数据的查询和部分数据导入到Doris表中。
- 可以创建 [
Catalog] 将外部数据源中的数据同步写入到Doris中,配合 [JOB] 可以异步写入。 - 可以使用 [
X2Doris]将其他 AP 系统的数据迁移到Doris中。
三、数据源
1、本地文件
-
StreamLoad:StreamLoad是通过HTTP协议将本地文件或数据流导入到Doris中。StreamLoad是一个同步导入方式,执行导入后返回导入结果,可以通过请求的返回判断导入是否成功。支持导入CSV、JSON、Parquet与ORC格式的数据 -
streamloader:Streamloader工具是一款用于将数据导入Doris数据库的专用客户端工具,底层基于StreamLoad实现,可以提供多文件,多并发导入的功能,降低大数据量导入的耗时。 -
MySQLLoad:Doris兼容MySQL协议,可以使用MySQL标准的 [LOAD DATA]语法导入本地文件。MySQLLoad是一种同步导入方式,执行导入后即返回导入结果,主要适用于导入客户端本地CSV文件。
1)使用 Stream Load 导入
第 1 步:准备数据:创建 CSV 文件 streamload_example.csv,内容如下:
1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64
第 2 步:在库中创建表:在 Doris 中创建表,语法如下:
CREATE TABLE testdb.test_streamload(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
第 3 步:使用 Stream Load 导入数据,使用 curl 提交 Stream Load 导入作业:
curl --location-trusted -u <doris_user>:<doris_password> \
-H "column_separator:," \
-H "columns:user_id,name,age" \
-T streamload_example.csv \
-XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load
Stream Load 是一种同步导入方式,导入结果会直接返回给用户。
{
"TxnId": 3,
"Label": "123",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 10,
"NumberLoadedRows": 10,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 118,
"LoadTimeMs": 173,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 70,
"ReadDataTimeMs": 2,
"WriteDataTimeMs": 48,
"CommitAndPublishTimeMs": 52
}
第 4 步:检查导入数据
select count(*) from testdb.test_streamload;
+----------+
| count(*) |
+----------+
| 10 |
+----------+
2)使用 Streamloader 工具导入
第 1 步:准备数据,创建 csv 文件 streamloader_example.csv 文件。具体内容如下
1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64
第 2 步:在库中创建表,在 Doris 中创建被导入的表,具体语法如下:
CREATE TABLE testdb.test_streamloader(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
第 3 步:使用 stream loader 工具导入数据
doris-streamloader --source_file="streamloader_example.csv" --url="http://localhost:8330" --header="column_separator:," --db="testdb" --table="test_streamloader"
这是一种同步导入方式,导入结果会直接返回给用户:
Load Result: {
"Status": "Success",
"TotalRows": 10,
"FailLoadRows": 0,
"LoadedRows": 10,
"FilteredRows": 0,
"UnselectedRows": 0,
"LoadBytes": 118,
"LoadTimeMs": 623,
"LoadFiles": [
"streamloader_example.csv"
]
}
第 4 步:检查导入数据
select count(*) from testdb.test_streamloader;
+----------+
| count(*) |
+----------+
| 10 |
+----------+
3)使用 MySQL Load 从本地数据导入
第 1 步:准备数据,创建名为 client_local.csv 的文件,样例数据如下:
1,10
2,20
3,30
4,40
5,50
6,60
第 2 步:在库中创建表,在执行 LOAD DATA 命令前,需要先链接 mysql 客户端。
mysql --local-infile -h <fe_ip> -P <fe_query_port> -u root -D testdb
执行 MySQL Load,在连接时需要使用指定参数选项:
- 在链接
mysql客户端时,必须使用--local-infile选项,否则可能会报错。 - 通过
JDBC链接,需要在 URL 中指定配置allowLoadLocalInfile=true
在 Doris 中创建以下表:
CREATE TABLE testdb.t1 (
pk INT,
v1 INT SUM
) AGGREGATE KEY (pk)
DISTRIBUTED BY hash (pk);
第 3 步:使用 Mysql Load 导入数据,链接 MySQL Client 后,创建导入作业,命令如下:
LOAD DATA LOCAL
INFILE 'client_local.csv'
INTO TABLE testdb.t1
COLUMNS TERMINATED BY ','
LINES TERMINATED BY '\n';
第 4 步:检查导入数据,MySQL Load 是一种同步的导入方式,导入后结果会在命令行中返回给用户。如果导入执行失败,会展示具体的报错信息。,如下是导入成功的结果显示,会返回导入的行数:
Query OK, 6 row affected (0.17 sec)
Records: 6 Deleted: 0 Skipped: 0 Warnings: 0
2、Kafka
在大多数情况下,可以直接选择
RoutineLoad进行数据导入,无需集成外部组件即可消费Kafka数据。当需要加载Avro、Protobuf格式的数据,或通过Debezium采集的上游数据库数据时,可以使用DorisKafkaConnector。
-
使用
RoutineLoad消费Kafka数据:Doris通过Routine Load持续消费KafkaTopic中的数据。提交RoutineLoad作业后,Doris会实时生成导入任务,消费Kafka集群中指定Topic的消息。RoutineLoad支持CSV和JSON格式,具备Exactly-Once语义,确保数据不丢失且不重复。。 -
DorisKafkaConnector消费Kafka数据:DorisKafkaConnector是将Kafka数据流导入Doris数据库的工具。用户可通过KafkaConnect插件轻松导入多种序列化格式(如JSON、Avro、Protobuf),并支持解析Debezium组件的数据格式(本质就是监听并消费而已)
1)使用Routine Load 消费 Kafka 数据
a、单表导入
第 1 步:准备数据,在 Kafka 中,样本数据如下:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-csv --from-beginning
1,Emily,25
第 2 步:在库中创建表,在 Doris 中创建被导入的表,具体语法如下:
CREATE TABLE testdb.test_routineload_tbl(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
第 3 步:创建 Routine Load job 导入数据至单表,在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:
CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA(
"kafka_broker_list" = "192.168.88.62:9092",
"kafka_topic" = "test-routine-load-csv",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
第 4 步:检查导入数据
select * from test_routineload_tbl;
+-----------+----------------+------+
| user_id | name | age |
+-----------+----------------+------+
| 1 | Emily | 25 |
+-----------+----------------+------+
b、多表导入
对于需要同时导入多张表的场景,
Kafka中的数据必须包含表名信息,格式为:table_name|data。例如,导入CSV数据时,格式应为:table_name|val1,val2,val3。请注意,表名必须与Doris中的表名完全一致,否则导入将失败,并且不支持后面介绍的column_mapping配置。
第 1 步:准备数据,在 Kafka 中,样本数据如下:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-multi-table-load --from-beginning
test_multi_table_load1|1,Emily,25
test_multi_table_load2|2,Benjamin,35
第 2 步:在库中创建表。在 Doris 中创建被导入的表,具体语法如下:
表 1:
CREATE TABLE test_multi_table_load1(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
表 2:
CREATE TABLE test_multi_table_load2(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
第 3 步:创建 Routine Load job 导入数据至多表,在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:
CREATE ROUTINE LOAD example_multi_table_load
COLUMNS TERMINATED BY ","
FROM KAFKA(
"kafka_broker_list" = "192.168.88.62:9092",
"kafka_topic" = "test-multi-table-load",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
第 4 步:检查导入数据
select * from test_multi_table_load1;
+------+----------------+------+
| id | name | age |
+------+----------------+------+
| 1 | Emily | 25 |
+------+----------------+------+
select * from test_multi_table_load2;
+------+----------------+------+
| id | name | age |
+------+----------------+------+
| 2 | Benjamin | 35 |
+------+----------------+------+
2)使用 Doris Kafka Connector 消费 Kafka 数据
DorisKafkaConnector是将Kafka数据流导入Doris数据库的工具。用户可通过KafkaConnect插件轻松导入多种序列化格式(如JSON、Avro、Protobuf),并支持解析Debezium组件的数据格式。
a、消费普通数据
1、导入数据样本:在 Kafka 中,样本数据如下:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64}
2、创建需要导入的表:在 Doris 中创建被导入的表,具体语法如下:
CREATE TABLE test_db.test_kafka_connector_tbl(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;
3、创建导入任务:在部署 Kafka Connect 的机器上,通过 curl 命令提交如下导入任务:
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-doris-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"10",
"topics":"test-data-topic",
"doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter"
}
}'
b、消费 Debezium 组件采集的数据
1、MySQL 数据库中有如下表:
CREATE TABLE test.test_user (
user_id int NOT NULL ,
name varchar(20),
age int,
PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);
2、在 Doris 创建被导入的表:
CREATE TABLE test_db.test_user(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;
3、部署 Debezium connector for MySQL 组件
4、创建 doris-kafka-connector 导入任务:假设通过 Debezium 采集到的 MySQL 表数据在 mysql_debezium.test.test_user Topic 中:
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-debezium-doris-sink",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"10",
"topics":"mysql_debezium.test.test_user",
"doris.topic2table.map": "mysql_debezium.test.test_user:test_user",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"converter.mode":"debezium_ingestion",
"enable.delete":"true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}
3、Flink
第 1 步:创建表
CREATE TABLE `students` (
`id` INT NULL,
`name` VARCHAR(256) NULL,
`age` INT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
第 2 步:使用 Flink 导入数据,运行 bin/sql-client.sh 打开 FlinkSQL 的控制台
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.students',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO student_sink values(1,'zhangsan',123)
第 3 步:检查导入数据
select * from test.students;
+------+----------+------+
| id | name | age |
+------+----------+------+
| 1 | zhangsan | 123 |
+------+----------+------+
三、导入方式
1、Stream Load
StreamLoad支持通过HTTP协议将本地文件或数据流导入到Doris中。
StreamLoad是一个同步导入方式,执行导入后返回导入结果,可以通过请求的返回判断导入是否成功。- 一般来说,可以使用
StreamLoad导入10GB以下的文件,如文件过大,建议切分后使用StreamLoad进行导入。StreamLoad可以保证一批导入任务的原子性,要么全部导入成功,要么全部导入失败。
1)基本原理
Client向FE提交StreamLoad导入作业请求- 通过
HTTP协议发起导入作业给FE节点
- 通过
FE会轮询选择一台BE作为Coordinator节点,负责导入作业调度,然后返回给Client一个HTTP重定向- 解析请求并验证表结构
- 从可用
BE列表中轮询选择BE3作为Coordinator; - 返回重定向响应:
Client连接CoordinatorBE节点,提交导入请求CoordinatorBE会分发数据给相应BE节点,导入完成后会返回导入结果给Client- 向
BE3发送数据,BE3作为Coordinator协调其他BE节点(如BE1、BE2)存储副本数据 BE、BE2、BE3接收数据后,按分桶规则写入对应Tablet,并将副本同步到其他节点。
- 向
Client也可以直接通过指定BE节点作为Coordinator,直接分发导入作业

2)使用场景
-
工具支持:相比于直接使用
curl的单并发导入,更推荐使用专用导入工具DorisStreamloader。该工具是一款用于将数据导入Doris数据库的专用客户端工具,可以提供多并发导入的功能,降低大数据量导入的耗时 -
支持能力:
Stream Load支持从本地或远程通过 HTTP 的方式导入 CSV、JSON、Parquet 与 ORC 格式的数据。在导入 CSV 文件时,需要明确区分空值(null)与空字符串:-
空值(
null):使用\N表示。例如a,\N,b表示中间列的值为null。 -
空字符串:当两个分隔符之间没有任何字符时表示空字符串。例如
a,,b中,两个逗号之间没有字符,表示中间列的值为空字符串。
-
2、Broker Load
BrokerLoad通过MySQLAPI发起,Doris会根据LOAD语句中的信息,主动从数据源拉取数据。BrokerLoad是一个异步导入方式,需要通过SHOWLOAD语句查看导入进度和导入结果。
1)基本原理
1、用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
2、BE 在执行的过程中会从 Broker 拉取数据,在对数据转换之后将数据导入系统。
3、所有 BE 均完成导入,由 FE 最终决定导入是否成功。
brocker进程:从下图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。
当前 BE 内置了对 HDFS 和 S3 两个 Broker 的支持,所以如果从 HDFS 和 S3 中导入数据,则不需要额外启动 Broker 进程。如果有自己定制的 Broker 实现,则需要部署相应的 Broker 进程。

2)支持场景
Broker Load 适合源数据存储在远程存储系统,比如对象存储或 HDFS,且数据量比较大的场景
3、Routine Load
Doris可以通过RoutineLoad导入方式持续消费KafkaTopic中的数据。在提交RoutineLoad作业后,Doris会持续运行该导入作业,实时生成导入任务不断消费Kafka集群中指定 Topic 的消息。
RoutineLoad是一个流式导入作业,支持Exactly-Once语义,保证数据不丢不重。
1)基本原理
RoutineLoad会持续消费KafkaTopic中的数据,写入Doris中。
在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务:
- 导入作业(
loadjob):一个RoutineLoadJob是一个常驻的导入作业,会持续不断地消费数据源中的数据。 - 导入任务(
loadtask):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。
Routine Load 的导入具体流程如下图所示:
Client向FE提交创建RoutineLoad作业请求FE通过RoutineLoadManager生成一个常驻的导入作业(RoutineLoadJob)。FE通过JobScheduler将RoutineLoadJob拆分成若干个RoutineLoadTask,由TaskScheduler进行调度,下发到BE节点。- 在
BE上,一个RoutineLoadTask导入完成后向FE提交事务,并更新Job的元数据。 - 一个
RoutineLoadTask提交后,会继续生成新的Task,或对超时的Task进行重试。 - 新生成的
RoutineLoadTask由TaskScheduler继续调度,不断循环。

2)使用场景
Routine Load 支持从Kafka 集群中消费数据。
4、Insert Into Select
INSERTINTO支持将Doris查询的结果导入到另一个表中。INSERTINTO是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERTINTO可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。
1)基本原理
1、在使用 INSERT INTO 时,需要通过 MySQL 协议发起导入作业给 FE 节点
2、FE 会生成执行计划,执行计划中前部是查询相关的算子,最后一个是 OlapTableSink 算子,用于将查询结果写到目标表中。
3、执行计划会被发送给 BE 节点执行,Doris 会选定一个节点作为 Coordinator 节点,Coordinator 节点负责接受数据并分发数据到其他节点上。
2)使用场景
1、用户希望将已经在 Doris 表中的数据进行 ETL 转换并导入到一个新的 Doris 表中,此时适合使用 INSERT INTO SELECT 语法。
2、与 Multi-Catalog 外部表机制进行结合,如通过 Multi-Catalog 映射 MySQL 或者 Hive 系统中的表,然后通过 INSERT INTO SELECT 语法将外部表中的数据导入到 Doris 表中存储。
3、通过 Table Value Function(TVF)功能,可以直接将对象存储或 HDFS 上的文件作为 Table 进行查询,并且支持自动的列类型推断。然后,通过 INSERT INTO SELECT 语法将外部表中的数据导入到 Doris 表中存储。
5、Insert Into Values
INSERTINTO支持将Doris查询的结果导入到另一个表中。INSERTINTO是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERTINTO可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。
1)基本原理
在使用 INSERT INTO VALUES 时,需要通过 MySQL 协议发起导入作业给 FE 节点,FE 会生成执行计划,执行计划中前部是查询相关的算子,最后一个是 OlapTableSink 算子,用于将查询结果写到目标表中。执行计划会被发送给 BE 节点执行,Doris 会选定一个节点做为 Coordinator 节点,Coordinator 节点负责接受数据并分发数据到其他节点上。
2)使用场景
1、用户希望仅导入几条假数据,验证一下 Doris 系统的功能。此时适合使用 INSERT INTO VALUES 的语法,语法和 MySQL 一样。
2、并发的 INSERT INTO VALUES 的性能会受到 commit 阶段的瓶颈限制。导入数据量较大时,可以打开 [group commit]达到更高的性能。
-
适用打开的场景
-
高并发小批量写入场景:如果有大量的小批量数据需要并发写入 Doris,打开
group commit可以将多个小批量导入在后台合并成一个大的事务提交,能显著提升写入性能。例如,在一些实时数据采集和写入的场景中,数据以较小的批次不断流入,开启group commit可以有效减少事务提交的次数,提高整体写入效率。 -
对数据可见性要求不高的场景:在异步模式下,
Doris会根据负载和表的group_commit_interval属性异步提交数据,提交之后数据才可见。如果业务对数据的实时可见性要求不高,允许有一定的延迟,那么打开group commit的异步模式可以在不影响业务的情况下提升性能。
-
-
不适用打开的场景
- 对数据可见性要求严格的场景:如果在导入完成后要求数据立即可见,那么打开
group commit可能不太合适,因为异步模式下数据有延迟才能可见,同步模式下导入耗时至少是表属性group_commit_interval设置的时间。例如一些实时查询和分析的场景,需要最新的数据及时可用,就可能不适合开启group commit。 - 单次导入数据量较大的场景:虽然
Doris在单次导入数据量较大时会自动切换为sync_mode以防止WAL占用较大磁盘空间,但这种情况下group commit的优势可能不明显,甚至可能因为额外的合并操作带来一些性能开销。
- 对数据可见性要求严格的场景:如果在导入完成后要求数据立即可见,那么打开
6、MySQL Load
Doris兼容MySQL协议,可以使用MySQL标准的 [LOADDATA](语法导入本地文件。MySQLLoad是一种同步导入方式,执行导入后即返回导入结果。可以通过LOADDATA语句的返回结果判断导入是否成功。一般来说,可以使用MySQLLoad导入10GB以下的文件,如果文件过大,建议将文件进行切分后使用MySQLLoad进行导入。MySQLLoad可以保证一批导入任务的原子性,要么全部导入成功,要么全部导入失败。
1)基本原理
MySQLLoad与StreamLoad功能相似,都是导入本地文件到Doris集群中。因此MySQLLoad的实现复用了StreamLoad的基本导入能力。
下图展示了 MySQL Load 的主要流程:
- 用户向
FE提交LOADDATA请求,FE完成解析工作,并将请求封装成StreamLoad; FE会选择一个BE节点发送StreamLoad请求;- 发送请求的同时,
FE会异步且流式的从MySQL客户端读取本地文件数据,并实时的发送到StreamLoad的HTTP请求中; MySQL客户端数据传输完毕,FE等待StreamLoad完成,并展示导入成功或者失败的信息给客户端。
2)使用场景
MySQL Load 主要适用于导入客户端本地 CSV 文件,或通过程序导入数据流中的数据。
四、导入实战
1、导入高可用性
Doris在数据导入过程中提供了多种机制来确保高可用性。本文将详细介绍Doris的默认导入行为以及为提高导入可用性而提供的额外选项,特别是最小写入副本数功能。
1)多数派写入
默认情况下,Doris 采用多数派写入策略来确保数据的可靠性和一致性:
- 当成功写入的副本数超过总副本数的一半时,导入被视为成功。
- 例如,对于三副本的表,至少需要两个副本写入成功才算导入成功。
工作原理:
- 数据分发:导入任务首先将数据分发到所有相关的
BE节点。 - 并行写入:各个
BE节点并行处理数据写入操作。 - 写入确认:每个
BE节点在完成数据写入后,会向FE发送确认信息。 - 多数派判断:
FE统计成功写入的副本数,当达到多数派时,认为导入成功。 - 事务提交:
FE提交导入事务,使数据对外可见。 - 异步复制:对于未成功写入的副本,系统会在后台异步进行数据复制,以确保最终所有副本的数据一致性。
多数派写入策略是 Doris 在数据可靠性和系统可用性之间的一个平衡。对于有特殊需求的场景,Doris 提供了最小写入副本数等其他选项来进一步提高系统的灵活性。
2)最小写入副本数
多数派写入策略在保证数据可靠性的同时,也可能在某些场景下影响系统的可用性。例如,在两副本的情况下,必须两个副本都写入成功才能完成导入,这意味着在导入过程中不允许任何一个副本不可用。
为了解决上述问题并提高导入的可用性,Doris 提供了最小写入副本数(选项。
1)功能说明:最小写入副本数允许用户指定导入数据时需要成功写入的最少副本数。当成功写入的副本数大于或等于这个值时,导入即视为成功。
-
使用场景
-
在部分节点不可用时,仍需要保证数据能够成功导入。
-
对数据导入速度有较高要求,愿意在一定程度上牺牲一致性来换取更高的可用性。
-
2)配置方法
1、单表配置
a. 创建表时设置:
CREATE TABLE example_table
(
id INT,
name STRING
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES
(
'replication_num' = '3',
'min_load_replica_num' = '2'
);
b. 修改现有表:
ALTER TABLE example_table
SET ( 'min_load_replica_num' = '2' );
2、全局配置
配置:通过 FE 配置项 min_load_replica_num 设置。
- 有效值:大于 0
- 默认值:-1(表示不开启全局最小写入副本数)
优先级:表属性 > 全局配置 > 默认多数派规则
3)其他
除了最小写入副本数选项,Doris 还采用了以下机制来提高导入的可用性:
- 导入重试:自动重试因临时故障导致的失败导入任务。
- 负载均衡:将导入任务分散到不同的
BE节点,避免单点压力过大。 - 事务机制:确保数据的一致性,失败时自动回滚。
2、高并发导入优化
在高频小批量写入场景下,传统的导入方式存在以下问题:
- 每个导入都会创建一个独立的事务,都需要经过
FE解析SQL和生成执行计划,影响整体性能- 每个导入都会生成一个新的版本,导致版本数快速增长,增加了后台
compaction的压力
为了解决这些问题,Doris 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式,而是对现有导入方式的优化扩展,通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。同时,Group Commit 与 PreparedStatement 结合使用可以获得更高的性能提升。主要针对:
INSERT INTO tbl VALUES(...)语句StreamLoad导入
1)Group Commit 模式
GroupCommit写入有三种模式,分别是:
| 模式 | 核心机制 | 优点 | 缺点 | 典型适用场景 |
|---|---|---|---|---|
关闭模式(off_mode) |
不启用 Group Commit,每个导入单独提交事务,数据写入后立即提交。 |
实现简单,无事务合并开销。 | 并发写入时事务提交频繁,性能较低;资源利用率差。 | 小数据量测试、单条数据导入(极低频场景)。 |
同步模式(sync_mode) |
将多个导入合并到同一事务中,根据 group_commit_interval 触发提交,提交后数据可见。 |
数据强一致性,提交后立即可见;Group Commit 减少事务开销,提升并发写入性能。 |
写入延迟受事务合并周期影响(如设置过长可能增加延迟)。 | 实时分析、报表查询(要求数据立即可见);高并发写入且对一致性要求高的场景(如交易数据)。 |
异步模式(async_mode) |
先写入 WAL(预写日志)并立即返回,后台异步合并事务并提交,提交后数据可见;大数据量自动切换为 sync_mode。 |
写入延迟极低(返回时间仅受 WAL 写入影响);适合高频小批量写入,提升吞吐量。 |
数据提交前存在丢失风险(如节点故障未提交的 WAL 可能丢失);WAL 占用磁盘空间需监控。 |
日志采集、埋点数据写入(允许短暂数据不可见);对写入延迟敏感的场景(如用户行为日志)。 |
2)自动提交条件
当满足时间间隔 (默认为
10秒) 或数据量 (默认为64MB) 其中一个条件时,会自动提交数据。这两个参数需要配合使用,建议根据实际场景进行调优。
a、修改提交间隔:
默认提交间隔为 10 秒,用户可以通过修改表的配置调整:
# 修改提交间隔为 2 秒
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");
参数调整建议:建议根据业务对数据可见性延迟的容忍度来设置,如果系统压力大,可以适当增加间隔。
- 较短的间隔 (如 2 秒):
- 优点:数据可见性延迟更低,适合对实时性要求较高的场景
- 缺点:提交次数增多,版本数增长更快,后台
compaction压力更大
- 较长的间隔 (如
30秒):- 优点:提交批次更大,版本数增长更慢,系统开销更小
- 缺点:数据可见性延迟更高
b、修改提交数据量:
GroupCommit的默认提交数据量为 64 MB,用户可以通过修改表的配置调整:
# 修改提交数据量为 128MB
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");
参数调整建议:建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐,可以适当增加到 128MB 或更大。
- 较小的阈值 (如
32MB):- 优点:内存占用更少,适合资源受限的环境
- 缺点:提交批次较小,吞吐量可能受限
- 较大的阈值 (如
256MB):- 优点:批量提交效率更高,系统吞吐量更大
- 缺点:占用更多内存
3)使用限制
a、Group Commit 限制条件
INSERT INTO VALUES语句在以下情况下会退化为非GroupCommit方式:- 使用事务写入 (
Begin; INSERT INTO VALUES; COMMIT) - 指定
Label(INSERT INTO dt WITH LABEL {label} VALUES) VALUES中包含表达式 (INSERT INTO dt VALUES (1 + 100))- 列更新写入
- 表不支持轻量级模式更改
- 使用事务写入 (
Stream Load在以下情况下会退化为非GroupCommit方式:- 使用两阶段提交
- 指定
Label(-H "label:my_label") - 列更新写入
- 表不支持轻量级模式更改
b、Unique 模型
GroupCommit不保证提交顺序,建议使用Sequence列来保证数据一致性。
c、WAL 限制
async_mode写入会将数据写入WAL,成功后删除,失败时通过WAL恢复。WAL文件是单副本存储的,如果对应磁盘损坏或文件误删可能导致数据丢失。- 下线
BE节点时,使用DECOMMISSION命令以防数据丢失。 async_mode在以下情况下切换为sync_mode- 导入数据量过大(超过
WAL单目录80%空间) - 不知道数据量的
chunkedstreamload - 磁盘可用空间不足
- 导入数据量过大(超过
- 重量级
SchemaChange时,拒绝GroupCommit写入,客户端需重试。
4)Group Commit 性能
机器配置
- 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘
- 3 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘
- 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
- 测试版本为 Doris-2.1.5
a、Stream Load
1)数据集
httplogs数据集,总共31GB、2.47亿条
2)测试工具:[doris-streamloader]
3)测试方法:对比 非 group_commit 和 group_commit 的 async_mode 模式下,设置不同的单并发数据量和并发数,导入 247249096 行数据
4)测试结果
- 单并发数据量:指每个并发连接在单次数据导入操作中处理的数据量
| 导入方式 | 单并发数据量 | 并发数 | 耗时 (秒) | 导入速率 (行/秒) | 导入吞吐 (MB/秒) |
|---|---|---|---|---|---|
| group_commit | 10 KB | 10 | 3306 | 74,787 | 9.8 |
| group_commit | 10 KB | 30 | 3264 | 75,750 | 10.0 |
| group_commit | 100 KB | 10 | 424 | 582,447 | 76.7 |
| group_commit | 100 KB | 30 | 366 | 675,543 | 89.0 |
| group_commit | 500 KB | 10 | 187 | 1,318,661 | 173.7 |
| group_commit | 500 KB | 30 | 183 | 1,351,087 | 178.0 |
| group_commit | 1 MB | 10 | 178 | 1,385,148 | 182.5 |
| group_commit | 1 MB | 30 | 178 | 1,385,148 | 182.5 |
| group_commit | 10 MB | 10 | 177 | 1,396,887 | 184.0 |
| 非 group_commit | 1 MB | 10 | 2824 | 87,536 | 11.5 |
| 非 group_commit | 10 MB | 10 | 450 | 549,442 | 68.9 |
| 非 group_commit | 10 MB | 30 | 177 | 1,396,887 | 184.0 |
在上面的group_commit测试中,BE 的 CPU 使用率在 10-40% 之间。
可以看出,group_commit 模式在小数据量并发导入的场景下,能有效的提升导入性能,同时减少版本数,降低系统合并数据的压力。
优先使用 group_commit 模式:
- 适用于高频小批量导入(如
10 KB-1 MB),可大幅减少事务开销,提升吞吐量。 - 配置
group_commit_interval参数,平衡实时性和性能(例如,设置为50ms可在50ms内合并多个导入)。
非 group_commit 的适用场景:
- 仅建议在超大批量导入(如单批次
10 MB以上)且高并发(如30并发以上)时使用,但需注意:- 数据可见性可能因事务单独提交而延迟;
- 需监控
WAL日志大小,避免磁盘占用过高。
优化数据量和并发数:
- 若导入性能不足,可尝试增大单并发数据量(如从
10 KB提升至100 KB),或适当增加并发数(但需结合系统资源评估)。 - 避免极端小数据量导入(如 <
10 KB),因事务开销占比过高,性能会大幅下降。
b、JDBC
机器配置
- 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘
- 1 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
- 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
- 测试版本为 Doris-2.1.5
- 关闭打印 parpared 语句的 audit log 以提高性能
数据集:tpch sf10 lineitem 表数据集,30 个文件,总共约 22 GB,1.8 亿行
测试工具:[DataX]
测试方法:通过 txtfilereader 向 mysqlwriter 写入数据,配置不同并发数和单个 INSERT 的行数
单个 insert 的行数 |
并发数 | 导入速率 (行/秒) | 导入吞吐 (MB/秒) |
|---|---|---|---|
| 100 | 10 | 107,172 | 11.47 |
| 100 | 20 | 140,317 | 14.79 |
| 100 | 30 | 142,882 | 15.28 |
在上面的测试中,FE 的 CPU 使用率在 60-70% 左右,BE 的 CPU 使用率在 10-20% 左右。 |
c、Insert into sync 模式小批量数据
- 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
- 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。
- 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
- 测试版本为 Doris-2.1.5
- 设置测试前的 init 语句,
set group_commit = async_mode以及set enable_nereids_planner=false。 - 开启 jdbc 的 prepared statement,完整的 url 为
jdbc:mysql://127.0.0.1:9030?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50&sessionVariables=group_commit=async_mode,enable_nereids_planner=false。 - 设置导入类型为
preparedupdatestatement。 - 设置导入语句。
- 设置每次需要导入的值,注意,导入的值与导入值的类型要一一匹配。
测试方法:通过 Jmeter 向Doris写数据。每个并发每次通过 insert into 写入 1 行数据。
测试结果
- 数据单位为行每秒。
- 以下测试分为 30,100,500 并发。
30 并发 sync 模式 5 个 BE3 副本性能测试
| Group commit interval | 10ms | 20ms | 50ms | 100ms |
|---|---|---|---|---|
| 321.5 | 307.3 | 285.8 | 224.3 |
100 并发 sync 模式性能测试
| Group commit interval | 10ms | 20ms | 50ms | 100ms |
|---|---|---|---|---|
| 1175.2 | 1108.7 | 1016.3 | 704.5 |
500 并发 sync 模式性能测试
| Group commit interval | 10ms | 20ms | 50ms | 100ms |
|---|---|---|---|---|
| 3289.8 | 3686.7 | 3280.7 | 2609.2 |
d、Insert into sync 模式大批量数据
机器配置
- 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
- 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。注:测试中分别用了 1 台,3 台,5 台 BE 进行测试。
- 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
- 测试版本为 Doris-2.1.5
测试方法:通过 Jmeter 向Doris写数据。每个并发每次通过 insert into 写入 1000 行数据。
测试结果
- 数据单位为行每秒。
- 以下测试分为 30,100,500 并发。
30 并发 sync 模式性能测试
| Group commit interval | 10ms | 20ms | 50ms | 100ms |
|---|---|---|---|---|
| 92.2K | 85.9K | 84K | 83.2K |
100 并发 sync 模式性能测试
| Group commit interval | 10ms | 20ms | 50ms | 100ms |
|---|---|---|---|---|
| 70.4K | 70.5K | 73.2K | 69.4K |
500 并发 sync 模式性能测试
| Group commit interval | 10ms | 20ms | 50ms | 100ms |
|---|---|---|---|---|
| 46.3K | 47.7K | 47.4K | 46.5K |
五、导入实践
1、表模型选择
建议优先考虑使用明细模型, 明细模型在数据导入和查询性能方面相比其他模型都具有优势
2、分区分桶配置
建议一个
tablet的大小在1-10G范围内。过小的tablet可能导致聚合效果不佳,增加元数据管理压力;过大的tablet不利于副本迁移、补齐。详
3、Random 分桶
在使用
Random分桶时,可以通过设置load_to_single_tablet为true来启用单分片导入模式。这种模式在大规模数据导入过程中,能够提升数据导入的并发度和吞吐量,减少写放大问题。
4、攒批导入
客户端攒批:建议将数据在客户端进行攒批(数MB到数 GB 大小)后再进行导入,高频小导入会频繁做 compaction,导致严重的写放大问题。
服务端攒批:对于高并发小数据量导入,建议打开[Group Commit],在服务端实现攒批导入。
5、分区导入
每次导入建议只导入少量分区的数据。过多的分区同时导入会增加内存占用,并可能导致性能问题。
Doris每个tablet在内存中有一个活跃的Memtable,每个Memtable达到一定大小时才会下刷到磁盘。为了避免进程OOM,当活跃的Memtable占用内存过高时,会提前触发Memtable下刷,导致产生大量小文件,同时会影响导入的性能。
6、大规模数据分批导入
需要导入的文件数较多、数据量很大时,建议分批进行导入,避免导入出错后重试代价太大,同时减少对系统资源的冲击。对
BrokerLoad每批次导入的数据量建议不超过100G。对于本地的大数据量文件,可以使用Doris提供的 ·streamloader`工具进行导入,该工具会自动进行分批导入。
7、Broker Load 导入并发数
压缩文件/Parquet/ORC文件:建议将文件分割成多个小文件进行导入,以实现多并发导入。非压缩的
CSV和JSON文件:Doris内部会自动切分文件并并发导入。
8、Stream load 并发导入
Streamload单BE上的并发数建议不超过128(由BE的webserver_num_workers参数控制)。过高的并发数可能导致webserver线程数不够用,影响导入性能。特别是当单个BE的并发数超过512(doris_max_remote_scanner_thread_pool_thread_num参数)时,可能会导致BE进程卡住。


