前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、导入概览

Apache Doris 提供了多种导入和集成数据的方法,您可以使用合适的导入方式从各种源将数据导入到数据库中。Apache Doris 提供的数据导入方式可以分为四类,

导入方式 使用场景 支持格式 导入模式 吞吐量 延迟 资源消耗 适用数据规模 并发建议 关键限制因素
Stream Load 导入本地文件或者应用程序写入 csvjsonparquetorc 同步 100MB/s ~ 1GB/s 秒级 高(占用 BE 写入线程) 单次 10MB~1GB 单节点建议≤10 并发 并行度、文件格式
Broker Load 从对象存储、HDFS 等导入 SQL 异步 TB 级 / 小时 分钟级 中(依赖 Broker 进程) 单任务≥10GB 集群级并发(依赖存储) HDFS / 对象存储读取速度、分区并行度
INSERT INTO VALUES 通过 JDBC 等接口逐条 / 小批量写入 SQL 同步 数千条 / 秒 毫秒级 高(逐行解析开销大) 单批次≤10 万行 建议配合批量参数 SQL 解析开销、事务提交频率
INSERT INTO SELECT 可以导入外部表或者对象存储、HDFS 中的文件 SQL 同步 取决于源表扫描速度 秒级~分钟级 中(依赖源表 IO) 视源表规模而定 单线程(可多任务并行) 外部表查询效率、SQL 执行计划
Routine Load kakfa 实时导入 csvjson 异步 百万条 / 秒 毫秒级~秒级 中(依赖 Kafka 消费能力) 持续流数据 按 Kafka 分区数并发 Kafka 分区数、数据解析效率
MySQL Load 从本地数据导入 CSV 同步 与 Stream Load 接近 秒级 高(需解析 SQL 中小规模数据 同 Stream Load  
Group Commit 高频小批量导入 取决于基础导入方式 - 提升 30%~50% 吞吐量 亚秒级 低(批量聚合提交) 单批次≤1 万行 需配置合理提交间隔  

1、实时写入

应用程序通过 HTTP 或者 JDBC 实时写入数据到 Doris 表中,适用于需要实时分析和查询的场景。

  • 极少量数据(5 分钟一次)时可以使用 JDBC INSERT写入数据。
  • 并发较高或者频次较高(大于 20 并发或者 1 分钟写入多次)时建议打开 [Group Commit],使用 JDBC INSERT 或者 Stream Load 写入数据。
  • 吞吐较高时推荐使用 [Stream Load]通过 HTTP 写入数据。

2、流式同步

通过实时数据流(如 FlinkKafka、事务数据库)将数据实时导入到 Doris 表中,适用于需要实时分析和查询的场景。

  • 可以使用 [Flink Doris Connector]将 Flink 的实时数据流写入到 Doris 表中。
  • 可以使用 [Routine Load] 将 Kafka 的实时数据流写入到 Doris 表中。
    • Routine Load 方式下,Doris 会调度任务将 Kafka 中的数据拉取并写入 Doris 中,目前支持 csvjson 格式的数据。
    • Kafka Connector 方式下,由 Kafka 将数据写入到 Doris 中,支持 avrojsoncsvprotobuf 格式的数据。
  • 可以使用 [Flink CDC]或 [Datax] 将事务数据库的 CDC 数据流写入到 Doris 中。

3、批量导入

将数据从外部存储系统(如对象存储、HDFS、本地文件、NAS)批量加载到 Doris 表中,适用于非实时数据导入的需求。

  • 可以使用 [Broker Load]将对象存储和 HDFS 中的文件写入到 Doris 中。
  • 可以使用 [INSERT INTO SELECT]将对象存储、HDFS 和 NAS 中的文件同步写入到 Doris 中,配合 [JOB]可以异步写入。
  • 可以使用 [Stream Load] 将本地文件写入 Doris 中。

4、外部数据源集成

通过与外部数据源(如 HiveJDBCIceberg 等)的集成,实现对外部数据的查询和部分数据导入到 Doris 表中。

  • 可以创建 [Catalog] 将外部数据源中的数据同步写入到 Doris 中,配合 [JOB] 可以异步写入。
  • 可以使用 [X2Doris]将其他 AP 系统的数据迁移到 Doris 中。

三、数据源

1、本地文件

  • Stream LoadStream Load 是通过 HTTP 协议将本地文件或数据流导入到 Doris 中。Stream Load 是一个同步导入方式,执行导入后返回导入结果,可以通过请求的返回判断导入是否成功。支持导入 CSVJSONParquetORC 格式的数据

  • streamloaderStreamloader 工具是一款用于将数据导入 Doris 数据库的专用客户端工具,底层基于 Stream Load 实现,可以提供多文件,多并发导入的功能,降低大数据量导入的耗时。

  • MySQL LoadDoris 兼容 MySQL 协议,可以使用 MySQL 标准的 [LOAD DATA]语法导入本地文件。MySQL Load 是一种同步导入方式,执行导入后即返回导入结果,主要适用于导入客户端本地 CSV 文件。

1)使用 Stream Load 导入

