Skip to content

Commit

Permalink
Lookout v2 DB Pruner (#2084)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlocamurri committed Feb 8, 2023
1 parent dfd9165 commit 6c19460
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 4 deletions.
36 changes: 36 additions & 0 deletions cmd/lookoutv2/main.go
Expand Up @@ -9,17 +9,20 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/database"
"github.com/armadaproject/armada/internal/lookoutv2"
"github.com/armadaproject/armada/internal/lookoutv2/configuration"
"github.com/armadaproject/armada/internal/lookoutv2/pruner"
"github.com/armadaproject/armada/internal/lookoutv2/schema"
)

const (
CustomConfigLocation string = "config"
MigrateDatabase = "migrateDatabase"
PruneDatabase = "pruneDatabase"
)

func init() {
Expand All @@ -29,6 +32,7 @@ func init() {
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)",
)
pflag.Bool(MigrateDatabase, false, "Migrate database instead of running server")
pflag.Bool(PruneDatabase, false, "Prune database of old jobs instead of running server")
pflag.Parse()
}

Expand Down Expand Up @@ -70,6 +74,32 @@ func migrate(ctx context.Context, config configuration.LookoutV2Configuration) {
}
}

func prune(ctx context.Context, config configuration.LookoutV2Configuration) {
db, err := database.OpenPgxConn(config.Postgres)
if err != nil {
panic(err)
}

if config.PrunerConfig.Timeout <= 0 {
panic("timeout must be greater than 0")
}
if config.PrunerConfig.ExpireAfter <= 0 {
panic("expireAfter must be greater than 0")
}
if config.PrunerConfig.BatchSize <= 0 {
panic("batchSize must be greater than 0")
}
log.Infof("expireAfter: %v, batchSize: %v, timeout: %v",
config.PrunerConfig.ExpireAfter, config.PrunerConfig.BatchSize, config.PrunerConfig.Timeout)

ctxTimeout, cancel := context.WithTimeout(ctx, config.PrunerConfig.Timeout)
defer cancel()
err = pruner.PruneDb(ctxTimeout, db, config.PrunerConfig.ExpireAfter, config.PrunerConfig.BatchSize, clock.RealClock{})
if err != nil {
panic(err)
}
}

func main() {
common.ConfigureLogging()
common.BindCommandlineArguments()
Expand All @@ -89,6 +119,12 @@ func main() {
return
}

if viper.GetBool(PruneDatabase) {
log.Info("Pruning database")
prune(ctx, config)
return
}

err := lookoutv2.Serve(config)
if err != nil {
log.Error(err)
Expand Down
4 changes: 4 additions & 0 deletions config/lookoutv2/config.yaml
Expand Up @@ -14,3 +14,7 @@ postgres:
password: psw
dbname: postgres
sslmode: disable
prunerConfig:
expireAfter: 1008h # 42 days, 6 weeks
timeout: 1h
batchSize: 1000
58 changes: 58 additions & 0 deletions deployment/lookout-v2/templates/cronjob.yaml
@@ -0,0 +1,58 @@
{{ if .Values.dbPruningEnabled}}
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: lookout-v2-db-pruner
labels:
{{- include "lookout_v2.labels.all" . | nindent 4 }}
spec:
schedule: {{ .Values.dbPruneSchedule | default "@hourly" | quote }}
jobTemplate:
metadata:
name: lookout-v2-db-pruner
labels:
{{- include "lookout_v2.labels.all" . | nindent 8 }}
spec:
template:
metadata:
name: lookout-v2-db-pruner
labels:
{{- include "lookout_v2.labels.all" . | nindent 12 }}
spec:
restartPolicy: Never
serviceAccountName: {{ .Values.customServiceAccount | default (include "lookout_v2.name" .) }}
securityContext:
runAsUser: 1000
runAsGroup: 2000
containers:
- name: lookout-v2
imagePullPolicy: IfNotPresent
image: {{ .Values.image.repository }}:{{ required "A value is required for .Values.image.tag" .Values.image.tag }}
args:
- --pruneDatabase
- --config
- /config/application_config.yaml
{{- if .Values.env }}
env:
{{- toYaml .Values.env | nindent 16 -}}
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 16 }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
subPath: {{ include "lookout_v2.config.filename" . }}
readOnly: true
{{- if .Values.additionalVolumeMounts }}
{{- toYaml .Values.additionalVolumeMounts | nindent 16 -}}
{{- end }}
securityContext:
allowPrivilegeEscalation: false
volumes:
- name: user-config
secret:
secretName: {{ include "lookout_v2.config.name" . }}
{{- if .Values.additionalVolumes }}
{{- toYaml .Values.additionalVolumes | nindent 12 }}
{{- end }}
{{- end }}
3 changes: 2 additions & 1 deletion deployment/lookout-v2/values.yaml
@@ -1,5 +1,5 @@
image:
repository: gresearchdev/armada-lookout
repository: gresearchdev/armada-lookout-v2-dev
tag: LATEST
resources:
limits:
Expand All @@ -11,6 +11,7 @@ resources:
additionalLabels: {}
additionalVolumeMounts: []
additionalVolumes: []
dbPruningEnabled: true
terminationGracePeriodSeconds: 5
replicas: 1
strategy:
Expand Down
2 changes: 1 addition & 1 deletion deployment/lookout/templates/cronjob.yaml
@@ -1,5 +1,5 @@
{{ if .Values.dbPruningEnabled}}
apiVersion: batch/v1beta1
apiVersion: batch/v1
kind: CronJob
metadata:
name: lookout-db-pruner
Expand Down
13 changes: 13 additions & 0 deletions internal/common/util/list.go
Expand Up @@ -45,3 +45,16 @@ func Filter[T any](list []T, predicate func(val T) bool) []T {
}
return out
}

