ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Zookeeper源码部分 第2章 ZK服务端初始化源码解析

2022-06-04 23:31:30  阅读:195  来源: 互联网

标签:args ZK Zookeeper else 源码 IOException quorumPeer new config


ZK服务端初始化源码解析

image-20220604222359643

2.2.1 ZK服务端启动脚本分析

1)Zookeeper服务的启动命令是zkServer.sh start

zkServer.sh

#!/usr/bin/env bash
# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"

if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  . "$ZOOBINDIR"/../libexec/zkEnv.sh
else
  . "$ZOOBINDIR"/zkEnv.sh  //相当于获取zkEnv.sh中的环境变量(ZOOCFG="zoo.cfg")
fi

# See the following page for extensive details on setting
# up the JVM to accept JMX remote management:
# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
# by default we allow local JMX connections
if [ "x$JMXLOCALONLY" = "x" ]
then
    JMXLOCALONLY=false
fi

if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
  echo "ZooKeeper JMX enabled by default" >&2
  if [ "x$JMXPORT" = "x" ]
  then
    # for some reason these two options are necessary on jdk6 on Ubuntu
    #   accord to the docs they are not necessary, but otw jconsole cannot
    #   do a local attach
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
  else
    if [ "x$JMXAUTH" = "x" ]
    then
      JMXAUTH=false
    fi
    if [ "x$JMXSSL" = "x" ]
    then
      JMXSSL=false
    fi
    if [ "x$JMXLOG4J" = "x" ]
    then
      JMXLOG4J=true
    fi
    echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
    echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
    echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
    echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
  fi
else
    echo "JMX disabled by user request" >&2
    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

if [ "x$SERVER_JVMFLAGS" != "x" ]
then
    JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
fi

… …

case $1 in
start)
    echo  -n "Starting zookeeper ... "
    if [ -f "$ZOOPIDFILE" ]; then
      if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
         echo $command already running as process `cat "$ZOOPIDFILE"`.
         exit 1
      fi
    fi
    nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" 
    "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
    -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' 
    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
    … …
    ;;
stop)
    echo -n "Stopping zookeeper ... "
    if [ ! -f "$ZOOPIDFILE" ]
    then
      echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
    else
      $KILL $(cat "$ZOOPIDFILE")
      rm "$ZOOPIDFILE"
      sleep 1
      echo STOPPED
    fi
    exit 0
    ;;
restart)
    shift
    "$0" stop ${@}
    sleep 3
    "$0" start ${@}
    ;;
status)
    … …
    ;;
*)
    echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}" >&2

esac

2)zkServer.sh start底层的实际执行内容

nohup "$JAVA" 
+ 一堆提交参数 
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG" (zkEnv.sh文件中ZOOCFG="zoo.cfg")

3)所以程序的入口是QuorumPeerMain.java类

2.2.2 ZK服务端启动入口

1)ctrl + n,查找QuorumPeerMain

​ QuorumPeerMain.java

public static void main(String[] args) {
    // 创建了一个zk节点
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        // 初始化节点并运行,args相当于提交参数中的zoo.cfg
        main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
        ... ...
    }
    LOG.info("Exiting normally");
    System.exit(0);
}

2)initializeAndRun

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
	// 管理zk的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
		// 1解析参数,zoo.cfg和myid
        config.parse(args[0]);
    }

    // 2启动定时任务,对过期的快照,执行删除(默认该功能关闭)
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

2.2.3 解析参数zoo.cfg和myid

QuorumPeerConfig.java

public void parse(String path) throws ConfigException {
    LOG.info("Reading configuration from: " + path);
   
    try {
		// 校验文件路径及是否存在
        File configFile = (new VerifyingFileFactory.Builder(LOG)
            .warnForRelativePath()
            .failForNonExistingPath()
            .build()).create(path);
            
        Properties cfg = new Properties();
        FileInputStream in = new FileInputStream(configFile);
        try {
			// 加载配置文件
            cfg.load(in);
            configFileStr = path;
        } finally {
            in.close();
        }
        // 解析配置文件
        parseProperties(cfg);
    } catch (IOException e) {
        throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
        throw new ConfigException("Error processing " + path, e);
    }   
    
    ... ...
}

QuorumPeerConfig.java

public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
    int clientPort = 0;
    int secureClientPort = 0;
    String clientPortAddress = null;
    String secureClientPortAddress = null;
    VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
	
	// 读取zoo.cfg文件中的属性值,并赋值给QuorumPeerConfig的类对象
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        if (key.equals("dataDir")) {
            dataDir = vff.create(value);
        } else if (key.equals("dataLogDir")) {
            dataLogDir = vff.create(value);
        } else if (key.equals("clientPort")) {
            clientPort = Integer.parseInt(value);
        } else if (key.equals("localSessionsEnabled")) {
            localSessionsEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("localSessionsUpgradingEnabled")) {
            localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
        } else if (key.equals("clientPortAddress")) {
            clientPortAddress = value.trim();
        } else if (key.equals("secureClientPort")) {
            secureClientPort = Integer.parseInt(value);
        } else if (key.equals("secureClientPortAddress")){
            secureClientPortAddress = value.trim();
        } else if (key.equals("tickTime")) {
            tickTime = Integer.parseInt(value);
        } else if (key.equals("maxClientCnxns")) {
            maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
            minSessionTimeout = Integer.parseInt(value);
        } 
		... ...
    }

    ... ...

	if (dynamicConfigFileStr == null) {
        setupQuorumPeerConfig(zkProp, true);
        if (isDistributed() && isReconfigEnabled()) {
            // we don't backup static config for standalone mode.
            // we also don't backup if reconfig feature is disabled.
            backupOldConfig();
        }
    }
}

QuorumPeerConfig.java

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
        throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}