第 1 步:准备数据:创建 CSV 文件 streamload_example.csv,内容如下:

1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64

第 2 步:在库中创建表:在 Doris 中创建表,语法如下:

CREATE TABLE testdb.test_streamload(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:使用 Stream Load 导入数据,使用 curl 提交 Stream Load 导入作业:

curl --location-trusted -u <doris_user>:<doris_password> \
    -H "column_separator:," \
    -H "columns:user_id,name,age" \
    -T streamload_example.csv \
    -XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load

Stream Load 是一种同步导入方式,导入结果会直接返回给用户。

{
    "TxnId": 3,
    "Label": "123",
    "Comment": "",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 10,
    "NumberLoadedRows": 10,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 118,
    "LoadTimeMs": 173,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 70,
    "ReadDataTimeMs": 2,
    "WriteDataTimeMs": 48,
    "CommitAndPublishTimeMs": 52
}

第 4 步:检查导入数据

select count(*) from testdb.test_streamload;
+----------+
| count(*) |
+----------+
|       10 |
+----------+

2)使用 Streamloader 工具导入

第 1 步:准备数据,创建 csv 文件 streamloader_example.csv 文件。具体内容如下

1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64

第 2 步:在库中创建表,在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE testdb.test_streamloader(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:使用 stream loader 工具导入数据

doris-streamloader --source_file="streamloader_example.csv" --url="http://localhost:8330" --header="column_separator:," --db="testdb" --table="test_streamloader"

这是一种同步导入方式,导入结果会直接返回给用户:

Load Result: {
        "Status": "Success",
        "TotalRows": 10,
        "FailLoadRows": 0,
        "LoadedRows": 10,
        "FilteredRows": 0,
        "UnselectedRows": 0,
        "LoadBytes": 118,
        "LoadTimeMs": 623,
        "LoadFiles": [
                "streamloader_example.csv"
        ]
}

第 4 步:检查导入数据

select count(*) from testdb.test_streamloader;
+----------+
| count(*) |
+----------+
|       10 |
+----------+

3)使用 MySQL Load 从本地数据导入

第 1 步:准备数据,创建名为 client_local.csv 的文件,样例数据如下:

1,10
2,20
3,30
4,40
5,50
6,60

第 2 步:在库中创建表,在执行 LOAD DATA 命令前,需要先链接 mysql 客户端。

mysql --local-infile  -h <fe_ip> -P <fe_query_port> -u root -D testdb

执行 MySQL Load,在连接时需要使用指定参数选项:

  1. 在链接 mysql 客户端时,必须使用 --local-infile 选项,否则可能会报错。
  2. 通过 JDBC 链接,需要在 URL 中指定配置 allowLoadLocalInfile=true

Doris 中创建以下表:

CREATE TABLE testdb.t1 (
    pk     INT, 
    v1     INT SUM
) AGGREGATE KEY (pk) 
DISTRIBUTED BY hash (pk);

第 3 步:使用 Mysql Load 导入数据,链接 MySQL Client 后,创建导入作业,命令如下:

LOAD DATA LOCAL
INFILE 'client_local.csv'
INTO TABLE testdb.t1
COLUMNS TERMINATED BY ','
LINES TERMINATED BY '\n';

第 4 步:检查导入数据,MySQL Load 是一种同步的导入方式,导入后结果会在命令行中返回给用户。如果导入执行失败,会展示具体的报错信息。,如下是导入成功的结果显示,会返回导入的行数:

Query OK, 6 row affected (0.17 sec)
Records: 6  Deleted: 0  Skipped: 0  Warnings: 0

2、Kafka

在大多数情况下,可以直接选择 Routine Load 进行数据导入,无需集成外部组件即可消费 Kafka 数据。当需要加载 AvroProtobuf 格式的数据,或通过 Debezium 采集的上游数据库数据时,可以使用 Doris Kafka Connector

  • 使用 Routine Load 消费 Kafka 数据Doris 通过 Routine Load 持续消费 Kafka Topic 中的数据。提交 Routine Load 作业后,Doris 会实时生成导入任务,消费 Kafka 集群中指定 Topic 的消息。Routine Load 支持CSVJSON 格式,具备 Exactly-Once 语义,确保数据不丢失且不重复。。

  • Doris Kafka Connector 消费 Kafka 数据Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSONAvroProtobuf),并支持解析 Debezium 组件的数据格式(本质就是监听并消费而已)

1)使用Routine Load 消费 Kafka 数据

a、单表导入

第 1 步:准备数据,在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-csv --from-beginning
1,Emily,25

第 2 步:在库中创建表,在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE testdb.test_routineload_tbl(
    user_id            BIGINT       NOT NULL COMMENT "user id",
    name               VARCHAR(20)           COMMENT "name",
    age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至单表,在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA(
    "kafka_broker_list" = "192.168.88.62:9092",
    "kafka_topic" = "test-routine-load-csv",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

第 4 步:检查导入数据

select * from test_routineload_tbl;
+-----------+----------------+------+
| user_id   | name           | age  |
+-----------+----------------+------+
|  1        | Emily          | 25   |
+-----------+----------------+------+

b、多表导入

对于需要同时导入多张表的场景,Kafka中的数据必须包含表名信息,格式为:table_name|data。例如,导入 CSV 数据时,格式应为:table_name|val1,val2,val3。请注意,表名必须与 Doris 中的表名完全一致,否则导入将失败,并且不支持后面介绍的 column_mapping 配置。

第 1 步:准备数据,在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-multi-table-load --from-beginning
test_multi_table_load1|1,Emily,25
test_multi_table_load2|2,Benjamin,35

第 2 步:在库中创建表。在 Doris 中创建被导入的表,具体语法如下:

表 1:

CREATE TABLE test_multi_table_load1(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

表 2:

CREATE TABLE test_multi_table_load2(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至多表,在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

CREATE ROUTINE LOAD example_multi_table_load
COLUMNS TERMINATED BY ","
FROM KAFKA(
    "kafka_broker_list" = "192.168.88.62:9092",
    "kafka_topic" = "test-multi-table-load",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

第 4 步:检查导入数据

select * from test_multi_table_load1;
+------+----------------+------+
| id   | name           | age  |
+------+----------------+------+
|  1   | Emily          | 25   |
+------+----------------+------+

select * from test_multi_table_load2;
+------+----------------+------+
| id   | name           | age  |
+------+----------------+------+
|  2   | Benjamin       | 35   |
+------+----------------+------+

2)使用 Doris Kafka Connector 消费 Kafka 数据

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSONAvroProtobuf),并支持解析 Debezium 组件的数据格式。

a、消费普通数据

1、导入数据样本:在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64}

2、创建需要导入的表:在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE test_db.test_kafka_connector_tbl(
    user_id            BIGINT       NOT NULL COMMENT "user id",
    name               VARCHAR(20)           COMMENT "name",
    age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;

3、创建导入任务:在部署 Kafka Connect 的机器上,通过 curl 命令提交如下导入任务:

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-doris-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "tasks.max":"10",
    "topics":"test-data-topic",
    "doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"10.10.10.1",
    "doris.user":"root",
    "doris.password":"",
    "doris.http.port":"8030",
    "doris.query.port":"9030",
    "doris.database":"test_db",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
  }
}'

b、消费 Debezium 组件采集的数据

1、MySQL 数据库中有如下表:

CREATE TABLE test.test_user (
  user_id int NOT NULL ,
  name varchar(20),
  age int,
  PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);

2、在 Doris 创建被导入的表:

CREATE TABLE test_db.test_user(
    user_id            BIGINT       NOT NULL COMMENT "user id",
    name               VARCHAR(20)           COMMENT "name",
    age                INT                   COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;

3、部署 Debezium connector for MySQL 组件

4、创建 doris-kafka-connector 导入任务:假设通过 Debezium 采集到的 MySQL 表数据在 mysql_debezium.test.test_user Topic 中:

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-debezium-doris-sink",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "tasks.max":"10",
    "topics":"mysql_debezium.test.test_user",
    "doris.topic2table.map": "mysql_debezium.test.test_user:test_user",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"10.10.10.1",
    "doris.user":"root",
    "doris.password":"",
    "doris.http.port":"8030",
    "doris.query.port":"9030",
    "doris.database":"test_db",
    "converter.mode":"debezium_ingestion",
    "enable.delete":"true",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}

第 1 步:创建表

CREATE TABLE `students` (
  `id` INT NULL, 
  `name` VARCHAR(256) NULL,
  `age` INT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)      
COMMENT 'OLAP' 
DISTRIBUTED BY HASH(`id`) BUCKETS 1  
PROPERTIES (                                                         
"replication_allocation" = "tag.location.default: 1"
); 

第 2 步:使用 Flink 导入数据,运行 bin/sql-client.sh 打开 FlinkSQL 的控制台

CREATE TABLE student_sink (
    id INT,
    name STRING,
    age INT
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '10.16.10.6:28737',
      'table.identifier' = 'test.students',
      'username' = 'root',
      'password' = '',
      'sink.label-prefix' = 'doris_label'
);

INSERT INTO student_sink values(1,'zhangsan',123)

第 3 步:检查导入数据

select * from test.students;                                                                                                                        
+------+----------+------+      
| id   | name     | age  |    
+------+----------+------+                                                                                                                             
|  1   | zhangsan |  123 |   
+------+----------+------+    

三、导入方式

1、Stream Load

Stream Load 支持通过 HTTP 协议将本地文件或数据流导入到 Doris 中。

  • Stream Load 是一个同步导入方式,执行导入后返回导入结果,可以通过请求的返回判断导入是否成功。
  • 一般来说,可以使用 Stream Load 导入 10GB 以下的文件,如文件过大,建议切分后使用 Stream Load 进行导入。
  • Stream Load 可以保证一批导入任务的原子性,要么全部导入成功,要么全部导入失败。

1)基本原理

  1. ClientFE 提交 Stream Load 导入作业请求
    • 通过 HTTP 协议发起导入作业给 FE 节点
  2. FE 会轮询选择一台 BE 作为 Coordinator 节点,负责导入作业调度,然后返回给 Client 一个 HTTP 重定向
    • 解析请求并验证表结构
    • 从可用 BE 列表中轮询选择 BE3 作为 Coordinator
    • 返回重定向响应:
  3. Client 连接 Coordinator BE 节点,提交导入请求
  4. Coordinator BE 会分发数据给相应 BE 节点,导入完成后会返回导入结果给 Client
    • BE3 发送数据,BE3 作为 Coordinator 协调其他 BE 节点(如 BE1BE2)存储副本数据
    • BE、BE2、BE3 接收数据后,按分桶规则写入对应 Tablet,并将副本同步到其他节点。
  5. Client 也可以直接通过指定 BE 节点作为 Coordinator,直接分发导入作业

