前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、概念理解

组件 核心作用 关键词
Hadoop 提供分布式存储(HDFS + 资源调度(YARN 存储 + 调度(地基)
Hive 基于 HadoopSQL 数据仓库工具,用于离线分析 SQL 查询 + 离线数仓
Spark 通用内存计算引擎,支持批处理、流处理、机器学习等 快 + 通用 + 内存计算
Flink 流批一体的实时计算引擎,强调低延迟与状态一致性 实时 + 流批一体 + 高吞吐

1、Hadoop:存储 + 调度(地基)

1)HDFSHadoop Distributed File System

HDFS 就像超市的货架系统——东西太多,一个人搬不动,那就分开放,每样东西都有备份,丢了也不怕。”

  • 就是超大仓库,能把 TBPB 级的数据分散存在成百上千台机器上。
  • 比如你有 10TB 的日志文件,HDFS 自动切成小块,存到不同服务器,不怕一台坏了就丢数据。

2)YARNYet Another Resource Negotiator

YARN 就像餐厅经理,谁想用厨房都得排队申请,不能乱抢灶台。”

  • 资源调度员,管 CPU、内存、磁盘这些“灶台资源”。
  • Spark 要跑任务?Flink 要启动?Hive 要查表?都得跟 YARN 打招呼:“给我两个灶台、8G 内存!”
  • YARN 说:“行,给你用10分钟,别超时啊。”

2、Hive:会 SQL 的老会计(离线数仓工具)

Hive 就像一个只会写 Excel 公式的老师傅,他让你用熟悉的 SQL 写需求,但他自己不动手,而是叫一群临时工(MapReduce)去翻仓库找数据,所以慢

  • 优点:简单、稳定、适合 T+1 报表(比如每天早上出昨天的销售汇总)
  • 缺点:延迟高,不适合“现在有多少人在线?”这种问题
  • Hive 不是数据库,也不是存储,它只是一个“翻译官”。
  • 你写一句 SELECT COUNT(*) FROM orders WHERE dt='2025-12-01';
  • Hive 把这句话翻译成 MapReduce / Tez / Spark 任务,扔给 Hadoop 去执行。
  • 结果可能要等几分钟甚至几小时(因为数据量太大)。

1)Hive 不是表,但是经常被叫做 Hive

Hive 表 = 元数据(schema + location) + HDFS 上的文件

**Hive 表”确实不是传统数据库意义上的“表”,但它在逻辑上就是一个“表”——有结构、有名字、能被 SQL 查询。所以大家习惯说“Hive 表 **

  • 当你在 Hive 里执行:

    CREATE TABLE orders (
      order_id STRING,
      user_id STRING,
      amount DOUBLE
    )
    PARTITIONED BY (dt STRING)
    STORED AS PARQUET
    LOCATION '/data/warehouse/orders';
      
    # `Hive` 并没有在某台机器上创建一个叫 `orders` 的“表文件”。
    
  • 它只是在 Hive Metastore(通常是 MySQL 里记录了一条元数据:

    • 表名:orders
    • 字段:order_id, user_id, amount
    • 分区字段:dt
    • 数据格式:Parquet
    • 数据位置:hdfs://namenode/data/warehouse/orders
  • 真正的数据文件(比如 /data/warehouse/orders/dt=2025-12-01/000001.parquet)是存在 HDFS 上的普通文件。

2)MapReduce 为什么慢?——“做完一步就存仓库

MapReduce 的设计哲学是“容错优先”:万一机器挂了,可以从磁盘恢复。但代价是性能低

