Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dapr/dapr into metadata-a…
Browse files Browse the repository at this point in the history
…ctor-status

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Nov 27, 2023
2 parents 9712d15 + 4d626ca commit 6e490c8
Show file tree
Hide file tree
Showing 74 changed files with 4,082 additions and 2,736 deletions.
1 change: 1 addition & 0 deletions .github/scripts/dapr_bot.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const owners = [
'taction',
'tanvigour',
'yaron2',
'rabollin',
]

const SDKs = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@ spec:
type: {{ .Values.apiService.type }}
ports:
- protocol: TCP
port: {{ .Values.ports.port }}
port: {{ .Values.ports.port }}
targetPort: {{ .Values.ports.targetPort }}

name: grpc
# Added for backwards compatibility where previous clients will attempt to
# connect on port 80.
# TOOD: @joshvanl: remove in v1.14
{{ if (ne (int .Values.ports.port) 80) }}
- protocol: TCP
port: 80
targetPort: {{ .Values.ports.targetPort }}
name: legacy
{{ end }}
---
apiVersion: v1
kind: Service
Expand All @@ -33,10 +42,10 @@ metadata:
{{ toYaml .Values.webhookService.annotations | indent 4}}
{{- end }}
spec:
selector:
app: dapr-operator
type: {{ .Values.webhookService.type }}
ports:
- port: 443
targetPort: 19443
protocol: TCP
protocol: TCP
selector:
app: dapr-operator
2 changes: 1 addition & 1 deletion charts/dapr/charts/dapr_operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ serviceReconciler:

ports:
protocol: TCP
port: 80
port: 443
targetPort: 6500

resources: {}
Expand Down
45 changes: 32 additions & 13 deletions pkg/actors/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/utils/clock"

diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/kit/ptr"
)

// ErrActorDisposed is the error when runtime tries to hold the lock of the disposed actor.
Expand All @@ -39,11 +40,12 @@ type actor struct {
// pendingActorCalls is the number of the current pending actor calls by turn-based concurrency.
pendingActorCalls atomic.Int32

// When consistent hashing tables are updated, actor runtime drains actor to rebalance actors
// across actor hosts after drainOngoingCallTimeout or until all pending actor calls are completed.
// lastUsedTime is the time when the last actor call holds lock. This is used to calculate
// the duration of ongoing calls to time out.
lastUsedTime time.Time
// idleTimeout is the configured max idle time for actors of this kind.
idleTimeout time.Duration

// idleAt is the time after which this actor is considered to be idle.
// When the actor is locked, idleAt is updated by adding the idleTimeout to the current time.
idleAt atomic.Pointer[time.Time]

// disposeLock guards disposed and disposeCh.
disposeLock sync.RWMutex
Expand All @@ -56,17 +58,33 @@ type actor struct {
clock clock.Clock
}

func newActor(actorType, actorID string, maxReentrancyDepth *int, cl clock.Clock) *actor {
func newActor(actorType, actorID string, maxReentrancyDepth *int, idleTimeout time.Duration, cl clock.Clock) *actor {
if cl == nil {
cl = &clock.RealClock{}
}
return &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
lastUsedTime: cl.Now().UTC(),

a := &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
idleTimeout: idleTimeout,
}
a.idleAt.Store(ptr.Of(cl.Now().Add(idleTimeout)))

return a
}

// Key returns the key for this unique actor.
// This is implemented to comply with the queueable interface.
func (a *actor) Key() string {
return a.actorType + daprSeparator + a.actorID
}

// ScheduledTime returns the time the actor becomes idle at.
// This is implemented to comply with the queueable interface.
func (a *actor) ScheduledTime() time.Time {
return *a.idleAt.Load()
}

// isBusy returns true when pending actor calls are ongoing.
Expand Down Expand Up @@ -115,7 +133,8 @@ func (a *actor) lock(reentrancyID *string) error {
a.unlock()
return ErrActorDisposed
}
a.lastUsedTime = a.clock.Now().UTC()

a.idleAt.Store(ptr.Of(a.clock.Now().Add(a.idleTimeout)))
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
var reentrancyStackDepth = 32

func TestIsBusy(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
assert.Equal(t, true, testActor.isBusy())
testActor.unlock()
}

func TestTurnBasedConcurrencyLocks(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

// first lock
testActor.lock(nil)
assert.Equal(t, true, testActor.isBusy())
firstLockTime := testActor.lastUsedTime
firstIdleAt := *testActor.idleAt.Load()

waitCh := make(chan bool)

Expand All @@ -57,7 +57,7 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) {
time.Sleep(10 * time.Millisecond)
assert.Equal(t, int32(2), testActor.pendingActorCalls.Load())
assert.True(t, testActor.isBusy())
assert.Equal(t, firstLockTime, testActor.lastUsedTime)
assert.Equal(t, firstIdleAt, *testActor.idleAt.Load())

// unlock the first lock
testActor.unlock()
Expand All @@ -70,12 +70,12 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) {

assert.Equal(t, int32(0), testActor.pendingActorCalls.Load())
assert.False(t, testActor.isBusy())
assert.True(t, testActor.lastUsedTime.Sub(firstLockTime) >= 10*time.Millisecond)
assert.True(t, testActor.idleAt.Load().Sub(firstIdleAt) >= 10*time.Millisecond)
}

func TestDisposedActor(t *testing.T) {
t.Run("not disposed", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
testActor.unlock()
Expand All @@ -86,7 +86,7 @@ func TestDisposedActor(t *testing.T) {
})

t.Run("disposed", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)

testActor.lock(nil)
ch := testActor.channel()
Expand All @@ -102,7 +102,7 @@ func TestDisposedActor(t *testing.T) {

func TestPendingActorCalls(t *testing.T) {
t.Run("no pending actor call with new actor object", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)
channelClosed := false

select {
Expand All @@ -117,7 +117,7 @@ func TestPendingActorCalls(t *testing.T) {
})

t.Run("close channel before timeout", func(t *testing.T) {
testActor := newActor("testType", "testID", &reentrancyStackDepth, nil)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil)
testActor.lock(nil)

channelClosed := atomic.Bool{}
Expand All @@ -139,7 +139,7 @@ func TestPendingActorCalls(t *testing.T) {

t.Run("multiple listeners", func(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Now())
testActor := newActor("testType", "testID", &reentrancyStackDepth, clock)
testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, clock)
testActor.lock(nil)

nListeners := 10
Expand Down

0 comments on commit 6e490c8

Please sign in to comment.