Skip to content

Commit

Permalink
Placement service lock optimisations and race condition fix (#7428)
Browse files Browse the repository at this point in the history
* Calculates vnode hashes outside of lock

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Checks version again after reacquiring lock

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Removes unneeded check.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Reduce the time we’re holding the lock on the Consistent hash

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes possible race condition

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Prevents possible race condition

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Another possible race condition

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Removes leftover context metadata

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Small refactor

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Removes comment cause it’s not needed anymore, after bringing the lock inside of the method itself

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Revert

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
elena-kolevska and artursouza committed Jan 26, 2024
1 parent 3eef69f commit c53fc16
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/actors/placement/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type placementClient struct {

// connectToServer initializes a new connection to the target server and if it succeeds replace the current
// stream with the connected stream.
func (c *placementClient) connectToServer(ctx context.Context, serverAddr string, apiLevel uint32) error {
func (c *placementClient) connectToServer(ctx context.Context, serverAddr string) error {
opts, err := c.getGrpcOpts()
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions pkg/actors/placement/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ func TestConnectToServer(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return nil, errEstablishingTLSConn
})
assert.Equal(t, client.connectToServer(context.Background(), "", 1), errEstablishingTLSConn)
assert.Equal(t, client.connectToServer(context.Background(), ""), errEstablishingTLSConn)
})
t.Run("when grpc dial returns an error connectToServer should return an error", func(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return []grpc.DialOption{}, nil
})

require.Error(t, client.connectToServer(context.Background(), "", 1))
require.Error(t, client.connectToServer(context.Background(), ""))
})
t.Run("when new placement stream returns an error connectToServer should return an error", func(t *testing.T) {
client := newPlacementClient(func() ([]grpc.DialOption, error) {
return []grpc.DialOption{}, nil
})
conn, cleanup := newTestServerWithOpts() // do not register the placement stream server
defer cleanup()
require.Error(t, client.connectToServer(context.Background(), conn, 1))
require.Error(t, client.connectToServer(context.Background(), conn))
})
t.Run("when connectToServer succeeds it should broadcast that a new connection is alive", func(t *testing.T) {
conn, _, cleanup := newTestServer() // do not register the placement stream server
Expand All @@ -64,7 +64,7 @@ func TestConnectToServer(t *testing.T) {
ready.Done()
}()

require.NoError(t, client.connectToServer(context.Background(), conn, 1))
require.NoError(t, client.connectToServer(context.Background(), conn))
ready.Wait() // should not timeout
assert.True(t, client.streamConnAlive)
})
Expand All @@ -84,7 +84,7 @@ func TestConnectToServer(t *testing.T) {
ready.Done()
}()