计算步奏:

  • Map 阶段输出中间结果(比如 key=商品ID, value=销量
  • 必须写入 HDFS(磁盘)
  • Reduce 阶段再从磁盘读这些数据
  • 如果有多个阶段(比如 JOIN + GROUP BY + ORDER BY),就要多次写磁盘

情景在线:假设你要做一道菜:宫保鸡丁,但厨房规定:

“每完成一个步骤,必须把半成品打包放进冷库(硬盘),下次用的时候再从冷库拿出来。”

流程:

  1. 切鸡丁 → 打包 → 放冷库 ❄️
  2. 炸花生米 → 打包 → 放冷库 ❄️
  3. 炒酱汁 → 打包 → 放冷库 ❄️
  4. 最后把三样从冷库拿出来,一起炒

👉 问题:来回开冷库门、搬东西,太耗时了!

3、Spark:闪电侠厨师(通用计算引擎)

Spark 就像一个记忆力超强的厨师,切完菜不用放回冰箱,直接拿起来炒,省了来回跑的时间,所以出菜飞快!

  • Spark 最大的特点是:把中间数据放在内存里,不像 MapReduce 那样每一步都写磁盘。
  • 所以它比 Hive(基于 MapReduce)快 10~100 倍
  • Spark 不光能做 SQL 查询(Spark SQL),还能做:
    • 机器学习(MLlib
    • 图计算(GraphX
    • 流处理(Structured Streaming
  • Hive 的关系
  • Spark 可以直接读 Hive 的表!只要配置好,spark.sql("SELECT * FROM hive_table") 直接跑。

  • 很多公司现在用 Spark SQL 替代 Hive 做离线分析,又快又兼容。

1)SparkHive 关系

Hive 表”其实是:Hive 定义的元数据 + 读 HDFS 上的实际数据文件

  • Hive 表不是“真实”的表(像 MySQL 那样),但它是一个“带说明书的文件夹”
  • Spark 能看懂这个说明书,然后自己去文件夹里拿数据,做得更快更好
spark.sql("SELECT * FROM orders")

Spark 会:

  1. Hive Metastore 里查 orders 表的元数据;
  2. 知道这个表的数据在 HDFS 的哪个路径;
  3. 知道字段名、类型、分区、文件格式(Parquet/ORC 等);
  4. 直接用 Spark 引擎高效读取这些文件(比 Hive 自己用 MapReduce 快得多)。

2)Spark SQL 替代 Hive对吗

替代”的是底层执行引擎和运行时性能,不是 SQL 本身,也不是表结构

  • 功能上兼容Spark SQL 支持 HiveQL 的大部分语法,也能读写 Hive 表。
  • 性能上碾压Spark 基于内存计算,比 Hive 默认的 MapReduce 快几十倍。
  • 生态上融合Spark 可以复用 Hive 已有的表结构、UDF、分区策略,迁移成本低。
维度 Hive(传统方式) Spark SQL(替代方案)
SQL 语法 HiveQL(类 SQL Spark SQL(兼容 HiveQL + 标准 SQL)
执行引擎 MapReduce(默认)或 Tez Spark(内存计算引擎)
元数据管理 Hive MetastoreMySQL/PostgreSQL 复用同一个 Hive Metastore
表结构/分区/存储格式 ORC/Parquet/TextFile 完全兼容,直接读写
查询速度 慢(分钟级) 快(秒级)
资源调度 YARN(通过 MapReduce) YARN(通过 Spark on YARN)

假设公司有一张 Hive 表:

CREATE TABLE user_behavior (
  user_id STRING,
  event STRING,
  ts BIGINT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION '/data/user_behavior';

a、老方式(Hive + MapReduce

# 在 Hive CLI 里执行
SELECT dt, COUNT(*) FROM user_behavior WHERE event='click' GROUP BY dt;
  • Hive 把 SQL 翻译成 MapReduce 任务
  • 提交到 YARN
  • 跑了 8 分钟,中间多次读写 HDFS

b、新方式(Spark SQL

# 在 PySpark 里执行
spark.sql("""
    SELECT dt, COUNT(*) 
    FROM user_behavior 
    WHERE event='click' 
    GROUP BY dt
""").show()
  • Spark 连接 同一个 Hive Metastore
  • 读取 同一张表(路径、格式、分区都一样)
  • 但用 Spark 引擎执行
  • 跑了 20 秒

c、为什么不直接叫“用 Spark 执行 Hive 查询

其实很多人也这么说!但在工程实践中,“替代”这个词更强调 技术栈的演进和责任转移

  • 以前:Hive 是主力,所有离线分析都走 Hive CLI / Beeline
  • 现在:Spark 是主力Hive 只剩下 Metastore(元数据服务)这个“遗产”

d、有了 Spark 就不用 Hive 了吗?

答案:Hive 的元数据管理(表结构、分区)仍是行业标准,Spark 经常“借”Hive 的元数据用

3)SparkHiveMapReduce)快 10~100 倍

SparkHiveMapReduce)快 10~100 倍Spark 的设计哲学是“性能优先 + 容错兼顾”:通过 lineage(血缘)机制,即使某步失败,也能从源头重算,不一定非要存中间结果

步骤 MapReduce Spark
1. 读数据 从 HDFS 读 从 HDFS 读
2. 过滤(filter) Map 输出 → 写磁盘 内存中过滤(不写盘)
3. 分组聚合(groupByKey) Reduce 读上一步磁盘数据 → 计算 → 再写磁盘 内存中直接聚合
4. 排序(sortBy) 再启动一轮 MapReduce → 又读又写磁盘 内存中排序(或溢写部分到磁盘)
总磁盘 I/O 3次以上读写 0~1次(仅输入/输出)

计算步奏:

  • SparkRDD(弹性分布式数据集)或 DataFrame中间结果缓存在内存
  • 多个操作(如 filter → map → groupBy → join)形成一个 DAG(有向无环图)
  • Spark 优化执行计划,尽量在内存中流水线式处理
  • 只有在内存不够 or 需要持久化时,才写磁盘

情景在线:Spark 厨师说:“我记忆力好,所有半成品都放操作台上(内存),不用进冷库!”

流程:

  1. 切鸡丁 → 放操作台
  2. 炸花生米 → 放操作台
  3. 炒酱汁 → 放操作台
  4. 直接混合翻炒 → 出锅!

👉 全程不进冷库,速度飞起!

4、Flink:24小时盯单的实时高手(流批一体引擎)

Flink 就像一个装了雷达的智能锅,水一开就自动调火,订单一来就立刻处理,根本不用等‘攒够一批再做’

Flink 的核心理念:一切皆流(Everything is a stream)

  • 它能做到 毫秒级延迟 + 精确一次(exactly-once)语义,金融、电商风控特别需要这个。

  • 批处理 = 有界的流(比如一天的数据)
  • 流处理 = 无界的流(比如每秒进来的订单)

典型场景

  • 实时大屏:当前在线用户数、每秒交易额
  • 风控:检测异常登录(同一账号1分钟内从北京跳到纽约?)
  • 实时推荐:你刚搜“手机”,首页马上推手机壳
  • Flink 可以从 Kafka 读实时数据,处理完写入 HDFSHive 表(供后续离线分析用)。
  • Flink 也能读 Hive 的元数据(通过 Hive Catalog),知道“orders 表长啥样”。
    • Flink Hive CatalogFlink 的一个插件,能让 Flink 连接 Hive Metastore,直接读这些“说明书”

image-20251202200540208

不是所有 Kafka 消费都需要 Flink,但一旦涉及“状态 + 精确计算 + 高可靠”,Flink 几乎是唯一选择

对比项 普通应用 Flink
定位 消息消费者 流处理引擎
关注点 “拿到消息就行” “如何正确、高效、可靠地处理无限数据流”
适合场景 简单通知、异步解耦 实时数仓、风控、监控、CEP(复杂事件处理)

Flink 是“真·流”,Spark 是“微批模拟流” —— 这是根本区别。

  • 你的业务要多“实时”?
  • 你的计算有多“有状态”?

1)差异1:执行模型不同

对比项 Flink Spark Structured Streaming
处理方式 数据一到达就立即处理(逐条或小窗口连续处理) 把流数据按时间切片(如每 1 秒),当作一个个小“批”来处理
延迟 毫秒级(通常 10~100ms) 至少一个批次间隔(默认 100ms 起,通常设为 1~10 秒)
类比 快递员一有包裹就立刻送到你手上,你马上拆开处理。 快递员每 5 分钟来一次,把这 5 分钟内所有包裹一起给你,你再统一拆开处理。

2)差异2:状态机制不同

状态”就是程序记住过去数据的能力

项目 Flink Spark
记笔记能力 有专业笔记本 + 云同步 + 自动备份 自己手写 + 容易丢 + 写多了手酸(OOM)
能记多少? TB 都行(比如全公司用户30天行为) 记太多就崩溃,适合简单任务
机器挂了怎么办? 自动从备份恢复,无缝继续 可能丢数据、重启慢、甚至起不来
适合干啥? 复杂实时计算(如风控、用户画像、实时推荐) 简单过滤、转换、小窗口统计
大状态场景 支持(如用户 30 天行为窗口) 风险高(状态大时 GC 频繁、任务失败)
状态 vs 数据 状态是核心抽象,只存必要的聚合/上下文 状态是执行副产品,常被迫缓存原始数据或中间结果

a、Flink:自带超级笔记本(而且能扩容)

1)Flink 自己就有一套专门记笔记的系统,叫“状态后端”。

  • 小本本可以存在内存里(快),也可以存在硬盘上(比如 RocksDB,稳,还能存超大笔记)。
  • 即使你的“笔记”有 TB(比如记录百万用户30天行为),Flink 也能扛住。

2)而且它还会自动备份笔记(通过 Checkpoint),就算机器宕机,重启后也能从备份恢复,不丢数据、不错乱

b、Spark:靠自己手写 + 容易忘事 / 笔记本炸了

  • Spark 做流处理时,状态是存在每个计算节点(Executor)的内存或本地磁盘里的,但是状态不是为“长期存储”设计的
    • 就像每个工人自己拿个小本本记事,没有统一管理
    • 如果状态太大(比如要记几百万用户的行为),小本本就写满了 → 内存爆了(OOM),溢出到磁盘是为了避免 OOM 崩溃,但速度会暴跌
  • 虽然 Spark 也用“预写日志(WAL)”做备份,但效率低、扩展差
    • 一旦任务失败,恢复慢,还可能因为 GC(垃圾回收)频繁卡死。

3)差异3:容错机制不同

不丢数据、不重复创立、挂了能自动恢复