image-20250624163913275

2)使用场景

  • 工具支持:相比于直接使用 curl 的单并发导入,更推荐使用专用导入工具 Doris Streamloader。该工具是一款用于将数据导入 Doris 数据库的专用客户端工具,可以提供多并发导入的功能,降低大数据量导入的耗时

  • 支持能力:Stream Load 支持从本地或远程通过 HTTP 的方式导入 CSV、JSON、Parquet 与 ORC 格式的数据。在导入 CSV 文件时,需要明确区分空值(null)与空字符串:

    • 空值(null):使用 \N 表示。例如 a,\N,b 表示中间列的值为 null

    • 空字符串:当两个分隔符之间没有任何字符时表示空字符串。例如 a,,b 中,两个逗号之间没有字符,表示中间列的值为空字符串。

2、Broker Load

Broker Load 通过 MySQL API 发起,Doris 会根据 LOAD 语句中的信息,主动从数据源拉取数据。Broker Load 是一个异步导入方式,需要通过 SHOW LOAD 语句查看导入进度和导入结果。

1)基本原理

1、用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。

2、BE 在执行的过程中会从 Broker 拉取数据,在对数据转换之后将数据导入系统。

3、所有 BE 均完成导入,由 FE 最终决定导入是否成功。

brocker进程:从下图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。

当前 BE 内置了对 HDFSS3 两个 Broker 的支持,所以如果从 HDFSS3 中导入数据,则不需要额外启动 Broker 进程。如果有自己定制的 Broker 实现,则需要部署相应的 Broker 进程。

image-20250624165009063

2)支持场景

Broker Load 适合源数据存储在远程存储系统,比如对象存储或 HDFS,且数据量比较大的场景

3、Routine Load

Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kafka 集群中指定 Topic 的消息。

Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证数据不丢不重。

1)基本原理

Routine Load 会持续消费 Kafka Topic 中的数据,写入 Doris 中。

Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务:

  • 导入作业(load job):一个 Routine Load Job 是一个常驻的导入作业,会持续不断地消费数据源中的数据。
  • 导入任务(load task):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。

Routine Load 的导入具体流程如下图所示:

  1. ClientFE 提交创建 Routine Load 作业请求FE 通过 Routine Load Manager 生成一个常驻的导入作业(Routine Load Job)。
  2. FE 通过 Job SchedulerRoutine Load Job 拆分成若干个 Routine Load Task,由 Task Scheduler 进行调度,下发到 BE 节点。
  3. BE 上,一个 Routine Load Task 导入完成后向 FE 提交事务,并更新 Job 的元数据。
  4. 一个 Routine Load Task 提交后,会继续生成新的 Task,或对超时的 Task 进行重试。
  5. 新生成的 Routine Load TaskTask Scheduler 继续调度,不断循环。

image-20250624165631566

2)使用场景

Routine Load 支持从Kafka 集群中消费数据。

4、Insert Into Select

INSERT INTO 支持将 Doris 查询的结果导入到另一个表中。INSERT INTO 是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERT INTO 可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。

1)基本原理

1、在使用 INSERT INTO 时,需要通过 MySQL 协议发起导入作业给 FE 节点

