Skip to content

Commit

Permalink
MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch (#13290)
Browse files Browse the repository at this point in the history
Reviewers: Luke Chen <showuon@gmail.com>
  • Loading branch information
chia7712 committed Feb 23, 2023
1 parent 8c84d29 commit 61ece48
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class LeaderElectionTest extends QuorumTestHarness {
// kill the server hosting the preferred replica/initial leader
servers.head.shutdown()
// check if leader moves to the other server
val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader1))
val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
oldLeaderOpt = Some(leader1), ignoreNoLeader = true)
val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
assertEquals(1, leader2, "Leader must move to broker 1")
// new leaderEpoch will be leaderEpoch1+2, one increment during ReplicaStateMachine.startup()-> handleStateChanges
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -900,10 +900,12 @@ object TestUtils extends Logging {
partition: Int,
timeoutMs: Long = 30000L,
oldLeaderOpt: Option[Int] = None,
newLeaderOpt: Option[Int] = None
newLeaderOpt: Option[Int] = None,
ignoreNoLeader: Boolean = false
): Int = {
def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
.filter(p => !ignoreNoLeader || p != LeaderAndIsr.NoLeader)
}
doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt)
}
Expand Down

0 comments on commit 61ece48

Please sign in to comment.