err := client.connectToServer(context.Background(), conn, 10)
err := client.connectToServer(context.Background(), conn)
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestDisconnect(t *testing.T) {
defer cleanup()

client := newPlacementClient(getGrpcOptsGetter([]string{conn}, testSecurity(t)))
require.NoError(t, client.connectToServer(context.Background(), conn, 1))
require.NoError(t, client.connectToServer(context.Background(), conn))

called := false
shouldBeCalled := func() {
Expand Down
43 changes: 29 additions & 14 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (p *actorPlacement) Start(ctx context.Context) error {
p.appHealthy.Store(true)
p.resetPlacementTables()

if !p.establishStreamConn(ctx, internal.ActorAPILevel) {
if !p.establishStreamConn(ctx) {
return nil
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func (p *actorPlacement) Start(ctx context.Context) error {
if !p.running.Load() {
break
}
p.establishStreamConn(ctx, internal.ActorAPILevel)
p.establishStreamConn(ctx)
}
}()

Expand Down Expand Up @@ -258,9 +258,7 @@ func (p *actorPlacement) Start(ctx context.Context) error {
log.Debug("Disconnecting from placement service by the unhealthy app")

p.client.disconnect()
p.placementTableLock.Lock()
p.resetPlacementTables()
p.placementTableLock.Unlock()
if p.haltAllActorsFn != nil {
haltErr := p.haltAllActorsFn()
if haltErr != nil {
Expand Down Expand Up @@ -379,7 +377,7 @@ func (p *actorPlacement) doLookupActor(ctx context.Context, actorType, actorID s
}

//nolint:nosnakecase
func (p *actorPlacement) establishStreamConn(ctx context.Context, apiLevel uint32) (established bool) {
func (p *actorPlacement) establishStreamConn(ctx context.Context) (established bool) {
// Backoff for reconnecting in case of errors
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = placementReconnectMinInterval
Expand Down Expand Up @@ -414,7 +412,7 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context, apiLevel uint3
log.Debug("try to connect to placement service: " + serverAddr)
}

err := p.client.connectToServer(ctx, serverAddr, apiLevel)
err := p.client.connectToServer(ctx, serverAddr)
if err == errEstablishingTLSConn {
return false
}
Expand Down Expand Up @@ -514,8 +512,10 @@ func (p *actorPlacement) unblockPlacements() {
}

// Resets the placement tables.
// Note that this method should be invoked by a caller that owns a lock.
func (p *actorPlacement) resetPlacementTables() {
p.placementTableLock.Lock()
defer p.placementTableLock.Unlock()

if p.hasPlacementTablesCh != nil {
close(p.hasPlacementTablesCh)
}
Expand All @@ -528,20 +528,20 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
updated := false
var updatedAPILevel *uint32
func() {
p.placementTableLock.Lock()
defer p.placementTableLock.Unlock()

p.placementTableLock.RLock()
if in.GetVersion() == p.placementTables.Version {
p.placementTableLock.RUnlock()
return
}
p.placementTableLock.RUnlock()

if in.GetApiLevel() != p.apiLevel {
p.apiLevel = in.GetApiLevel()
updatedAPILevel = ptr.Of(in.GetApiLevel())
}

maps.Clear(p.placementTables.Entries)
p.placementTables.Version = in.GetVersion()
entries := map[string]*hashing.Consistent{}

for k, v := range in.GetEntries() {
loadMap := make(map[string]*hashing.Host, len(v.GetLoadMap()))
for lk, lv := range v.GetLoadMap() {
Expand All @@ -551,12 +551,27 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
// TODO: @elena in v1.15 remove the check for versions < 1.13
// only keep `hashing.NewFromExisting`
if p.apiLevel < placement.NoVirtualNodesInPlacementTablesAPILevel {
p.placementTables.Entries[k] = hashing.NewFromExistingWithVirtNodes(v.GetHosts(), v.GetSortedSet(), loadMap)
entries[k] = hashing.NewFromExistingWithVirtNodes(v.GetHosts(), v.GetSortedSet(), loadMap)
} else {
p.placementTables.Entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
entries[k] = hashing.NewFromExisting(loadMap, int(in.GetReplicationFactor()), p.virtualNodesCache)
}
}

p.placementTableLock.Lock()
defer p.placementTableLock.Unlock()

// Check if the table was updated in the meantime
// This is not needed atm, because the placement leader is the only one sending updates,
// but it might be needed soon because there's plans to allow other nodes to send updates
// This is a very cheap check, so it's a good idea to keep it
if in.GetVersion() == p.placementTables.Version {
return
}

maps.Clear(p.placementTables.Entries)
p.placementTables.Version = in.GetVersion()
p.placementTables.Entries = entries

updated = true
if p.hasPlacementTablesCh != nil {
close(p.hasPlacementTablesCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/placement/hashing/consistent_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *Consistent) Add(host, id string, port int64) bool {
// in 1.13, and the API level was increased to 20, but we still have to support sidecars
// running on 1.12 with placement services on 1.13. That's why we are keeping the
// vhosts in the store in v1.13.
// This should be removed in 1.14.
// This should be removed in 1.15.
// --Start remove--
for i := 0; i < c.replicationFactor; i++ {
h := hash(host + strconv.Itoa(i))
Expand Down
4 changes: 0 additions & 4 deletions tests/integration/suite/placement/quorum/insecure.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

"google.golang.org/grpc/metadata"

"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -122,7 +119,6 @@ func (i *insecure) Run(t *testing.T, ctx context.Context) {
}
t.Cleanup(func() { require.NoError(t, conn.Close()) })
client := v1pb.NewPlacementClient(conn)
ctx = metadata.AppendToOutgoingContext(ctx, "dapr-placement-api-level", strconv.Itoa(i.places[j].CurrentActorsAPILevel()))

stream, err = client.ReportDaprStatus(ctx)
if err != nil {
Expand Down

0 comments on commit c53fc16

Please sign in to comment.