QuorumPeerConfig.java

private void setupMyId() throws IOException {
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
        return;
    }
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
        myIdString = br.readLine();
    } finally {
        br.close();
    }
    try {
        // 将解析myid文件中的id赋值给serverId
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("serverid " + myIdString
                + " is not a number");
    }
}

2.2.4 过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
	// 管理zk的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
		// 1解析参数,zoo.cfg和myid
        config.parse(args[0]);
    }

    // 2启动定时任务,对过期的快照,执行删除(默认是关闭)
    // config.getSnapRetainCount() = 3 最少保留的快照个数
    // config.getPurgeInterval() = 0 默认0表示关闭
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

protected int snapRetainCount = 3;
protected int purgeInterval = 0;
public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
	// 默认情况purgeInterval=0,该任务关闭,直接返回
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }
	// 创建一个定时器
    timer = new Timer("PurgeTask", true);
	// 创建一个清理快照任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
	// 如果purgeInterval设置的值是1,表示1小时检查一次,判断是否有过期快照,有则删除
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

static class PurgeTask extends TimerTask {
    private File logsDir;
    private File snapsDir;
    private int snapRetainCount;

    public PurgeTask(File dataDir, File snapDir, int count) {
        logsDir = dataDir;
        snapsDir = snapDir;
        snapRetainCount = count;
    }

    @Override
    public void run() {
        LOG.info("Purge task started.");
        try {
			// 清理过期的数据
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {
            LOG.error("Error occurred while purging.", e);
        }
        LOG.info("Purge task completed.");
    }
}

public static void purge(File dataDir, File snapDir, int num) throws IOException {
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }

    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

2.2.5 初始化通信组件

protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
	// 管理zk的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
		// 1解析参数,zoo.cfg和myid
        config.parse(args[0]);
    }

    // 2启动定时任务,对过期的快照,执行删除(默认是关闭)
    // config.getSnapRetainCount() = 3 最少保留的快照个数
    // config.getPurgeInterval() = 0 默认0表示关闭
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群(集群模式)
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        // 本地模式
        ZooKeeperServerMain.main(args);
    }
}

1)通信协议默认NIO(可以支持Netty)

public void runFromConfig(QuorumPeerConfig config)
        throws IOException, AdminServerException
{
  … …

  LOG.info("Starting quorum peer");
  try {
      ServerCnxnFactory cnxnFactory = null;
      ServerCnxnFactory secureCnxnFactory = null;

	  // 通信组件初始化,默认是NIO通信
      if (config.getClientPortAddress() != null) {
          cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                  config.getMaxClientCnxns(), false);
      }

      if (config.getSecureClientPortAddress() != null) {
          secureCnxnFactory = ServerCnxnFactory.createFactory();
          secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                  config.getMaxClientCnxns(), true);
      }

	  // 把解析的参数赋值给该zookeeper节点
      quorumPeer = getQuorumPeer();
      quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  config.getDataLogDir(),
                  config.getDataDir()));
      quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
      quorumPeer.enableLocalSessionsUpgrading(
          config.isLocalSessionsUpgradingEnabled());
      //quorumPeer.setQuorumPeers(config.getAllMembers());
      quorumPeer.setElectionType(config.getElectionAlg());
      quorumPeer.setMyid(config.getServerId());
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      quorumPeer.setInitLimit(config.getInitLimit());
      quorumPeer.setSyncLimit(config.getSyncLimit());
      quorumPeer.setConfigFileName(config.getConfigFilename());
      // 管理zk数据的存储
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
      if (config.getLastSeenQuorumVerifier()!=null) {
          quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
    }
      quorumPeer.initConfigInZKDatabase();
      // 管理zk的通信
      quorumPeer.setCnxnFactory(cnxnFactory);
      quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
      quorumPeer.setSslQuorum(config.isSslQuorum());
      quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
      quorumPeer.setLearnerType(config.getPeerType());
      quorumPeer.setSyncEnabled(config.getSyncEnabled());
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      if (config.sslQuorumReloadCertFiles) {
          quorumPeer.getX509Util().enableCertFileReloading();
      }
… …
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
      quorumPeer.initialize();
      
	  // 启动zk
      quorumPeer.start();
      quorumPeer.join();
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}

static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                .getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}

public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";

zookeeperAdmin.md 文件中
* *serverCnxnFactory* :
	 (Java system property: zookeeper.serverCnxnFactory)
	Specifies ServerCnxnFactory implementation. 
    This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
     Default is `NIOServerCnxnFactory`.

2)初始化NIO服务端Socket(并未启动)

ctrl + alt +B 查找configure实现类,NIOServerCnxnFactory.java

public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
    if (secure) {
        throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
    }
    configureSaslLogin();

    maxClientCnxns = maxcc;
    sessionlessCnxnTimeout = Integer.getInteger(
        ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
    // We also use the sessionlessCnxnTimeout as expiring interval for
    // cnxnExpiryQueue. These don't need to be the same, but the expiring
    // interval passed into the ExpiryQueue() constructor below should be
    // less than or equal to the timeout.
    cnxnExpiryQueue =
        new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
    expirerThread = new ConnectionExpirerThread();

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores/2), 1));
    if (numSelectorThreads < 1) {
        throw new IOException("numSelectorThreads must be at least 1");
    }

    numWorkerThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    workerShutdownTimeoutMS = Long.getLong(
        ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

    ... ...
    for(int i=0; i<numSelectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(i));
    }

	// 初始化NIO服务端socket,绑定2181端口,可以接收客户端请求
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
	// 绑定2181端口
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

标签:args,ZK,Zookeeper,else,源码,IOException,quorumPeer,new,config
来源: https://www.cnblogs.com/niuniu2022/p/16343106.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有