场景 Flink Spark
第 8 秒挂了 从第 0 秒 Checkpoint 恢复,重放 0~8 秒数据,状态一致 WAL 重放最近一个微批(比如 5~10 秒),但状态可能丢失或不一致
数据丢失风险 几乎没有(只要 source 可重放) 如果 Executor 永久丢失,状态可能无法恢复
重复处理 有,但结果正确(exactly-once) 有,且可能因状态丢失导致结果错误
  • 核心思想:定期给整个系统“拍快照”

    • 就像打游戏时自动存档:角色血量、背包、地图进度全保存。挂了就从存档点重来,不丢进度。

    • Flink 会周期性地(比如每 10 秒)触发一次 Checkpoint

    • 这个快照包含:

      • 所有算子的当前状态(比如窗口聚合值、用户会话)
      • 数据源的读取位置(比如 Kafkaoffset

      • 举例: 比如你正在统计“每分钟订单数”,Checkpoint 会记录:

        • 当前窗口累计了多少单

        • Kafka 读到了哪条消息

  • 恢复过程:

    • 如果某个节点挂了,Flink 会:
      1. 停止所有任务
      2. 从最近一次成功的 Checkpoint 恢复整个作业
      3. 重置数据源到对应位置(比如 Kafka 回滚到那个 offset
      4. 重新开始计算 → 结果完全一致,精确一次(exactly-once
  • 优点:

    • 全局一致:所有算子的状态是“同一时刻”的快照

    • 高效:支持增量 CheckpointRocksDB 异步快照

    • 真正 exactly-once:配合 Kafka 等支持事务的 source/sink

    • 状态再大也能恢复(因为状态后端可扩展)

b、Spark 流 的容错机制:WAL + 幂等写

  • Spark 流处理的容错依赖两个东西:

    • WALWrite-Ahead Log,预写日志)

      • 数据从 Kafka/Pulsar 等读进来后,先写一份日志到磁盘(通常是 HDFS 或本地磁盘)

      • 即使 Executor 挂了,Driver 可以从 WAL重放这批数据

    • 幂等写(Idempotent Sink
    • Spark 本身不保证 exactly-once,除非你用的输出系统支持“幂等”(比如写数据库时用主键去重)

    • 否则可能重复写(at-least-once
  • 恢复过程的问题:

    • 如果任务失败,Spark 会:
      1. WAL 里读出未处理完的微批次数据
      2. 重新跑一遍这个批次
    • 但状态恢复很弱
      • 状态存在 Executor 内存/本地磁盘
      • 如果 Executor 永久丢失(比如机器宕机),状态就没了!
      • 虽然可以重建状态,但大状态重建慢、易失败

4)使用场景对比

对比项 Flink Spark Structured Streaming
场景 实时风控、高频交易等对延迟敏感的场景 准实时报表、日志聚合等场景完全够用
实时反欺诈 毫秒响应 + CEP 模式匹配 微批延迟高,CEP 需手写
5s大屏监控 能做,但“杀鸡用牛刀 完全够用,开发更快
7天用户行为 状态高效管理 可能因状态过大失败
日志清洗入湖 可以,但生态不如 Spark 与 Delta Lake/Hive 无缝集成

一、Spark 核心概念入门

1、Spark 是干啥的?—— 定位与价值

Apache Spark 是一个开源的、基于内存的、通用的大数据(大规模数据)

它不是数据库,也不是存储系统,而是一个计算引擎——专门用来**快速处理海量数据

  • 内存计算:比 Hadoop100
  • 统一API:支持批处理、流处理、ML、图计算
  • 多语言支持ScalaJavaPythonR
  • DAG 执行引擎:优化复杂任务执行流程
功能 说明 实际场景举例
批处理(Batch) 处理历史全量数据 每天凌晨统计昨日用户活跃数
流处理(Streaming) 处理实时数据流 实时监控交易异常、大屏展示在线人数
交互式查询 快速响应 SQL 查询 数据分析师用 SQL 探查数据
机器学习(ML) 分布式训练模型 用户画像、推荐系统、风控模型
图计算 分析关系网络 社交关系挖掘、反欺诈关联分析

2、Spark 的核心思想

关键词 说明
内存计算 中间数据尽量放内存,避免磁盘 I/O
DAG 调度 多步骤任务编排成有向无环图,优化执行顺序
懒执行Lazy Evaluation 只有遇到 count(), show(), write() 等 action 才真正计算
容错靠血缘Lineage 不存中间结果,失败时从源头重算
统一引擎 批、流、ML、图计算一套 API 搞定

3、Spark 协作架构:两层执行角色 + 一层外部资源协调者

Spark 作业的执行引擎由 DriverExecutor 构成,但依赖 Cluster Manager(如 YARN)来分配资源和启动进程。因此,在集群环境中,我们通常说 Spark 涉及三个关键角色:两层执行角色 + 一层外部资源协调者

  • 一个 Spark 应用 = 1 个 Driver + NExecutors
    • Executors长期驻留(不像 MapReduce 每次新建),减少启动开销

    • 数据缓存在 Executor 内存中,供后续 Task 复用

  • 每运行一个 spark-submit 命令(或在代码中创建一个 SparkContext / SparkSession),就会启动一个独立的 Spark 应用
    • Spark 调度和资源管理的最小单位
    • YARN 上,它对应一个 YARN Application(可在 yarn application -list 中看到)。
    • 拥有唯一的 Application ID(如 application_1712345678901_0012)。
角色 职责 是否属于 Spark 应用
Driver 控制中心:解析、调度、收集结果
Executor 执行单元:跑 Task、缓存数据

1)工作流程图

角色 职责 类比
Driver 解析逻辑、构建 DAG、调度 Task、收集结果 导演的对讲机:用来跟现场沟通
Worker Node 集群中的工作机器 拍摄现场
Executor Worker 上运行的进程 执行 Task • 缓存 RDD/DataFrame 数据 演员 + 场务:既表演(计算),又保管道具(缓存数据)
Task 最小执行单元,每个分区对应一个 Task 一个镜头的拍摄任务
Cluster Manager 管理集群资源(CPU、内存) 常见:YARNHadoo 生态)、K8sSpark Standalone 制片主任:分配摄影棚、灯光、群演
+-------------------+          +---------------------+          +------------------+
|    Client         |          |   Cluster Manager   |          |    Worker Nodes  |
| (spark-submit)    |<-------->| (YARN / Mesos / K8s)|<-------->|                  |
+-------------------+          +---------------------+          +------------------+
        |                              |                               |
        |                              |                               |
        v                              v                               v
+-------------------+          +---------------------+          +------------------+
|      Driver       |          |                     |          |     Executor     |
| (Runs on client)  |          |                     |          | (runs tasks)     |
+-------------------+          +---------------------+          +------------------+
        |
        | Communicates directly with Executors
        v
+----------------------------------------------------+
|                    Executors                       |
|                                                    |
|  +------------------+  +------------------+        |
|  | Executor         |  | Executor         | ...    |
|  | - Runs Tasks  |  |  |  | - Runs Tasks  |  |
|  | - Caches Data |  |  |  | - Caches Data |  |
|  +------------------+  +------------------+        |
+----------------------------------------------------+



[Your Laptop]                     ← Client(不在集群)
      │
      │ spark-submit
      ↓
[YARN ResourceManager]            ← Cluster Manager(集群 Master 节点)
      │
      ├─ Launches → [Driver in YARN Container]   ← 在某个 Worker Node 上
      │
      └─ Launches → [Executor in YARN Container] × N
                        ↑
                   [Worker Node 1, 2, ..., N]     ← 集群计算节点
                   

2)执行流程

  1. 提交作业:Client

    • 用户通过 spark-submit 命令提交应用程序。命令指定了应用主类、应用参数以及部署模式(clientcluster)等信息。
  2. Cluster Manager 分配资源:
    • Cluster Manager:负责集群资源的管理和分配。它可以是 Apache MesosHadoop YARN 或者 Kubernetes 等。在接收到任务请求后,Cluster Manager 根据资源情况为应用程序分配合适的资源。
  3. 启动 DriverExecutors
  • client 模式下:
    • Driver 直接在客户端机器上启动,并连接到 Cluster Manager 来请求资源。
    • Executors:由 Cluster ManagerWorker Nodes 上启动,并且会主动连接到 Driver
  • cluster 模式下:
    • Cluster Manager 首先在集群中选择一个节点来启动 Driver
    • 其他 Worker Nodes 上会启动 Executors,这些 Executors 也会与 Driver 进行通信。
  1. 执行任务

    • **Driver **负责将用户程序转换成多个 Task,并调度这些 Task 到各个 Executor 上执行。

    • Executor 接收来自 DriverTask,在其所属的 Worker Node 上执行,并将执行结果返回给 Driver

  2. 数据处理和通信

    • Executor 不仅执行计算任务,还可能需要从外部存储系统读取数据或将中间结果写回存储系统。

    • DriverExecutor 之间进行直接的通信以传输任务指令和结果数据。

