ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink checkpoint失败

2021-04-06 16:05:00  阅读:349  来源: 互联网

标签:Flink 快照 flink checkpoint 失败 检查点 Barriers config


 

目录

前言

问题描述

问题定位

checkpoint的基本原理

思路

现象

问题解决

前言

Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照。这些快照充当一致的检查点,如果发生故障,系统可以回退到这些检查点。Flink绘制这些快照的机制在“分布式数据流的轻量级异步快照”中进行了介绍。它受 用于分布式快照的标准Chandy-Lamport算法的启发, 并且专门针对Flink的执行模型进行了量身定制。

问题描述

flink任务,从kafka中获取数据,经过处理,写入到另外的一个kafka中,开启了checkpoint,配置如下:

        CheckpointConfig config = env.getCheckpointConfig();

        // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 设置checkpoint的周期, 每隔3000 ms进行启动一个检查点
        config.setCheckpointInterval(3 * 60 * 1000);

        // 设置模式为AT_LEAST_ONCE,降低性能损耗
        config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        config.setMinPauseBetweenCheckpoints(500);

        // 检查点必须在2分钟内完成,或者被丢弃【checkpoint的超时时间】
        config.setCheckpointTimeout(2 * 60 * 1000);

        // 同一时间只允许进行一个检查点
        config.setMaxConcurrentCheckpoints(1);

通过flink web页面发现checkpoint总是失败,checkpoint超时(时间>CheckpointTimeout)。

问题定位

在解决这个问题之前,我们先应该了解下flink checkpoint的基本原理

checkpoint的基本原理

  1. jobManager发起checkpoint
  2. source Task将Barriers 注入到数据流中向下流动
  3. 中间operator从所有的输入通道中接收到Barriers后(对齐),制作快照,给jobmanager发送ack消息,同时将Barriers发送到其所有输出
  4. 最后sink完成checpoint后,整个checkpoint完成

注:flink提供了俩种语义,Exactly Once和At Least Once语义,俩者之间不同点在于,在并行度下,operator存在多个输入端,operator从其中一个输入端接收到Barriers后,会存在俩种情况

  1. Exactly Once:停止处理数据,等待所有输入端Barriers到达
  2. At Least Once:继续处理数据,不会阻塞处理

 

思路

通过原理我们可以知道,排除配置问题(状态后端配置错误,因为统一配置,所以基本不会存在配置错误)外,影响checkpoint时间的因素Barriers对齐时间,

影响Barriers对齐其实本质是数据的流动问题,而影响数据流动的因素有俩个

  1. 反压:导致数据流动堵塞
  2. 数据倾斜:导致某一个输入端的Barriers到达慢,对齐时间长(对齐需要所有输入端都到达才可以

现象

我们通过flink web管理台里可以看到,如下图的失败详情,可以看到一个并行度为24的算子,17和23这俩个subtask,迟迟没有对齐,

我们打开对应的任务监控,如下图,发现了数据倾斜问题,那么到底是什么原因导致的数据倾斜呢,

在flink任务里可以看到这个东西,这个代表的就是flink的分区器,

flink提供了如下的分区器策略

  1. ForwardPartitioner:FORWARD
  2. ShufflePartitioner,SHUFFLE
  3. RebalancePartitioner:REBALANCE
  4. RescalePartitioner:RESCALE
  5. KeyGroupStreamPartitioner:HASH
  6. CustomPartitionerWrapper:CUSTOM

Keyby:hash,最容易造成数据倾斜,通过上边的图,我们基本可以判断,是keyby导致到数据倾斜

 

问题解决

 

既然直接知道是keyby导致的数据倾斜,在keyby里key后缀增加随机数,使得数据均匀分布

.keyBy(new KeySelector<JSONObject, String>() {
    @Override
    public String getKey(JSONObject jsonObject) throws Exception {

        String eventcode = jsonObject.getString(Constants.EVENT_CODE);

        return eventcode + RandomUtils.nextInt(0, 128);
    }
})

问题解决!

 

标签:Flink,快照,flink,checkpoint,失败,检查点,Barriers,config
来源: https://blog.csdn.net/qq_24505127/article/details/115462742

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有