2、FE 会生成执行计划,执行计划中前部是查询相关的算子,最后一个是 OlapTableSink 算子,用于将查询结果写到目标表中。

3、执行计划会被发送给 BE 节点执行,Doris 会选定一个节点作为 Coordinator 节点,Coordinator 节点负责接受数据并分发数据到其他节点上。

2)使用场景

1、用户希望将已经在 Doris 表中的数据进行 ETL 转换并导入到一个新的 Doris 表中,此时适合使用 INSERT INTO SELECT 语法。

2、与 Multi-Catalog 外部表机制进行结合,如通过 Multi-Catalog 映射 MySQL 或者 Hive 系统中的表,然后通过 INSERT INTO SELECT 语法将外部表中的数据导入到 Doris 表中存储。

3、通过 Table Value Function(TVF)功能,可以直接将对象存储或 HDFS 上的文件作为 Table 进行查询,并且支持自动的列类型推断。然后,通过 INSERT INTO SELECT 语法将外部表中的数据导入到 Doris 表中存储。

5、Insert Into Values

INSERT INTO 支持将 Doris 查询的结果导入到另一个表中。INSERT INTO 是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERT INTO 可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。

1)基本原理

在使用 INSERT INTO VALUES 时,需要通过 MySQL 协议发起导入作业给 FE 节点,FE 会生成执行计划,执行计划中前部是查询相关的算子,最后一个是 OlapTableSink 算子,用于将查询结果写到目标表中。执行计划会被发送给 BE 节点执行,Doris 会选定一个节点做为 Coordinator 节点,Coordinator 节点负责接受数据并分发数据到其他节点上。

2)使用场景

1、用户希望仅导入几条假数据,验证一下 Doris 系统的功能。此时适合使用 INSERT INTO VALUES 的语法,语法和 MySQL 一样。

2、并发的 INSERT INTO VALUES 的性能会受到 commit 阶段的瓶颈限制。导入数据量较大时,可以打开 [group commit]达到更高的性能。

  • 适用打开的场景

    • 高并发小批量写入场景:如果有大量的小批量数据需要并发写入 Doris,打开group commit可以将多个小批量导入在后台合并成一个大的事务提交,能显著提升写入性能。例如,在一些实时数据采集和写入的场景中,数据以较小的批次不断流入,开启group commit可以有效减少事务提交的次数,提高整体写入效率。

    • 对数据可见性要求不高的场景:在异步模式下,Doris 会根据负载和表的 group_commit_interval 属性异步提交数据,提交之后数据才可见。如果业务对数据的实时可见性要求不高,允许有一定的延迟,那么打开group commit的异步模式可以在不影响业务的情况下提升性能。

  • 不适用打开的场景

    • 对数据可见性要求严格的场景:如果在导入完成后要求数据立即可见,那么打开group commit可能不太合适,因为异步模式下数据有延迟才能可见,同步模式下导入耗时至少是表属性group_commit_interval设置的时间。例如一些实时查询和分析的场景,需要最新的数据及时可用,就可能不适合开启group commit
    • 单次导入数据量较大的场景:虽然 Doris 在单次导入数据量较大时会自动切换为sync_mode以防止 WAL 占用较大磁盘空间,但这种情况下group commit的优势可能不明显,甚至可能因为额外的合并操作带来一些性能开销。

6、MySQL Load

Doris 兼容 MySQL 协议,可以使用 MySQL 标准的 [LOAD DATA](语法导入本地文件。MySQL Load 是一种同步导入方式,执行导入后即返回导入结果。可以通过 LOAD DATA 语句的返回结果判断导入是否成功。一般来说,可以使用 MySQL Load 导入 10GB 以下的文件,如果文件过大,建议将文件进行切分后使用 MySQL Load 进行导入。MySQL Load 可以保证一批导入任务的原子性,要么全部导入成功,要么全部导入失败。

1)基本原理

MySQL LoadStream Load 功能相似,都是导入本地文件到 Doris 集群中。因此 MySQL Load 的实现复用了 Stream Load 的基本导入能力。

下图展示了 MySQL Load 的主要流程:

  1. 用户向 FE 提交 LOAD DATA 请求,FE 完成解析工作,并将请求封装成 Stream Load
  2. FE 会选择一个 BE 节点发送 Stream Load 请求;
  3. 发送请求的同时,FE 会异步且流式的从 MySQL 客户端读取本地文件数据,并实时的发送到 Stream LoadHTTP 请求中;
  4. MySQL 客户端数据传输完毕,FE 等待 Stream Load 完成,并展示导入成功或者失败的信息给客户端。

2)使用场景

MySQL Load 主要适用于导入客户端本地 CSV 文件,或通过程序导入数据流中的数据。

四、导入实战

1、导入高可用性

Doris 在数据导入过程中提供了多种机制来确保高可用性。本文将详细介绍 Doris 的默认导入行为以及为提高导入可用性而提供的额外选项,特别是最小写入副本数功能。