3)jobStageTask 关系

  • 一个 Application(应用)包含多个 Job
  • 一个 Job 包含多个 Stage
  • 一个 Stage 包含多个 Task
  • 每个 Task 在一个 Executor 上运行。
  • Executor 是物理资源容器,负责执行 Task。
Application
│
├── Job 0 (triggered by count())
│   ├── Stage 0 (no shuffle)
│   │   ├── Task 0 → runs on Executor 1
│   │   ├── Task 1 → runs on Executor 2
│   │   └── ...
│   └── Stage 1 (after shuffle)
│       ├── Task 0 → runs on Executor 3
│       ├── Task 1 → runs on Executor 1
│       └── ...
│
├── Job 1 (triggered by save())
│   ├── Stage 0
│   └── Stage 1
│
└── ... (more Jobs)

a、Application

  • 启动一次 spark-submit 就是一个 Application;
  • 整个生命周期内可以提交多个 Job;
  • 有唯一的 Driver 进程(协调者)和若干 Executor 进程(执行者)。

b、Job(作业)

  • 触发条件:遇到一个 Action 操作(如 count(), collect(), saveAsTable());

  • 每个 Action 会触发一个 Job;

  • Job逻辑上的工作单元

    df1 = spark.sql("SELECT * FROM t1 WHERE id > 100")
    df1.count()        # ← 触发 Job 0
      
    df2 = df1.groupBy("name").sum()
    df2.write.save(...) # ← 触发 Job 1
    

c、Stage(阶段)

关键规则:ShuffleStage 的分界线!

  • Spark 根据 Shuffle 边界(宽依赖)将 Job 切分成多个 Stage
    • 没有 Shuffle 的操作(如 map,filter)属于同一个 Stage(窄依赖);
    • Shuffle 的操作(如 join, groupBy)会切断 Stage,形成新 Stage
SELECT dept, COUNT(*) 
FROM employees 
JOIN departments ON employees.dept_id = departments.id 
GROUP BY dept;

执行计划会被切成:

  • Stage 0:读取 employees 表 + departments 表(可能广播小表);
  • Stage 1Shuffle + Join + GroupBy(需要重分区);

d、 Task(任务)

  • Stage 被拆成多个 Task,每个 Task 处理一个 数据分区(Partition)
  • Task 数量 = 该 Stage分区数
  • 所有 Task 并行执行

  • 示例:

    • 如果 Stage1000 个分区 → 就有 1000Task

    • 每个 Task 处理 ~256MB 数据(由 AQE 或 spark.sql.shuffle.partitions 控制)。

4、Spark 五大核心模块

模块 全称 主要用途
Spark Core 核心计算引擎 通用数据处理
Spark SQL 结构化数据处理 ETL、报表
Structured Streaming 流处理引擎 分析
MLlib 机器学习库 预测、分类
GraphX 图计算框架 社交网络分析

1)相互关系

所有高级模块都依赖 Spark Core,但彼此独立,可单独使用。

                    +------------------+
                    |   Spark Core     | ← RDD, Task Scheduler, Shuffle
                    +--------+---------+
                             |
        +--------------------+--------------------+
        |                    |                    |
+-------v------+    +--------v--------+   +-------v--------+
|  Spark SQL   |    | Structured      |   |     MLlib      |
| (DataFrame)  |    | Streaming       |   | (ML Pipelines) |
+--------------+    +-----------------+   +----------------+
        |
+-------v------+
|   GraphX     | ← (Scala only)
+--------------+

2)如何选择

你要做什么? 用哪个模块?
SQL / 做 ETL / 查报表 Spark SQL
处理 Kafka / 实时数据 Structured Streaming
训练机器学习模型 MLlib
分析社交网络 GraphX(Scala)或 GraphFrames(Python)
需要极致控制(如自定义分区) Spark Core (RDD)(最后选择

三、Spark 参数配置

1、配置参考

配置项 动态版本 静态版本
资源获取 先申请 50 个,不够再加。排队快,但爬坡需要时间。 一次性申请 200 个,排队可能久,但一旦开始就全速跑。
稳定性 受集群整体负载影响(可能想扩扩不出来) 极高。资源独占,不受其他任务干扰。
核心参数 dynamicAllocation --num-executors 200
适用场景 流量波动大、共享集群 流量稳定、专属队列、定时任务
推荐度 ⭐⭐⭐ (适合灵活场景) ⭐⭐⭐⭐⭐ (对于核心大任务)

1)动态版本

#!/bin/bash

# ==============================
# Spark 作业资源配置(根据集群规模和数据量调整)
# 适用场景:生产环境大规模数据 ETL / 报表 /  Join 计算
# 核心优化:动态资源 + AQE 自适应 + 小文件合并 + 数据倾斜处理
# ==============================

# ==============================
# 提交 Spark 作业到 YARN 集群
# ==============================
spark-submit --master yarn \
--deploy-mode cluster \  # Driver 在集群中运行(非本地),适合生产
--name "demo_task" \  # 作业名称(YARN 页面可识别,便于监控排查)

# Driver 资源:Spark 驱动器进程,负责任务调度、生成执行计划、收集结果、管理广播变量
# 配置原则:避免内存不足(OOM),不要过度分配造成资源浪费
--driver-cores 4 \        # Driver 使用的 CPU 核心数(生产通用值:2~4核,调度密集型可上调)
--driver-memory 16g \     # Driver 堆内存;需容纳广播表、collect() 结果集,避免 OOM


# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \      # 单个Executor的CPU核数,推荐2-5核平衡并行度与调度效率
--executor-memory 24g \   # 单个Executor内存,YARN会自动追加内存overhead


# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \      # 单个 Executor 占用 CPU 核数;推荐 2~5 核,提升任务并行效率
--executor-memory 24g \   # 单个 Executor 堆内存;YARN 会自动追加 spark.executor.memoryOverhead 内存


# ==============================
# 1、动态资源分配:按需伸缩 Executor 数量,避免资源闲置/排队
# 前提条件:YARN 集群已部署 External Shuffle Service(ESS)
# ==============================
--conf spark.dynamicAllocation.enabled=true \                # 开启动态资源分配(生产必开)
--conf spark.dynamicAllocation.minExecutors=50 \             # 作业最小保留 Executor 数(避免频繁缩容导致卡顿)
--conf spark.dynamicAllocation.maxExecutors=200 \            # 作业最大可用 Executor 数(根据集群总资源上限调整)
--conf spark.dynamicAllocation.initialExecutors=50 \         # 作业启动时初始化 Executor 数
--conf spark.shuffle.service.enabled=true \                  # 开启外部 Shuffle 服务(动态分配必须依赖)
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \ # 开启 Shuffle 追踪,精准释放空闲 Executor

# ==============================
# 2、Shuffle 与全局并行度优化
# 并行度原则:初始分区数 ≈ 总CPU核数 × 1~3 倍,保证任务充分并行
# ==============================
--conf spark.sql.shuffle.partitions=2000 \                  # Spark SQL 默认 Shuffle 分区数(join/agg 时生效)
--conf spark.default.parallelism=2000 \                     # Spark RDD 算子默认并行度(非 SQL 场景生效)


# ==============================
# 3、AQE 自适应查询执行(Spark 3.x 核心优化,自动解决数据倾斜/小分区)
# 作用:运行时动态调整分区数、优化 Join 策略、处理数据倾斜
# ==============================
--conf spark.sql.adaptive.enabled=true \                     # 开启 AQE 自适应执行(生产必开)
--conf spark.sql.adaptive.coalescePartitions.enabled=true \  # 自动合并过小分区,降低任务调度开销
--conf spark.sql.adaptive.skewJoin.enabled=true \            # 自动检测并优化 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \   # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \     # 强制优化倾斜 Join,不依赖自动检测
--conf spark.sql.adaptive.skewPartition.enabled=true \       # 开启倾斜分区自动拆分
--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \ # 倾斜分区判定阈值:256MB
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \  # AQE 目标分区大小:256MB(推荐128MB~512MB)
--conf spark.sql.autoBroadcastJoinThreshold=209175200 \      # 自动广播表阈值:≈200MB;小表自动广播,避免 Shuffle


# ==============================
# 4、Hive 分区表写入优化(核心解决:动态分区失败、HDFS 小文件过多)
# ==============================
--conf spark.sql.hive.mergeFiles=true \                      # 写入 Hive 表时自动合并小文件(减少 NameNode 压力)
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ # 动态分区模式:非严格,允许全动态分区(无静态分区列
--conf spark.hadoop.hive.exec.dynamic.partition=true \           # 开启动态分区写入(写入分区表必开)
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \    # 全局最大允许创建动态分区数(防失控)
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \ # 单节点最大动态分区数
--conf spark.hadoop.hive.merge.mapfiles=true \               # 合并 Map 任务产生的小文件
--conf spark.hadoop.hive.merge.mapredfiles=true \            # 合并 Reduce 任务产生的小文件

# ==============================
# 其他功能开关(按需启用)
# ==============================
--conf spark.sql.crossJoin.enabled=true \                    # 允许笛卡尔积(默认禁用,仅在确实需要时开启)

# ==============================
# 📌 应用入口
# ==============================
template.py $1 $2   # 传入两个参数给 PySpark 脚本(如 日期、业务标识)

2)静态版本

#!/bin/bash

# ==============================
# 🚀 Spark 作业资源配置(静态固定资源版)
# 适用场景:生产环境固定流量/定时调度/稳定负载作业
# 资源模式:固定 200 个 Executor,不自动扩缩容
# ==============================


# ==============================
# 提交 Spark 作业到 YARN 集群
# ==============================
spark-submit --master yarn \
--deploy-mode cluster \  # Driver 在集群中运行(生产标准模式),断连不中断
--name "demo_task" \  # 作业名称(YARN 页面可识别,便于监控排查)


# Driver 资源:负责任务调度、生成执行计划、收集结果、管理广播变量
--driver-cores 4 \        # Driver CPU 核数(调度密集型推荐 2~4 核)
--driver-memory 16g \     # Driver 堆内存,容纳广播表/collect 结果,避免 OOM

# 静态 Executor 配置:固定资源,无动态扩缩容
--num-executors 200 \     # 【静态核心参数】固定启动 200 个 Executor
--executor-cores 4 \      # 单个 Executor CPU 核数,推荐 2~5 核,提升并行效率
--executor-memory 24g \   # 单个 Executor 堆内存,YARN 自动追加内存 overhead,避免 Full GC


# ==============================
# 1、Shuffle 与全局并行度优化(静态资源专用)
# 并行度计算:200 Executor × 4 核 = 800 总核数
# 初始分区 = 总核数 × 2.5 倍,保证并行度充足,配合 AQE 自动调整
# ==============================
--conf spark.sql.shuffle.partitions=2000 \        # Spark SQL 默认 Shuffle 分区数(join/agg 生效)
--conf spark.default.parallelism=2000 \           # Spark RDD 算子默认并行度(非 SQL 场景生效)


# ==============================
# 2、AQE 自适应查询执行(静态资源必开,自动处理倾斜/小分区)
# 作用:运行时动态合并小分区、优化 Join 策略、解决数据倾斜
# ==============================
--conf spark.sql.adaptive.enabled=true \                     # 开启 AQE 自适应执行(生产必开)
--conf spark.sql.adaptive.coalescePartitions.enabled=true \  # 自动合并过小分区,降低任务调度开销
--conf spark.sql.adaptive.skewJoin.enabled=true \            # 自动检测并优化 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \  # 增强版倾斜 Join 优化,适配复杂业务场景
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \     # 强制优化倾斜 Join,不依赖自动检测
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \  # AQE 目标分区大小:256MB(推荐 128~512MB)
--conf spark.sql.autoBroadcastJoinThreshold=209175200 \      # 自动广播表阈值:≈200MB,小表广播避免 Shuffle


# ==============================
# 3、Hive 分区表写入优化(解决小文件+动态分区写入失败)
# ==============================
--conf spark.sql.hive.mergeFiles=true \                      # 写入 Hive 时自动合并小文件,减轻 NameNode 压力
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ # 非严格动态分区(允许全动态分区,无静态分区)
--conf spark.hadoop.hive.exec.dynamic.partition=true \       # 开启动态分区写入(写入分区表必开)
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \   # 全局最大允许创建动态分区数(防止异常创建大量分区)
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \ # 单节点最大动态分区数
--conf spark.hadoop.hive.merge.mapfiles=true \               # 合并 Map 任务产生的小文件
--conf spark.hadoop.hive.merge.mapredfiles=true \            # 合并 Reduce 任务产生的小文件


