Skip to content

Commit

Permalink
Handle min-max ApiLevel for actors at the membership level.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza committed Jan 30, 2024
1 parent e649a7e commit 87a581a
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 37 deletions.
2 changes: 2 additions & 0 deletions cmd/placement/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func Run() {
Peers: opts.RaftPeers,
LogStorePath: opts.RaftLogStorePath,
ReplicationFactor: int64(opts.ReplicationFactor),
MinAPILevel: uint32(opts.MinAPILevel),
MaxAPILevel: uint32(opts.MaxAPILevel),
})
if raftServer == nil {
log.Fatal("Failed to create raft server.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
if p.apiLevel < raft.NoVirtualNodesInPlacementTablesAPILevel {
entries[k] = hashing.NewFromExistingWithVirtNodes(v.GetHosts(), v.GetSortedSet(), loadMap)
} else {
entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
entries[k] = hashing.NewFromExisting(loadMap, in.GetReplicationFactor(), p.virtualNodesCache)
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/placement/hashing/consistent_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Consistent struct {
sortedSet []uint64
loadMap map[string]*Host
totalLoad int64
replicationFactor int
replicationFactor int64

sync.RWMutex
}
Expand All @@ -73,7 +73,7 @@ func NewHost(name, id string, load int64, port int64) *Host {
}

// NewConsistentHash returns a new consistent hash.
func NewConsistentHash(replicationFactory int) *Consistent {
func NewConsistentHash(replicationFactory int64) *Consistent {
return &Consistent{
hosts: map[uint64]string{},
sortedSet: []uint64{},
Expand All @@ -83,7 +83,7 @@ func NewConsistentHash(replicationFactory int) *Consistent {
}

// NewFromExisting creates a new consistent hash from existing values.
func NewFromExisting(loadMap map[string]*Host, replicationFactor int, virtualNodesCache *VirtualNodesCache) *Consistent {
func NewFromExisting(loadMap map[string]*Host, replicationFactor int64, virtualNodesCache *VirtualNodesCache) *Consistent {
newHash := &Consistent{
hosts: map[uint64]string{},
sortedSet: []uint64{},
Expand All @@ -92,7 +92,7 @@ func NewFromExisting(loadMap map[string]*Host, replicationFactor int, virtualNod
}

for hostName := range loadMap {
hashes := virtualNodesCache.GetHashes(int64(replicationFactor), hostName)
hashes := virtualNodesCache.GetHashes(replicationFactor, hostName)

Check warning on line 95 in pkg/placement/hashing/consistent_hash.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/hashing/consistent_hash.go#L95

Added line #L95 was not covered by tests
for _, h := range hashes {
newHash.hosts[h] = hostName
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *Consistent) Add(host, id string, port int64) bool {
// vhosts in the store in v1.13.
// This should be removed in 1.15.
// --Start remove--
for i := 0; i < c.replicationFactor; i++ {
for i := 0; i < int(c.replicationFactor); i++ {
h := hash(host + strconv.Itoa(i))
c.hosts[h] = host
c.sortedSet = append(c.sortedSet, h)
Expand Down Expand Up @@ -341,7 +341,7 @@ func (c *Consistent) Remove(host string) bool {
c.Lock()
defer c.Unlock()

for i := 0; i < c.replicationFactor; i++ {
for i := 0; i < int(c.replicationFactor); i++ {
h := hash(host + strconv.Itoa(i))
delete(c.hosts, h)
c.delSlice(h)
Expand Down
2 changes: 1 addition & 1 deletion pkg/placement/hashing/consistent_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestReplicationFactor(t *testing.T) {
}

t.Run("varying replication factors, no movement", func(t *testing.T) {
factors := []int{1, 100, 1000, 10000}
factors := []int64{1, 100, 1000, 10000}

for _, f := range factors {
h := NewConsistentHash(f)
Expand Down
16 changes: 8 additions & 8 deletions pkg/placement/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type FSM struct {
// racing with Restore(), which is called by Raft (it puts in a totally
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
state *DaprHostMemberState
replicationFactor int64
stateLock sync.RWMutex
state *DaprHostMemberState
config DaprHostMemberStateConfig
}

func newFSM(replicationFactor int64) *FSM {
func newFSM(config DaprHostMemberStateConfig) *FSM {
return &FSM{
state: newDaprHostMemberState(int(replicationFactor)),
replicationFactor: replicationFactor,
state: newDaprHostMemberState(config),
config: config,
}
}

Expand All @@ -77,7 +77,7 @@ func (c *FSM) PlacementState() *v1pb.PlacementTables {
Version: strconv.FormatUint(c.state.TableGeneration(), 10),
Entries: make(map[string]*v1pb.PlacementTable),
ApiLevel: c.state.APILevel(),
ReplicationFactor: c.replicationFactor,
ReplicationFactor: c.config.replicationFactor,
}

totalHostSize := 0
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
func (c *FSM) Restore(old io.ReadCloser) error {
defer old.Close()

members := newDaprHostMemberState(int(c.replicationFactor))
members := newDaprHostMemberState(c.config)
if err := members.restore(old); err != nil {
return err
}
Expand Down
30 changes: 25 additions & 5 deletions pkg/placement/raft/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
)

func TestFSMApply(t *testing.T) {
fsm := newFSM(100)
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})

t.Run("upsertMember", func(t *testing.T) {
cmdLog, err := makeRaftLogCommand(MemberUpsert, DaprHostMember{
Expand Down Expand Up @@ -77,9 +81,17 @@ func TestFSMApply(t *testing.T) {

func TestRestore(t *testing.T) {
// arrange
fsm := newFSM(100)
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})

s := newDaprHostMemberState(100)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})
s.upsertMember(&DaprHostMember{
Name: "127.0.0.1:8080",
AppID: "FakeID",
Expand All @@ -99,7 +111,11 @@ func TestRestore(t *testing.T) {
}

func TestPlacementStateWithVirtualNodes(t *testing.T) {
fsm := newFSM(100)
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})

// We expect to see the placement table INCLUDE vnodes,
// because the only dapr instance in the cluster is at level 10 (pre v1.13)
Expand Down Expand Up @@ -134,7 +150,11 @@ func TestPlacementStateWithVirtualNodes(t *testing.T) {
}

func TestPlacementState(t *testing.T) {
fsm := newFSM(100)
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})
m := DaprHostMember{
Name: "127.0.0.1:3030",
AppID: "fakeAppID",
Expand Down
8 changes: 7 additions & 1 deletion pkg/placement/raft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type Options struct {
LogStorePath string
Clock clock.Clock
ReplicationFactor int64
MinAPILevel uint32
MaxAPILevel uint32
}

// New creates Raft server node.
Expand All @@ -106,7 +108,11 @@ func New(opts Options) *Server {
raftLogStorePath: opts.LogStorePath,
clock: cl,
raftReady: make(chan struct{}),
fsm: newFSM(opts.ReplicationFactor),
fsm: newFSM(DaprHostMemberStateConfig{
replicationFactor: opts.ReplicationFactor,
minAPILevel: opts.MinAPILevel,
maxAPILevel: opts.MaxAPILevel,
}),

Check warning on line 115 in pkg/placement/raft/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/raft/server.go#L112-L115

Added lines #L112 - L115 were not covered by tests
}
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/placement/raft/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func (m *MockSnapShotSink) Close() error {

func TestPersist(t *testing.T) {
// arrange
fsm := newFSM(10)
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})
testMember := DaprHostMember{
Name: "127.0.0.1:3030",
AppID: "fakeAppID",
Expand All @@ -65,7 +69,11 @@ func TestPersist(t *testing.T) {
snap.Persist(fakeSink)

// assert
restoredState := newDaprHostMemberState(10)
restoredState := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})
err = restoredState.restore(buf)
require.NoError(t, err)

Expand Down
32 changes: 26 additions & 6 deletions pkg/placement/raft/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,19 @@ type DaprHostMemberStateData struct {
type DaprHostMemberState struct {
lock sync.RWMutex

replicationFactor int
data DaprHostMemberStateData
config DaprHostMemberStateConfig
data DaprHostMemberStateData
}

func newDaprHostMemberState(replicationFactor int) *DaprHostMemberState {
type DaprHostMemberStateConfig struct {
replicationFactor int64
minAPILevel uint32
maxAPILevel uint32
}

func newDaprHostMemberState(config DaprHostMemberStateConfig) *DaprHostMemberState {
return &DaprHostMemberState{
replicationFactor: replicationFactor,
config: config,
data: DaprHostMemberStateData{
Members: map[string]*DaprHostMember{},
hashingTableMap: map[string]*hashing.Consistent{},
Expand Down Expand Up @@ -126,6 +132,20 @@ func (s *DaprHostMemberState) updateAPILevel() {
}
}

// Only enforce minAPILevel if value > 0
// 0 is the default value of the struct.
// -1 is the default value of the CLI flag.
if s.config.minAPILevel >= uint32(0) && observedMinLevel < s.config.minAPILevel {
observedMinLevel = s.config.minAPILevel

Check warning on line 139 in pkg/placement/raft/state.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/raft/state.go#L139

Added line #L139 was not covered by tests
}

// Only enforce maxAPILevel if value > 0
// 0 is the default value of the struct.
// -1 is the default value of the CLI flag.
if s.config.maxAPILevel >= uint32(0) && observedMinLevel > s.config.maxAPILevel {
observedMinLevel = s.config.maxAPILevel

Check warning on line 146 in pkg/placement/raft/state.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/raft/state.go#L146

Added line #L146 was not covered by tests
}

if observedMinLevel > s.data.APILevel {
s.data.APILevel = observedMinLevel
}
Expand All @@ -143,7 +163,7 @@ func (s *DaprHostMemberState) clone() *DaprHostMemberState {
defer s.lock.RUnlock()

newMembers := &DaprHostMemberState{
replicationFactor: s.replicationFactor,
config: s.config,
data: DaprHostMemberStateData{
Index: s.data.Index,
TableGeneration: s.data.TableGeneration,
Expand All @@ -169,7 +189,7 @@ func (s *DaprHostMemberState) clone() *DaprHostMemberState {
func (s *DaprHostMemberState) updateHashingTables(host *DaprHostMember) {
for _, e := range host.Entities {
if _, ok := s.data.hashingTableMap[e]; !ok {
s.data.hashingTableMap[e] = hashing.NewConsistentHash(s.replicationFactor)
s.data.hashingTableMap[e] = hashing.NewConsistentHash(s.config.replicationFactor)
}

s.data.hashingTableMap[e].Add(host.Name, host.AppID, 0)
Expand Down
42 changes: 35 additions & 7 deletions pkg/placement/raft/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (

func TestNewDaprHostMemberState(t *testing.T) {
// act
s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})

// assert
assert.Equal(t, uint64(0), s.Index())
Expand All @@ -31,7 +35,11 @@ func TestNewDaprHostMemberState(t *testing.T) {

func TestClone(t *testing.T) {
// arrange
s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})
s.upsertMember(&DaprHostMember{
Name: "127.0.0.1:8080",
AppID: "FakeID",
Expand All @@ -50,7 +58,11 @@ func TestClone(t *testing.T) {

func TestUpsertMember(t *testing.T) {
// arrange
s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})

t.Run("add new actor member", func(t *testing.T) {
// act
Expand Down Expand Up @@ -131,7 +143,11 @@ func TestUpsertMember(t *testing.T) {

func TestRemoveMember(t *testing.T) {
// arrange
s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})

t.Run("remove member and clean up consistent hashing table", func(t *testing.T) {
// act
Expand Down Expand Up @@ -174,7 +190,11 @@ func TestUpdateHashingTable(t *testing.T) {
// each subtest has dependency on the state

// arrange
s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})

t.Run("add new hashing table per actor types", func(t *testing.T) {
testMember := &DaprHostMember{
Expand Down Expand Up @@ -227,7 +247,11 @@ func TestRemoveHashingTable(t *testing.T) {
{"127.0.0.1:8081", 0},
}

s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})
for _, tc := range testcases {
testMember.Name = tc.name
s.updateHashingTables(testMember)
Expand Down Expand Up @@ -278,7 +302,11 @@ func TestRestoreHashingTables(t *testing.T) {

func TestUpdateAPILevel(t *testing.T) {
// arrange
s := newDaprHostMemberState(10)
s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 10,
minAPILevel: 0,
maxAPILevel: 100,
})
s.upsertMember(&DaprHostMember{
Name: "127.0.0.1:8080",
AppID: "FakeID1",
Expand Down

0 comments on commit 87a581a

Please sign in to comment.