1)多数派写入

默认情况下,Doris 采用多数派写入策略来确保数据的可靠性和一致性:

  • 当成功写入的副本数超过总副本数的一半时,导入被视为成功。
  • 例如,对于三副本的表,至少需要两个副本写入成功才算导入成功。

工作原理:

  1. 数据分发:导入任务首先将数据分发到所有相关的 BE 节点。
  2. 并行写入:各个 BE 节点并行处理数据写入操作。
  3. 写入确认:每个 BE 节点在完成数据写入后,会向 FE 发送确认信息。
  4. 多数派判断:FE 统计成功写入的副本数,当达到多数派时,认为导入成功。
  5. 事务提交:FE 提交导入事务,使数据对外可见。
  6. 异步复制:对于未成功写入的副本,系统会在后台异步进行数据复制,以确保最终所有副本的数据一致性。

多数派写入策略是 Doris 在数据可靠性和系统可用性之间的一个平衡。对于有特殊需求的场景,Doris 提供了最小写入副本数等其他选项来进一步提高系统的灵活性。

2)最小写入副本数

多数派写入策略在保证数据可靠性的同时,也可能在某些场景下影响系统的可用性。例如,在两副本的情况下,必须两个副本都写入成功才能完成导入,这意味着在导入过程中不允许任何一个副本不可用。

为了解决上述问题并提高导入的可用性,Doris 提供了最小写入副本数(选项。

1)功能说明:最小写入副本数允许用户指定导入数据时需要成功写入的最少副本数。当成功写入的副本数大于或等于这个值时,导入即视为成功。

  • 使用场景

    • 在部分节点不可用时,仍需要保证数据能够成功导入。

    • 对数据导入速度有较高要求,愿意在一定程度上牺牲一致性来换取更高的可用性。

2)配置方法

1、单表配置

a. 创建表时设置:

CREATE TABLE example_table
(
id INT,
name STRING
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES
(
'replication_num' = '3',
'min_load_replica_num' = '2'
);

b. 修改现有表:

ALTER TABLE example_table
SET ( 'min_load_replica_num' = '2' );

2、全局配置

配置:通过 FE 配置项 min_load_replica_num 设置。

  • 有效值:大于 0
  • 默认值:-1(表示不开启全局最小写入副本数)

优先级:表属性 > 全局配置 > 默认多数派规则

3)其他

除了最小写入副本数选项,Doris 还采用了以下机制来提高导入的可用性:

  1. 导入重试:自动重试因临时故障导致的失败导入任务。
  2. 负载均衡:将导入任务分散到不同的 BE 节点,避免单点压力过大。
  3. 事务机制:确保数据的一致性,失败时自动回滚。

2、高并发导入优化

在高频小批量写入场景下,传统的导入方式存在以下问题:

  • 每个导入都会创建一个独立的事务,都需要经过 FE 解析 SQL 和生成执行计划,影响整体性能
  • 每个导入都会生成一个新的版本,导致版本数快速增长,增加了后台 compaction 的压力

为了解决这些问题,Doris 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式,而是对现有导入方式的优化扩展,通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。同时,Group CommitPreparedStatement 结合使用可以获得更高的性能提升。主要针对:

  • INSERT INTO tbl VALUES(...) 语句
  • Stream Load 导入

1)Group Commit 模式

Group Commit 写入有三种模式,分别是:

模式 核心机制 优点 缺点 典型适用场景
关闭模式(off_mode 不启用 Group Commit,每个导入单独提交事务,数据写入后立即提交。 实现简单,无事务合并开销。 并发写入时事务提交频繁,性能较低;资源利用率差。 小数据量测试、单条数据导入(极低频场景)。
同步模式(sync_mode 将多个导入合并到同一事务中,根据 group_commit_interval 触发提交,提交后数据可见。 数据强一致性,提交后立即可见;Group Commit 减少事务开销,提升并发写入性能。 写入延迟受事务合并周期影响(如设置过长可能增加延迟)。 实时分析、报表查询(要求数据立即可见);高并发写入且对一致性要求高的场景(如交易数据)。
异步模式(async_mode 先写入 WAL(预写日志)并立即返回,后台异步合并事务并提交,提交后数据可见;大数据量自动切换为 sync_mode 写入延迟极低(返回时间仅受 WAL 写入影响);适合高频小批量写入,提升吞吐量。 数据提交前存在丢失风险(如节点故障未提交的 WAL 可能丢失);WAL 占用磁盘空间需监控。 日志采集、埋点数据写入(允许短暂数据不可见);对写入延迟敏感的场景(如用户行为日志)。

2)自动提交条件

当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。这两个参数需要配合使用,建议根据实际场景进行调优。

a、修改提交间隔:

默认提交间隔为 10 秒,用户可以通过修改表的配置调整:

# 修改提交间隔为 2 
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

参数调整建议:建议根据业务对数据可见性延迟的容忍度来设置,如果系统压力大,可以适当增加间隔。

  • 较短的间隔 (如 2 秒):
    • 优点:数据可见性延迟更低,适合对实时性要求较高的场景
    • 缺点:提交次数增多,版本数增长更快,后台 compaction 压力更大
  • 较长的间隔 (如 30 秒):
    • 优点:提交批次更大,版本数增长更慢,系统开销更小
    • 缺点:数据可见性延迟更高

b、修改提交数据量

Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的配置调整:

# 修改提交数据量为 128MB
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");

参数调整建议:建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐,可以适当增加到 128MB 或更大。

  • 较小的阈值 (如 32MB):
    • 优点:内存占用更少,适合资源受限的环境
    • 缺点:提交批次较小,吞吐量可能受限
  • 较大的阈值 (如 256MB):
    • 优点:批量提交效率更高,系统吞吐量更大
    • 缺点:占用更多内存

3)使用限制

a、Group Commit 限制条件

  • INSERT INTO VALUES 语句在以下情况下会退化为非 Group Commit 方式:
    • 使用事务写入 (Begin; INSERT INTO VALUES; COMMIT)
    • 指定 Label (INSERT INTO dt WITH LABEL {label} VALUES)
    • VALUES 中包含表达式 (INSERT INTO dt VALUES (1 + 100))
    • 列更新写入
    • 表不支持轻量级模式更改
  • Stream Load 在以下情况下会退化为非 Group Commit 方式:
    • 使用两阶段提交
    • 指定 Label (-H "label:my_label")
    • 列更新写入
    • 表不支持轻量级模式更改

b、Unique 模型

  • Group Commit 不保证提交顺序,建议使用 Sequence 列来保证数据一致性。

c、WAL 限制

  • async_mode 写入会将数据写入 WAL,成功后删除,失败时通过 WAL 恢复。
  • WAL 文件是单副本存储的,如果对应磁盘损坏或文件误删可能导致数据丢失。
  • 下线 BE 节点时,使用 DECOMMISSION 命令以防数据丢失。
  • async_mode 在以下情况下切换为 sync_mode
    • 导入数据量过大(超过 WAL 单目录 80% 空间)
    • 不知道数据量的 chunked stream load
    • 磁盘可用空间不足
  • 重量级 Schema Change 时,拒绝 Group Commit 写入,客户端需重试。

4)Group Commit 性能

机器配置

  • 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 3 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 Doris-2.1.5

a、Stream Load

1)数据集

  • httplogs 数据集,总共 31GB2.47 亿条

2)测试工具:[doris-streamloader]

3)测试方法:对比 非 group_commitgroup_commit async_mode 模式下,设置不同的单并发数据量和并发数,导入 247249096 行数据

4)测试结果

  • 单并发数据量:指每个并发连接在单次数据导入操作中处理的数据量
导入方式 单并发数据量 并发数 耗时 (秒) 导入速率 (行/秒) 导入吞吐 (MB/秒)
group_commit 10 KB 10 3306 74,787 9.8
group_commit 10 KB 30 3264 75,750 10.0
group_commit 100 KB 10 424 582,447 76.7
group_commit 100 KB 30 366 675,543 89.0
group_commit 500 KB 10 187 1,318,661 173.7
group_commit 500 KB 30 183 1,351,087 178.0
group_commit 1 MB 10 178 1,385,148 182.5
group_commit 1 MB 30 178 1,385,148 182.5
group_commit 10 MB 10 177 1,396,887 184.0
           
非 group_commit 1 MB 10 2824 87,536 11.5
非 group_commit 10 MB 10 450 549,442 68.9
非 group_commit 10 MB 30 177 1,396,887 184.0

在上面的group_commit测试中,BECPU 使用率在 10-40% 之间。

可以看出,group_commit 模式在小数据量并发导入的场景下,能有效的提升导入性能,同时减少版本数,降低系统合并数据的压力。

优先使用 group_commit 模式

  • 适用于高频小批量导入(如 10 KB - 1 MB),可大幅减少事务开销,提升吞吐量。
  • 配置 group_commit_interval 参数,平衡实时性和性能(例如,设置为 50ms 可在 50ms 内合并多个导入)。

group_commit 的适用场景

  • 仅建议在超大批量导入(如单批次 10 MB 以上)且高并发(如 30 并发以上)时使用,但需注意:
    • 数据可见性可能因事务单独提交而延迟;
    • 需监控 WAL 日志大小,避免磁盘占用过高。

优化数据量和并发数

  • 若导入性能不足,可尝试增大单并发数据量(如从 10 KB 提升至 100 KB),或适当增加并发数(但需结合系统资源评估)。
  • 避免极端小数据量导入(如 <10 KB),因事务开销占比过高,性能会大幅下降。

