欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

【第十六篇】ZooKeeper 进阶-Leader选举源码分析

Zookeeper启动的main方法是org.apache.zookeeper.server.quorum.QuorumPeerMain类的main方法:

public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } ...
        ...
    }

进入到initializeAndRun方法:

protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
         // args[0]是传入的配置文件
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            // 解析配置文件
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        // 启动一个定时清除日志的任务
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
         // 我们是集群启动Zookeeper,必须使用到配置文件,因此接下来运行到runFromConfig
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

这里传入的配置文件就是Zookeeper根目录下的conf/zoo.cfg文件(可参考启动脚本zkServer.sh),该配置文件可以配置哪些配置项可以参考QuorumPeerConfig的解析配置文件过程。接下来进入runFromConfig方法:

public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          // cnxnFactory用于处理本节点上的一些IO操作,默认情况下是NIOServerCnxnFactory的对象
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          // 配置本节点监听的客户端端口和连接数
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());
            // QuorumPeer是zk的重要类之一
          quorumPeer = getQuorumPeer();

            // 设置从配置文件读取到的所有配置节点(zoo.cfg中的server.1=localhost:2888:3888配置项)
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          // 设置使用的Leader选举算法
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }

          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

runFromConfig方法主要将从zoo.cfg中读取到的配置设置到QuorumPeer的对象属性中区,QuorumPeer继承了ZooKeeperThread类,而ZooKeeperThread继承了Thread类,因此QuorumPeer是一个线程类,稍后我们将分析其run方法。进入到quorumPeer.start()方法:

    @Override
    public synchronized void start() {
         // 从快照文件(dataDir目录下snapshot文件)中恢复会话和zk节点信息
        loadDataBase();
        // 处理IO事件(暂不了解这个的作用,之后的文章会对这个进行分析,现在跳过不影响我们分析Leader的选举)
        cnxnFactory.start();
        // 开始进行Leader的选举        
        startLeaderElection();
        // 开始监听本节点的状态,根据不同的状态(LOOKING、OBSERVING、FOLLOWING、LEADING)执行相应操作
        super.start();
    }

 

作者:zkp_java | 出处:https://blog.csdn.net/zkp_java/article/category/8044591

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » 【第十六篇】ZooKeeper 进阶-Leader选举源码分析

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