大数据_Spark理论篇
前言
Github:https://github.com/HealerJean
一、概念理解
| 组件 | 核心作用 | 关键词 |
|---|---|---|
Hadoop |
提供分布式存储(HDFS) + 资源调度(YARN) |
存储 + 调度(地基) |
Hive |
基于 Hadoop 的SQL 数据仓库工具,用于离线分析 |
SQL 查询 + 离线数仓 |
Spark |
通用内存计算引擎,支持批处理、流处理、机器学习等 | 快 + 通用 + 内存计算 |
Flink |
流批一体的实时计算引擎,强调低延迟与状态一致性 | 实时 + 流批一体 + 高吞吐 |
1、Hadoop:存储 + 调度(地基)
1)HDFS(Hadoop Distributed File System)
HDFS就像超市的货架系统——东西太多,一个人搬不动,那就分开放,每样东西都有备份,丢了也不怕。”
- 就是超大仓库,能把
TB、PB级的数据分散存在成百上千台机器上。 - 比如你有
10TB的日志文件,HDFS自动切成小块,存到不同服务器,不怕一台坏了就丢数据。
2)YARN(Yet Another Resource Negotiator)
YARN就像餐厅经理,谁想用厨房都得排队申请,不能乱抢灶台。”
- 是资源调度员,管
CPU、内存、磁盘这些“灶台资源”。 Spark要跑任务?Flink要启动?Hive要查表?都得跟YARN打招呼:“给我两个灶台、8G 内存!”YARN说:“行,给你用10分钟,别超时啊。”
2、Hive:会 SQL 的老会计(离线数仓工具)
Hive就像一个只会写Excel公式的老师傅,他让你用熟悉的SQL写需求,但他自己不动手,而是叫一群临时工(MapReduce)去翻仓库找数据,所以慢
- 优点:简单、稳定、适合 T+1 报表(比如每天早上出昨天的销售汇总)
- 缺点:延迟高,不适合“现在有多少人在线?”这种问题
Hive不是数据库,也不是存储,它只是一个“翻译官”。- 你写一句
SELECT COUNT(*) FROM orders WHERE dt='2025-12-01'; Hive把这句话翻译成MapReduce/Tez/Spark任务,扔给Hadoop去执行。- 结果可能要等几分钟甚至几小时(因为数据量太大)。
1)Hive 不是表,但是经常被叫做 Hive表
Hive表 = 元数据(schema+location) +HDFS上的文件
**
Hive表”确实不是传统数据库意义上的“表”,但它在逻辑上就是一个“表”——有结构、有名字、能被SQL查询。所以大家习惯说“Hive表 **
-
当你在
Hive里执行:CREATE TABLE orders ( order_id STRING, user_id STRING, amount DOUBLE ) PARTITIONED BY (dt STRING) STORED AS PARQUET LOCATION '/data/warehouse/orders'; # `Hive` 并没有在某台机器上创建一个叫 `orders` 的“表文件”。 -
它只是在
HiveMetastore(通常是MySQL) 里记录了一条元数据:- 表名:
orders - 字段:
order_id, user_id, amount - 分区字段:
dt - 数据格式:
Parquet - 数据位置:
hdfs://namenode/data/warehouse/orders
- 表名:
-
真正的数据文件(比如
/data/warehouse/orders/dt=2025-12-01/000001.parquet)是存在HDFS上的普通文件。
2)MapReduce 为什么慢?——“做完一步就存仓库
MapReduce的设计哲学是“容错优先”:万一机器挂了,可以从磁盘恢复。但代价是性能低。
计算步奏:
Map阶段输出中间结果(比如key=商品ID, value=销量)- 必须写入
HDFS(磁盘) Reduce阶段再从磁盘读这些数据- 如果有多个阶段(比如
JOIN+GROUPBY+ORDER BY),就要多次写磁盘
情景在线:假设你要做一道菜:宫保鸡丁,但厨房规定:
“每完成一个步骤,必须把半成品打包放进冷库(硬盘),下次用的时候再从冷库拿出来。”
流程:
- 切鸡丁 → 打包 → 放冷库 ❄️
- 炸花生米 → 打包 → 放冷库 ❄️
- 炒酱汁 → 打包 → 放冷库 ❄️
- 最后把三样从冷库拿出来,一起炒
👉 问题:来回开冷库门、搬东西,太耗时了!
3、Spark:闪电侠厨师(通用计算引擎)
Spark就像一个记忆力超强的厨师,切完菜不用放回冰箱,直接拿起来炒,省了来回跑的时间,所以出菜飞快!
Spark最大的特点是:把中间数据放在内存里,不像MapReduce那样每一步都写磁盘。- 所以它比
Hive(基于MapReduce)快 10~100 倍。 Spark不光能做SQL查询(Spark SQL),还能做:- 机器学习(
MLlib) - 图计算(
GraphX) - 流处理(
Structured Streaming)
- 机器学习(
- 和
Hive的关系: -
Spark可以直接读Hive的表!只要配置好,spark.sql("SELECT * FROM hive_table")直接跑。 - 很多公司现在用
Spark SQL替代Hive做离线分析,又快又兼容。
1)Spark 和 Hive 关系
读
Hive表”其实是:读Hive定义的元数据 + 读 HDFS 上的实际数据文件。
Hive表不是“真实”的表(像 MySQL 那样),但它是一个“带说明书的文件夹”。Spark能看懂这个说明书,然后自己去文件夹里拿数据,做得更快更好
spark.sql("SELECT * FROM orders")
Spark 会:
- 去
HiveMetastore里查orders表的元数据; - 知道这个表的数据在
HDFS的哪个路径; - 知道字段名、类型、分区、文件格式(
Parquet/ORC等); - 直接用
Spark引擎高效读取这些文件(比Hive自己用MapReduce快得多)。
2)Spark SQL 替代 Hive对吗
替代”的是底层执行引擎和运行时性能,不是
SQL本身,也不是表结构
- 功能上兼容:
SparkSQL支持HiveQL的大部分语法,也能读写Hive表。 - 性能上碾压:
Spark基于内存计算,比Hive默认的MapReduce快几十倍。 - 生态上融合:
Spark可以复用Hive已有的表结构、UDF、分区策略,迁移成本低。
| 维度 | Hive(传统方式) | Spark SQL(替代方案) |
|---|---|---|
| SQL 语法 | HiveQL(类 SQL) |
Spark SQL(兼容 HiveQL + 标准 SQL) |
| 执行引擎 | MapReduce(默认)或 Tez |
Spark(内存计算引擎) |
| 元数据管理 | Hive Metastore(MySQL/PostgreSQL) |
复用同一个 Hive Metastore |
| 表结构/分区/存储格式 | ORC/Parquet/TextFile 等 |
完全兼容,直接读写 |
| 查询速度 | 慢(分钟级) | 快(秒级) |
| 资源调度 | YARN(通过 MapReduce) |
YARN(通过 Spark on YARN) |
假设公司有一张 Hive 表:
CREATE TABLE user_behavior (
user_id STRING,
event STRING,
ts BIGINT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION '/data/user_behavior';
a、老方式(Hive + MapReduce)
# 在 Hive CLI 里执行
SELECT dt, COUNT(*) FROM user_behavior WHERE event='click' GROUP BY dt;
- Hive 把 SQL 翻译成 MapReduce 任务
- 提交到 YARN
- 跑了 8 分钟,中间多次读写 HDFS
b、新方式(Spark SQL)
# 在 PySpark 里执行
spark.sql("""
SELECT dt, COUNT(*)
FROM user_behavior
WHERE event='click'
GROUP BY dt
""").show()
Spark连接 同一个HiveMetastore- 读取 同一张表(路径、格式、分区都一样)
- 但用
Spark引擎执行 - 跑了 20 秒
c、为什么不直接叫“用 Spark 执行 Hive 查询
其实很多人也这么说!但在工程实践中,“替代”这个词更强调 技术栈的演进和责任转移:
- 以前:
Hive是主力,所有离线分析都走Hive CLI/Beeline - 现在:
Spark是主力,Hive只剩下Metastore(元数据服务)这个“遗产”
d、有了 Spark 就不用 Hive 了吗?
答案:Hive 的元数据管理(表结构、分区)仍是行业标准,Spark 经常“借”Hive 的元数据用
3)Spark 比 Hive(MapReduce)快 10~100 倍
Spark比Hive(MapReduce)快 10~100 倍。Spark的设计哲学是“性能优先 + 容错兼顾”:通过lineage(血缘)机制,即使某步失败,也能从源头重算,不一定非要存中间结果
| 步骤 | MapReduce | Spark |
|---|---|---|
| 1. 读数据 | 从 HDFS 读 | 从 HDFS 读 |
| 2. 过滤(filter) | Map 输出 → 写磁盘 | 内存中过滤(不写盘) |
| 3. 分组聚合(groupByKey) | Reduce 读上一步磁盘数据 → 计算 → 再写磁盘 | 内存中直接聚合 |
| 4. 排序(sortBy) | 再启动一轮 MapReduce → 又读又写磁盘 | 内存中排序(或溢写部分到磁盘) |
| 总磁盘 I/O | 3次以上读写 | 0~1次(仅输入/输出) |
计算步奏:
Spark把RDD(弹性分布式数据集)或DataFrame的中间结果缓存在内存- 多个操作(如 filter → map → groupBy → join)形成一个
DAG(有向无环图) Spark优化执行计划,尽量在内存中流水线式处理- 只有在内存不够
or需要持久化时,才写磁盘
情景在线:Spark 厨师说:“我记忆力好,所有半成品都放操作台上(内存),不用进冷库!”
流程:
- 切鸡丁 → 放操作台
- 炸花生米 → 放操作台
- 炒酱汁 → 放操作台
- 直接混合翻炒 → 出锅!
👉 全程不进冷库,速度飞起!
4、Flink:24小时盯单的实时高手(流批一体引擎)
Flink就像一个装了雷达的智能锅,水一开就自动调火,订单一来就立刻处理,根本不用等‘攒够一批再做’
Flink 的核心理念:一切皆流(Everything is a stream)。
-
它能做到 毫秒级延迟 + 精确一次(exactly-once)语义,金融、电商风控特别需要这个。
- 批处理 = 有界的流(比如一天的数据)
- 流处理 = 无界的流(比如每秒进来的订单)
典型场景:
- 实时大屏:当前在线用户数、每秒交易额
- 风控:检测异常登录(同一账号1分钟内从北京跳到纽约?)
- 实时推荐:你刚搜“手机”,首页马上推手机壳
1)Flink 和 Hive/Hadoop 的关系:
Flink可以从Kafka读实时数据,处理完写入HDFS或Hive表(供后续离线分析用)。Flink也能读Hive的元数据(通过HiveCatalog),知道“orders表长啥样”。Flink Hive Catalog:Flink的一个插件,能让Flink连接 Hive Metastore,直接读这些“说明书”

2)Flink 订阅 Kafka 和普通应用订阅
不是所有 Kafka 消费都需要 Flink,但一旦涉及“状态 + 精确计算 + 高可靠”,Flink 几乎是唯一选择。
| 对比项 | 普通应用 | Flink |
|---|---|---|
| 定位 | 消息消费者 | 流处理引擎 |
| 关注点 | “拿到消息就行” | “如何正确、高效、可靠地处理无限数据流” |
| 适合场景 | 简单通知、异步解耦 | 实时数仓、风控、监控、CEP(复杂事件处理) |
5、实时流处理推荐 Flink 而不是 Spark
Flink 是“真·流”,
Spark是“微批模拟流” —— 这是根本区别。
- 你的业务要多“实时”?
- 你的计算有多“有状态”?
1)差异1:执行模型不同
| 对比项 | Flink |
Spark Structured Streaming |
|---|---|---|
| 处理方式 | 数据一到达就立即处理(逐条或小窗口连续处理) | 把流数据按时间切片(如每 1 秒),当作一个个小“批”来处理 |
| 延迟 | 毫秒级(通常 10~100ms) | 至少一个批次间隔(默认 100ms 起,通常设为 1~10 秒) |
| 类比 | 快递员一有包裹就立刻送到你手上,你马上拆开处理。 | 快递员每 5 分钟来一次,把这 5 分钟内所有包裹一起给你,你再统一拆开处理。 |
2)差异2:状态机制不同
状态”就是程序记住过去数据的能力
| 项目 | Flink |
Spark |
|---|---|---|
| 记笔记能力 | 有专业笔记本 + 云同步 + 自动备份 | 自己手写 + 容易丢 + 写多了手酸(OOM) |
| 能记多少? | 几 TB 都行(比如全公司用户30天行为) |
记太多就崩溃,适合简单任务 |
| 机器挂了怎么办? | 自动从备份恢复,无缝继续 | 可能丢数据、重启慢、甚至起不来 |
| 适合干啥? | 复杂实时计算(如风控、用户画像、实时推荐) | 简单过滤、转换、小窗口统计 |
| 大状态场景 | 支持(如用户 30 天行为窗口) | 风险高(状态大时 GC 频繁、任务失败) |
| 状态 vs 数据 | 状态是核心抽象,只存必要的聚合/上下文 | 状态是执行副产品,常被迫缓存原始数据或中间结果 |
a、Flink:自带超级笔记本(而且能扩容)
1)Flink 自己就有一套专门记笔记的系统,叫“状态后端”。
- 小本本可以存在内存里(快),也可以存在硬盘上(比如
RocksDB,稳,还能存超大笔记)。 - 即使你的“笔记”有 几
TB大(比如记录百万用户30天行为),Flink也能扛住。
2)而且它还会自动备份笔记(通过 Checkpoint),就算机器宕机,重启后也能从备份恢复,不丢数据、不错乱。
b、Spark:靠自己手写 + 容易忘事 / 笔记本炸了
Spark做流处理时,状态是存在每个计算节点(Executor)的内存或本地磁盘里的,但是状态不是为“长期存储”设计的。- 就像每个工人自己拿个小本本记事,没有统一管理。
- 如果状态太大(比如要记几百万用户的行为),小本本就写满了 → 内存爆了(
OOM),溢出到磁盘是为了避免OOM崩溃,但速度会暴跌
- 虽然
Spark也用“预写日志(WAL)”做备份,但效率低、扩展差。- 一旦任务失败,恢复慢,还可能因为
GC(垃圾回收)频繁卡死。
- 一旦任务失败,恢复慢,还可能因为
3)差异3:容错机制不同
不丢数据、不重复创立、挂了能自动恢复
| 场景 | Flink | Spark |
|---|---|---|
| 第 8 秒挂了 | 从第 0 秒 Checkpoint 恢复,重放 0~8 秒数据,状态一致 |
从 WAL 重放最近一个微批(比如 5~10 秒),但状态可能丢失或不一致 |
| 数据丢失风险 | 几乎没有(只要 source 可重放) |
如果 Executor 永久丢失,状态可能无法恢复 |
| 重复处理 | 有,但结果正确(exactly-once) | 有,且可能因状态丢失导致结果错误 |
a、Flink 的容错机制:分布式快照(Checkpoint)
-
核心思想:定期给整个系统“拍快照”
-
就像打游戏时自动存档:角色血量、背包、地图进度全保存。挂了就从存档点重来,不丢进度。
-
Flink会周期性地(比如每10秒)触发一次Checkpoint。 -
这个快照包含:
- 所有算子的当前状态(比如窗口聚合值、用户会话)
-
数据源的读取位置(比如
Kafka的offset) -
举例: 比如你正在统计“每分钟订单数”,
Checkpoint会记录:-
当前窗口累计了多少单
-
Kafka读到了哪条消息
-
-
-
恢复过程:
- 如果某个节点挂了,
Flink会:- 停止所有任务
- 从最近一次成功的
Checkpoint恢复整个作业 - 重置数据源到对应位置(比如
Kafka回滚到那个offset) - 重新开始计算 → 结果完全一致,精确一次(
exactly-once)
- 如果某个节点挂了,
-
优点:
-
全局一致:所有算子的状态是“同一时刻”的快照
-
高效:支持增量
Checkpoint、RocksDB异步快照 -
真正
exactly-once:配合Kafka等支持事务的 source/sink -
状态再大也能恢复(因为状态后端可扩展)
-
b、Spark 流 的容错机制:WAL + 幂等写
-
Spark流处理的容错依赖两个东西:-
WAL(Write-Ahead Log,预写日志)-
数据从
Kafka/Pulsar等读进来后,先写一份日志到磁盘(通常是HDFS或本地磁盘) -
即使
Executor挂了,Driver可以从WAL里重放这批数据
-
- 幂等写(
Idempotent Sink) -
Spark本身不保证exactly-once,除非你用的输出系统支持“幂等”(比如写数据库时用主键去重) - 否则可能重复写(
at-least-once)
-
-
恢复过程的问题:
- 如果任务失败,
Spark会:- 从
WAL里读出未处理完的微批次数据 - 重新跑一遍这个批次
- 从
- 但状态恢复很弱:
- 状态存在
Executor内存/本地磁盘 - 如果
Executor永久丢失(比如机器宕机),状态就没了! - 虽然可以重建状态,但大状态重建慢、易失败
- 状态存在
- 如果任务失败,
4)使用场景对比
| 对比项 | Flink |
Spark Structured Streaming |
|---|---|---|
| 场景 | 实时风控、高频交易等对延迟敏感的场景 | 准实时报表、日志聚合等场景完全够用 |
| 实时反欺诈 | 毫秒响应 + CEP 模式匹配 |
微批延迟高,CEP 需手写 |
| 5s大屏监控 | 能做,但“杀鸡用牛刀 | 完全够用,开发更快 |
| 7天用户行为 | 状态高效管理 | 可能因状态过大失败 |
| 日志清洗入湖 | 可以,但生态不如 Spark |
与 Delta Lake/Hive 无缝集成 |
一、Spark 核心概念入门
1、Spark 是干啥的?—— 定位与价值
Apache Spark是一个开源的、基于内存的、通用的大数据(大规模数据)
它不是数据库,也不是存储系统,而是一个计算引擎——专门用来**快速处理海量数据
- 内存计算:比
Hadoop快100倍 - 统一
API:支持批处理、流处理、ML、图计算 - 多语言支持:
Scala、Java、Python、R DAG执行引擎:优化复杂任务执行流程
| 功能 | 说明 | 实际场景举例 |
|---|---|---|
| 批处理(Batch) | 处理历史全量数据 | 每天凌晨统计昨日用户活跃数 |
| 流处理(Streaming) | 处理实时数据流 | 实时监控交易异常、大屏展示在线人数 |
| 交互式查询 | 快速响应 SQL 查询 | 数据分析师用 SQL 探查数据 |
| 机器学习(ML) | 分布式训练模型 | 用户画像、推荐系统、风控模型 |
| 图计算 | 分析关系网络 | 社交关系挖掘、反欺诈关联分析 |
2、Spark 的核心思想
| 关键词 | 说明 |
|---|---|
| 内存计算 | 中间数据尽量放内存,避免磁盘 I/O |
DAG 调度 |
多步骤任务编排成有向无环图,优化执行顺序 |
懒执行(Lazy Evaluation) |
只有遇到 count(), show(), write() 等 action 才真正计算 |
容错靠血缘(Lineage) |
不存中间结果,失败时从源头重算 |
| 统一引擎 | 批、流、ML、图计算一套 API 搞定 |
3、Spark 协作架构:两层执行角色 + 一层外部资源协调者
Spark作业的执行引擎由Driver和Executor构成,但依赖Cluster Manager(如 YARN)来分配资源和启动进程。因此,在集群环境中,我们通常说Spark涉及三个关键角色:两层执行角色 + 一层外部资源协调者
- 一个
Spark应用 = 1 个Driver+N个Executors-
Executors会长期驻留(不像MapReduce每次新建),减少启动开销 -
数据缓存在
Executor内存中,供后续Task复用
-
- 每运行一个
spark-submit命令(或在代码中创建一个SparkContext/SparkSession),就会启动一个独立的Spark应用。- 是
Spark调度和资源管理的最小单位。 - 在
YARN上,它对应一个YARNApplication(可在yarn application -list中看到)。 - 拥有唯一的
Application ID(如application_1712345678901_0012)。
- 是
| 角色 | 职责 | 是否属于 Spark 应用 |
|---|---|---|
Driver |
控制中心:解析、调度、收集结果 | 是 |
Executor |
执行单元:跑 Task、缓存数据 |
是 |
1)工作流程图
| 角色 | 职责 | 类比 |
|---|---|---|
Driver |
解析逻辑、构建 DAG、调度 Task、收集结果 |
导演的对讲机:用来跟现场沟通 |
Worker Node |
集群中的工作机器 | 拍摄现场 |
Executor |
Worker 上运行的进程 执行 Task • 缓存 RDD/DataFrame 数据 |
演员 + 场务:既表演(计算),又保管道具(缓存数据) |
Task |
最小执行单元,每个分区对应一个 Task | 一个镜头的拍摄任务 |
Cluster Manager |
管理集群资源(CPU、内存) 常见:YARN(Hadoo 生态)、K8s、Spark Standalone |
制片主任:分配摄影棚、灯光、群演 |
+-------------------+ +---------------------+ +------------------+
| Client | | Cluster Manager | | Worker Nodes |
| (spark-submit) |<-------->| (YARN / Mesos / K8s)|<-------->| |
+-------------------+ +---------------------+ +------------------+
| | |
| | |
v v v
+-------------------+ +---------------------+ +------------------+
| Driver | | | | Executor |
| (Runs on client) | | | | (runs tasks) |
+-------------------+ +---------------------+ +------------------+
|
| Communicates directly with Executors
v
+----------------------------------------------------+
| Executors |
| |
| +------------------+ +------------------+ |
| | Executor | | Executor | ... |
| | - Runs Tasks | | | | - Runs Tasks | |
| | - Caches Data | | | | - Caches Data | |
| +------------------+ +------------------+ |
+----------------------------------------------------+
[Your Laptop] ← Client(不在集群)
│
│ spark-submit
↓
[YARN ResourceManager] ← Cluster Manager(集群 Master 节点)
│
├─ Launches → [Driver in YARN Container] ← 在某个 Worker Node 上
│
└─ Launches → [Executor in YARN Container] × N
↑
[Worker Node 1, 2, ..., N] ← 集群计算节点
2)执行流程
-
提交作业:
Client:- 用户通过
spark-submit命令提交应用程序。命令指定了应用主类、应用参数以及部署模式(client或cluster)等信息。
- 用户通过
Cluster Manager分配资源:Cluster Manager:负责集群资源的管理和分配。它可以是Apache Mesos、Hadoop YARN或者Kubernetes等。在接收到任务请求后,Cluster Manager根据资源情况为应用程序分配合适的资源。
- 启动
Driver和Executors
- 在
client模式下:Driver直接在客户端机器上启动,并连接到Cluster Manager来请求资源。Executors:由Cluster Manager在Worker Nodes上启动,并且会主动连接到Driver。
- 在
cluster模式下:Cluster Manager首先在集群中选择一个节点来启动Driver。- 其他
Worker Nodes上会启动Executors,这些Executors也会与Driver进行通信。
-
执行任务
-
**
Driver**负责将用户程序转换成多个Task,并调度这些Task到各个Executor上执行。 -
Executor接收来自Driver的Task,在其所属的Worker Node上执行,并将执行结果返回给Driver。
-
-
数据处理和通信
-
Executor不仅执行计算任务,还可能需要从外部存储系统读取数据或将中间结果写回存储系统。 -
Driver和Executor之间进行直接的通信以传输任务指令和结果数据。
-
3)job、Stage、Task 关系
- 一个
Application(应用)包含多个Job; - 一个
Job包含多个Stage; - 一个
Stage包含多个Task; - 每个
Task在一个Executor上运行。 Executor是物理资源容器,负责执行 Task。
Application
│
├── Job 0 (triggered by count())
│ ├── Stage 0 (no shuffle)
│ │ ├── Task 0 → runs on Executor 1
│ │ ├── Task 1 → runs on Executor 2
│ │ └── ...
│ └── Stage 1 (after shuffle)
│ ├── Task 0 → runs on Executor 3
│ ├── Task 1 → runs on Executor 1
│ └── ...
│
├── Job 1 (triggered by save())
│ ├── Stage 0
│ └── Stage 1
│
└── ... (more Jobs)
a、Application
- 启动一次
spark-submit就是一个 Application; - 整个生命周期内可以提交多个 Job;
- 有唯一的
Driver进程(协调者)和若干Executor进程(执行者)。
b、Job(作业)
-
触发条件:遇到一个 Action 操作(如
count(),collect(),saveAsTable()); -
每个
Action会触发一个 Job; -
Job是 逻辑上的工作单元。df1 = spark.sql("SELECT * FROM t1 WHERE id > 100") df1.count() # ← 触发 Job 0 df2 = df1.groupBy("name").sum() df2.write.save(...) # ← 触发 Job 1
c、Stage(阶段)
关键规则:
Shuffle是Stage的分界线!
Spark根据Shuffle边界(宽依赖)将Job切分成多个Stage;- 没有
Shuffle的操作(如map,filter)属于同一个Stage(窄依赖); - 有
Shuffle的操作(如join,groupBy)会切断Stage,形成新Stage。
- 没有
SELECT dept, COUNT(*)
FROM employees
JOIN departments ON employees.dept_id = departments.id
GROUP BY dept;
执行计划会被切成:
Stage 0:读取employees表 +departments表(可能广播小表);Stage 1:Shuffle + Join + GroupBy(需要重分区);
d、 Task(任务)
Stage被拆成多个Task,每个Task处理一个 数据分区(Partition);Task数量 = 该Stage的 分区数;-
所有
Task并行执行。 -
示例:
-
如果
Stage有1000个分区 → 就有1000个Task; -
每个
Task处理 ~256MB数据(由 AQE 或spark.sql.shuffle.partitions控制)。
-
4、Spark 五大核心模块
| 模块 | 全称 | 主要用途 |
|---|---|---|
Spark Core |
核心计算引擎 | 通用数据处理 |
Spark SQL |
结构化数据处理 | ETL、报表 |
Structured Streaming |
流处理引擎 | 分析 |
MLlib |
机器学习库 | 预测、分类 |
GraphX |
图计算框架 | 社交网络分析 |
1)相互关系
所有高级模块都依赖
Spark Core,但彼此独立,可单独使用。
+------------------+
| Spark Core | ← RDD, Task Scheduler, Shuffle
+--------+---------+
|
+--------------------+--------------------+
| | |
+-------v------+ +--------v--------+ +-------v--------+
| Spark SQL | | Structured | | MLlib |
| (DataFrame) | | Streaming | | (ML Pipelines) |
+--------------+ +-----------------+ +----------------+
|
+-------v------+
| GraphX | ← (Scala only)
+--------------+
2)如何选择
| 你要做什么? | 用哪个模块? |
|---|---|
写 SQL / 做 ETL / 查报表 |
Spark SQL |
| 处理 Kafka / 实时数据 | Structured Streaming |
| 训练机器学习模型 | MLlib |
| 分析社交网络 | GraphX(Scala)或 GraphFrames(Python) |
| 需要极致控制(如自定义分区) | Spark Core (RDD)(最后选择 |
三、Spark 参数配置
1、配置参考
| 配置项 | 动态版本 | 静态版本 |
|---|---|---|
| 资源获取 | 先申请 50 个,不够再加。排队快,但爬坡需要时间。 | 一次性申请 200 个,排队可能久,但一旦开始就全速跑。 |
| 稳定性 | 受集群整体负载影响(可能想扩扩不出来) | 极高。资源独占,不受其他任务干扰。 |
| 核心参数 | dynamicAllocation |
--num-executors 200 |
| 适用场景 | 流量波动大、共享集群 | 流量稳定、专属队列、定时任务 |
| 推荐度 | ⭐⭐⭐ (适合灵活场景) | ⭐⭐⭐⭐⭐ (对于核心大任务) |
1)动态版本
#!/bin/bash
# ==============================
# Spark 作业资源配置(根据集群规模和数据量调整)
# 适用场景:生产环境大规模数据 ETL / 报表 / Join 计算
# 核心优化:动态资源 + AQE 自适应 + 小文件合并 + 数据倾斜处理
# ==============================
# ==============================
# 提交 Spark 作业到 YARN 集群
# ==============================
spark-submit --master yarn \
--deploy-mode cluster \ # Driver 在集群中运行(非本地),适合生产
--name "demo_task" \ # 作业名称(YARN 页面可识别,便于监控排查)
# Driver 资源:Spark 驱动器进程,负责任务调度、生成执行计划、收集结果、管理广播变量
# 配置原则:避免内存不足(OOM),不要过度分配造成资源浪费
--driver-cores 4 \ # Driver 使用的 CPU 核心数(生产通用值:2~4核,调度密集型可上调)
--driver-memory 16g \ # Driver 堆内存;需容纳广播表、collect() 结果集,避免 OOM
# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \ # 单个Executor的CPU核数,推荐2-5核平衡并行度与调度效率
--executor-memory 24g \ # 单个Executor内存,YARN会自动追加内存overhead
# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \ # 单个 Executor 占用 CPU 核数;推荐 2~5 核,提升任务并行效率
--executor-memory 24g \ # 单个 Executor 堆内存;YARN 会自动追加 spark.executor.memoryOverhead 内存
# ==============================
# 1、动态资源分配:按需伸缩 Executor 数量,避免资源闲置/排队
# 前提条件:YARN 集群已部署 External Shuffle Service(ESS)
# ==============================
--conf spark.dynamicAllocation.enabled=true \ # 开启动态资源分配(生产必开)
--conf spark.dynamicAllocation.minExecutors=50 \ # 作业最小保留 Executor 数(避免频繁缩容导致卡顿)
--conf spark.dynamicAllocation.maxExecutors=200 \ # 作业最大可用 Executor 数(根据集群总资源上限调整)
--conf spark.dynamicAllocation.initialExecutors=50 \ # 作业启动时初始化 Executor 数
--conf spark.shuffle.service.enabled=true \ # 开启外部 Shuffle 服务(动态分配必须依赖)
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \ # 开启 Shuffle 追踪,精准释放空闲 Executor
# ==============================
# 2、Shuffle 与全局并行度优化
# 并行度原则:初始分区数 ≈ 总CPU核数 × 1~3 倍,保证任务充分并行
# ==============================
--conf spark.sql.shuffle.partitions=2000 \ # Spark SQL 默认 Shuffle 分区数(join/agg 时生效)
--conf spark.default.parallelism=2000 \ # Spark RDD 算子默认并行度(非 SQL 场景生效)
# ==============================
# 3、AQE 自适应查询执行(Spark 3.x 核心优化,自动解决数据倾斜/小分区)
# 作用:运行时动态调整分区数、优化 Join 策略、处理数据倾斜
# ==============================
--conf spark.sql.adaptive.enabled=true \ # 开启 AQE 自适应执行(生产必开)
--conf spark.sql.adaptive.coalescePartitions.enabled=true \ # 自动合并过小分区,降低任务调度开销
--conf spark.sql.adaptive.skewJoin.enabled=true \ # 自动检测并优化 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \ # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \ # 强制优化倾斜 Join,不依赖自动检测
--conf spark.sql.adaptive.skewPartition.enabled=true \ # 开启倾斜分区自动拆分
--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \ # 倾斜分区判定阈值:256MB
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \ # AQE 目标分区大小:256MB(推荐128MB~512MB)
--conf spark.sql.autoBroadcastJoinThreshold=209175200 \ # 自动广播表阈值:≈200MB;小表自动广播,避免 Shuffle
# ==============================
# 4、Hive 分区表写入优化(核心解决:动态分区失败、HDFS 小文件过多)
# ==============================
--conf spark.sql.hive.mergeFiles=true \ # 写入 Hive 表时自动合并小文件(减少 NameNode 压力)
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ # 动态分区模式:非严格,允许全动态分区(无静态分区列
--conf spark.hadoop.hive.exec.dynamic.partition=true \ # 开启动态分区写入(写入分区表必开)
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \ # 全局最大允许创建动态分区数(防失控)
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \ # 单节点最大动态分区数
--conf spark.hadoop.hive.merge.mapfiles=true \ # 合并 Map 任务产生的小文件
--conf spark.hadoop.hive.merge.mapredfiles=true \ # 合并 Reduce 任务产生的小文件
# ==============================
# 其他功能开关(按需启用)
# ==============================
--conf spark.sql.crossJoin.enabled=true \ # 允许笛卡尔积(默认禁用,仅在确实需要时开启)
# ==============================
# 📌 应用入口
# ==============================
template.py $1 $2 # 传入两个参数给 PySpark 脚本(如 日期、业务标识)
2)静态版本
#!/bin/bash
# ==============================
# 🚀 Spark 作业资源配置(静态固定资源版)
# 适用场景:生产环境固定流量/定时调度/稳定负载作业
# 资源模式:固定 200 个 Executor,不自动扩缩容
# ==============================
# ==============================
# 提交 Spark 作业到 YARN 集群
# ==============================
spark-submit --master yarn \
--deploy-mode cluster \ # Driver 在集群中运行(生产标准模式),断连不中断
--name "demo_task" \ # 作业名称(YARN 页面可识别,便于监控排查)
# Driver 资源:负责任务调度、生成执行计划、收集结果、管理广播变量
--driver-cores 4 \ # Driver CPU 核数(调度密集型推荐 2~4 核)
--driver-memory 16g \ # Driver 堆内存,容纳广播表/collect 结果,避免 OOM
# 静态 Executor 配置:固定资源,无动态扩缩容
--num-executors 200 \ # 【静态核心参数】固定启动 200 个 Executor
--executor-cores 4 \ # 单个 Executor CPU 核数,推荐 2~5 核,提升并行效率
--executor-memory 24g \ # 单个 Executor 堆内存,YARN 自动追加内存 overhead,避免 Full GC
# ==============================
# 1、Shuffle 与全局并行度优化(静态资源专用)
# 并行度计算:200 Executor × 4 核 = 800 总核数
# 初始分区 = 总核数 × 2.5 倍,保证并行度充足,配合 AQE 自动调整
# ==============================
--conf spark.sql.shuffle.partitions=2000 \ # Spark SQL 默认 Shuffle 分区数(join/agg 生效)
--conf spark.default.parallelism=2000 \ # Spark RDD 算子默认并行度(非 SQL 场景生效)
# ==============================
# 2、AQE 自适应查询执行(静态资源必开,自动处理倾斜/小分区)
# 作用:运行时动态合并小分区、优化 Join 策略、解决数据倾斜
# ==============================
--conf spark.sql.adaptive.enabled=true \ # 开启 AQE 自适应执行(生产必开)
--conf spark.sql.adaptive.coalescePartitions.enabled=true \ # 自动合并过小分区,降低任务调度开销
--conf spark.sql.adaptive.skewJoin.enabled=true \ # 自动检测并优化 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \ # 增强版倾斜 Join 优化,适配复杂业务场景
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \ # 强制优化倾斜 Join,不依赖自动检测
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \ # AQE 目标分区大小:256MB(推荐 128~512MB)
--conf spark.sql.autoBroadcastJoinThreshold=209175200 \ # 自动广播表阈值:≈200MB,小表广播避免 Shuffle
# ==============================
# 3、Hive 分区表写入优化(解决小文件+动态分区写入失败)
# ==============================
--conf spark.sql.hive.mergeFiles=true \ # 写入 Hive 时自动合并小文件,减轻 NameNode 压力
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ # 非严格动态分区(允许全动态分区,无静态分区)
--conf spark.hadoop.hive.exec.dynamic.partition=true \ # 开启动态分区写入(写入分区表必开)
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \ # 全局最大允许创建动态分区数(防止异常创建大量分区)
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \ # 单节点最大动态分区数
--conf spark.hadoop.hive.merge.mapfiles=true \ # 合并 Map 任务产生的小文件
--conf spark.hadoop.hive.merge.mapredfiles=true \ # 合并 Reduce 任务产生的小文件
# ==============================
# 其他功能开关(按需启用,谨慎使用)
# ==============================
--conf spark.sql.crossJoin.enabled=true \ # 允许笛卡尔积(默认禁用;仅业务需要时开启,避免性能雪崩)
# ==============================
# 应用入口
# ==============================
template.py $1 $2 # 向 PySpark 脚本传递 2 个运行参数(例:统计日期、业务标识)
2、基础资源配置(核心)
# Driver 资源:Spark 驱动器进程,负责任务调度、生成执行计划、收集结果、管理广播变量
# 配置原则:避免内存不足(OOM),不要过度分配造成资源浪费
--driver-cores 4 \ # Driver 使用的 CPU 核心数(生产通用值:2~4核,调度密集型可上调)
--driver-memory 16g \ # Driver 堆内存;需容纳广播表、collect() 结果集,避免 OOM
# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \ # 单个Executor的CPU核数,推荐2-5核平衡并行度与调度效率
--executor-memory 24g \ # 单个Executor内存,YARN会自动追加内存overhead
# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \ # 单个 Executor 占用 CPU 核数;推荐 2~5 核,提升任务并行效率
--executor-memory 24g \ # 单个 Executor 堆内存;YARN 会自动追加 spark.executor.memoryOverhead 内存
单作业资源规格:
- Driver:4 核 16GB 内存(控制进程)
- 单 Executor:4 核 24GB 内存(计算进程)
- 总 Executor 数量:200 个
总资源占用:
- CPU:200 × 4 = 800 核
- 内存:200 × 24 = 4800 GB
生产环境风险提示
- 单个
Spark作业建议不占用集群总资源的 30%~50%,防止阻塞其他重要任务 200个Executor属于大规模作业,需确保集群队列有足够资源- 静态资源作业会长期占用资源,不释放,不适合共享集群高峰时段
1)Driver 资源配置
--driver-cores 4 \ # Driver 使用的 CPU 核心数(生产通用值:2~4核,调度密集型可上调)
--driver-memory 16g \ # Driver 堆内存;需容纳广播表、collect() 结果集,避免 OOM
Driver 是 Spark 作业的“大脑”:
- 负责解析逻辑、生成执行计划(
DAG) - 协调所有
Executor的任务调度 - 接收最终结果(比如你用了
collect()或show())
DRIVER_CORES=4:给 Driver 分配 4 个 CPU 核。
- → 实际上
Driver大部分时间在等待,1~2 核通常足够; - 设为
4是为了应对复杂广播或大量小任务调度。
DRIVER_MEM=16g:Driver 内存 16GB。
- 关键限制:如果你用
broadcast(df)广播一个大表,或者collect()拉回大量数据,总大小不能超过 16GB,否则会 OOM。
2)Executor资源配置
单个
Executor内存 16~32GB 最佳,核数 2~5。
# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
EXECUTOR_CORES=4 # 每个 Executor 的 CPU 核数(推荐 2~5,平衡并行度与调度开销)
EXECUTOR_MEM=24g # 每个 Executor 的内存(YARN 实际申请 = 此值 + overhead,默认约 10%+384MB)
Executor是真正干活的“工人”,运行Task(如 map、filter、shuffle read)。EXECUTOR_CORES=4:- 表示每个
Executor可以 并行运行 4 个Task。 - 推荐值:
2~5。- 太小 → 并行度低;
- 太大 → 单个
JVM管理太多线程,GC 压力大。
- 表示每个
EXECUTOR_MEM = 24g:- 每个
Executor的堆内存为24GB。 YARN实际申请的内存 = 24g + overhead
- 每个
3、启用动态分配
# ==============================
# 1、动态资源分配:按需伸缩 Executor 数量,避免资源闲置/排队
# 前提条件:YARN 集群已部署 External Shuffle Service(ESS)
# ==============================
--conf spark.dynamicAllocation.enabled=true \ # 开启动态资源分配(生产必开)
--conf spark.dynamicAllocation.minExecutors=50 \ # 作业最小保留 Executor 数(避免频繁缩容导致卡顿)
--conf spark.dynamicAllocation.maxExecutors=200 \ # 作业最大可用 Executor 数(根据集群总资源上限调整)
--conf spark.dynamicAllocation.initialExecutors=50 \ # 作业启动时初始化 Executor 数
--conf spark.shuffle.service.enabled=true \ # 开启外部 Shuffle 服务(动态分配必须依赖)
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \ # 开启 Shuffle 追踪,精准释放空闲 Executor
- 动态分配(
Dynamic Allocation) 是生产环境强烈推荐的功能:- 作业开始时只申请
MIN_EXECUTORS(如 50 个) - 随着数据量增加,自动扩容到最多
MAX_EXECUTORS(如 200 个) - 空闲时自动释放资源
- 作业开始时只申请
- 好处:
- 避免“申请 400 个但只用 100 个”的资源浪费
- 避免因一次性申请太多而长时间排队
- 更弹性,适合数据量波动的场景(如每天日志量不同)
4、AQE 自动处理
# ==============================
# 3、AQE 自适应查询执行(Spark 3.x 核心优化,自动解决数据倾斜/小分区)
# 作用:运行时动态调整分区数、优化 Join 策略、处理数据倾斜
# ==============================
--conf spark.sql.adaptive.enabled=true \ # 开启 AQE 自适应执行(生产必开)
--conf spark.sql.adaptive.coalescePartitions.enabled=true \ # 自动合并过小分区,降低任务调度开销
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \ # AQE 目标分区大小:256MB(推荐128MB~512MB)
--conf spark.sql.autoBroadcastJoinThreshold=209175200 \ # 自动广播表阈值:≈200MB;小表自动广播,避免 Shuffle
# join倾斜
--conf spark.sql.adaptive.skewJoin.enabled=true \ # 自动检测并优化 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \ # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \ # 强制优化倾斜 Join,不依赖自动检测
# 聚合/去重 倾斜
--conf spark.sql.adaptive.skewPartition.enabled=true \ # 开启倾斜分区自动拆分
--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \ # 倾斜分区判定阈值:256MB
1)Join 倾斜优化
Spark会自动在Execution(含Shuffle) 和Storage(缓存) 之间动态共享内存。
- 作用:当
join操作中某些key导致分区过大时,AQE会自动将这些key拆分处理(如广播小表部分、加盐等)。 - 适用操作:
join,left join,inner join等。 - 无需知道热点 key,全自动!
# ==============================
# 🔧 Join 倾斜优化(AQE 自动处理)
# ==============================
--conf spark.sql.adaptive.skewJoin.enabled=true \ # 自动处理 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \ # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \ # 强制优化倾斜 Join,不依赖自动检测
参数解释:
spark.sql.adaptive.skewJoin.enabled=true:- 当执行
JOIN(包括INNER JOIN,LEFT JOIN,RIGHT JOIN等)时,若AQE检测到某些key对应的Shuffle分区过大(如热点用户 ID),会自动将这些倾斜key单独拆分处理:- 若小表侧可广播 → 自动广播该 key 的匹配数据
- 否则 → 自动对大表侧的倾斜
key加盐打散(salting),启动多个Task并行 join
- 当执行
适用场景:
- 大表
JOIN小表,但小表中某些 key 被大量匹配(多对多) - 匿名用户(如
user_id = "0")导致join任务卡死 - 不知道具体热点 key,希望全自动处理
版本要求:Spark ≥ 3.0(skewJoin 自 Spark 3.0 引入)
使用建议:
- 所有 Spark 3.x 生产作业都应开启此配置
- 无需修改 SQL 或 DataFrame 代码
- 若倾斜极严重(单 key > 50%),可配合侵入式方案使用
3)聚合/去重倾斜
- 作用:对任意
Shuffle后的大分区(如groupBy,distinct,repartition)自动切分成多个子任务并行处理。 - 本质:自动实现“两阶段聚合”,但不用写一行额外代码。
# ==============================
# 自动处理通用 Shuffle 分区倾斜(Spark 3.4+ 新增)
# ==============================
--conf spark.sql.adaptive.skewPartition.enabled=true \ # 开启倾斜分区自动拆分
# 开启“倾斜分区自动切分”功能。
# 当任意 Shuffle 阶段(如 groupBy、distinct、repartition)出现超大分区时,
# AQE 会自动将该分区拆成多个子任务并行处理,避免单 Task 成为瓶颈。
--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \
# 设置“倾斜分区”的判定阈值:256MB(268435456 字节)。
# 任何 Shuffle 分区大小超过此值,就会被 AQE 视为倾斜并自动切分。
# 建议范围:100MB ~ 512MB,根据集群内存和数据量调整。
参数解释:
-
spark.sql.adaptive.skewPartition.enabled=true:开启“倾斜分区自动切分”功能。-
在 任意
Shuffle Read阶段(不限于join),若某个分区大小超过thresholdInBytes,AQE会自动将该大分区切分为多个子分区,并启动多个Task并行处理同一个原始分区。 -
典型受益操作:
-
df.groupBy("key").agg(...)(聚合倾斜) -
df.distinct()或countDistinct()(去重倾斜) -
df.repartition("key")(重分区后倾斜)
-
-
-
skewPartition.thresholdInBytes=268435456:设定“大分区”阈值为 256MB(可根据集群调整,通常 100~512MB)
适用场景:
groupBy某个字段时,个别key出现频率极高(如日志中的默认值)distinct操作因重复值分布不均导致 Task 不均衡- 任何非 join 类 Shuffle 操作出现长尾 Task
版本要求:Spark ≥ 3.4(skewPartition 是 Spark 3.4 新增功能!)
注意:在 Spark 3.0 ~ 3.3 中,此参数无效,需手动实现“两阶段聚合”。
使用建议:
- 如果你已升级到 Spark 3.4+,强烈建议开启
- 可与
skewJoin同时启用,形成完整倾斜防护体系 - 对极端倾斜仍可能不足,需结合采样 + 侵入式处理
5、Hive 写入配置
# ==============================
# Hive 分区表写入优化(解决小文件+动态分区写入失败)
# ==============================
--conf spark.sql.hive.mergeFiles=true \ # 写入 Hive 时自动合并小文件,减轻 NameNode 压力
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ # 非严格动态分区(允许全动态分区,无静态分区)
--conf spark.hadoop.hive.exec.dynamic.partition=true \ # 开启动态分区写入(写入分区表必开)
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \ # 全局最大允许创建动态分区数(防止异常创建大量分区)
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \ # 单节点最大动态分区数
--conf spark.hadoop.hive.merge.mapfiles=true \ # 合并 Map 任务产生的小文件
--conf spark.hadoop.hive.merge.mapredfiles=true \ # 合并 Reduce 任务产生的小文件
- 允许写入大量动态分区(如按天/用户ID分区)
- 合并小文件,减少
HDFS小文件问题
6)建议
1)根据集群规模调整 MAX_EXECUTORS
可通过
yarn node -list查看集群总资源
| 集群总 CPU 核数 | 推荐 MAX_EXECUTORS |
|---|---|
| 200 核 | 40~60 |
| 500 核 | 100~150 |
| 1000+ 核 | 200~400 |
2)监控实际分区大小
- 在
Spark UI的SQL tab中查看Shuffle后的分区大小 - 如果多数分区 «
100MB→ 可适当降低advisoryPartitionSizeInBytes - 如果出现个别分区 » 1GB → AQE 的 skewJoin 会自动拆分(确保已开启)
四、数据倾斜
1、如何快速识别倾斜
1)Spark UI
- 进入
Stages页面 - 查看
Task Duration分布图- 如果是“长尾”或“单点突出”,就是倾斜
- 点进具体
Task,看Input Size/Shuffle Read Size
2)采样分析热点 Key
经验:如果 top 1 key 的 count > 总量的 10%,基本会倾斜。
# 找出 top 100 高频 key
df.groupBy("key").count().orderBy(desc("count")).show(100)
2、非侵入式-参数调优
不改业务逻辑,通过 Spark 参数/配置自动缓解
1)AQE 自适应优化:解决Join 数据倾斜
# ==============================
# 🔧 Join 倾斜优化(AQE 自动处理)
# ==============================
--conf spark.sql.adaptive.skewJoin.enabled=true \ # 自动处理 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \ # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \ # 强制优化倾斜 Join,不依赖自动检测
2)AQE 自适应优化:聚合/去重倾斜
# ==============================
# 自动处理通用 Shuffle 分区倾斜(Spark 3.4+ 新增)
# ==============================
--conf spark.sql.adaptive.skewPartition.enabled=true \ # 开启倾斜分区自动拆分
# 开启“倾斜分区自动切分”功能。
# 当任意 Shuffle 阶段(如 groupBy、distinct、repartition)出现超大分区时,
# AQE 会自动将该分区拆成多个子任务并行处理,避免单 Task 成为瓶颈。
--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \
# 设置“倾斜分区”的判定阈值:256MB(268435456 字节)。
# 任何 Shuffle 分区大小超过此值,就会被 AQE 视为倾斜并自动切分。
# 建议范围:100MB ~ 512MB,根据集群内存和数据量调整。
3)调整 Shuffle 分区数(治标不治本)
--conf spark.sql.shuffle.partitions=5000 # 增大分区数,稀释热点
- 参数设置
- 太小 → 每个分区太大 →
Task处理慢、OOM风险高、并行度低 - 太大 → 每个分区太小 → 调度开销大、产生大量小文件
- 太小 → 每个分区太大 →
- 适用场景:倾斜不严重,或热点 key 分布较“散”
- 风险:可能产生更多小文件,调度开销变大
3、侵入式-改逻辑
修改业务逻辑,从数据/算法层面解决
1)预处理热点 Key
- 优点:精准打击,效果最好
-
缺点:代码复杂,需维护 hot key 列表
- 思路:把倾斜的 key 单独拎出来,用不同策略处理
- 步骤:
- 采样找出 hot keys(如 top 100)
- 将原数据拆成两份:
normal_data:非热点 keyhot_data:热点 key
- 分别处理:
normal_data→ 正常join/grouphot_data→ 单独聚合 or 广播处理
- 合并结果
hot_keys = [row.key for row in df.groupBy("key").count().orderBy(desc("count")).limit(100).collect()]
normal_df = df.filter(~col("key").isin(hot_keys))
hot_df = df.filter(col("key").isin(hot_keys))
# normal 走常规流程
normal_result = normal_df.groupBy("key").sum("value")
# hot key 单独处理(比如用 collect + local agg)
hot_local = hot_df.collect()
# ... 本地聚合后转回 DataFrame
final_result = normal_result.union(hot_result)
2)启用 Salting(加盐)—— 半自动方案
虽然需要改代码,但属于通用技巧,常归为“非侵入式思维”。
- 本质:把一个大
key拆成N个小key,分散到不同分区 - 适用:已知热点 key 列表(可提前采样)
from pyspark.sql.functions import *
# 假设 key="user_id" 有倾斜
SALT_NUM = 10
# 步骤1:给大表(big_df)的热点 key 加盐
big_df_salted = big_df.withColumn(
"salted_key",
when(col("user_id").isin(hot_keys),
concat(col("user_id"), lit("_"), (rand() * SALT_NUM).cast("int"))
).otherwise(col("user_id"))
)
# 步骤2:给小表膨胀(每个 hot key 复制成 SALT_NUM 份)
small_df_exploded = small_df.withColumn(
"salted_key",
explode(array([concat(col("user_id"), lit("_"), lit(i)) for i in range(SALT_NUM)]))
).union(
small_df.filter(~col("user_id").isin(hot_keys)).withColumn("salted_key", col("user_id"))
)
# 步骤3:用 salted_key join
result = big_df_salted.join(small_df_exploded, "salted_key")
原理:
-
假设:
-
big_df:10 亿条用户行为日志,其中user_id = "0"(匿名用户)占了 3 亿条 -
small_df:10 万条用户画像表,包含user_id = "0"的一条记录# 直接 join 会怎样? result = big_df.join(small_df, "user_id")
-
-
问题:
- 所有
user_id = "0"的 3 亿条日志 → 被Shuffle到 同一个分区 - 小表中
user_id = "0"的那条记录 → 也被广播/发送到这个分区 - 结果:一个 Task 要处理 3 亿 × 1 = 3 亿行 join → 卡死 or OOM
- 所有
-
解法:只对热点 key “加盐打散”
- 把 3 亿条
user_id=0拆成10份,每份 3000 万,分别和小表的user_id=0做join - 步骤1:给大表(big_df)的热点 key 加盐
- 让原本聚集在
user_id=0的数据,分散到 10 个不同的 key 上。
- 让原本聚集在
- 步骤2:把小表(small_df)的热点 key “膨胀”成 10 份
- 把 3 亿条
3) 两阶段聚合(适用于 groupBy 倾斜)
- 思想:先局部聚合,再全局聚合
- 实现:
第一阶段:加随机前缀,局部聚合
df1 = df.withColumn("prefix", (rand() * 10).cast("int")) \
.withColumn("new_key", concat(col("prefix"), lit("_"), col("key"))) \
.groupBy("new_key").sum("value")
# 第二阶段:去掉前缀,全局聚合
df2 = df1.withColumn("key", split(col("new_key"), "_")[1]) \
.groupBy("key").sum("sum(value)")
- 原理
- 如果直接
groupBy("key").sum("value"):- 所有
X的记录会被Shuffle到 同一个分区 → 一个Task处理 1 亿条数据 → OOM 或卡死!
- 所有
- 给每条记录加一个随机前缀(
0~9)- 效果:原本
1亿条X被打散到10个新 key:0_X,1_X, …,9_X - 每个新
key平均约 1000 万条(理想情况下)
- 效果:原本
- 按
new_key局部聚合- 现在有
10个Task分别处理0_X~9_X - 每个
Task只处理 ~1000 万条,压力大大降低!
- 现在有
- 如果直接
4、业务层规避
- 例子1:用户 ID 为 0 表示“匿名用户” → 改为随机 UUID
- 例子2:日志中
ip="0.0.0.0"太多 → 预处理过滤或打标 - 核心:从源头减少极端值
五、FQA
1、关于 Task 数量
Task的数量 = 分区(Partition)的数量。- 对于一个
DataFrame或RDD,其分区数决定了后续操作中并行执行的 Task 数量。 - 因此,你不能直接指定“
SQL查询拆分成多少个 Task”,但你可以间接控制分区数,从而影响 Task 数量。
2、如何控制 Task 数量?
1)设置 shuffle 分区数(最常用)
Spark SQL在进行 shuffle 操作(如 join、groupBy、distinct 等)时,默认会使用spark.sql.shuffle.partitions参数来决定 shuffle 后的分区数(即 Task 数)。
--conf spark.sql.shuffle_partitions=50
2)查看实际 Task 数量
你可以在
Spark UI中查看每个Job的Stage和Task数量,也可以通过代码获取:
// 查看当前 DataFrame 的分区数(= Task 数)
println(df.rdd.getNumPartitions)
3、SQL 是不是被“拆分”了
是的,但不是“拆成多个 SQL,而是只有一个 SQL 被分布式执行。
- 一个 逻辑
SQL查询 → 被Catalyst优化器转换为 物理执行计划(Physical Plan); - 物理计划由多个
Stage组成; - 每个
Stage包含多个Task(每个 Task 处理一个数据分区); - 所有
Task并行运行在不同 Executor 上,但它们执行的是同一个作业的不同部分。
4、SQL 本身会出现在 Executor 日志里吗
Spark不会自动把原始SQL写入每个 Executor 的日志;Driver会记录SQL(在Driver的stderr或Spark History Server中);Executor日志主要记录:Task执行过程;- 序列化/反序列化错误;
- OOM、GC、网络异常;
- UDF(自定义函数)抛出的异常。
5、监控需要看什么
- 不要期望在
Executor日志里看到完整 SQL——那是 Driver 的职责; Executor 日志的核心价值是:告诉你“执行时发生了什么错误”;- 结合
Stages页面 + Executors 页面 + stderr,才能精准定位问题。
| 目标 | 看哪里? | 关注什么? |
|---|---|---|
| 查 SQL 是否提交成功 | Driver 日志 | SQL 语句、Job ID |
| 查 Task 为什么失败 | Executor stderr | 异常堆栈、OOM、FetchFailed |
| 查 UDF 逻辑错误 | Executor stdout/stderr | print 输出、AttributeError |
| 查 Python 环境问题 | Executor stderr | ModuleNotFoundError |
| 查数据倾斜或热点 | Executors 页面 + stderr | 某个 Executor 失败频繁 |