# ==============================
# 其他功能开关(按需启用,谨慎使用)
# ==============================
--conf spark.sql.crossJoin.enabled=true \        # 允许笛卡尔积(默认禁用;仅业务需要时开启,避免性能雪崩)

# ==============================
# 应用入口
# ==============================
template.py $1 $2   # 向 PySpark 脚本传递 2 个运行参数(例:统计日期、业务标识)

2、基础资源配置(核心)

# Driver 资源:Spark 驱动器进程,负责任务调度、生成执行计划、收集结果、管理广播变量
# 配置原则:避免内存不足(OOM),不要过度分配造成资源浪费
--driver-cores 4 \        # Driver 使用的 CPU 核心数(生产通用值:2~4核,调度密集型可上调)
--driver-memory 16g \     # Driver 堆内存;需容纳广播表、collect() 结果集,避免 OOM


# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \      # 单个Executor的CPU核数,推荐2-5核平衡并行度与调度效率
--executor-memory 24g \   # 单个Executor内存,YARN会自动追加内存overhead


# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
--executor-cores 4 \      # 单个 Executor 占用 CPU 核数;推荐 2~5 核,提升任务并行效率
--executor-memory 24g \   # 单个 Executor 堆内存;YARN 会自动追加 spark.executor.memoryOverhead 内存

单作业资源规格:

  • Driver:4 核 16GB 内存(控制进程)
  • 单 Executor:4 核 24GB 内存(计算进程)
  • 总 Executor 数量:200 个

总资源占用:

  • CPU:200 × 4 = 800 核
  • 内存:200 × 24 = 4800 GB

生产环境风险提示

  • 单个 Spark 作业建议不占用集群总资源的 30%~50%,防止阻塞其他重要任务
  • 200Executor 属于大规模作业,需确保集群队列有足够资源
  • 静态资源作业会长期占用资源,不释放,不适合共享集群高峰时段

1)Driver 资源配置

--driver-cores 4 \        # Driver 使用的 CPU 核心数(生产通用值:2~4核,调度密集型可上调)
--driver-memory 16g \     # Driver 堆内存;需容纳广播表、collect() 结果集,避免 OOM

DriverSpark 作业的“大脑”:

  • 负责解析逻辑、生成执行计划(DAG
  • 协调所有 Executor 的任务调度
  • 接收最终结果(比如你用了 collect()show()

DRIVER_CORES=4:给 Driver 分配 4 个 CPU 核。

  • → 实际上 Driver 大部分时间在等待,1~2 核通常足够
  • 设为 4 是为了应对复杂广播或大量小任务调度。

DRIVER_MEM=16gDriver 内存 16GB。

  • 关键限制:如果你用 broadcast(df) 广播一个大表,或者 collect() 拉回大量数据,总大小不能超过 16GB,否则会 OOM。

2)Executor资源配置

单个 Executor 内存 16~32GB 最佳,核数 2~5

# Executor 资源:每个执行器的资源(建议单个 executor 不超过 32GB,避免 GC 停顿过长)
EXECUTOR_CORES=4        # 每个 Executor 的 CPU 核数(推荐 2~5,平衡并行度与调度开销)
EXECUTOR_MEM=24g        # 每个 Executor 的内存(YARN 实际申请 = 此值 + overhead,默认约 10%+384MB)
  • Executor 是真正干活的“工人”,运行 Task(如 map、filter、shuffle read)。
  • EXECUTOR_CORES=4
    • 表示每个 Executor 可以 并行运行 4 个 Task
    • 推荐值:2~5
      • 太小 → 并行度低;
      • 太大 → 单个 JVM 管理太多线程,GC 压力大。
  • EXECUTOR_MEM = 24g
    • 每个 Executor 的堆内存为 24GB
    • YARN 实际申请的内存 = 24g + overhead

3、启用动态分配

# ==============================
# 1、动态资源分配:按需伸缩 Executor 数量,避免资源闲置/排队
# 前提条件:YARN 集群已部署 External Shuffle Service(ESS)
# ==============================
--conf spark.dynamicAllocation.enabled=true \                # 开启动态资源分配(生产必开)
--conf spark.dynamicAllocation.minExecutors=50 \             # 作业最小保留 Executor 数(避免频繁缩容导致卡顿)
--conf spark.dynamicAllocation.maxExecutors=200 \            # 作业最大可用 Executor 数(根据集群总资源上限调整)
--conf spark.dynamicAllocation.initialExecutors=50 \         # 作业启动时初始化 Executor 数
--conf spark.shuffle.service.enabled=true \                  # 开启外部 Shuffle 服务(动态分配必须依赖)
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \ # 开启 Shuffle 追踪,精准释放空闲 Executor
  • 动态分配(Dynamic Allocation 是生产环境强烈推荐的功能:
    • 作业开始时只申请 MIN_EXECUTORS(如 50 个)
    • 随着数据量增加,自动扩容到最多 MAX_EXECUTORS(如 200 个)
    • 空闲时自动释放资源
  • 好处
    • 避免“申请 400 个但只用 100 个”的资源浪费
    • 避免因一次性申请太多而长时间排队
    • 更弹性,适合数据量波动的场景(如每天日志量不同)

4、AQE 自动处理

# ==============================
# 3、AQE 自适应查询执行(Spark 3.x 核心优化,自动解决数据倾斜/小分区)
# 作用:运行时动态调整分区数、优化 Join 策略、处理数据倾斜
# ==============================
--conf spark.sql.adaptive.enabled=true \                     # 开启 AQE 自适应执行(生产必开)
--conf spark.sql.adaptive.coalescePartitions.enabled=true \  # 自动合并过小分区,降低任务调度开销
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \  # AQE 目标分区大小:256MB(推荐128MB~512MB)
--conf spark.sql.autoBroadcastJoinThreshold=209175200 \      # 自动广播表阈值:≈200MB;小表自动广播,避免 Shuffle

# join倾斜
--conf spark.sql.adaptive.skewJoin.enabled=true \            # 自动检测并优化 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \   # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \     # 强制优化倾斜 Join,不依赖自动检测

# 聚合/去重 倾斜
--conf spark.sql.adaptive.skewPartition.enabled=true \       # 开启倾斜分区自动拆分
--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \ # 倾斜分区判定阈值:256MB

1)Join 倾斜优化

Spark 会自动在 Execution(含 ShuffleStorage(缓存) 之间动态共享内存。

  • 作用:当 join 操作中某些 key 导致分区过大时,AQE 会自动将这些 key 拆分处理(如广播小表部分、加盐等)。
  • 适用操作join, left join, inner join 等。
  • 无需知道热点 key,全自动!
# ==============================
# 🔧 Join 倾斜优化(AQE 自动处理)
# ==============================
--conf spark.sql.adaptive.skewJoin.enabled=true \            # 自动处理 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \   # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \     # 强制优化倾斜 Join,不依赖自动检测

参数解释:

  • spark.sql.adaptive.skewJoin.enabled=true
    • 当执行 JOIN(包括 INNER JOIN, LEFT JOIN, RIGHT JOIN 等)时,若 AQE 检测到某些 key 对应的 Shuffle 分区过大(如热点用户 ID),会自动将这些倾斜 key 单独拆分处理
      • 若小表侧可广播 → 自动广播该 key 的匹配数据
      • 否则 → 自动对大表侧的倾斜 key 加盐打散(salting),启动多个 Task 并行 join

适用场景:

  • 大表 JOIN 小表,但小表中某些 key 被大量匹配(多对多)
  • 匿名用户(如 user_id = "0")导致 join 任务卡死
  • 不知道具体热点 key,希望全自动处理

版本要求:Spark ≥ 3.0skewJoin 自 Spark 3.0 引入)

使用建议:

  • 所有 Spark 3.x 生产作业都应开启此配置
  • 无需修改 SQL 或 DataFrame 代码
  • 若倾斜极严重(单 key > 50%),可配合侵入式方案使用

3)聚合/去重倾斜

  • 作用:对任意 Shuffle 后的大分区(如 groupBy, distinct, repartition)自动切分成多个子任务并行处理。
  • 本质:自动实现“两阶段聚合”,但不用写一行额外代码
