Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple fixes in actors package #7119

Merged
merged 40 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
200904d
First batch of fixes:
ItalyPaleAle Oct 30, 2023
98a7b4d
Fixed: placement waits for the first table dissemination before repor…
ItalyPaleAle Oct 30, 2023
5590dce
Some improvements to memory efficiency and tests
ItalyPaleAle Oct 30, 2023
60e3e8d
Halt all actors if connection to placement fails
ItalyPaleAle Oct 30, 2023
facb35f
Added test for deactivation
ItalyPaleAle Oct 30, 2023
9d43520
💄
ItalyPaleAle Oct 30, 2023
1488faa
Merge branch 'master' of https://github.com/dapr/dapr into actors-dea…
ItalyPaleAle Oct 30, 2023
fbf6b51
💄
ItalyPaleAle Oct 31, 2023
730b009
Update tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go
ItalyPaleAle Oct 31, 2023
ffa00b8
Address review feedback
ItalyPaleAle Oct 31, 2023
43c6668
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 2, 2023
70e8d75
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 2, 2023
3840345
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 3, 2023
7575790
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 4, 2023
f2e3bcd
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 4, 2023
6dedbc7
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 4, 2023
927ab4b
Renamed test
ItalyPaleAle Nov 6, 2023
9b41ac4
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 6, 2023
6794fb5
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 7, 2023
533adac
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 9, 2023
b7f7418
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 9, 2023
9c60c13
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 9, 2023
f26361b
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 13, 2023
7803b73
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 13, 2023
dfd70de
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 13, 2023
0af9c34
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 13, 2023
fa31d81
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 13, 2023
a5a4af8
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 15, 2023
c5d0f7b
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 15, 2023
1820b2b
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 15, 2023
75b5381
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 15, 2023
718789f
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 16, 2023
deb364c
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 16, 2023
c3abae1
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 17, 2023
1ec07f5
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 17, 2023
369381b
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 17, 2023
1b388d8
Merge branch 'master' of https://github.com/dapr/dapr into actors-dea…
ItalyPaleAle Nov 19, 2023
4e14ef3
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 20, 2023
d40d77c
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 20, 2023
4f2ebdb
Merge branch 'master' into actors-deactivation-refactorings
dapr-bot Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 32 additions & 13 deletions pkg/actors/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/utils/clock"

diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/kit/ptr"
)

// ErrActorDisposed is the error when runtime tries to hold the lock of the disposed actor.
Expand All @@ -39,11 +40,12 @@ type actor struct {
// pendingActorCalls is the number of the current pending actor calls by turn-based concurrency.
pendingActorCalls atomic.Int32

// When consistent hashing tables are updated, actor runtime drains actor to rebalance actors
// across actor hosts after drainOngoingCallTimeout or until all pending actor calls are completed.
// lastUsedTime is the time when the last actor call holds lock. This is used to calculate
// the duration of ongoing calls to time out.
lastUsedTime time.Time
// idleTimeout is the configured max idle time for actors of this kind.
idleTimeout time.Duration

// idleAt is the time after which this actor is considered to be idle.
// When the actor is locked, idleAt is updated by adding the idleTimeout to the current time.
idleAt atomic.Pointer[time.Time]

// disposeLock guards disposed and disposeCh.
disposeLock sync.RWMutex
Expand All @@ -56,17 +58,33 @@ type actor struct {
clock clock.Clock
}

func newActor(actorType, actorID string, maxReentrancyDepth *int, cl clock.Clock) *actor {
func newActor(actorType, actorID string, maxReentrancyDepth *int, idleTimeout time.Duration, cl clock.Clock) *actor {
if cl == nil {
cl = &clock.RealClock{}
}
return &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
lastUsedTime: cl.Now().UTC(),

a := &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
idleTimeout: idleTimeout,
}
a.idleAt.Store(ptr.Of(cl.Now().Add(idleTimeout)))

return a
}

// Key returns the key for this unique actor.
// This is implemented to comply with the queueable interface.
func (a *actor) Key() string {
return a.actorType + daprSeparator + a.actorID
}

// ScheduledTime returns the time the actor becomes idle at.
// This is implemented to comply with the queueable interface.
func (a *actor) ScheduledTime() time.Time {
return *a.idleAt.Load()
}

// isBusy returns true when pending actor calls are ongoing.
Expand Down Expand Up @@ -115,7 +133,8 @@ func (a *actor) lock(reentrancyID *string) error {
a.unlock()
return ErrActorDisposed
}
a.lastUsedTime = a.clock.Now().UTC()

a.idleAt.Store(ptr.Of(a.clock.Now().Add(a.idleTimeout)))
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
var reentrancyStackDepth = 32

func TestIsBusy(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
assert.Equal(t, true, testActor.isBusy())
testActor.unlock()
}

func TestTurnBasedConcurrencyLocks(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

// first lock
testActor.lock(nil)
assert.Equal(t, true, testActor.isBusy())
firstLockTime := testActor.lastUsedTime
firstIdleAt := *testActor.idleAt.Load()

waitCh := make(chan bool)

Expand All @@ -57,7 +57,7 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) {
time.Sleep(10 * time.Millisecond)
assert.Equal(t, int32(2), testActor.pendingActorCalls.Load())
assert.True(t, testActor.isBusy())
assert.Equal(t, firstLockTime, testActor.lastUsedTime)
assert.Equal(t, firstIdleAt, *testActor.idleAt.Load())

// unlock the first lock
testActor.unlock()
Expand All @@ -70,12 +70,12 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) {

assert.Equal(t, int32(0), testActor.pendingActorCalls.Load())
assert.False(t, testActor.isBusy())
assert.True(t, testActor.lastUsedTime.Sub(firstLockTime) >= 10*time.Millisecond)
assert.True(t, testActor.idleAt.Load().Sub(firstIdleAt) >= 10*time.Millisecond)
}

func TestDisposedActor(t *testing.T) {
t.Run("not disposed", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
testActor.unlock()
Expand All @@ -86,7 +86,7 @@ func TestDisposedActor(t *testing.T) {
})

t.Run("disposed", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
ch := testActor.channel()
Expand All @@ -102,7 +102,7 @@ func TestDisposedActor(t *testing.T) {

func TestPendingActorCalls(t *testing.T) {
t.Run("no pending actor call with new actor object", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)
channelClosed := false

select {
Expand All @@ -117,7 +117,7 @@ func TestPendingActorCalls(t *testing.T) {
})

t.Run("close channel before timeout", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)
testActor.lock(nil)

channelClosed := atomic.Bool{}
Expand All @@ -139,7 +139,7 @@ func TestPendingActorCalls(t *testing.T) {

t.Run("multiple listeners", func(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Now())
testActor := newActor("testType", "testID", &reentrancyStackDepth, clock)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, clock)
testActor.lock(nil)

nListeners := 10
Expand Down