ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink实战之入库任务调优

2020-12-03 17:34:02  阅读:444  来源: 互联网

标签:java scala Flink taskmanager 调优 内存 memory akka 入库


背景

在调试flink写hdfs和hive时,任务总是报各种各样的异常,其中255问题最多,异常信息如下:

java.lang.Exception: Exception from container-launch.
Container id: container_1597847003686_5818_01_000002
Exit code: 255
Stack trace: ExitCodeException exitCode=255: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
	at org.apache.hadoop.util.Shell.run(Shell.java:507)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 255

	at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这段异常是yarn报出来的,根本原因是Direct Memory OOM了。那么该如何调优呢,容我慢慢道来。
我们先看下Flink的内存模型。

Flink内存模型

JVM Heap内存

堆内存包括:

  1. Framework Heap内存:flink框架使用的堆内存
  2. Task Heap内存:任务使用堆内存(java对象,基于内存的backend存储的state对象)

配置参数:

taskmanager.memory.framework.heap.size

taskmanager.memory.task.heap.size

JVM Off-Heap内存

对外内存:

  1. Framework Off-Heap内存:flink框架使用的对外内存
  2. Task Off-Heap内存:任务使用的对外内存

配置参数:

taskmanager.memory.framework.off-heap.size

taskmanager.memory.task.off-heap.size

Framework vs Task

区分:是否计入Slot资源

Framework:flink框架运行使用的内存

Task:任务运行使用的内存,包括heap、off-heap、managed、direct

Heap vs Off-Heap

区分:jvm堆内存和对外内存

Heap:jvm堆

Off-Heap:包括Direct、Native

Framework Heap+Task Heap = -Xmx

Framework off-heap +task off-heap + network = -XX:MaxDirectMemorySize

Network Memory(网络buffer)

属于Directory Memory

用途:

用于task之间缓冲数据,input buffer pool / output buffer pool

配置参数:

taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction

Managed Memory(托管内存)

属于Native Memory

用途:

  1. streaming任务RocksDB Backend
  2. batch任务的sort、hash table、中间结果缓存
  3. python任务的UDF使用

配置参数:

设置大小:taskmanager.memory.managed.size

设置比率:taskmanager.memory.managed.fraction

JVM Metaspace & Overhead

都是jvm本身的开销

JVM Metaspace

用途:存放JVM加载的类的元数据,加载的类越多需要空间越大

所以如果任务需要加载大量第三方库时,可以调大Metaspace内存

配置参数:

taskmanager.memory.jvm-metaspace.size

JVM Overhead

属于Native Memory

用途:用于其他JVM开销,比如Code Cache、Thread Stack、garbage collection space 等。

配置参数:

taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction

看完上面的总结,想必大家已经有了大概了解,回到我们的入库任务,理解入库任务主要会使用哪一块的内存,那么如何调优也就一目了然了。

入库任务使用内存

入库任务底层原理都是基于StreamingFileSink写Hdfs文件。借助BulkWriter进行写入,数据是先写到Direct Memory当中,然后在文件滚动时flush到hdfs。所以主要使用的Direct Memory,其属于task off-heap内存。
同时我们任务使用了RocksDB的状态后端,但是状态不是很大,也就1M左右。所以可以适当减少Managed Memory的大小。最终效果是调大了task off-heap的内存,调小了Managed Memory的内存,然后任务就不再报255了。
taskmanager.memory.task.off-heap.size和taskmanager.memory.managed.fraction

标签:java,scala,Flink,taskmanager,调优,内存,memory,akka,入库
来源: https://blog.csdn.net/weixin_41608066/article/details/110549951

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有