Skip to content

Commit

Permalink
Serialize reminders as protobuf in state store (#7129)
Browse files Browse the repository at this point in the history
* Added support for reading/writing reminders as protobuf depending on API level

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Increase target QPS

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added integration tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* 💄

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Updated test code per review feedback

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* 💄

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Mod tidy

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Update tests/integration/suite/actors/reminders/serialization_json.go

Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>

* Update tests/integration/framework/process/daprd/options.go

Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>

* Changed per review feedback

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Lint

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Should fix the test errors

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Lint....

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Disable ITs on Windows due to SQLite limitations

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
  • Loading branch information
4 people committed Dec 12, 2023
1 parent 53ceb4a commit 72a439c
Show file tree
Hide file tree
Showing 18 changed files with 1,082 additions and 62 deletions.
37 changes: 37 additions & 0 deletions dapr/proto/internals/v1/reminders.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";

package dapr.proto.internals.v1;

import "google/protobuf/timestamp.proto";

option go_package = "github.com/dapr/dapr/pkg/proto/internals/v1;internals";

// Reminder represents a reminder that is stored in the Dapr actor state store.
message Reminder {
string actor_id = 1;
string actor_type = 2;
string name = 3;
bytes data = 4;
string period = 5;
google.protobuf.Timestamp registered_time = 6;
string due_time = 7;
google.protobuf.Timestamp expiration_time = 8;
}

// Reminders is a collection of reminders.
message Reminders {
repeated Reminder reminders = 1;
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/metrics v0.26.3
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
modernc.org/sqlite v1.27.0
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -420,7 +421,6 @@ require (
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.7.2 // indirect
modernc.org/opt v0.1.3 // indirect
modernc.org/sqlite v1.27.0 // indirect
modernc.org/strutil v1.2.0 // indirect
modernc.org/token v1.1.0 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
Expand Down
3 changes: 2 additions & 1 deletion pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime {
}

// Init reminders
a.actorsReminders = reminders.NewRemindersProvider(a.clock, internal.RemindersProviderOpts{
a.actorsReminders = reminders.NewRemindersProvider(a.clock, reminders.NewRemindersProviderOpts{
StoreName: a.storeName,
Config: a.actorsConfig.Config,
APILevel: &a.apiLevel,
})
a.actorsReminders.SetExecuteReminderFn(a.executeReminder)
a.actorsReminders.SetResiliencyProvider(a.resiliency)
Expand Down
22 changes: 21 additions & 1 deletion pkg/actors/internal/api_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,24 @@ package internal
// ActorAPILevel is the level of the Actor APIs supported by this runtime.
// It is sent to the Placement service and disseminated to all other Dapr runtimes.
// The Dapr runtime can use this value, as well as the minimum API level observed in the cluster (as disseminated by Placement) to make decisions on feature availability across the cluster.
const ActorAPILevel = 10
//
// API levels per Dapr version:
// - 1.11.x and older = unset (equivalent to 0)
// - 1.12.x = 10
// - 1.13.x = 20
const ActorAPILevel = 20

// Features that can be enabled depending on the API level
type apiLevelFeature uint32

const (
// Enables serializing reminders as protobuf rather than JSON in the pkg/actors/reminders package
// When serialized as protobuf, reminders have the "\0pb" prefix
// Note this only control serializations; when un-serializing, legacy JSON is always supported as fallback
APILevelFeatureRemindersProtobuf apiLevelFeature = 20
)

// IsEnabled returns true if the feature is enabled for the current API level.
func (a apiLevelFeature) IsEnabled(currentAPILevel uint32) bool {
return currentAPILevel >= uint32(a)
}
6 changes: 0 additions & 6 deletions pkg/actors/internal/reminders.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ type LookupActorFn func(ctx context.Context, actorType string, actorID string) (
// StateStoreProviderFn is the type of a function that returns the state store provider.
type StateStoreProviderFn func() (TransactionalStateStore, error)

// RemindersProviderOpts contains the options for the reminders provider.
type RemindersProviderOpts struct {
StoreName string
Config Config
}

// RemindersProvider is the interface for the object that provides reminders services.
//
//nolint:interfacebloat
Expand Down
163 changes: 149 additions & 14 deletions pkg/actors/reminders/reminders.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@ limitations under the License.
package reminders

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/utils/clock"

"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/actors/internal"
diag "github.com/dapr/dapr/pkg/diagnostics"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
Expand All @@ -46,6 +51,7 @@ type remindersMetricsCollectorFn = func(actorType string, reminders int64)
// Implements a reminders provider.
type reminders struct {
clock clock.WithTicker
apiLevel *atomic.Uint32
runningCh chan struct{}
executeReminderFn internal.ExecuteReminderFn
remindersLock sync.RWMutex
Expand All @@ -61,10 +67,18 @@ type reminders struct {
metricsCollector remindersMetricsCollectorFn
}

// NewRemindersProviderOpts contains the options for the NewRemindersProvider function.
type NewRemindersProviderOpts struct {
StoreName string
Config internal.Config
APILevel *atomic.Uint32
}

// NewRemindersProvider returns a reminders provider.
func NewRemindersProvider(clock clock.WithTicker, opts internal.RemindersProviderOpts) internal.RemindersProvider {
func NewRemindersProvider(clock clock.WithTicker, opts NewRemindersProviderOpts) internal.RemindersProvider {
return &reminders{
clock: clock,
apiLevel: opts.APILevel,
runningCh: make(chan struct{}),
reminders: map[string][]ActorReminderReference{},
activeReminders: &sync.Map{},
Expand Down Expand Up @@ -442,8 +456,12 @@ func (r *reminders) doDeleteReminder(ctx context.Context, actorType, actorID, na
stateMetadata := map[string]string{
metadataPartitionKey: databasePartitionKey,
}
partitionOp, rErr := r.saveRemindersInPartitionRequest(stateKey, remindersInPartition, etag, stateMetadata)
if rErr != nil {
return false, fmt.Errorf("failed to create request for storing reminders: %w", rErr)
}
stateOperations := []state.TransactionalStateOperation{
r.saveRemindersInPartitionRequest(stateKey, remindersInPartition, etag, stateMetadata),
partitionOp,
r.saveActorTypeMetadataRequest(actorType, actorMetadata, stateMetadata),
}
rErr = r.executeStateStoreTransaction(ctx, store, stateOperations, stateMetadata)
Expand Down Expand Up @@ -528,8 +546,12 @@ func (r *reminders) storeReminder(ctx context.Context, store internal.Transactio
stateMetadata := map[string]string{
metadataPartitionKey: databasePartitionKey,
}
partitionOp, err := r.saveRemindersInPartitionRequest(stateKey, remindersInPartition, etag, stateMetadata)
if err != nil {
return struct{}{}, fmt.Errorf("failed to create request for storing reminders: %w", err)
}
stateOperations := []state.TransactionalStateOperation{
r.saveRemindersInPartitionRequest(stateKey, remindersInPartition, etag, stateMetadata),
partitionOp,
r.saveActorTypeMetadataRequest(reminder.ActorType, actorMetadata, stateMetadata),
}
rErr = r.executeStateStoreTransaction(ctx, store, stateOperations, stateMetadata)
Expand Down Expand Up @@ -571,16 +593,125 @@ func (r *reminders) executeStateStoreTransaction(ctx context.Context, store inte
return err
}

func (r *reminders) saveRemindersInPartitionRequest(stateKey string, reminders []internal.Reminder, etag *string, metadata map[string]string) state.SetRequest {
return state.SetRequest{
func (r *reminders) serializeRemindersToProto(reminders []internal.Reminder) ([]byte, error) {
pb := &internalv1pb.Reminders{
Reminders: make([]*internalv1pb.Reminder, len(reminders)),
}
for i, rm := range reminders {
pb.Reminders[i] = &internalv1pb.Reminder{
ActorId: rm.ActorID,
ActorType: rm.ActorType,
Name: rm.Name,
Period: rm.Period.String(),
DueTime: rm.DueTime,
}
if !rm.RegisteredTime.IsZero() {
pb.Reminders[i].RegisteredTime = timestamppb.New(rm.RegisteredTime)
}
if !rm.ExpirationTime.IsZero() {
pb.Reminders[i].ExpirationTime = timestamppb.New(rm.ExpirationTime)
}
if len(rm.Data) > 0 && !bytes.Equal(rm.Data, []byte("null")) {
pb.Reminders[i].Data = rm.Data
}
}
res, err := proto.Marshal(pb)
if err != nil {
return nil, fmt.Errorf("failed to serialize reminders as protobuf: %w", err)
}

// Prepend the prefix "\0pb" to indicate this is protobuf
res = append([]byte{0, 'p', 'b'}, res...)

return res, nil
}

func (r *reminders) unserialize(data []byte) ([]internal.Reminder, error) {
// Check if we have the protobuf prefix
if bytes.HasPrefix(data, []byte{0, 'p', 'b'}) {
return r.unserializeRemindersFromProto(data[3:])
}

// Fallback to unserializing from JSON
var batch []internal.Reminder
err := json.Unmarshal(data, &batch)
return batch, err
}

//nolint:protogetter
func (r *reminders) unserializeRemindersFromProto(data []byte) ([]internal.Reminder, error) {
pb := internalv1pb.Reminders{}
err := proto.Unmarshal(data, &pb)
if err != nil {
return nil, fmt.Errorf("failed to unserialize reminders from protobuf: %w", err)
}

res := make([]internal.Reminder, len(pb.GetReminders()))
for i, rm := range pb.GetReminders() {
if rm == nil {
return nil, errors.New("unserialized reminder object is nil")
}
res[i] = internal.Reminder{
ActorID: rm.ActorId,
ActorType: rm.ActorType,
Name: rm.Name,
DueTime: rm.DueTime,
Period: internal.NewEmptyReminderPeriod(),
}

if len(rm.Data) > 0 {
res[i].Data = rm.Data
}
if rm.Period != "" {
err = res[i].Period.UnmarshalJSON([]byte(rm.Period))
if err != nil {
return nil, fmt.Errorf("failed to unserialize reminder period: %w", err)
}
}

expirationTimePb := rm.GetExpirationTime()
if expirationTimePb != nil && expirationTimePb.IsValid() {
expirationTime := expirationTimePb.AsTime()
if !expirationTime.IsZero() {
res[i].ExpirationTime = expirationTime
}
}

registeredTimePb := rm.GetRegisteredTime()
if registeredTimePb != nil && registeredTimePb.IsValid() {
registeredTime := registeredTimePb.AsTime()
if !registeredTime.IsZero() {
res[i].RegisteredTime = registeredTime
}
}
}

return res, nil
}

func (r *reminders) saveRemindersInPartitionRequest(stateKey string, reminders []internal.Reminder, etag *string, metadata map[string]string) (state.SetRequest, error) {
req := state.SetRequest{
Key: stateKey,
Value: reminders,
ETag: etag,
Metadata: metadata,
Options: state.SetStateOption{
Concurrency: state.FirstWrite,
},
}

// If APILevelFeatureRemindersProtobuf is enabled, then serialize as protobuf which is more efficient
// Otherwise, fall back to sending the data as-is in the request (which will serialize it as JSON)
if internal.APILevelFeatureRemindersProtobuf.IsEnabled(r.apiLevel.Load()) {
var err error
req.Value, err = r.serializeRemindersToProto(reminders)
if err != nil {
return req, err
}
} else {
req.Value = reminders
}

return req, nil
}

func (r *reminders) saveActorTypeMetadataRequest(actorType string, actorMetadata *ActorMetadata, stateMetadata map[string]string) state.SetRequest {
Expand Down Expand Up @@ -651,16 +782,17 @@ func (r *reminders) getRemindersForActorType(ctx context.Context, actorType stri
return nil, nil, fmt.Errorf("could not get reminders partition %v: %v", resp.Key, resp.Error)
}

// Data can be empty if there's no reminder, when serialized as protobuf
var batch []internal.Reminder
if len(resp.Data) > 0 {
err = json.Unmarshal(resp.Data, &batch)
if err != nil {
return nil, nil, fmt.Errorf("could not parse actor reminders partition %v: %w", resp.Key, err)
}
} else {
if len(resp.Data) == 0 {
return nil, nil, fmt.Errorf("no data found for reminder partition %v: %w", resp.Key, err)
}

batch, err = r.unserialize(resp.Data)
if err != nil {
return nil, nil, fmt.Errorf("could not parse actor reminders partition %v: %w", resp.Key, err)
}

// We can't pre-allocate "list" with the needed capacity because we don't know how many items are in each partition
// However, we can limit the number of times we call "append" on list in a way that could cause the slice to be re-allocated, by managing a separate list here with a fixed capacity and modify "list" just once at per iteration on "bulkResponse".
batchList := make([]ActorReminderReference, len(batch))
Expand Down Expand Up @@ -698,7 +830,7 @@ func (r *reminders) getRemindersForActorType(ctx context.Context, actorType stri

var reminders []internal.Reminder
if len(resp.Data) > 0 {
err = json.Unmarshal(resp.Data, &reminders)
reminders, err = r.unserialize(resp.Data)
if err != nil {
return nil, nil, fmt.Errorf("could not parse actor reminders: %w", err)
}
Expand Down Expand Up @@ -831,7 +963,10 @@ func (r *reminders) migrateRemindersForActorType(ctx context.Context, store inte
}
for i := 0; i < actorMetadata.RemindersMetadata.PartitionCount; i++ {
stateKey := actorMetadata.calculateRemindersStateKey(actorType, uint32(i+1))
stateOperations[i] = r.saveRemindersInPartitionRequest(stateKey, actorRemindersPartitions[i], nil, stateMetadata)
stateOperations[i], err = r.saveRemindersInPartitionRequest(stateKey, actorRemindersPartitions[i], nil, stateMetadata)
if err != nil {
return fmt.Errorf("failed to create request for reminders in partition %d: %w", i, err)
}
}

// Also create a request to save the new metadata, so the new "metadataID" becomes the new de facto referenced list for reminders
Expand Down
20 changes: 12 additions & 8 deletions pkg/actors/reminders/reminders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ func newTestReminders() *reminders {
PlacementAddresses: []string{"placement:5050"},
HostedActorTypes: internal.NewHostedActors([]string{"cat"}),
}
opts := internal.RemindersProviderOpts{
clock := clocktesting.NewFakeClock(startOfTime)
apiLevel := &atomic.Uint32{}
apiLevel.Store(internal.ActorAPILevel)
r := NewRemindersProvider(clock, NewRemindersProviderOpts{
StoreName: "testStore",
Config: conf,
}
clock := clocktesting.NewFakeClock(startOfTime)
r := NewRemindersProvider(clock, opts)
APILevel: apiLevel,
})
store := daprt.NewFakeStateStore()
r.SetStateStoreProviderFn(func() (internal.TransactionalStateStore, error) {
return store, nil
Expand Down Expand Up @@ -497,12 +499,14 @@ func newTestRemindersWithMockAndActorMetadataPartition() *reminders {
}
}
}
opts := internal.RemindersProviderOpts{
clock := clocktesting.NewFakeClock(startOfTime)
apiLevel := &atomic.Uint32{}
apiLevel.Store(internal.ActorAPILevel)
r := NewRemindersProvider(clock, NewRemindersProviderOpts{
StoreName: "testStore",
Config: conf,
}
clock := clocktesting.NewFakeClock(startOfTime)
r := NewRemindersProvider(clock, opts)
APILevel: apiLevel,
})
return r.(*reminders)
}

Expand Down

0 comments on commit 72a439c

Please sign in to comment.