# ==============================
# 自动处理通用 Shuffle 分区倾斜(Spark 3.4+ 新增)
# ==============================
--conf spark.sql.adaptive.skewPartition.enabled=true \ # 开启倾斜分区自动拆分
	# 开启“倾斜分区自动切分”功能。
	# 当任意 Shuffle 阶段(如 groupBy、distinct、repartition)出现超大分区时,
	# AQE 会自动将该分区拆成多个子任务并行处理,避免单 Task 成为瓶颈。

--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \
  # 设置“倾斜分区”的判定阈值:256MB(268435456 字节)。
  # 任何 Shuffle 分区大小超过此值,就会被 AQE 视为倾斜并自动切分。
  # 建议范围:100MB ~ 512MB,根据集群内存和数据量调整。

参数解释:

  • spark.sql.adaptive.skewPartition.enabled=true:开启“倾斜分区自动切分”功能。

    • 任意 Shuffle Read 阶段(不限于 join),若某个分区大小超过 thresholdInBytesAQE自动将该大分区切分为多个子分区,并启动多个 Task 并行处理同一个原始分区

    • 典型受益操作

      • df.groupBy("key").agg(...)(聚合倾斜)

      • df.distinct()countDistinct()(去重倾斜)

      • df.repartition("key")(重分区后倾斜)

  • skewPartition.thresholdInBytes=268435456:设定“大分区”阈值为 256MB(可根据集群调整,通常 100~512MB)

适用场景:

  • groupBy 某个字段时,个别 key 出现频率极高(如日志中的默认值)
  • distinct 操作因重复值分布不均导致 Task 不均衡
  • 任何非 join 类 Shuffle 操作出现长尾 Task

版本要求:Spark ≥ 3.4skewPartition 是 Spark 3.4 新增功能!)

注意:在 Spark 3.0 ~ 3.3 中,此参数无效,需手动实现“两阶段聚合”。

使用建议:

  • 如果你已升级到 Spark 3.4+,强烈建议开启
  • 可与 skewJoin 同时启用,形成完整倾斜防护体系
  • 对极端倾斜仍可能不足,需结合采样 + 侵入式处理

5、Hive 写入配置

# ==============================
# Hive 分区表写入优化(解决小文件+动态分区写入失败)
# ==============================
--conf spark.sql.hive.mergeFiles=true \                      # 写入 Hive 时自动合并小文件,减轻 NameNode 压力
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ # 非严格动态分区(允许全动态分区,无静态分区)
--conf spark.hadoop.hive.exec.dynamic.partition=true \       # 开启动态分区写入(写入分区表必开)
--conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \   # 全局最大允许创建动态分区数(防止异常创建大量分区)
--conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \ # 单节点最大动态分区数
--conf spark.hadoop.hive.merge.mapfiles=true \               # 合并 Map 任务产生的小文件
--conf spark.hadoop.hive.merge.mapredfiles=true \            # 合并 Reduce 任务产生的小文件

  • 允许写入大量动态分区(如按天/用户ID分区)
  • 合并小文件,减少 HDFS 小文件问题

6)建议

1)根据集群规模调整 MAX_EXECUTORS

可通过 yarn node -list 查看集群总资源

集群总 CPU 核数 推荐 MAX_EXECUTORS
200 核 40~60
500 核 100~150
1000+ 核 200~400

2)监控实际分区大小

  • Spark UISQL tab 中查看 Shuffle 后的分区大小
  • 如果多数分区 « 100MB → 可适当降低 advisoryPartitionSizeInBytes
  • 如果出现个别分区 » 1GB → AQE 的 skewJoin 会自动拆分(确保已开启)

四、数据倾斜

1、如何快速识别倾斜

1)Spark UI

  • 进入 Stages 页面
  • 查看 Task Duration 分布图
    • 如果是“长尾”或“单点突出”,就是倾斜
  • 点进具体 Task,看 Input Size / Shuffle Read Size

2)采样分析热点 Key

经验:如果 top 1 key 的 count > 总量的 10%,基本会倾斜。

# 找出 top 100 高频 key
df.groupBy("key").count().orderBy(desc("count")).show(100)

2、非侵入式-参数调优

不改业务逻辑,通过 Spark 参数/配置自动缓解

1)AQE 自适应优化:解决Join 数据倾斜

# ==============================
# 🔧 Join 倾斜优化(AQE 自动处理)
# ==============================
--conf spark.sql.adaptive.skewJoin.enabled=true \            # 自动处理 Join 数据倾斜
--conf spark.sql.adaptive.skewJoin.enhanced.enabled=true \   # 增强版倾斜 Join 优化(支持更复杂场景)
--conf spark.sql.adaptive.forceOptimizeSkewedJoin=true \     # 强制优化倾斜 Join,不依赖自动检测

2)AQE 自适应优化:聚合/去重倾斜

# ==============================
# 自动处理通用 Shuffle 分区倾斜(Spark 3.4+ 新增)
# ==============================
--conf spark.sql.adaptive.skewPartition.enabled=true \ # 开启倾斜分区自动拆分
	# 开启“倾斜分区自动切分”功能。
	# 当任意 Shuffle 阶段(如 groupBy、distinct、repartition)出现超大分区时,
	# AQE 会自动将该分区拆成多个子任务并行处理,避免单 Task 成为瓶颈。

--conf spark.sql.adaptive.skewPartition.thresholdInBytes=268435456 \
  # 设置“倾斜分区”的判定阈值:256MB(268435456 字节)。
  # 任何 Shuffle 分区大小超过此值,就会被 AQE 视为倾斜并自动切分。
  # 建议范围:100MB ~ 512MB,根据集群内存和数据量调整。

3)调整 Shuffle 分区数(治标不治本)

--conf spark.sql.shuffle.partitions=5000  # 增大分区数,稀释热点
  • 参数设置
    • 太小 → 每个分区太大 → Task 处理慢、OOM 风险高、并行度低
    • 太大 → 每个分区太小 → 调度开销大、产生大量小文件
  • 适用场景:倾斜不严重,或热点 key 分布较“散”
  • 风险:可能产生更多小文件,调度开销变大

3、侵入式-改逻辑

修改业务逻辑,从数据/算法层面解决

