标签:切换 preferred replica election zkClient leader partitions
场景: 3 节点集群 (b1, b2, b3),分区 tp1 的 isr[1, 2, 3],leader 是 1,现在希望把 tp1 的 leader 切换为 3,怎么操作?
1. 通过 zk 客户端,修改 zk 上 tp1 的 isr 列表为 [3, 2, 1]
2. 执行命令行 kafka-preferred-replica-election.sh,把需要修改 leader 的分区信息写入到 zk 的 /admin/preferred_replica_election 节点
// kafka.admin.PreferredReplicaLeaderElectionCommand#writePreferredReplicaElectionData def writePreferredReplicaElectionData(zkClient: KafkaZkClient, partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]) { try { zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet) println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(","))) } catch { case _: NodeExistsException => throw new AdminOperationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(zkClient.getPreferredReplicaElection.mkString(","))) case e2: Throwable => throw new AdminOperationException(e2.toString) } }
3. KafkaController 监听 /admin/preferred_replica_election 节点,重新选举 leader,取 isr 中第一个副本为 leader,发送 LeaderAndIsrRequest 请求给其他 broker,通知他们转变副本角色
// kafka.controller.KafkaController.PreferredReplicaLeaderElection case object PreferredReplicaLeaderElection extends ControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance override def process(): Unit = { if (!isActive) return // We need to register the watcher if the path doesn't exist in order to detect future preferred replica // leader elections and we get the `path exists` check for free if (zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) { val partitions = zkClient.getPreferredReplicaElection val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) if (partitionsForTopicsToBeDeleted.nonEmpty) { error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " + "respective topics are being deleted") } onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) } } } // kafka.controller.KafkaController#onPreferredReplicaElection private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) { info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}") try { partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy)) } catch { case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e) } finally { removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) } }
标签:切换,preferred,replica,election,zkClient,leader,partitions 来源: https://www.cnblogs.com/allenwas3/p/13173348.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。