Skip to content

Commit

Permalink
Merge pull request #313 from cschleiden/start-cache-eviction
Browse files Browse the repository at this point in the history
Start cache eviction
  • Loading branch information
cschleiden committed Jan 25, 2024
2 parents 2e4b887 + 5b0ce1e commit 97a13f2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
4 changes: 4 additions & 0 deletions internal/worker/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (atw *ActivityTaskWorker) Complete(ctx context.Context, event *history.Even
return nil
}

func (atw *ActivityTaskWorker) Start(ctx context.Context) error {
return nil
}

func (atw *ActivityTaskWorker) Execute(ctx context.Context, task *backend.ActivityTask) (*history.Event, error) {
a := task.Event.Attributes.(*history.ActivityScheduledAttributes)
ametrics := atw.backend.Metrics().WithTags(metrics.Tags{metrickeys.ActivityName: a.Name})
Expand Down
5 changes: 5 additions & 0 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type TaskWorker[Task, Result any] interface {
Start(context.Context) error
Get(context.Context) (*Task, error)
Extend(context.Context, *Task) error
Execute(context.Context, *Task) (*Result, error)
Expand Down Expand Up @@ -55,6 +56,10 @@ func NewWorker[Task, TaskResult any](
}

func (w *Worker[Task, TaskResult]) Start(ctx context.Context) error {
if err := w.tw.Start(ctx); err != nil {
return fmt.Errorf("starting task worker: %w", err)
}

w.pollersWg.Add(w.options.Pollers)

for i := 0; i < w.options.Pollers; i++ {
Expand Down
8 changes: 8 additions & 0 deletions internal/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ type WorkflowTaskWorker struct {
logger *slog.Logger
}

func (wtw *WorkflowTaskWorker) Start(ctx context.Context) error {
if wtw.cache != nil {
go wtw.cache.StartEviction(ctx)
}

return nil
}

// Complete implements TaskWorker.
func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *workflow.ExecutionResult, t *backend.WorkflowTask) error {
state := result.State
Expand Down

0 comments on commit 97a13f2

Please sign in to comment.