b、JDBC

机器配置

  • 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 1 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 Doris-2.1.5
  • 关闭打印 parpared 语句的 audit log 以提高性能

数据集:tpch sf10 lineitem 表数据集,30 个文件,总共约 22 GB,1.8 亿行

测试工具:[DataX]

测试方法:通过 txtfilereadermysqlwriter 写入数据,配置不同并发数和单个 INSERT 的行数

单个 insert 的行数 并发数 导入速率 (行/秒) 导入吞吐 (MB/秒)
100 10 107,172 11.47
100 20 140,317 14.79
100 30 142,882 15.28
在上面的测试中,FE 的 CPU 使用率在 60-70% 左右,BE 的 CPU 使用率在 10-20% 左右。      

c、Insert into sync 模式小批量数据

  • 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
  • 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 Doris-2.1.5
  1. 设置测试前的 init 语句,set group_commit = async_mode以及set enable_nereids_planner=false
  2. 开启 jdbc 的 prepared statement,完整的 url 为jdbc:mysql://127.0.0.1:9030?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50&sessionVariables=group_commit=async_mode,enable_nereids_planner=false
  3. 设置导入类型为 prepared update statement
  4. 设置导入语句。
  5. 设置每次需要导入的值,注意,导入的值与导入值的类型要一一匹配。

测试方法:通过 JmeterDoris写数据。每个并发每次通过 insert into 写入 1 行数据。

测试结果

  • 数据单位为行每秒。
  • 以下测试分为 30,100,500 并发。

30 并发 sync 模式 5 个 BE3 副本性能测试

Group commit interval 10ms 20ms 50ms 100ms
  321.5 307.3 285.8 224.3

100 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
  1175.2 1108.7 1016.3 704.5

500 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
  3289.8 3686.7 3280.7 2609.2

d、Insert into sync 模式大批量数据

机器配置

  • 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
  • 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。注:测试中分别用了 1 台,3 台,5 台 BE 进行测试。
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 Doris-2.1.5

测试方法:通过 JmeterDoris写数据。每个并发每次通过 insert into 写入 1000 行数据。

测试结果

  • 数据单位为行每秒。
  • 以下测试分为 30,100,500 并发。

30 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
  92.2K 85.9K 84K 83.2K

100 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
  70.4K 70.5K 73.2K 69.4K

500 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
  46.3K 47.7K 47.4K 46.5K

五、导入实践

1、表模型选择

建议优先考虑使用明细模型, 明细模型在数据导入和查询性能方面相比其他模型都具有优势

2、分区分桶配置

建议一个 tablet 的大小在 1-10G 范围内。过小的 tablet 可能导致聚合效果不佳,增加元数据管理压力;过大的 tablet 不利于副本迁移、补齐。详

3、Random 分桶

在使用 Random 分桶时,可以通过设置 load_to_single_tablettrue 来启用单分片导入模式。这种模式在大规模数据导入过程中,能够提升数据导入的并发度和吞吐量,减少写放大问题。

4、攒批导入

客户端攒批‌:建议将数据在客户端进行攒批(数MB到数 GB 大小)后再进行导入,高频小导入会频繁做 compaction,导致严重的写放大问题。

服务端攒批:对于高并发小数据量导入,建议打开[Group Commit],在服务端实现攒批导入。

5、分区导入

每次导入建议只导入少量分区的数据。过多的分区同时导入会增加内存占用,并可能导致性能问题。 Doris 每个 tablet 在内存中有一个活跃的 Memtable ,每个 Memtable 达到一定大小时才会下刷到磁盘。为了避免进程 OOM ,当活跃的 Memtable 占用内存过高时,会提前触发 Memtable 下刷,导致产生大量小文件,同时会影响导入的性能。

6、大规模数据分批导入

需要导入的文件数较多、数据量很大时,建议分批进行导入,避免导入出错后重试代价太大,同时减少对系统资源的冲击。对 Broker Load 每批次导入的数据量建议不超过 100G。对于本地的大数据量文件,可以使用 Doris 提供的 ·streamloader`工具进行导入,该工具会自动进行分批导入。

7、Broker Load 导入并发数

压缩文件/Parquet/ORC文件‌:建议将文件分割成多个小文件进行导入,以实现多并发导入。非压缩的 CSVJSON 文件‌:Doris 内部会自动切分文件并并发导入。

8、Stream load 并发导入

Stream loadBE 上的并发数建议不超过 128 (由 BEwebserver_num_workers 参数控制)。过高的并发数可能导致webserver 线程数不够用,影响导入性能。特别是当单个 BE 的并发数超过 512doris_max_remote_scanner_thread_pool_thread_num 参数)时,可能会导致BE进程卡住。

ContactAuthor