Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When increasing replication-factor on a topic, does it maintain rack awareness? #137

Open
wushujames opened this issue Oct 14, 2022 · 2 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@wushujames
Copy link

We have rack-awareness on for our cluster. If I use kafkactl to increase the replication-factor on a topic, does the new replication-factor and assignment maintain rack awareness?

We used kafkactl last week, and it now appears that one of the topics is no longer rack-safe. I'm checking to see if the topic that is no longer rack-safe is the topic that we used kafkactl on.

I browsed through the code for kafkactl, and couldn't tell if you were taking into account broker racks, when increasing replication factor.

func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int, targetReplicationFactor int16) ([]int32, error) {
replicas := currentReplicas
for len(replicas) > int(targetReplicationFactor) {
sort.Slice(replicas, func(i, j int) bool {
brokerI := replicas[i]
brokerJ := replicas[j]
return brokerReplicaCount[brokerI] < brokerReplicaCount[brokerJ] || (brokerReplicaCount[brokerI] == brokerReplicaCount[brokerJ] && brokerI < brokerJ)
})
lastReplica := replicas[len(replicas)-1]
replicas = replicas[:len(replicas)-1]
brokerReplicaCount[lastReplica]--
}
var unusedBrokerIds []int32
if len(replicas) < int(targetReplicationFactor) {
for brokerID := range brokerReplicaCount {
if !util.ContainsInt32(replicas, brokerID) {
unusedBrokerIds = append(unusedBrokerIds, brokerID)
}
}
if len(unusedBrokerIds) < (int(targetReplicationFactor) - len(replicas)) {
return nil, errors.New("not enough brokers")
}
}
for len(replicas) < int(targetReplicationFactor) {
sort.Slice(unusedBrokerIds, func(i, j int) bool {
brokerI := unusedBrokerIds[i]
brokerJ := unusedBrokerIds[j]
return brokerReplicaCount[brokerI] < brokerReplicaCount[brokerJ] || (brokerReplicaCount[brokerI] == brokerReplicaCount[brokerJ] && brokerI > brokerJ)
})
replicas = append(replicas, unusedBrokerIds[0])
brokerReplicaCount[unusedBrokerIds[0]]++
unusedBrokerIds = unusedBrokerIds[1:]
}
return replicas, nil
}

Thanks!

@d-rk
Copy link
Collaborator

d-rk commented Oct 17, 2022

Hey @wushujames

we currently do not take the rack information into account when altering replicas.
Since I currently only have limited time to spend on this project, I will not implement this in the near future.

If someone is interested in implementing this I can give some advice.
The rack information can be retrieved with: https://pkg.go.dev/github.com/shopify/sarama#Broker.Rack
Then all thats needed is a logic which spreads the partition replicas across brokers of different racks.

@d-rk d-rk added enhancement New feature or request help wanted Extra attention is needed labels Oct 17, 2022
@wushujames
Copy link
Author

Thanks @d-rk ! I also don't have time to make the change myself, but I will leave this issue open for tracking purposes, in case someone else has a similar question. And if someone is able to implement it, they can refer to this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants