Skip to content

Commit

Permalink
Metadata API: report actor runtime status (#7040)
Browse files Browse the repository at this point in the history
* Metadata API: report actor runtime status

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added integration tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Changed per review feedback

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Address review feedback 1

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Address review feedback 2

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Renamed to WithInMemoryActorStateStore

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added comments

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
  • Loading branch information
3 people committed Nov 27, 2023
1 parent 4d626ca commit 9189949
Show file tree
Hide file tree
Showing 34 changed files with 2,117 additions and 1,499 deletions.
25 changes: 24 additions & 1 deletion dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,37 @@ message GetMetadataRequest {
// GetMetadataResponse is a message that is returned on GetMetadata rpc call.
message GetMetadataResponse {
string id = 1;
repeated ActiveActorsCount active_actors_count = 2 [json_name = "actors"];
// Deprecated alias for actor_runtime.active_actors.
repeated ActiveActorsCount active_actors_count = 2 [json_name = "actors", deprecated = true];
repeated RegisteredComponents registered_components = 3 [json_name = "components"];
map<string, string> extended_metadata = 4 [json_name = "extended"];
repeated PubsubSubscription subscriptions = 5 [json_name = "subscriptions"];
repeated MetadataHTTPEndpoint http_endpoints = 6 [json_name = "httpEndpoints"];
AppConnectionProperties app_connection_properties = 7 [json_name = "appConnectionProperties"];
string runtime_version = 8 [json_name = "runtimeVersion"];
repeated string enabled_features = 9 [json_name = "enabledFeatures"];
ActorRuntime actor_runtime = 10 [json_name = "actorRuntime"];
}

message ActorRuntime {
enum ActorRuntimeStatus {
// Indicates that the actor runtime is still being initialized.
INITIALIZING = 0;
// Indicates that the actor runtime is disabled.
// This normally happens when Dapr is started without "placement-host-address"
DISABLED = 1;
// Indicates the actor runtime is running, either as an actor host or client.
RUNNING = 2;
}

// Contains an enum indicating whether the actor runtime has been initialized.
ActorRuntimeStatus runtime_status = 1 [json_name = "runtimeStatus"];
// Count of active actors per type.
repeated ActiveActorsCount active_actors = 2 [json_name = "activeActors"];
// Indicates whether the actor runtime is ready to host actors.
bool host_ready = 3 [json_name = "hostReady"];
// Custom message from the placement provider.
string placement = 4 [json_name = "placement"];
}

message ActiveActorsCount {
Expand Down
18 changes: 16 additions & 2 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ActorRuntime interface {
io.Closer
Init(context.Context) error
IsActorHosted(ctx context.Context, req *ActorHostedRequest) bool
GetActiveActorsCount(ctx context.Context) []*runtimev1pb.ActiveActorsCount
GetRuntimeStatus(ctx context.Context) *runtimev1pb.ActorRuntime
RegisterInternalActor(ctx context.Context, actorType string, actor InternalActor, actorIdleTimeout time.Duration) error
}

Expand Down Expand Up @@ -1036,7 +1036,21 @@ func (a *actorsRuntime) RegisterInternalActor(ctx context.Context, actorType str
return nil
}

func (a *actorsRuntime) GetActiveActorsCount(ctx context.Context) []*runtimev1pb.ActiveActorsCount {
func (a *actorsRuntime) GetRuntimeStatus(ctx context.Context) *runtimev1pb.ActorRuntime {
// Do not populate RuntimeStatus, which will be populated by the runtime
res := &runtimev1pb.ActorRuntime{
ActiveActors: a.getActiveActorsCount(ctx),
}

if a.placement != nil {
res.HostReady = a.placement.PlacementHealthy() && a.haveCompatibleStorage()
res.Placement = a.placement.StatusMessage()
}

return res
}

func (a *actorsRuntime) getActiveActorsCount(ctx context.Context) []*runtimev1pb.ActiveActorsCount {
actorTypes := a.actorsConfig.Config.HostedActorTypes.ListActorTypes()
actorCountMap := make(map[string]int32, len(actorTypes))
for _, actorType := range actorTypes {
Expand Down
39 changes: 27 additions & 12 deletions pkg/actors/actors_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func TestActiveActorsCount(t *testing.T) {
fakeCallAndActivateActor(testActorsRuntime, "cat", "xyz", testActorsRuntime.clock)
fakeCallAndActivateActor(testActorsRuntime, "dog", "xyz", testActorsRuntime.clock)

actualCounts := testActorsRuntime.GetActiveActorsCount(ctx)
actualCounts := testActorsRuntime.getActiveActorsCount(ctx)
assert.ElementsMatch(t, expectedCounts, actualCounts)
})

Expand All @@ -1016,7 +1016,7 @@ func TestActiveActorsCount(t *testing.T) {
testActorsRuntime.actorsConfig.Config.HostedActorTypes = internal.NewHostedActors([]string{})
defer testActorsRuntime.Close()

actualCounts := testActorsRuntime.GetActiveActorsCount(ctx)
actualCounts := testActorsRuntime.getActiveActorsCount(ctx)
assert.Equal(t, expectedCounts, actualCounts)
})
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/actors/internal/placement_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type PlacementService interface {

SetHaltActorFns(haltFn HaltActorFn, haltAllFn HaltAllActorsFn)
SetOnAPILevelUpdate(fn func(apiLevel uint32))

// PlacementHealthy returns true if the placement service is healthy.
PlacementHealthy() bool
// StatusMessage returns a custom status message.
StatusMessage() string
}

// LookupActorRequest is the request for LookupActor.
Expand Down
2 changes: 1 addition & 1 deletion pkg/actors/internal_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,6 @@ func TestInternalActorsNotCounted(t *testing.T) {
internalActors[InternalActorTypePrefix+"wfengine.workflow"] = &mockInternalActor{}
testActorRuntime, err := newTestActorsRuntimeWithInternalActors(internalActors)
require.NoError(t, err)
actorCounts := testActorRuntime.GetActiveActorsCount(context.Background())
actorCounts := testActorRuntime.getActiveActorsCount(context.Background())
assert.Empty(t, actorCounts)
}
11 changes: 11 additions & 0 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ func NewActorPlacement(opts ActorPlacementOpts) internal.PlacementService {
}
}

func (p *actorPlacement) PlacementHealthy() bool {
return p.appHealthy.Load() && p.client.isConnected()
}

func (p *actorPlacement) StatusMessage() string {
if p.client.isConnected() {
return "placement: connected"
}
return "placement: disconnected"
}

// Register an actor type by adding it to the list of known actor types (if it's not already registered)
// The placement tables will get updated when the next heartbeat fires
func (p *actorPlacement) AddHostedActorType(actorType string, idleTimeout time.Duration) error {
Expand Down
51 changes: 22 additions & 29 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4159,23 +4159,9 @@ func TestMetadata(t *testing.T) {
})

mockActors := new(actors.MockActors)
mockActors.On("GetActiveActorsCount")

appConnectionConfig := config.AppConnectionConfig{
ChannelAddress: "1.2.3.4",
MaxConcurrency: 10,
Port: 5000,
Protocol: "grpc",
HealthCheckHTTPPath: "/healthz",
HealthCheck: &config.AppHealthConfig{
ProbeInterval: 10 * time.Second,
ProbeTimeout: 5 * time.Second,
ProbeOnly: true,
Threshold: 3,
},
}
mockActors.On("GetRuntimeStatus")

server, lis := startDaprAPIServer(&api{
a := &api{
UniversalAPI: &universalapi.UniversalAPI{
AppID: "fakeAPI",
Actors: mockActors,
Expand All @@ -4191,10 +4177,26 @@ func TestMetadata(t *testing.T) {
ExtendedMetadata: map[string]string{
"test": "value",
},
AppConnectionConfig: appConnectionConfig,
GlobalConfig: &config.Configuration{},
AppConnectionConfig: config.AppConnectionConfig{
ChannelAddress: "1.2.3.4",
MaxConcurrency: 10,
Port: 5000,
Protocol: "grpc",
HealthCheckHTTPPath: "/healthz",
HealthCheck: &config.AppHealthConfig{
ProbeInterval: 10 * time.Second,
ProbeTimeout: 5 * time.Second,
ProbeOnly: true,
Threshold: 3,
},
},
GlobalConfig: &config.Configuration{},
},
}, "")
}
a.UniversalAPI.InitUniversalAPI()
a.UniversalAPI.SetActorsInitDone()

server, lis := startDaprAPIServer(a, "")
defer server.Stop()

clientConn := createTestClient(lis)
Expand All @@ -4219,16 +4221,7 @@ func TestMetadata(t *testing.T) {
bytes, err := json.Marshal(res)
assert.NoError(t, err)

expectedResponse := `{"id":"fakeAPI",` +
`"active_actors_count":[{"type":"abcd","count":10},{"type":"xyz","count":5}],` +
`"registered_components":[{"name":"MockComponent1Name","type":"mock.component1Type","version":"v1.0","capabilities":["mock.feat.MockComponent1Name"]},` +
`{"name":"MockComponent2Name","type":"mock.component2Type","version":"v1.0","capabilities":["mock.feat.MockComponent2Name"]}],` +
`"extended_metadata":{"daprRuntimeVersion":"edge","foo":"bar","test":"value"},` +
`"subscriptions":[{"pubsub_name":"test","topic":"topic","rules":{"rules":[{"path":"path"}]},"dead_letter_topic":"dead"}],` +
`"http_endpoints":[{"name":"MockHTTPEndpoint"}],` +
`"app_connection_properties":{"port":5000,"protocol":"grpc","channel_address":"1.2.3.4","max_concurrency":10,` +
`"health":{"health_probe_interval":"10s","health_probe_timeout":"5s","health_threshold":3}},` +
`"runtime_version":"edge"}`
expectedResponse := `{"id":"fakeAPI","active_actors_count":[{"type":"abcd","count":10},{"type":"xyz","count":5}],"registered_components":[{"name":"MockComponent1Name","type":"mock.component1Type","version":"v1.0","capabilities":["mock.feat.MockComponent1Name"]},{"name":"MockComponent2Name","type":"mock.component2Type","version":"v1.0","capabilities":["mock.feat.MockComponent2Name"]}],"extended_metadata":{"daprRuntimeVersion":"edge","foo":"bar","test":"value"},"subscriptions":[{"pubsub_name":"test","topic":"topic","rules":{"rules":[{"path":"path"}]},"dead_letter_topic":"dead"}],"http_endpoints":[{"name":"MockHTTPEndpoint"}],"app_connection_properties":{"port":5000,"protocol":"grpc","channel_address":"1.2.3.4","max_concurrency":10,"health":{"health_probe_interval":"10s","health_probe_timeout":"5s","health_threshold":3}},"runtime_version":"edge","actor_runtime":{"runtime_status":2,"active_actors":[{"type":"abcd","count":10},{"type":"xyz","count":5}],"host_ready":true}}`
assert.Equal(t, expectedResponse, string(bytes))
})
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/grpc/universalapi/api_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ func (a *UniversalAPI) GetMetadata(ctx context.Context, in *runtimev1pb.GetMetad
// This is deprecated, but we still need to support it for backward compatibility.
extendedMetadata[daprRuntimeVersionKey] = buildinfo.Version()

// Active actors count
activeActorsCount := []*runtimev1pb.ActiveActorsCount{}
if a.Actors != nil {
activeActorsCount = a.Actors.GetActiveActorsCount(ctx)
// Actor runtime
var actorRuntime *runtimev1pb.ActorRuntime
if a.actorsReady.Load() {
if a.Actors == nil {
actorRuntime = &runtimev1pb.ActorRuntime{
RuntimeStatus: runtimev1pb.ActorRuntime_DISABLED,
}
} else {
actorRuntime = a.Actors.GetRuntimeStatus(ctx)
actorRuntime.RuntimeStatus = runtimev1pb.ActorRuntime_RUNNING
}
}

// App connection information
Expand Down Expand Up @@ -104,12 +111,13 @@ func (a *UniversalAPI) GetMetadata(ctx context.Context, in *runtimev1pb.GetMetad
Id: a.AppID,
ExtendedMetadata: extendedMetadata,
RegisteredComponents: registeredComponents,
ActiveActorsCount: activeActorsCount,
ActiveActorsCount: actorRuntime.GetActiveActors(), // Alias for backwards-compatibility
Subscriptions: ps,
HttpEndpoints: registeredHTTPEndpoints,
AppConnectionProperties: appConnectionProperties,
RuntimeVersion: buildinfo.Version(),
EnabledFeatures: a.GlobalConfig.EnabledFeatures(),
ActorRuntime: actorRuntime,
}, nil
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/grpc/universalapi/api_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ func TestGetMetadata(t *testing.T) {
fakeComponent.Name = "testComponent"

mockActors := new(actors.MockActors)
mockActors.On("GetActiveActorsCount").Return(&runtimev1pb.ActiveActorsCount{
Count: 10,
Type: "abcd",
})
mockActors.On("GetRuntimeStatus")

compStore := compstore.New()
require.NoError(t, compStore.AddPendingComponentForCommit(fakeComponent))
Expand Down Expand Up @@ -107,6 +104,7 @@ func TestGetMetadata(t *testing.T) {
AppConnectionConfig: appConnectionConfig,
GlobalConfig: &config.Configuration{},
}
fakeAPI.actorsReady.Store(true)

response, err := fakeAPI.GetMetadata(context.Background(), &runtimev1pb.GetMetadataRequest{})
require.NoError(t, err, "Expected no error")
Expand All @@ -126,7 +124,7 @@ func TestGetMetadata(t *testing.T) {
`"subscriptions":[{"pubsub_name":"test","topic":"topic","rules":{"rules":[{"path":"path"}]},"dead_letter_topic":"dead"}],` +
`"app_connection_properties":{"port":1234,"protocol":"http","channel_address":"1.2.3.4","max_concurrency":10` +
healthCheckJSON +
`"runtime_version":"edge"}`
`"runtime_version":"edge","actor_runtime":{"runtime_status":2,"active_actors":[{"type":"abcd","count":10},{"type":"xyz","count":5}],"host_ready":true}}`
assert.Equal(t, expectedResponse, string(bytes))
})
}
Expand Down
20 changes: 19 additions & 1 deletion pkg/http/api_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (a *api) onGetMetadata() http.HandlerFunc {
ID: out.Id,
Extended: out.ExtendedMetadata,
// We can embed the proto object directly only for as long as the protojson key is == json key
ActiveActorsCount: out.ActiveActorsCount,
ActiveActorsCount: out.ActiveActorsCount, //nolint:staticcheck
RegisteredComponents: out.RegisteredComponents,
HTTPEndpoints: out.HttpEndpoints,
RuntimeVersion: out.RuntimeVersion,
Expand Down Expand Up @@ -115,6 +115,16 @@ func (a *api) onGetMetadata() http.HandlerFunc {
res.Subscriptions = subs
}

// Actor runtime
// We need to include the status as string
actorRuntime := out.GetActorRuntime()
res.ActorRuntime = metadataActorRuntime{
Status: actorRuntime.GetRuntimeStatus().String(),
ActiveActors: actorRuntime.GetActiveActors(),
HostReady: actorRuntime.GetHostReady(),
Placement: actorRuntime.GetPlacement(),
}

return res, nil
},
},
Expand Down Expand Up @@ -155,6 +165,14 @@ type metadataResponse struct {
Subscriptions []metadataResponsePubsubSubscription `json:"subscriptions,omitempty"`
HTTPEndpoints []*runtimev1pb.MetadataHTTPEndpoint `json:"httpEndpoints,omitempty"`
AppConnectionProperties metadataResponseAppConnectionProperties `json:"appConnectionProperties,omitempty"`
ActorRuntime metadataActorRuntime `json:"actorRuntime,omitempty"`
}

type metadataActorRuntime struct {
Status string `json:"runtimeStatus"`
ActiveActors []*runtimev1pb.ActiveActorsCount `json:"activeActors,omitempty"`
HostReady bool `json:"hostReady"`
Placement string `json:"placement,omitempty"`
}

type metadataResponsePubsubSubscription struct {
Expand Down

0 comments on commit 9189949

Please sign in to comment.