Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vnode calculation in placement clients. #7415

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 5 additions & 7 deletions cmd/placement/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/dapr/dapr/pkg/metrics"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/placement"
"github.com/dapr/dapr/pkg/placement/hashing"
"github.com/dapr/dapr/pkg/placement/monitoring"
"github.com/dapr/dapr/pkg/placement/raft"
"github.com/dapr/dapr/pkg/security"
Expand Down Expand Up @@ -58,10 +57,11 @@ func Run() {

// Start Raft cluster.
raftServer := raft.New(raft.Options{
ID: opts.RaftID,
InMem: opts.RaftInMemEnabled,
Peers: opts.RaftPeers,
LogStorePath: opts.RaftLogStorePath,
ID: opts.RaftID,
InMem: opts.RaftInMemEnabled,
Peers: opts.RaftPeers,
LogStorePath: opts.RaftLogStorePath,
ReplicationFactor: int64(opts.ReplicationFactor),
})
if raftServer == nil {
log.Fatal("Failed to create raft server.")
Expand All @@ -81,8 +81,6 @@ func Run() {
log.Fatal(err)
}

hashing.SetReplicationFactor(opts.ReplicationFactor)

placementOpts := placement.PlacementServiceOpts{
RaftNode: raftServer,
SecProvider: secProvider,
Expand Down
4 changes: 0 additions & 4 deletions pkg/actors/placement/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ package placement

import (
"context"
"strconv"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/dapr/dapr/pkg/placement"
v1pb "github.com/dapr/dapr/pkg/proto/placement/v1"
)

Expand Down Expand Up @@ -66,7 +63,6 @@ func (c *placementClient) connectToServer(ctx context.Context, serverAddr string
}

client := v1pb.NewPlacementClient(conn)
ctx = metadata.AppendToOutgoingContext(ctx, placement.GRPCContextKeyAPILevel, strconv.Itoa(int(apiLevel)))
stream, err := client.ReportDaprStatus(ctx)
if err != nil {
if conn != nil {
Expand Down
14 changes: 0 additions & 14 deletions pkg/actors/placement/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"sync"
"testing"

"google.golang.org/grpc/metadata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -88,18 +86,6 @@ func TestConnectToServer(t *testing.T) {

err := client.connectToServer(context.Background(), conn, 10)
require.NoError(t, err)

// Extract the "dapr-placement-api-level" value from the context's metadata
md, ok := metadata.FromOutgoingContext(client.clientStream.Context())
require.True(t, ok)

// All keys in the returned MD are lowercase, as per the http spec for header fields
// https://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2
apiLevelValues := md["dapr-placement-api-level"]
require.Len(t, apiLevelValues, 1)

apiLevelStr := apiLevelValues[0]
require.Equal(t, "10", apiLevelStr)
})
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/dapr/dapr/pkg/actors/internal"
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/placement"
"github.com/dapr/dapr/pkg/placement/hashing"
v1pb "github.com/dapr/dapr/pkg/proto/placement/v1"
"github.com/dapr/dapr/pkg/resiliency"
Expand Down Expand Up @@ -126,7 +127,6 @@ func NewActorPlacement(opts internal.ActorsProviderOptions) internal.PlacementSe
appHealthFn: opts.AppHealthFn,
closeCh: make(chan struct{}),
resiliency: opts.Resiliency,
apiLevel: opts.APILevel.Load(),
virtualNodesCache: hashing.NewVirtualNodesCache(),
}
}
Expand Down Expand Up @@ -548,12 +548,12 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
loadMap[lk] = hashing.NewHost(lv.GetName(), lv.GetId(), lv.GetLoad(), lv.GetPort())
}

// TODO in v1.15 remove the check for versions < 1.13
// TODO: @elena in v1.15 remove the check for versions < 1.13
// only keep `hashing.NewFromExisting`
if in.GetReplicationFactor() > 0 && len(v.GetHosts()) == 0 {
p.placementTables.Entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
} else {
if p.apiLevel < placement.NoVirtualNodesInPlacementTablesAPILevel {
p.placementTables.Entries[k] = hashing.NewFromExistingWithVirtNodes(v.GetHosts(), v.GetSortedSet(), loadMap)
} else {
p.placementTables.Entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
}
}

