Skip to content

Commit

Permalink
Fix vnode calculation in placement clients. (#7415)
Browse files Browse the repository at this point in the history
* Fix vnode response to placement clients.

Fixes vnode response to placement clients to be backwards compatible.
Makes the API level check based on cluster level, rather than on a per
request basis. Remove placement API level gRPC context metadata.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix method used in int sa grpc test

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Jan 19, 2024
1 parent 21095cb commit 38fb965
Show file tree
Hide file tree
Showing 21 changed files with 92 additions and 201 deletions.
12 changes: 5 additions & 7 deletions cmd/placement/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/dapr/dapr/pkg/metrics"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/placement"
"github.com/dapr/dapr/pkg/placement/hashing"
"github.com/dapr/dapr/pkg/placement/monitoring"
"github.com/dapr/dapr/pkg/placement/raft"
"github.com/dapr/dapr/pkg/security"
Expand Down Expand Up @@ -58,10 +57,11 @@ func Run() {

// Start Raft cluster.
raftServer := raft.New(raft.Options{
ID: opts.RaftID,
InMem: opts.RaftInMemEnabled,
Peers: opts.RaftPeers,
LogStorePath: opts.RaftLogStorePath,
ID: opts.RaftID,
InMem: opts.RaftInMemEnabled,
Peers: opts.RaftPeers,
LogStorePath: opts.RaftLogStorePath,
ReplicationFactor: int64(opts.ReplicationFactor),
})
if raftServer == nil {
log.Fatal("Failed to create raft server.")
Expand All @@ -81,8 +81,6 @@ func Run() {
log.Fatal(err)
}

hashing.SetReplicationFactor(opts.ReplicationFactor)

placementOpts := placement.PlacementServiceOpts{
RaftNode: raftServer,
SecProvider: secProvider,
Expand Down
4 changes: 0 additions & 4 deletions pkg/actors/placement/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ package placement

import (
"context"
"strconv"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/dapr/dapr/pkg/placement"
v1pb "github.com/dapr/dapr/pkg/proto/placement/v1"
)

Expand Down Expand Up @@ -66,7 +63,6 @@ func (c *placementClient) connectToServer(ctx context.Context, serverAddr string
}

client := v1pb.NewPlacementClient(conn)
ctx = metadata.AppendToOutgoingContext(ctx, placement.GRPCContextKeyAPILevel, strconv.Itoa(int(apiLevel)))
stream, err := client.ReportDaprStatus(ctx)
if err != nil {
if conn != nil {
Expand Down
14 changes: 0 additions & 14 deletions pkg/actors/placement/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"sync"
"testing"

"google.golang.org/grpc/metadata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -88,18 +86,6 @@ func TestConnectToServer(t *testing.T) {

err := client.connectToServer(context.Background(), conn, 10)
require.NoError(t, err)

// Extract the "dapr-placement-api-level" value from the context's metadata
md, ok := metadata.FromOutgoingContext(client.clientStream.Context())
require.True(t, ok)

// All keys in the returned MD are lowercase, as per the http spec for header fields
// https://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2
apiLevelValues := md["dapr-placement-api-level"]
require.Len(t, apiLevelValues, 1)

apiLevelStr := apiLevelValues[0]
require.Equal(t, "10", apiLevelStr)
})
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/dapr/dapr/pkg/actors/internal"
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/placement"
"github.com/dapr/dapr/pkg/placement/hashing"
v1pb "github.com/dapr/dapr/pkg/proto/placement/v1"
"github.com/dapr/dapr/pkg/resiliency"
Expand Down Expand Up @@ -126,7 +127,6 @@ func NewActorPlacement(opts internal.ActorsProviderOptions) internal.PlacementSe
appHealthFn: opts.AppHealthFn,
closeCh: make(chan struct{}),
resiliency: opts.Resiliency,
apiLevel: opts.APILevel.Load(),
virtualNodesCache: hashing.NewVirtualNodesCache(),
}
}
Expand Down Expand Up @@ -548,12 +548,12 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
loadMap[lk] = hashing.NewHost(lv.GetName(), lv.GetId(), lv.GetLoad(), lv.GetPort())
}

// TODO in v1.15 remove the check for versions < 1.13
// TODO: @elena in v1.15 remove the check for versions < 1.13
// only keep `hashing.NewFromExisting`
if in.GetReplicationFactor() > 0 && len(v.GetHosts()) == 0 {
p.placementTables.Entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
} else {
if p.apiLevel < placement.NoVirtualNodesInPlacementTablesAPILevel {
p.placementTables.Entries[k] = hashing.NewFromExistingWithVirtNodes(v.GetHosts(), v.GetSortedSet(), loadMap)
} else {
p.placementTables.Entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
}
}

Expand Down
20 changes: 12 additions & 8 deletions pkg/actors/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (
"github.com/dapr/kit/logger"
)

var apiLevel atomic.Uint32

func TestAddDNSResolverPrefix(t *testing.T) {
testCases := []struct {
addr []string
Expand Down Expand Up @@ -72,6 +70,7 @@ func TestPlacementStream_RoundRobin(t *testing.T) {
address[i], testSrv[i], cleanup[i] = newTestServer()
}

var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -134,6 +133,7 @@ func TestAppHealthyStatus(t *testing.T) {

appHealthCh := make(chan bool)

var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -172,7 +172,6 @@ func TestAppHealthyStatus(t *testing.T) {
func TestOnPlacementOrder(t *testing.T) {
tableUpdateCount := atomic.Int64{}
tableUpdateFunc := func() { tableUpdateCount.Add(1) }
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
ActorsService: "placement:",
Expand All @@ -185,7 +184,6 @@ func TestOnPlacementOrder(t *testing.T) {
AppHealthFn: func(ctx context.Context) <-chan bool { return nil },
Security: testSecurity(t),
Resiliency: resiliency.New(logger.NewLogger("test")),
APILevel: &apiLevel,
}).(*actorPlacement)
testPlacement.SetOnTableUpdateFn(tableUpdateFunc)

Expand Down Expand Up @@ -248,12 +246,17 @@ func TestOnPlacementOrder(t *testing.T) {
},
}

testPlacement.apiLevel = 20
t.Cleanup(func() {
testPlacement.apiLevel = 10
})

testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{
Operation: "update",
Tables: &placementv1pb.PlacementTables{
Version: tableVersion,
Entries: entries,
ApiLevel: 10,
ApiLevel: 20,
ReplicationFactor: 3,
},
})
Expand Down Expand Up @@ -340,6 +343,7 @@ func TestOnPlacementOrder(t *testing.T) {
}

func TestWaitUntilPlacementTableIsReady(t *testing.T) {
var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -481,6 +485,7 @@ func TestWaitUntilPlacementTableIsReady(t *testing.T) {
}

func TestLookupActor(t *testing.T) {
var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down Expand Up @@ -512,9 +517,7 @@ func TestLookupActor(t *testing.T) {
Entries: map[string]*hashing.Consistent{},
}

// set vnode size
hashing.SetReplicationFactor(10)
actorOneHashing := hashing.NewConsistentHash()
actorOneHashing := hashing.NewConsistentHash(10)
actorOneHashing.Add(testPlacement.config.GetRuntimeHostname(), testPlacement.config.AppID, 0)
testPlacement.placementTables.Entries["actorOne"] = actorOneHashing

Expand All @@ -540,6 +543,7 @@ func TestLookupActor(t *testing.T) {
}

func TestConcurrentUnblockPlacements(t *testing.T) {
var apiLevel atomic.Uint32
apiLevel.Store(1)
testPlacement := NewActorPlacement(internal.ActorsProviderOptions{
Config: internal.Config{
Expand Down
53 changes: 24 additions & 29 deletions pkg/placement/hashing/consistent_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import (
"golang.org/x/crypto/blake2b"
)

var replicationFactor int

// ErrNoHosts is an error for no hosts.
var ErrNoHosts = errors.New("no hosts added")

Expand All @@ -55,10 +53,11 @@ type Host struct {

// Consistent represents a data structure for consistent hashing.
type Consistent struct {
hosts map[uint64]string
sortedSet []uint64
loadMap map[string]*Host
totalLoad int64
hosts map[uint64]string
sortedSet []uint64
loadMap map[string]*Host
totalLoad int64
replicationFactor int

sync.RWMutex
}
Expand All @@ -74,20 +73,22 @@ func NewHost(name, id string, load int64, port int64) *Host {
}

// NewConsistentHash returns a new consistent hash.
func NewConsistentHash() *Consistent {
func NewConsistentHash(replicationFactory int) *Consistent {
return &Consistent{
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: map[string]*Host{},
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: map[string]*Host{},
replicationFactor: replicationFactory,
}
}

// NewFromExisting creates a new consistent hash from existing values.
func NewFromExisting(loadMap map[string]*Host, replicationFactor int, virtualNodesCache *VirtualNodesCache) *Consistent {
newHash := &Consistent{
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: loadMap,
hosts: map[uint64]string{},
sortedSet: []uint64{},
loadMap: loadMap,
replicationFactor: replicationFactor,
}

for hostName := range loadMap {
Expand Down Expand Up @@ -167,9 +168,8 @@ func (hc *VirtualNodesCache) setHashes(replicationFactor int64, host string) []u
hashMap := newHashMap()
hashMap.hashes[host] = make([]uint64, replicationFactor)

c := &Consistent{}
for i := 0; i < int(replicationFactor); i++ {
hashMap.hashes[host][i] = c.hash(host + strconv.Itoa(i))
hashMap.hashes[host][i] = hash(host + strconv.Itoa(i))
}

hc.data[replicationFactor] = hashMap
Expand All @@ -179,7 +179,7 @@ func (hc *VirtualNodesCache) setHashes(replicationFactor int64, host string) []u

// NewFromExistingWithVirtNodes creates a new consistent hash from existing values with vnodes
// It's a legacy function needed for backwards compatibility (daprd >= 1.13 with placement < 1.13)
// TODO in v1.15 remove this function
// TODO: @elena in v1.15 remove this function
func NewFromExistingWithVirtNodes(hosts map[uint64]string, sortedSet []uint64, loadMap map[string]*Host) *Consistent {
return &Consistent{
hosts: hosts,
Expand Down Expand Up @@ -207,15 +207,15 @@ func (c *Consistent) Add(host, id string, port int64) bool {

c.loadMap[host] = &Host{Name: host, AppID: id, Load: 0, Port: port}

// TODO in v1.15
// TODO: @elena in v1.15
// The optimisation of not disseminating vnodes with the placement table was introduced
// in 1.13, and the API level was increased to 20, but we still have to support sidecars
// running on 1.12 with placement services on 1.13. That's why we are keeping the
// vhosts in the store in v1.13.
// This should be removed in 1.14.
// --Start remove--
for i := 0; i < replicationFactor; i++ {
h := c.hash(host + strconv.Itoa(i))
for i := 0; i < c.replicationFactor; i++ {
h := hash(host + strconv.Itoa(i))
c.hosts[h] = host
c.sortedSet = append(c.sortedSet, h)
}
Expand All @@ -241,7 +241,7 @@ func (c *Consistent) Get(key string) (string, error) {
return "", ErrNoHosts
}

h := c.hash(key)
h := hash(key)
idx := c.search(h)
return c.hosts[c.sortedSet[idx]], nil
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *Consistent) GetLeast(key string) (string, error) {
return "", ErrNoHosts
}

h := c.hash(key)
h := hash(key)
idx := c.search(h)

i := idx
Expand Down Expand Up @@ -341,8 +341,8 @@ func (c *Consistent) Remove(host string) bool {
c.Lock()
defer c.Unlock()

for i := 0; i < replicationFactor; i++ {
h := c.hash(host + strconv.Itoa(i))
for i := 0; i < c.replicationFactor; i++ {
h := hash(host + strconv.Itoa(i))
delete(c.hosts, h)
c.delSlice(h)
}
Expand Down Expand Up @@ -455,12 +455,7 @@ func (c *Consistent) SortedSet() (sortedSet []uint64) {
return sortedSet
}

func (c *Consistent) hash(key string) uint64 {
func hash(key string) uint64 {
out := blake2b.Sum512([]byte(key))
return binary.LittleEndian.Uint64(out[:])
}

// SetReplicationFactor sets the replication factor for actor placement on vnodes.
func SetReplicationFactor(factor int) {
replicationFactor = factor
}
11 changes: 1 addition & 10 deletions pkg/placement/hashing/consistent_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ func TestReplicationFactor(t *testing.T) {
factors := []int{1, 100, 1000, 10000}

for _, f := range factors {
SetReplicationFactor(f)

h := NewConsistentHash()
h := NewConsistentHash(f)
for _, n := range nodes {
s := h.Add(n, n, 1)
assert.False(t, s)
Expand Down Expand Up @@ -67,13 +65,6 @@ func TestReplicationFactor(t *testing.T) {
})
}

func TestSetReplicationFactor(t *testing.T) {
f := 10
SetReplicationFactor(f)

assert.Equal(t, f, replicationFactor)
}

func TestGetAndSetVirtualNodeCacheHashes(t *testing.T) {
cache := NewVirtualNodesCache()

Expand Down

0 comments on commit 38fb965

Please sign in to comment.