Skip to content

Commit

Permalink
[raft] Allow us to query whether a replica is current or behind (#6575)
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 committed May 15, 2024
1 parent ae702ae commit 5bc55bb
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
8 changes: 8 additions & 0 deletions enterprise/server/raft/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,11 @@ var (
RangeLeaseInvalidMsg = "Range lease invalid" // continue
RangeSplittingMsg = "Range splitting"
)

type ReplicaState int

const (
ReplicaStateUnknown ReplicaState = iota
ReplicaStateCurrent
ReplicaStateBehind
)
77 changes: 77 additions & 0 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/base64"
"encoding/binary"
"flag"
"fmt"
"net"
Expand Down Expand Up @@ -1804,3 +1805,79 @@ func (s *Store) removeReplicaFromRangeDescriptor(ctx context.Context, shardID, r
}
return newDescriptor, nil
}

func bytesToUint64(buf []byte) uint64 {
return binary.LittleEndian.Uint64(buf)
}

func (s *Store) GetReplicaStates(ctx context.Context, rd *rfpb.RangeDescriptor) map[uint64]constants.ReplicaState {
localReplica, err := s.GetReplica(rd.GetRangeId())
if err != nil {
s.log.Errorf("GetReplicaStates failed to get replica(range_id=%d): %s", rd.GetRangeId(), err)
return nil
}
if !s.IsLeader(localReplica.ShardID()) {
// we are not the leader. we don't know whether the replica is behind
// or not.
return nil
}
curIndex, err := localReplica.LastAppliedIndex()
if err != nil {
s.log.Errorf("ReplicaIsBehind failed to get last applied index(range_id=%d): %s", rd.GetRangeId(), err)
return nil
}

res := make(map[uint64]constants.ReplicaState, 0)
for _, r := range rd.GetReplicas() {
if r.GetReplicaId() == localReplica.ReplicaID() {
res[r.GetReplicaId()] = constants.ReplicaStateCurrent
} else {
res[r.GetReplicaId()] = constants.ReplicaStateUnknown
}
}
rd = localReplica.RangeDescriptor()
header := makeHeader(rd)
// To read a local key for a replica, we don't need to check whether the
// replica has lease or not.
header.ConsistencyMode = rfpb.Header_STALE
for _, r := range rd.GetReplicas() {
if r.GetReplicaId() == localReplica.ReplicaID() {
res[r.GetReplicaId()] = constants.ReplicaStateCurrent
continue
}
client, err := s.apiClient.GetForReplica(ctx, r)
if err != nil {
s.log.Errorf("GetReplicaStates failed to get client for replica %+v: %s", r, err)
continue
}
readReq, err := rbuilder.NewBatchBuilder().Add(&rfpb.DirectReadRequest{
Key: constants.LastAppliedIndexKey,
}).ToProto()
if err != nil {
s.log.Errorf("GetReplicaStates failed to construct direct read request for replica %+v: %s", r, err)
continue
}
syncResp, err := client.SyncRead(ctx, &rfpb.SyncReadRequest{
Header: header,
Batch: readReq,
})
if err != nil {
s.log.Errorf("GetReplicaStates failed to read last index for replica %+v: %s", r, err)
continue
}
batchResp := rbuilder.NewBatchResponseFromProto(syncResp.GetBatch())
readResponse, err := batchResp.DirectReadResponse(0)
if err != nil {
s.log.Errorf("GetReplicaStates failed to parse direct read response for replica %+v: %s", r, err)
continue
}
index := bytesToUint64(readResponse.GetKv().GetValue())
if index >= curIndex {
res[r.GetReplicaId()] = constants.ReplicaStateCurrent
} else {
res[r.GetReplicaId()] = constants.ReplicaStateBehind
}
}

return res
}

0 comments on commit 5bc55bb

Please sign in to comment.