1)预处理热点 Key

  • 优点:精准打击,效果最好
  • 缺点:代码复杂,需维护 hot key 列表

  • 思路:把倾斜的 key 单独拎出来,用不同策略处理
  • 步骤
    1. 采样找出 hot keys(如 top 100)
    2. 将原数据拆成两份:
      • normal_data:非热点 key
      • hot_data:热点 key
    3. 分别处理:
      • normal_data → 正常 join/group
      • hot_data → 单独聚合 or 广播处理
    4. 合并结果
hot_keys = [row.key for row in df.groupBy("key").count().orderBy(desc("count")).limit(100).collect()]

normal_df = df.filter(~col("key").isin(hot_keys))
hot_df = df.filter(col("key").isin(hot_keys))

# normal 走常规流程
normal_result = normal_df.groupBy("key").sum("value")

# hot key 单独处理(比如用 collect + local agg)
hot_local = hot_df.collect()
# ... 本地聚合后转回 DataFrame

final_result = normal_result.union(hot_result)

2)启用 Salting(加盐)—— 半自动方案

虽然需要改代码,但属于通用技巧,常归为“非侵入式思维”。

  • 本质:把一个大 key 拆成 N 个小 key,分散到不同分区
  • 适用:已知热点 key 列表(可提前采样)
from pyspark.sql.functions import *

# 假设 key="user_id" 有倾斜
SALT_NUM = 10

# 步骤1:给大表(big_df)的热点 key 加盐
big_df_salted = big_df.withColumn(
    "salted_key", 
    when(col("user_id").isin(hot_keys), 
         concat(col("user_id"), lit("_"), (rand() * SALT_NUM).cast("int"))
    ).otherwise(col("user_id"))
)

# 步骤2:给小表膨胀(每个 hot key 复制成 SALT_NUM 份)
small_df_exploded = small_df.withColumn(
    "salted_key",
    explode(array([concat(col("user_id"), lit("_"), lit(i)) for i in range(SALT_NUM)]))
).union(
    small_df.filter(~col("user_id").isin(hot_keys)).withColumn("salted_key", col("user_id"))
)

# 步骤3:用 salted_key join
result = big_df_salted.join(small_df_exploded, "salted_key")

原理:

  • 假设:

    • big_df:10 亿条用户行为日志,其中 user_id = "0"(匿名用户)占了 3 亿条

    • small_df:10 万条用户画像表,包含 user_id = "0" 的一条记录

      # 直接 join 会怎样?
      result = big_df.join(small_df, "user_id")
      
  • 问题:

    • 所有 user_id = "0" 的 3 亿条日志 → 被 Shuffle同一个分区
    • 小表中 user_id = "0" 的那条记录 → 也被广播/发送到这个分区
    • 结果:一个 Task 要处理 3 亿 × 1 = 3 亿行 join → 卡死 or OOM
  • 解法:只对热点 key “加盐打散”

    • 把 3 亿条 user_id=0 拆成 10 份,每份 3000 万,分别和小表的 user_id=0join
    • 步骤1:给大表(big_df)的热点 key 加盐
      • 让原本聚集在 user_id=0 的数据,分散到 10 个不同的 key 上。
    • 步骤2:把小表(small_df)的热点 key “膨胀”成 10 份

3) 两阶段聚合(适用于 groupBy 倾斜)

  • 思想:先局部聚合,再全局聚合
  • 实现
 第一阶段:加随机前缀,局部聚合
df1 = df.withColumn("prefix", (rand() * 10).cast("int")) \
        .withColumn("new_key", concat(col("prefix"), lit("_"), col("key"))) \
        .groupBy("new_key").sum("value")

# 第二阶段:去掉前缀,全局聚合
df2 = df1.withColumn("key", split(col("new_key"), "_")[1]) \
         .groupBy("key").sum("sum(value)")
  • 原理
    • 如果直接 groupBy("key").sum("value")
      • 所有 X 的记录会被 Shuffle同一个分区一个 Task 处理 1 亿条数据 → OOM 或卡死!
    • 给每条记录加一个随机前缀(0~9
      • 效果:原本 1 亿条 X打散到 10 个新 key0_X, 1_X, …, 9_X
      • 每个新 key 平均约 1000 万条(理想情况下)
    • new_key 局部聚合
      • 现在有 10Task 分别处理 0_X ~ 9_X
      • 每个 Task 只处理 ~1000 万条,压力大大降低!

4、业务层规避

  • 例子1:用户 ID 为 0 表示“匿名用户” → 改为随机 UUID
  • 例子2:日志中 ip="0.0.0.0" 太多 → 预处理过滤或打标
  • 核心:从源头减少极端值

五、FQA

1、关于 Task 数量

  • Task 的数量 = 分区(Partition)的数量
  • 对于一个 DataFrameRDD,其分区数决定了后续操作中并行执行的 Task 数量。
  • 因此,你不能直接指定“SQL 查询拆分成多少个 Task”,但你可以间接控制分区数,从而影响 Task 数量。

2、如何控制 Task 数量?

1)设置 shuffle 分区数(最常用)

Spark SQL 在进行 shuffle 操作(如 join、groupBy、distinct 等)时,默认会使用 spark.sql.shuffle.partitions 参数来决定 shuffle 后的分区数(即 Task 数)。

--conf spark.sql.shuffle_partitions=50

2)查看实际 Task 数量

你可以在 Spark UI中查看每个 JobStageTask 数量,也可以通过代码获取:

// 查看当前 DataFrame 的分区数= Task 
println(df.rdd.getNumPartitions) 

3、SQL 是不是被“拆分”了

是的,但不是“拆成多个 SQL,而是只有一个 SQL 被分布式执行。

  • 一个 逻辑 SQL 查询 → 被 Catalyst 优化器转换为 物理执行计划(Physical Plan)
  • 物理计划由多个 Stage 组成;
  • 每个 Stage 包含多个 Task(每个 Task 处理一个数据分区);
  • 所有 Task 并行运行在不同 Executor 上,但它们执行的是同一个作业的不同部分

4、SQL 本身会出现在 Executor 日志里吗

  • Spark 不会自动把原始 SQL 写入每个 Executor 的日志
  • Driver 会记录 SQL(在 DriverstderrSpark History Server 中);
  • Executor 日志主要记录:
    • Task 执行过程;
    • 序列化/反序列化错误;
    • OOM、GC、网络异常;
    • UDF(自定义函数)抛出的异常。

5、监控需要看什么

  • 不要期望在 Executor 日志里看到完整 SQL——那是 Driver 的职责;
  • Executor 日志的核心价值是:告诉你“执行时发生了什么错误”
  • 结合 Stages 页面 + Executors 页面 + stderr,才能精准定位问题。
目标 看哪里? 关注什么?
查 SQL 是否提交成功 Driver 日志 SQL 语句、Job ID
查 Task 为什么失败 Executor stderr 异常堆栈、OOM、FetchFailed
查 UDF 逻辑错误 Executor stdout/stderr print 输出、AttributeError
查 Python 环境问题 Executor stderr ModuleNotFoundError
查数据倾斜或热点 Executors 页面 + stderr 某个 Executor 失败频繁

ContactAuthor