ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

gRPC(Java) keepAlive-icode9机制研究

2022-11-19 13:32:49  阅读:315  来源: 互联网

标签:icode9 机制研究 keepAlive java grpc 操作系统  框架创建


结论

  1. gRPC keepAlive是grpc框架在应用层面连接保活的一种措施。即当grpc连接上没有业务数据时,是否发送pingpong,以保持连接活跃性,不因长时间空闲而被Server或操作系统关闭
  2. gRPC keepAlive在client与server都有,client端默认关闭(keepAliveTime为Long.MAX_VALUE), server端默认打开,keepAliveTime为2小时,即每2小时向client发送一次ping
// io.grpc.internal.GrpcUtil
public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
  1. KeepAlive的管理使用类io.grpc.internal.KeepAliveManager, 用于管理KeepAlive状态,ping任务调度与执行.

Client端KeepAlive

使用入口

  1. 我们在使用io.grpc框架创建grpc连接的时候,可以设置keeplive, 例如下面:
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //
      .usePlaintext() //
      .defaultLoadBalancingPolicy(props.getBalancePolicy()) //
      .maxInboundMessageSize(props.getMaxInboundMessageSize()) //
      .keepAliveTime(1,TimeUnit.MINUTES)
      .keepAliveWithoutCalls(true)
      .keepAliveTimeout(10,TimeUnit.SECONDS)
      .intercept(channelManager.getInterceptors()); //
  1. 其中与keepAlive相关的参数有三个,keepAliveTime,keepAliveTimeout,keepAliveWithoutCalls。这三个变量有什么作用呢?
  • keepAliveTime: 表示当grpc连接没有数据传递时,多久之后开始向server发送ping packet
  • keepAliveTimeout: 表示当发送完ping packet后多久没收到server回应算超时
  • keepAliveTimeoutCalls: 表示如果grpc连接没有数据传递时,是否keepAlive,默认为false

简要时序列表

Create & Start

NettyChannelBuilder
   -----> NettyTransportFactory
   ---------> NettyClientTransport
   -------------> KeepAliveManager & NettyClientHandler

响应各种事件
当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。

Server端KeepAlive

使用入口

// 只截取关键代码,详细代码请看`NettyServerBuilder`
ServerImpl server = new ServerImpl(
    this,
    buildTransportServers(getTracerFactories()),
    Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
  notifyTarget.notifyOnBuild(server);
}
return server;

// 在buildTransportServers方法中创建NettyServer
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
  NettyServer transportServer = new NettyServer(
      listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
      workerEventLoopGroupPool, negotiator, streamTracerFactories,
      getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
      maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
      maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
      permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
  transportServers.add(transportServer);
}

简要时序列表

Create & Start

NettyServerBuilder
    ---> NettyServer
    ---------> NettyServerTransport
    -------------> NettyServerHandler
    -----------------> KeepAliveEnforcer

连接准备就绪
调用 io.netty.channel.ChannelHandler的handlerAdded方法,关于此方法的描述:

Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
NettyServerHandler(handlerAdded)
   ---> 创建KeepAliveManager对象

响应各种事件
同Client

KeepAliveEnforcer

在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer
此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。

package io.grpc.netty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;

/** Monitors the client's PING usage to make sure the rate is permitted. */
class KeepAliveEnforcer {
  @VisibleForTesting
  static final int MAX_PING_STRIKES = 2;
  @VisibleForTesting
  static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);

  private final boolean permitWithoutCalls;
  private final long minTimeNanos;
  private final Ticker ticker;
  private final long epoch;

  private long lastValidPingTime;
  private boolean hasOutstandingCalls;
  private int pingStrikes;

  public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {
    this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);
  }

  @VisibleForTesting
  KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {
    Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");

    this.permitWithoutCalls = permitWithoutCalls;
    this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);
    this.ticker = ticker;
    this.epoch = ticker.nanoTime();
    lastValidPingTime = epoch;
  }

  /** Returns {@code false} when client is misbehaving and should be disconnected. */
  @CheckReturnValue
  public boolean pingAcceptable() {
    long now = ticker.nanoTime();
    boolean valid;
    if (!hasOutstandingCalls && !permitWithoutCalls) {
      valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;
    } else {
      valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;
    }
    if (!valid) {
      pingStrikes++;
      return !(pingStrikes > MAX_PING_STRIKES);
    } else {
      lastValidPingTime = now;
      return true;
    }
  }

  /**
   * Reset any counters because PINGs are allowed in response to something sent. Typically called
   * when sending HEADERS and DATA frames.
   */
  public void resetCounters() {
    lastValidPingTime = epoch;
    pingStrikes = 0;
  }

  /** There are outstanding RPCs on the transport. */
  public void onTransportActive() {
    hasOutstandingCalls = true;
  }

  /** There are no outstanding RPCs on the transport. */
  public void onTransportIdle() {
    hasOutstandingCalls = false;
  }

  /**
   * Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important
   * to use something like this instead of directly comparing nano times. See {@link
   * System#nanoTime}.
   */
  private static long compareNanos(long time1, long time2) {
    // Possibility of overflow/underflow is on purpose and necessary for correctness
    return time1 - time2;
  }

  @VisibleForTesting
  interface Ticker {
    long nanoTime();
  }

  @VisibleForTesting
  static class SystemTicker implements Ticker {
    public static final SystemTicker INSTANCE = new SystemTicker();

    @Override
    public long nanoTime() {
      return System.nanoTime();
    }
  }

标签:icode9,机制研究,keepAlive,java,grpc,操作系统,,框架创建
来源:

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有