Skip to content

Commit

Permalink
Multiple fixes in actors package (#7119)
Browse files Browse the repository at this point in the history
* First batch of fixes:

1. Deactivate all actors in the app if the placement disconnects
2. Actor object now records the idle time rather than the last activation time
3. Remove actors from the active actors table even if deactivation fails
4. Improved shutdown of actors object

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed: placement waits for the first table dissemination before reporting readiness

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Some improvements to memory efficiency and tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Halt all actors if connection to placement fails

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added test for deactivation

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* 💄

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* 💄

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Update tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go

Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>

* Address review feedback

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Renamed test

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle and dapr-bot committed Nov 21, 2023
1 parent 70f5fd6 commit 6d000be
Show file tree
Hide file tree
Showing 15 changed files with 585 additions and 170 deletions.
45 changes: 32 additions & 13 deletions pkg/actors/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/utils/clock"

diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/kit/ptr"
)

// ErrActorDisposed is the error when runtime tries to hold the lock of the disposed actor.
Expand All @@ -39,11 +40,12 @@ type actor struct {
// pendingActorCalls is the number of the current pending actor calls by turn-based concurrency.
pendingActorCalls atomic.Int32

// When consistent hashing tables are updated, actor runtime drains actor to rebalance actors
// across actor hosts after drainOngoingCallTimeout or until all pending actor calls are completed.
// lastUsedTime is the time when the last actor call holds lock. This is used to calculate
// the duration of ongoing calls to time out.
lastUsedTime time.Time
// idleTimeout is the configured max idle time for actors of this kind.
idleTimeout time.Duration

// idleAt is the time after which this actor is considered to be idle.
// When the actor is locked, idleAt is updated by adding the idleTimeout to the current time.
idleAt atomic.Pointer[time.Time]

// disposeLock guards disposed and disposeCh.
disposeLock sync.RWMutex
Expand All @@ -56,17 +58,33 @@ type actor struct {
clock clock.Clock
}

func newActor(actorType, actorID string, maxReentrancyDepth *int, cl clock.Clock) *actor {
func newActor(actorType, actorID string, maxReentrancyDepth *int, idleTimeout time.Duration, cl clock.Clock) *actor {
if cl == nil {
cl = &clock.RealClock{}
}
return &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
lastUsedTime: cl.Now().UTC(),

a := &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
idleTimeout: idleTimeout,
}
a.idleAt.Store(ptr.Of(cl.Now().Add(idleTimeout)))

return a
}

// Key returns the key for this unique actor.
// This is implemented to comply with the queueable interface.
func (a *actor) Key() string {
return a.actorType + daprSeparator + a.actorID
}

// ScheduledTime returns the time the actor becomes idle at.
// This is implemented to comply with the queueable interface.
func (a *actor) ScheduledTime() time.Time {
return *a.idleAt.Load()
}

// isBusy returns true when pending actor calls are ongoing.
Expand Down Expand Up @@ -115,7 +133,8 @@ func (a *actor) lock(reentrancyID *string) error {
a.unlock()
return ErrActorDisposed
}
a.lastUsedTime = a.clock.Now().UTC()

a.idleAt.Store(ptr.Of(a.clock.Now().Add(a.idleTimeout)))
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
var reentrancyStackDepth = 32

func TestIsBusy(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
assert.Equal(t, true, testActor.isBusy())
testActor.unlock()
}

func TestTurnBasedConcurrencyLocks(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

// first lock
testActor.lock(nil)
assert.Equal(t, true, testActor.isBusy())
firstLockTime := testActor.lastUsedTime
firstIdleAt := *testActor.idleAt.Load()

waitCh := make(chan bool)

Expand All @@ -57,7 +57,7 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) {
time.Sleep(10 * time.Millisecond)
assert.Equal(t, int32(2), testActor.pendingActorCalls.Load())
assert.True(t, testActor.isBusy())
assert.Equal(t, firstLockTime, testActor.lastUsedTime)
assert.Equal(t, firstIdleAt, *testActor.idleAt.Load())

// unlock the first lock
testActor.unlock()
Expand All @@ -70,12 +70,12 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) {

assert.Equal(t, int32(0), testActor.pendingActorCalls.Load())
assert.False(t, testActor.isBusy())
assert.True(t, testActor.lastUsedTime.Sub(firstLockTime) >= 10*time.Millisecond)
assert.True(t, testActor.idleAt.Load().Sub(firstIdleAt) >= 10*time.Millisecond)
}

func TestDisposedActor(t *testing.T) {
t.Run("not disposed", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
testActor.unlock()
Expand All @@ -86,7 +86,7 @@ func TestDisposedActor(t *testing.T) {
})

t.Run("disposed", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
ch := testActor.channel()
Expand All @@ -102,7 +102,7 @@ func TestDisposedActor(t *testing.T) {

func TestPendingActorCalls(t *testing.T) {
t.Run("no pending actor call with new actor object", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)
channelClosed := false

select {
Expand All @@ -117,7 +117,7 @@ func TestPendingActorCalls(t *testing.T) {
})

t.Run("close channel before timeout", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)
testActor.lock(nil)

channelClosed := atomic.Bool{}
Expand All @@ -139,7 +139,7 @@ func TestPendingActorCalls(t *testing.T) {

t.Run("multiple listeners", func(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Now())
testActor := newActor("testType", "testID", &reentrancyStackDepth, clock)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, clock)
testActor.lock(nil)

nListeners := 10
Expand Down

0 comments on commit 6d000be

Please sign in to comment.