Expand Down
20 changes: 12 additions & 8 deletions pkg/actors/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (
"github.com/dapr/kit/logger"
)

var apiLevel atomic.Uint32

func TestAddDNSResolverPrefix(t *testing.T) {
testCases := []struct {
addr []string
Expand Down Expand Up @@ -72,6 +70,7 @@ func TestPlacementStream_RoundRobin(t *testing.T) {
address[i], testSrv[i], cleanup[i] = newTestServer()
}

var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -134,6 +133,7 @@ func TestAppHealthyStatus(t *testing.T) {

appHealthCh := make(chan bool)

var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -172,7 +172,6 @@ func TestAppHealthyStatus(t *testing.T) {
func TestOnPlacementOrder(t *testing.T) {
tableUpdateCount := atomic.Int64{}
tableUpdateFunc := func() { tableUpdateCount.Add(1) }
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
ActorsService: "placement:",
Expand All @@ -185,7 +184,6 @@ func TestOnPlacementOrder(t *testing.T) {
AppHealthFn: func(ctx context.Context) <-chan bool { return nil },
Security: testSecurity(t),
Resiliency: resiliency.New(logger.NewLogger("test")),
APILevel: &apiLevel,
}).(*actorPlacement)
testPlacement.SetOnTableUpdateFn(tableUpdateFunc)

Expand Down Expand Up @@ -248,12 +246,17 @@ func TestOnPlacementOrder(t *testing.T) {
},
}

testPlacement.apiLevel = 20
t.Cleanup(func() {
testPlacement.apiLevel = 10
})

testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{
Operation: "update",
Tables: &placementv1pb.PlacementTables{
Version: tableVersion,
Entries: entries,
ApiLevel: 10,
ApiLevel: 20,
ReplicationFactor: 3,
},
})
Expand Down Expand Up @@ -340,6 +343,7 @@ func TestOnPlacementOrder(t *testing.T) {
}

func TestWaitUntilPlacementTableIsReady(t *testing.T) {
var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -481,6 +485,7 @@ func TestWaitUntilPlacementTableIsReady(t *testing.T) {
}

func TestLookupActor(t *testing.T) {
var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -512,9 +517,7 @@ func TestLookupActor(t *testing.T) {
Entries: map[string]*hashing.Consistent{},
}

// set vnode size
hashing.SetReplicationFactor(10)
actorOneHashing := hashing.NewConsistentHash()
actorOneHashing := hashing.NewConsistentHash(10)
actorOneHashing.Add(testPlacement.config.GetRuntimeHostname(), testPlacement.config.AppID, 0)
testPlacement.placementTables.Entries["actorOne"] = actorOneHashing

Expand All @@ -540,6 +543,7 @@ func TestLookupActor(t *testing.T) {
}

func TestConcurrentUnblockPlacements(t *testing.T) {
var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down
53 changes: 24 additions & 29 deletions pkg/placement/hashing/consistent_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
"golang.org/x/crypto/blake2b"
)

var replicationFactor int

// ErrNoHosts is an error for no hosts.
var ErrNoHosts = errors.New("no hosts added")

Expand All @@ -55,10 +53,11 @@

// Consistent represents a data structure for consistent hashing.
type Consistent struct {
hosts map[uint64]string
sortedSet []uint64
loadMap map[string]*Host
totalLoad int64
hosts map[uint64]string
sortedSet []uint64
loadMap map[string]*Host
totalLoad int64
replicationFactor int

sync.RWMutex
}
Expand All @@ -74,20 +73,22 @@
}

// NewConsistentHash returns a new consistent hash.
func NewConsistentHash() *Consistent {
func NewConsistentHash(replicationFactory int) *Consistent {
return &Consistent{
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: map[string]*Host{},
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: map[string]*Host{},
replicationFactor: replicationFactory,
}
}

// NewFromExisting creates a new consistent hash from existing values.
func NewFromExisting(loadMap map[string]*Host, replicationFactor int, virtualNodesCache *VirtualNodesCache) *Consistent {
newHash := &Consistent{
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: loadMap,
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: loadMap,
replicationFactor: replicationFactor,

Check warning on line 91 in pkg/placement/hashing/consistent_hash.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/hashing/consistent_hash.go#L88-L91

Added lines #L88 - L91 were not covered by tests
}

for hostName := range loadMap {
Expand Down Expand Up @@ -167,9 +168,8 @@
hashMap := newHashMap()
hashMap.hashes[host] = make([]uint64, replicationFactor)

c := &Consistent{}
for i := 0; i < int(replicationFactor); i++ {
hashMap.hashes[host][i] = c.hash(host + strconv.Itoa(i))
hashMap.hashes[host][i] = hash(host + strconv.Itoa(i))
}

hc.data[replicationFactor] = hashMap
Expand All @@ -179,7 +179,7 @@

// NewFromExistingWithVirtNodes creates a new consistent hash from existing values with vnodes
// It's a legacy function needed for backwards compatibility (daprd >= 1.13 with placement < 1.13)
// TODO in v1.15 remove this function
// TODO: @elena in v1.15 remove this function
func NewFromExistingWithVirtNodes(hosts map[uint64]string, sortedSet []uint64, loadMap map[string]*Host) *Consistent {
return &Consistent{
hosts: hosts,
Expand Down Expand Up @@ -207,15 +207,15 @@

c.loadMap[host] = &Host{Name: host, AppID: id, Load: 0, Port: port}

// TODO in v1.15
// TODO: @elena in v1.15
// The optimisation of not disseminating vnodes with the placement table was introduced
// in 1.13, and the API level was increased to 20, but we still have to support sidecars
// running on 1.12 with placement services on 1.13. That's why we are keeping the
// vhosts in the store in v1.13.
// This should be removed in 1.14.
// --Start remove--
for i := 0; i < replicationFactor; i++ {
h := c.hash(host + strconv.Itoa(i))
for i := 0; i < c.replicationFactor; i++ {
h := hash(host + strconv.Itoa(i))
c.hosts[h] = host
c.sortedSet = append(c.sortedSet, h)
}
Expand All @@ -241,7 +241,7 @@
return "", ErrNoHosts
}

h := c.hash(key)
h := hash(key)
idx := c.search(h)
return c.hosts[c.sortedSet[idx]], nil
}
Expand Down Expand Up @@ -271,7 +271,7 @@
return "", ErrNoHosts
}

h := c.hash(key)
h := hash(key)

Check warning on line 274 in pkg/placement/hashing/consistent_hash.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/hashing/consistent_hash.go#L274

Added line #L274 was not covered by tests
idx := c.search(h)

i := idx
Expand Down Expand Up @@ -341,8 +341,8 @@
c.Lock()
defer c.Unlock()

for i := 0; i < replicationFactor; i++ {
h := c.hash(host + strconv.Itoa(i))
for i := 0; i < c.replicationFactor; i++ {
h := hash(host + strconv.Itoa(i))
delete(c.hosts, h)
c.delSlice(h)
}
Expand Down Expand Up @@ -455,12 +455,7 @@
return sortedSet
}

func (c *Consistent) hash(key string) uint64 {
func hash(key string) uint64 {
out := blake2b.Sum512([]byte(key))
return binary.LittleEndian.Uint64(out[:])
}

// SetReplicationFactor sets the replication factor for actor placement on vnodes.
func SetReplicationFactor(factor int) {
replicationFactor = factor
}
11 changes: 1 addition & 10 deletions pkg/placement/hashing/consistent_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ func TestReplicationFactor(t *testing.T) {
factors := []int{1, 100, 1000, 10000}

for _, f := range factors {
SetReplicationFactor(f)

h := NewConsistentHash()
h := NewConsistentHash(f)
for _, n := range nodes {
s := h.Add(n, n, 1)
assert.False(t, s)
Expand Down Expand Up @@ -67,13 +65,6 @@ func TestReplicationFactor(t *testing.T) {
})
}

func TestSetReplicationFactor(t *testing.T) {
f := 10
SetReplicationFactor(f)

assert.Equal(t, f, replicationFactor)
}

func TestGetAndSetVirtualNodeCacheHashes(t *testing.T) {
cache := NewVirtualNodesCache()

Expand Down