Skip to content

Commit

Permalink
Store actor API level in actors object
Browse files Browse the repository at this point in the history
Follow-up from dapr#7134
Part of dapr#6838

Actor runtime can use this to change its behavior. Will allow completing dapr#7129

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Nov 15, 2023
1 parent 3bb14c4 commit 70a658b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
5 changes: 5 additions & 0 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type actorsRuntime struct {
wg sync.WaitGroup
closed atomic.Bool
closeCh chan struct{}
apiLevel atomic.Uint32

// TODO: @joshvanl Remove in Dapr 1.12 when ActorStateTTL is finalized.
stateTTLEnabled bool
Expand Down Expand Up @@ -253,6 +254,10 @@ func (a *actorsRuntime) Init(ctx context.Context) error {
a.actorsReminders.OnPlacementTablesUpdated(ctx)
},
})
a.placement.SetOnAPILevelUpdate(func(apiLevel uint32) {
a.apiLevel.Store(apiLevel)
log.Infof("Actor API level in the cluster has been updated to %d", apiLevel)
})
}

a.wg.Add(1)
Expand Down
5 changes: 5 additions & 0 deletions pkg/actors/actors_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/actors/internal/placement_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type PlacementService interface {
WaitUntilReady(ctx context.Context) error
LookupActor(ctx context.Context, req LookupActorRequest) (LookupActorResponse, error)
AddHostedActorType(actorType string, idleTimeout time.Duration) error
SetOnAPILevelUpdate(fn func(apiLevel uint32))
}

// LookupActorRequest is the request for LookupActor.
Expand Down
27 changes: 22 additions & 5 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)

var log = logger.NewLogger("dapr.runtime.actors.placement")
Expand Down Expand Up @@ -76,6 +77,11 @@ type actorPlacement struct {
// placementTableLock is the lock for placementTables.
placementTableLock sync.RWMutex

// apiLevel is the current API level of the cluster
apiLevel uint32
// onAPILevelUpdate is invoked when the API level is updated
onAPILevelUpdate func(apiLevel uint32)

// unblockSignal is the channel to unblock table locking.
unblockSignal chan struct{}
// tableIsBlocked is the status of table lock.
Expand Down Expand Up @@ -448,6 +454,7 @@ func (p *actorPlacement) unblockPlacements() {

func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
updated := false
var updatedAPILevel *uint32
func() {
p.placementTableLock.Lock()
defer p.placementTableLock.Unlock()
Expand All @@ -456,6 +463,11 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
return
}

if in.ApiLevel != p.apiLevel {
p.apiLevel = in.ApiLevel
updatedAPILevel = ptr.Of(in.ApiLevel)
}

tables := &hashing.ConsistentHashTables{Entries: make(map[string]*hashing.Consistent)}
for k, v := range in.Entries {
loadMap := map[string]*hashing.Host{}
Expand All @@ -470,14 +482,19 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) {
updated = true
}()

if !updated {
return
if updatedAPILevel != nil && p.onAPILevelUpdate != nil {
p.onAPILevelUpdate(*updatedAPILevel)
}

// May call LookupActor inside, so should not do this with placementTableLock locked.
p.afterTableUpdateFn()
if updated {
// May call LookupActor inside, so should not do this with placementTableLock locked.
p.afterTableUpdateFn()
log.Infof("Placement tables updated, version: %s", in.GetVersion())
}
}

log.Infof("Placement tables updated, version: %s", in.GetVersion())
func (p *actorPlacement) SetOnAPILevelUpdate(fn func(apiLevel uint32)) {
p.onAPILevelUpdate = fn
}

// addDNSResolverPrefix add the `dns://` prefix to the given addresses
Expand Down

0 comments on commit 70a658b

Please sign in to comment.