func Concat[T any](slices ...[]T) []T {
total := 0
for _, s := range slices {
total += len(s)
}
result := make([]T, total)
var i int
for _, s := range slices {
i += copy(result[i:], s)
}
return result
}
14 changes: 14 additions & 0 deletions internal/common/util/list_test.go
Expand Up @@ -32,3 +32,17 @@ func TestFilter_IncludeAllFilter(t *testing.T) {
output := Filter(input, includeAll)
assert.Equal(t, expectedOutput, output)
}

func TestConcat_Empty(t *testing.T) {
output := Concat([]int{})
assert.Equal(t, []int{}, output)
}

func TestConcat(t *testing.T) {
output := Concat(
[]int{1, 2, 3},
[]int{4, 5, 6, 7},
[]int{8, 9, 10, 11, 12},
)
assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, output)
}
14 changes: 13 additions & 1 deletion internal/lookoutv2/configuration/types.go
@@ -1,10 +1,22 @@
package configuration

import "github.com/armadaproject/armada/internal/armada/configuration"
import (
"time"

"github.com/armadaproject/armada/internal/armada/configuration"
)

type LookoutV2Configuration struct {
ApiPort int
CorsAllowedOrigins []string

Postgres configuration.PostgresConfig

PrunerConfig PrunerConfig
}

type PrunerConfig struct {
ExpireAfter time.Duration
Timeout time.Duration
BatchSize int
}
103 changes: 103 additions & 0 deletions internal/lookoutv2/pruner/pruner.go
@@ -0,0 +1,103 @@
package pruner

import (
"context"
"time"

"github.com/jackc/pgx/v4"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/clock"
)

func PruneDb(ctx context.Context, db *pgx.Conn, keepAfterCompletion time.Duration, batchLimit int, clock clock.Clock) error {
now := clock.Now()
cutOffTime := now.Add(-keepAfterCompletion)
totalJobsToDelete, err := createJobIdsToDeleteTempTable(ctx, db, cutOffTime)
if err != nil {
return errors.WithStack(err)
}
if totalJobsToDelete == 0 {
log.Infof("Found no jobs to be deleted. Exiting")
return nil
}

_, err = db.Exec(ctx, "CREATE TEMP TABLE batch (job_id TEXT);")
if err != nil {
return errors.WithStack(err)
}

jobsDeleted := 0
keepGoing := true
for keepGoing {
batchStart := clock.Now()
batchSize := 0
err = db.BeginTxFunc(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
batchSize, err = deleteBatch(ctx, tx, batchLimit)
if err != nil {
return err
}
if batchSize == 0 {
keepGoing = false
return nil
}
return nil
})
if err != nil {
return errors.Wrap(err, "error deleting batch from postgres")
}
batchDuration := clock.Since(batchStart)
jobsDeleted += batchSize
log.Infof("Deleted %d jobs in %s. Deleted %d jobs out of %d", batchSize, batchDuration, jobsDeleted, totalJobsToDelete)
}
totalTime := clock.Since(now)
log.Infof("Total jobs deleted: %d, time taken: %v", jobsDeleted, totalTime)
return nil
}

// Returns total number of jobs to delete
func createJobIdsToDeleteTempTable(ctx context.Context, db *pgx.Conn, cutOffTime time.Time) (int, error) {
_, err := db.Exec(ctx, `
CREATE TEMP TABLE job_ids_to_delete AS (
SELECT job_id FROM job
WHERE last_transition_time < $1
)`, cutOffTime)
if err != nil {
return -1, errors.WithStack(err)
}
totalJobsToDelete := 0
err = db.QueryRow(ctx, "SELECT COUNT(*) FROM job_ids_to_delete").Scan(&totalJobsToDelete)
if err != nil {
return -1, errors.WithStack(err)
}
return totalJobsToDelete, nil
}

func deleteBatch(ctx context.Context, tx pgx.Tx, batchLimit int) (int, error) {
_, err := tx.Exec(ctx, "INSERT INTO batch (job_id) SELECT job_id FROM job_ids_to_delete LIMIT $1;", batchLimit)
if err != nil {
return -1, err
}
var batchSize int
err = tx.QueryRow(ctx, "SELECT COUNT(*) FROM batch").Scan(&batchSize)
if err != nil {
return -1, err
}
if batchSize == 0 {
return 0, nil
}
_, err = tx.Exec(ctx, `
DELETE FROM job WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_run WHERE job_id in (SELECT job_id from batch);
DELETE FROM user_annotation_lookup WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_ids_to_delete WHERE job_id in (SELECT job_id from batch);
TRUNCATE TABLE batch;`)
if err != nil {
return -1, err
}
return batchSize, nil
}

0 comments on commit 6c19460

Please sign in to comment.