Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use generics in executors #4108

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 17 additions & 18 deletions core/executors/bulkexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type (
// A BulkExecutor is an executor that can execute tasks on either requirement meets:
// 1. up to given size of tasks
// 2. flush interval time elapsed
BulkExecutor struct {
executor *PeriodicalExecutor
container *bulkContainer
BulkExecutor[T any] struct {
executor *PeriodicalExecutor[T]
container *bulkContainer[T]
}

bulkOptions struct {
Expand All @@ -23,37 +23,37 @@ type (
)

// NewBulkExecutor returns a BulkExecutor.
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
func NewBulkExecutor[T any](execute Execute[T], opts ...BulkOption) *BulkExecutor[T] {
options := newBulkOptions()
for _, opt := range opts {
opt(&options)
}

container := &bulkContainer{
container := &bulkContainer[T]{
execute: execute,
maxTasks: options.cachedTasks,
}
executor := &BulkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
executor := &BulkExecutor[T]{
executor: NewPeriodicalExecutor[T](options.flushInterval, container),
container: container,
}

return executor
}

// Add adds task into be.
func (be *BulkExecutor) Add(task any) error {
func (be *BulkExecutor[T]) Add(task T) error {
be.executor.Add(task)
return nil
}

// Flush forces be to flush and execute tasks.
func (be *BulkExecutor) Flush() {
func (be *BulkExecutor[T]) Flush() {
be.executor.Flush()
}

// Wait waits be to done with the task execution.
func (be *BulkExecutor) Wait() {
func (be *BulkExecutor[T]) Wait() {
be.executor.Wait()
}

Expand All @@ -78,23 +78,22 @@ func newBulkOptions() bulkOptions {
}
}

type bulkContainer struct {
tasks []any
execute Execute
type bulkContainer[T any] struct {
tasks []T
execute Execute[T]
maxTasks int
}

func (bc *bulkContainer) AddTask(task any) bool {
func (bc *bulkContainer[T]) AddTask(task T) bool {
bc.tasks = append(bc.tasks, task)
return len(bc.tasks) >= bc.maxTasks
}

func (bc *bulkContainer) Execute(tasks any) {
vals := tasks.([]any)
bc.execute(vals)
func (bc *bulkContainer[T]) Execute(tasks []T) {
bc.execute(tasks)
}

func (bc *bulkContainer) RemoveAll() any {
func (bc *bulkContainer[T]) RemoveAll() []T {
tasks := bc.tasks
bc.tasks = nil
return tasks
Expand Down
48 changes: 25 additions & 23 deletions core/executors/chunkexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type (
// A ChunkExecutor is an executor to execute tasks when either requirement meets:
// 1. up to given chunk size
// 2. flush interval elapsed
ChunkExecutor struct {
executor *PeriodicalExecutor
container *chunkContainer
ChunkExecutor[T any] struct {
executor *PeriodicalExecutor[chunk[T]]
container *chunkContainer[T]
}

chunkOptions struct {
Expand All @@ -23,40 +23,40 @@ type (
)

// NewChunkExecutor returns a ChunkExecutor.
func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
func NewChunkExecutor[T any](execute Execute[T], opts ...ChunkOption) *ChunkExecutor[T] {
options := newChunkOptions()
for _, opt := range opts {
opt(&options)
}

container := &chunkContainer{
container := &chunkContainer[T]{
execute: execute,
maxChunkSize: options.chunkSize,
}
executor := &ChunkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
executor := &ChunkExecutor[T]{
executor: NewPeriodicalExecutor[chunk[T]](options.flushInterval, container),
container: container,
}

return executor
}

// Add adds task with given chunk size into ce.
func (ce *ChunkExecutor) Add(task any, size int) error {
ce.executor.Add(chunk{
func (ce *ChunkExecutor[T]) Add(task T, size int) error {
ce.executor.Add(chunk[T]{
val: task,
size: size,
})
return nil
}

// Flush forces ce to flush and execute tasks.
func (ce *ChunkExecutor) Flush() {
func (ce *ChunkExecutor[T]) Flush() {
ce.executor.Flush()
}

// Wait waits the execution to be done.
func (ce *ChunkExecutor) Wait() {
func (ce *ChunkExecutor[T]) Wait() {
ce.executor.Wait()
}

Expand All @@ -81,33 +81,35 @@ func newChunkOptions() chunkOptions {
}
}

type chunkContainer struct {
tasks []any
execute Execute
type chunkContainer[T any] struct {
tasks []chunk[T]
execute Execute[T]
size int
maxChunkSize int
}

func (bc *chunkContainer) AddTask(task any) bool {
ck := task.(chunk)
bc.tasks = append(bc.tasks, ck.val)
bc.size += ck.size
func (bc *chunkContainer[T]) AddTask(task chunk[T]) bool {
bc.tasks = append(bc.tasks, task)
bc.size += task.size
return bc.size >= bc.maxChunkSize
}

func (bc *chunkContainer) Execute(tasks any) {
vals := tasks.([]any)
func (bc *chunkContainer[T]) Execute(tasks []chunk[T]) {
vals := make([]T, 0, len(tasks))
for _, elem := range tasks {
vals = append(vals, elem.val)
}
bc.execute(vals)
}

func (bc *chunkContainer) RemoveAll() any {
func (bc *chunkContainer[T]) RemoveAll() []chunk[T] {
tasks := bc.tasks
bc.tasks = nil
bc.size = 0
return tasks
}

type chunk struct {
val any
type chunk[T any] struct {
val T
size int
}
58 changes: 23 additions & 35 deletions core/executors/periodicalexecutor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package executors

import (
"reflect"
"sync"
"sync/atomic"
"time"
Expand All @@ -18,21 +17,21 @@ const idleRound = 10
type (
// TaskContainer interface defines a type that can be used as the underlying
// container that used to do periodical executions.
TaskContainer interface {
TaskContainer[T any] interface {
// AddTask adds the task into the container.
// Returns true if the container needs to be flushed after the addition.
AddTask(task any) bool
AddTask(task T) bool
// Execute handles the collected tasks by the container when flushing.
Execute(tasks any)
Execute(tasks []T)
// RemoveAll removes the contained tasks, and return them.
RemoveAll() any
RemoveAll() []T
}

// A PeriodicalExecutor is an executor that periodically execute tasks.
PeriodicalExecutor struct {
commander chan any
PeriodicalExecutor[T any] struct {
commander chan []T
interval time.Duration
container TaskContainer
container TaskContainer[T]
waitGroup sync.WaitGroup
// avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
wgBarrier syncx.Barrier
Expand All @@ -45,10 +44,10 @@ type (
)

// NewPeriodicalExecutor returns a PeriodicalExecutor with given interval and container.
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
func NewPeriodicalExecutor[T any](interval time.Duration, container TaskContainer[T]) *PeriodicalExecutor[T] {
executor := &PeriodicalExecutor[T]{
// buffer 1 to let the caller go quickly
commander: make(chan any, 1),
commander: make(chan []T, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
Expand All @@ -64,39 +63,39 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
}

// Add adds tasks into pe.
func (pe *PeriodicalExecutor) Add(task any) {
func (pe *PeriodicalExecutor[T]) Add(task T) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}

// Flush forces pe to execute tasks.
func (pe *PeriodicalExecutor) Flush() bool {
func (pe *PeriodicalExecutor[T]) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() any {
return pe.executeTasks(func() []T {
pe.lock.Lock()
defer pe.lock.Unlock()
return pe.container.RemoveAll()
}())
}

// Sync lets caller run fn thread-safe with pe, especially for the underlying container.
func (pe *PeriodicalExecutor) Sync(fn func()) {
func (pe *PeriodicalExecutor[T]) Sync(fn func()) {
pe.lock.Lock()
defer pe.lock.Unlock()
fn()
}

// Wait waits the execution to be done.
func (pe *PeriodicalExecutor) Wait() {
func (pe *PeriodicalExecutor[T]) Wait() {
pe.Flush()
pe.wgBarrier.Guard(func() {
pe.waitGroup.Wait()
})
}

func (pe *PeriodicalExecutor) addAndCheck(task any) (any, bool) {
func (pe *PeriodicalExecutor[T]) addAndCheck(task T) ([]T, bool) {
pe.lock.Lock()
defer func() {
if !pe.guarded {
Expand All @@ -115,7 +114,7 @@ func (pe *PeriodicalExecutor) addAndCheck(task any) (any, bool) {
return nil, false
}

func (pe *PeriodicalExecutor) backgroundFlush() {
func (pe *PeriodicalExecutor[T]) backgroundFlush() {
go func() {
// flush before quit goroutine to avoid missing tasks
defer pe.Flush()
Expand Down Expand Up @@ -147,17 +146,17 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
}()
}

func (pe *PeriodicalExecutor) doneExecution() {
func (pe *PeriodicalExecutor[T]) doneExecution() {
pe.waitGroup.Done()
}

func (pe *PeriodicalExecutor) enterExecution() {
func (pe *PeriodicalExecutor[T]) enterExecution() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
}

func (pe *PeriodicalExecutor) executeTasks(tasks any) bool {
func (pe *PeriodicalExecutor[T]) executeTasks(tasks []T) bool {
defer pe.doneExecution()

ok := pe.hasTasks(tasks)
Expand All @@ -170,22 +169,11 @@ func (pe *PeriodicalExecutor) executeTasks(tasks any) bool {
return ok
}

func (pe *PeriodicalExecutor) hasTasks(tasks any) bool {
if tasks == nil {
return false
}

val := reflect.ValueOf(tasks)
switch val.Kind() {
case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
return val.Len() > 0
default:
// unknown type, let caller execute it
return true
}
func (pe *PeriodicalExecutor[T]) hasTasks(tasks []T) bool {
return len(tasks) > 0
}

func (pe *PeriodicalExecutor) shallQuit(last time.Duration) (stop bool) {
func (pe *PeriodicalExecutor[T]) shallQuit(last time.Duration) (stop bool) {
if timex.Since(last) <= pe.interval*idleRound {
return
}
Expand Down