Skip to content

Commit

Permalink
Merge pull request #8736 from evanchaoli/optimize-db-notify
Browse files Browse the repository at this point in the history
Optimize db notify.
  • Loading branch information
Spimtav committed Jul 18, 2023
2 parents 90f8dab + 76af034 commit 3793327
Show file tree
Hide file tree
Showing 12 changed files with 609 additions and 35 deletions.
19 changes: 19 additions & 0 deletions atc/atccmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ type RunCommand struct {
DefaultTaskTimeout time.Duration `long:"default-task-timeout" description:"Default timeout of task steps"`

NumGoroutineThreshold int `long:"num-goroutine-threshold" description:"When number of goroutines reaches to this threshold, then slow down current ATC. This helps distribute workloads across ATCs evenly."`

DBNotificationBusQueueSize int `long:"db-notification-bus-queue-size" default:"10000" description:"DB notification bus queue size, default is 10000. If UI often misses loading running build logs, then consider to increase the queue size."`
}

type Migration struct {
Expand Down Expand Up @@ -553,6 +555,11 @@ func (cmd *RunCommand) Runner(positionalArguments []string) (ifrit.Runner, error
atc.LoadBaseResourceTypeDefaults(defaults)
}

err = db.SetNotificationBusQueueSize(cmd.DBNotificationBusQueueSize)
if err != nil {
return nil, err
}

//FIXME: These only need to run once for the entire binary. At the moment,
//they rely on state of the command.
db.SetupConnectionRetryingDriver(
Expand Down Expand Up @@ -1078,6 +1085,11 @@ func (cmd *RunCommand) backendComponents(
policyChecker,
)

buildEventWatcher, err := db.NewBuildBeingWatchedMarker(logger, dbConn, db.DefaultBuildBeingWatchedMarkDuration, clock.NewClock())
if err != nil {
return nil, err
}

// In case that a user configures resource-checking-interval, but forgets to
// configure resource-with-webhook-checking-interval, keep both checking-
// intervals consistent. Even if both intervals are configured, there is no
Expand Down Expand Up @@ -1155,6 +1167,13 @@ func (cmd *RunCommand) backendComponents(
syslogDrainConfigured,
),
},
{
Component: atc.Component{
Name: atc.ComponentBeingWatchedBuildMarker,
Interval: 10 * time.Minute,
},
Runnable: buildEventWatcher,
},
}

if syslogDrainConfigured {
Expand Down
1 change: 1 addition & 0 deletions atc/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
ComponentCollectorWorkers = "collector_workers"
ComponentCollectorPipelines = "collector_pipelines"
ComponentPipelinePauser = "pipeline_pauser"
ComponentBeingWatchedBuildMarker = "being_watched_build_marker"
)

type Component struct {
Expand Down
16 changes: 5 additions & 11 deletions atc/db/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,22 +1142,14 @@ func (b *build) Preparation() (BuildPreparation, bool, error) {
}

func (b *build) Events(from uint) (EventSource, error) {
notifier, err := newConditionNotifier(b.conn.Bus(), buildEventsChannel(b.id), func() (bool, error) {
return true, nil
})
if err != nil {
return nil, err
}

return newBuildEventSource(
b.id,
b.eventsTable(),
b.conn,
notifier,
from,
func(tx Tx, buildID int) (bool, error) {
completed := false
err = psql.Select("completed").
err := psql.Select("completed").
From("builds").
Where(sq.Eq{"id": buildID}).
RunWith(tx).
Expand All @@ -1168,7 +1160,7 @@ func (b *build) Events(from uint) (EventSource, error) {
}
return completed, nil
},
), nil
)
}

func (b *build) SaveEvent(event atc.Event) error {
Expand Down Expand Up @@ -2102,8 +2094,10 @@ func buildStartedChannel() string {
return atc.ComponentBuildTracker
}

const buildEventChannelPrefix = "build_events_"

func buildEventsChannel(buildID int) string {
return fmt.Sprintf("build_events_%d", buildID)
return fmt.Sprintf("%s%d", buildEventChannelPrefix, buildID)
}

func buildAbortChannel(buildID int) string {
Expand Down
224 changes: 224 additions & 0 deletions atc/db/build_being_watched_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package db

import (
"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"code.cloudfoundry.org/lager/v3/lagerctx"
"context"
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"strconv"
"strings"
"sync"
"time"
)

// beingWatchedBuildEventChannelMap stores build event notifier channel names
// for those builds that are being watched. The way to know if a build is being
// watched is that, when a build is watched on UI, then build.Events() will be
// called. So that we can mark a build as BeingWatched from build.Events(). Note
// that, as build event notification should only be sent from running builds,
// this map should only store running builds' event channel names.
type beingWatchedBuildEventChannelMap struct {
sync.RWMutex
internal map[string]time.Time
}

var (
beingWatchedBuildEventMap *beingWatchedBuildEventChannelMap
once sync.Once
)

// NewBeingWatchedBuildEventChannelMap returns a singleton instance of
// beingWatchedBuildEventChannelMap.
func NewBeingWatchedBuildEventChannelMap() *beingWatchedBuildEventChannelMap {
once.Do(func() {
beingWatchedBuildEventMap = &beingWatchedBuildEventChannelMap{
internal: make(map[string]time.Time),
}
})
return beingWatchedBuildEventMap
}

func (m *beingWatchedBuildEventChannelMap) load(key string) (value time.Time, ok bool) {
m.RLock()
result, ok := m.internal[key]
m.RUnlock()
return result, ok
}

func (m *beingWatchedBuildEventChannelMap) delete(key string) {
m.Lock()
delete(m.internal, key)
m.Unlock()
}

func (m *beingWatchedBuildEventChannelMap) store(key string, value time.Time) {
m.Lock()
m.internal[key] = value
m.Unlock()
}

func (m *beingWatchedBuildEventChannelMap) clone() map[string]time.Time {
c := map[string]time.Time{}
m.RLock()
for k, v := range m.internal {
c[k] = v
}
m.RUnlock()
return c
}

func (m *beingWatchedBuildEventChannelMap) Mark(buildEventChannel string, t time.Time) {
m.store(buildEventChannel, t)
}

// BeingWatched returns true if given buildEventChannel is being watched.
func (m *beingWatchedBuildEventChannelMap) BeingWatched(buildEventChannel string) bool {
_, ok := beingWatchedBuildEventMap.load(buildEventChannel)
return ok
}

// Clean deletes entries based on conditionFunc returning true. To reduce holding
// lock, it will clone the internal map, and determine which item should be deleted
// based on cloned data.
func (m *beingWatchedBuildEventChannelMap) Clean(conditionFunc func(k string, v time.Time) bool) {
clone := m.clone()
var toClean []string
for k, v := range clone {
do := conditionFunc(k, v)
if do {
toClean = append(toClean, k)
}
}

for _, k := range toClean {
m.delete(k)
}
}

const beingWatchedNotifyChannelName = "being_watched_build_event_channel"

// MarkBuildAsBeingWatched marks a build as BeingWatched by sending a db
// notification to channel beingWatchedNotifyChannelName with payload of
// the build's event channel name. This is because a build may be watched
// from any ATCs, while the build may be running in a separate ATC.
func MarkBuildAsBeingWatched(db Conn, buildEventChannel string) error {
_, err := db.Exec(fmt.Sprintf("NOTIFY %s, '%s'", beingWatchedNotifyChannelName, buildEventChannel))
if err != nil {
return err
}
return nil
}

// BuildBeingWatchedMarker listens to channel beingWatchedNotifyChannelName and
// mark builds as BeingWatched accordingly in a singleton map. And it periodically
// cleans up the map.
type BuildBeingWatchedMarker struct {
conn Conn
dataRetainDuration time.Duration
watchedMap *beingWatchedBuildEventChannelMap
notifier chan Notification
clock clock.Clock
wg *sync.WaitGroup
stop chan struct{}
}

const DefaultBuildBeingWatchedMarkDuration = 2 * time.Hour

func NewBuildBeingWatchedMarker(logger lager.Logger, conn Conn, dataRetainDuration time.Duration, clock clock.Clock) (*BuildBeingWatchedMarker, error) {
if dataRetainDuration < 0 {
return nil, errors.New("data retain duration must be positive")
}

w := &BuildBeingWatchedMarker{
conn: conn,
dataRetainDuration: dataRetainDuration,
watchedMap: NewBeingWatchedBuildEventChannelMap(),
clock: clock,
wg: new(sync.WaitGroup),
stop: make(chan struct{}, 1),
}

notifier, err := w.conn.Bus().Listen(beingWatchedNotifyChannelName, 100)
if err != nil {
return nil, err
}
w.notifier = notifier

w.wg.Add(1)
go func(logger lager.Logger, w *BuildBeingWatchedMarker) {
defer w.wg.Done()
defer w.conn.Bus().Unlisten(beingWatchedNotifyChannelName, notifier)

for {
select {
case notification := <-w.notifier:
beingWatchedBuildEventMap.Mark(notification.Payload, w.clock.Now())
logger.Debug("start-to-watch-build", lager.Data{"channel": notification.Payload})
case <-w.stop:
return
}
}
}(logger, w)

return w, nil
}

// Run is periodically invoked to clean the internal map. We have no way to
// know if a build is no longer watched by any client, so cleanup strategy
// is, after a build is added to the map, we keep it in the map for 2 hours.
// After 2 hours, we will query its status. If it's completed, then we delete
// it from the map. If we cannot find the build, mostly like that's a check
// build, as a check build should never last 2 hours, so we just delete it
// from the map.
func (bt *BuildBeingWatchedMarker) Run(ctx context.Context) error {
logger := lagerctx.FromContext(ctx)

logger.Debug("start")
defer logger.Debug("done")

bt.watchedMap.Clean(func(k string, v time.Time) bool {
if v.After(bt.clock.Now().Add(-bt.dataRetainDuration)) {
return false
}
return bt.isBuildCompleted(k)
})

return nil
}

func (bt *BuildBeingWatchedMarker) Drain(ctx context.Context) {
logger := lagerctx.FromContext(ctx)

logger.Debug("start")
defer logger.Debug("done")

close(bt.stop)
bt.wg.Wait()
}

func (bt *BuildBeingWatchedMarker) isBuildCompleted(channel string) bool {
strBuildID := strings.TrimPrefix(channel, buildEventChannelPrefix)
buildID, err := strconv.Atoi(strBuildID)
if err != nil {
// If build id is not an integer, then we consider a wrong channel,
// so return true to delete it.
return true
}

completed := false
err = psql.Select("completed").
From("builds").
Where(sq.Eq{"id": buildID}).
RunWith(bt.conn).
QueryRow().
Scan(&completed)
if err != nil {
// If we cannot get a build's status, then we consider the build is
// no longer being watched.
return true
}
return completed
}

0 comments on commit 3793327

Please sign in to comment.