- 实时任务优化-数据开发的看家本领
- 采用 Direct 连接方式取代Receiver 的形式
- 合理设置并行度
- 合理的 Kafka 拉取量,防止单个批次读取太多的数据超过处理上限
- 合理设置批次时间
- 反压,根据JobScheduler反馈作业的执行信息来动态调整数据接收率
- 广播大变量
- gc优化,SparkStreaming 程序对实时性要求会较高,所以我们需要尽可能降低 JVM 垃圾回收所导致的延迟
- 设置合理的 CPU 资源数
- 参考资料
导引
- 大数据开发之路-概述
- flume-高度定制化的日志采集传输系统
- sqoop-rdbms和hadoop之间的数据同步工具
- datax-多种异构数据源间的高效数据同步工具
- canal-基于MySQL binlog的增量同步工具
- hdfs-大数据生态系统的存储基石
- hbase-基于hdfs的支持实时随机读写的列式数据库
- kudu-介于hdfs和hbase之间的“锋卫摇摆人”
- kafka-出色的分布式高性能发布/订阅消息队列系统
- hive-基于Hadoop的数据仓库工具
- spark streaming-分布式流式计算引擎
- 离线任务优化-数据开发的看家本领
- 实时任务优化-数据开发的看家本领
- presto-基于MPP架构的分布式SQL交互式查询引擎
- clickhouse-OLAP分析数据库的异域猛禽
- doris-OLAP分析数据库的国产之光
- kylin-MOLAP引擎的“扛把子”
- mongodb-最接近“mysql”的nosql数据库
- yarn-大数据生态体系的资源管理系统
- ranger-大数据生态体系的统一权限控制系统
- cap原则-分布式系统设计的指导思想
- 行存和列存-大数据存取的选择
- 存算分离-让大数据获得真正的弹性
- 离线数仓-大数据系统的重量级应用
- 实时数仓-实时性需求催生的数仓架构
- 流批一体-实时数仓架构的百家争鸣
- olap-基于数仓的多维实时分析系统
- 数据湖-数据宇宙的未来架构(科普篇)
关联阅读
Spark任务性能监控
实时任务优化-数据开发的看家本领
《大数据开发之路-离线任务优化》中的大部分思路和方法也适用于实时任务的优化,可以参考一下。本篇重点总结一下跟spark streaming和kafka相关的内容。
采用 Direct 连接方式取代Receiver 的形式
- 前者在executor中有Receiver接收数据,并且1个Receiver占用一个core;而后者无Receiver,所以不会占用core
- 前者InputDStream的分区是
num_receiver*batchInterval/blockInteral
,后者的分区数是kafka topic partition的数量。Receiver模式下num_receiver的设置不合理会影响性能或造成资源浪费;如果设置太小,并行度不够,整个链路上接收数据将是瓶颈;如果设置太多,则会浪费资源 - 前者使用zookeeper来维护consumer的偏移量,而后者需要自己维护偏移量
- 为了保证不丢失数据,前者需要开启WAL机制,而后者不需要,只需要在程序中成功消费完数据后再更新偏移量即可
由于以上特点,receiver模式下会造成一定的资源浪费;使用checkpoint保存状态, 如果需要升级程序,则会导致checkpoint无法使用;receiver模式下会导致程序不太稳定;并且如果设置receiver数量不合理也会造成性能瓶颈在receiver。为了优化资源和程序稳定性,应将receiver模式改造成direct模式。
合理设置并行度
spark streaming任务的并行度一般设置为kafka topic的分区数,所以kafka topic的分区数是否合理,分区数据量是否均衡很关键。可以优先通过调整topic分区数控制每个partition的数据量,如果无法调整分区数,再通过repartition增加并行度。
合理的 Kafka 拉取量,防止单个批次读取太多的数据超过处理上限
结合 batchDuration 设置的值,调整spark.streaming.kafka.maxRatePerPatition
参数,注意该参数配置的是 Kafka 每个 partition 拉取的上限,数据总量还需乘以所有的 partition 数量,调整两个参数 maxRatePerPartition 和 batchDuration 使得数据的拉取和处理能够平衡,尽可能地增加整个系统的吞吐量。
合理设置批次时间
batchDuration 本身不能设置为小于 500ms,这会导致 SparkStreaming 进行频繁地提交作业,造成额外的开销,减少整个系统的吞吐量;相反如果将 batchDuration 时间设置得过长,也会影响整个系统的吞吐量
通过调整并行度/executor内存,尽量将每个批次的处理时间控制在批次间隔的40%~80%之间,达到资源和效率的均衡。
如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的 batch 的端到端延迟来观察,也可以看全局延迟来观察(可以在 Sparklog4j 的日志里或者使用 StreamingListener 接口,也可以直接在 UI 界面查看)。如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以通过观察spark metrics来分析任务的性能状态。
反压,根据JobScheduler反馈作业的执行信息来动态调整数据接收率
1 | spark.streaming.backpressure.enabled |
广播大变量
在集群节点间进行数据传输时,都会有序列化和反序列化的开销,如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用 Driver 节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。
更新广播变量的方式是,利用 unpersist()函数先将已经发布的广播变量删除,然后修改数据后重新进行广播
gc优化,SparkStreaming 程序对实时性要求会较高,所以我们需要尽可能降低 JVM 垃圾回收所导致的延迟
- 通过配置
spark.rdd.compress
进行更进一步的压缩 - 及时清理老数据:默认情况下所有的输入数据和由 DStream 的 Transormation 操作产生的持久 RDD 会被自动清理,即 SparkStreaming 会决定何时对数据进行清理。例如,假设我们使用 10 分钟的窗口操作,SparkStreaming 会保存之前 10 分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过 stremingContext.remember 进行设置
- CMS 垃圾回收器:不同于Spark,由于需要减少 GC 间的停顿,所以这里建议使用并发标记清除类的 GC 方式。即使并发 GC 会降低全局系统的生产吞吐量,但是使用这种 GC 可以使得每个 Batch 的处理时间更加一致(不会因为某个 Batch 处理时发生了 GC,而导致处理时间剧增)。我们需要确保在 Driver 节点(在 spark-submit 中使用—driver-java-options)和 Executor 节点(在 Spark 配置中使用 spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了 CMSGC 方式
设置合理的 CPU 资源数
每个 Executor 可以占用一个或多个 core,观察 CPU 使用率(Linux 命令 top)来了解计算资源的使用情况。例如,很常见的一种浪费是一个 Executor 占用了多个 core,但是总的 CPU 使用率却不高(因为一个 Executor 并不会一直充分利用多核的能力),这个时候可以考虑让单个 Executor 占用更少的 core,同时 Worker 下面增加更多的 Executor;或者从另一个角度,增加单个节点的 worker 数量,当然这需要修改 Spark 集群的配置,从而增加 CPU 利用率。值得注意是,这里的优化有一个平衡,Executor 的数量需要考虑其他计算资源的配置,Executor 的数量和每个 Executor 分到的内存大小成反比,如果每个 Executor 的内存过小,容易产生内存溢出(outofmemory)的问题。