Skip to content

Commit

Permalink
Improve capped collection cleanup (#4118)
Browse files Browse the repository at this point in the history
Signed-off-by: Wazir Ahmed <wazirahmedf@gmail.com>
  • Loading branch information
wazir-ahmed committed Mar 19, 2024
1 parent 419f9a0 commit aea56ea
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 53 deletions.
46 changes: 33 additions & 13 deletions integration/commands_administration_test.go
Expand Up @@ -1711,47 +1711,67 @@ func TestCommandsAdministrationCompactForce(t *testing.T) {
func TestCommandsAdministrationCompactCapped(t *testing.T) {
t.Parallel()

ctx, coll := setup.Setup(t)

for name, tc := range map[string]struct { //nolint:vet // for readability
force bool
cleanupPercentage uint8 // optional, default value is 20
maxDocuments int64
sizeInBytes int64
insertDocuments int32
expectedDocuments int64 // insertDocuments - insertDocuments*0.2 (cleanup 20%) + 1 (extra insert after compact)
expectedDocuments int64

skipForMongoDB string // optional, skip test for MongoDB backend with a specific reason
}{
"OverflowDocs": {
force: true,
maxDocuments: 10,
sizeInBytes: 100000,
insertDocuments: 12, // overflows capped collection max documents
force: true,
maxDocuments: 10,
sizeInBytes: 100000,
insertDocuments: 12,
// cleanup will be based on max documents
// maxDocuments + 1 (extra insert after compact)
expectedDocuments: 11,
},
"OverflowSize": {
force: true,
maxDocuments: 1000,
sizeInBytes: 256,
insertDocuments: 20, // overflows capped collection size
force: true,
sizeInBytes: 256,
insertDocuments: 20,
// cleanup will be based on size
// [insertDocuments * 0.2 (cleanup 20%)] + 1 (extra insert after compact)
expectedDocuments: 17,
},
"Cleanup10Percent": {
force: true,
cleanupPercentage: 10,
sizeInBytes: 50,
insertDocuments: 5,
// cleanup will be based on size
// [insertDocuments * 0.1 (cleanup 10%) ≈ 0] + 1 (extra insert after compact)
expectedDocuments: 6,
skipForMongoDB: "MongoDB cleans up collection precisely close to sizeInBytes, not based on percentage",
},
"ForceFalse": {
force: false,
maxDocuments: 10,
sizeInBytes: 100000,
insertDocuments: 12, // overflows capped collection max documents
expectedDocuments: 11,
skipForMongoDB: "Only {force:true} can be run on active replica set primary",
skipForMongoDB: "Compact command with {force:false} cannot be executed on active replica set primary",
},
} {
name, tc := name, tc
t.Run(name, func(t *testing.T) {
t.Parallel()

if tc.skipForMongoDB != "" {
setup.SkipForMongoDB(t, tc.skipForMongoDB)
}

t.Parallel()
beOpts := setup.NewBackendOpts()
if tc.cleanupPercentage != 0 {
beOpts.CappedCleanupPercentage = tc.cleanupPercentage
}

s := setup.SetupWithOpts(t, &setup.SetupOpts{BackendOptions: beOpts})
ctx, coll := s.Ctx, s.Collection

collName := testutil.CollectionName(t) + name

Expand Down
8 changes: 5 additions & 3 deletions integration/setup/listener.go
Expand Up @@ -95,7 +95,7 @@ func listenerMongoDBURI(tb testtb.TB, hostPort, unixSocketPath string, tlsAndAut

// setupListener starts in-process FerretDB server that runs until ctx is canceled.
// It returns basic MongoDB URI for that listener.
func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string {
func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger, opts *BackendOpts) string {
tb.Helper()

_, span := otel.Tracer("").Start(ctx, "setupListener")
Expand Down Expand Up @@ -168,6 +168,8 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string
sp, err := state.NewProvider("")
require.NoError(tb, err)

require.NotNil(tb, opts)

handlerOpts := &registry.NewHandlerOpts{
Logger: logger,
ConnMetrics: listenerMetrics.ConnMetrics,
Expand All @@ -180,8 +182,8 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string

TestOpts: registry.TestOpts{
DisablePushdown: *disablePushdownF,
CappedCleanupPercentage: 20,
CappedCleanupInterval: 0,
CappedCleanupPercentage: opts.CappedCleanupPercentage,
CappedCleanupInterval: opts.CappedCleanupInterval,
EnableNewAuth: true,
},
}
Expand Down
27 changes: 26 additions & 1 deletion integration/setup/setup.go
Expand Up @@ -24,6 +24,7 @@ import (
"runtime/trace"
"slices"
"strings"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -94,6 +95,18 @@ type SetupOpts struct {

// SetupUser true creates a user and returns an authenticated client.
SetupUser bool

// Options to override default backend configuration.
BackendOptions *BackendOpts
}

// BackendOpts represents backend configuration used for test setup.
type BackendOpts struct {
// Capped collections cleanup interval.
CappedCleanupInterval time.Duration

// Percentage of documents to cleanup for capped collections. If not set, defaults to 20.
CappedCleanupPercentage uint8
}

// SetupResult represents setup results.
Expand All @@ -103,6 +116,14 @@ type SetupResult struct {
MongoDBURI string
}

// NewBackendOpts returns BackendOpts with default values set.
func NewBackendOpts() *BackendOpts {
return &BackendOpts{
CappedCleanupInterval: time.Duration(0),
CappedCleanupPercentage: uint8(20),
}
}

// IsUnixSocket returns true if MongoDB URI is a Unix domain socket.
func (s *SetupResult) IsUnixSocket(tb testtb.TB) bool {
tb.Helper()
Expand Down Expand Up @@ -142,7 +163,11 @@ func SetupWithOpts(tb testtb.TB, opts *SetupOpts) *SetupResult {

uri := *targetURLF
if uri == "" {
uri = setupListener(tb, setupCtx, logger)
if opts.BackendOptions == nil {
opts.BackendOptions = NewBackendOpts()
}

uri = setupListener(tb, setupCtx, logger, opts.BackendOptions)
} else {
uri = toAbsolutePathURI(tb, *targetURLF)
}
Expand Down
2 changes: 1 addition & 1 deletion integration/setup/setup_compat.go
Expand Up @@ -103,7 +103,7 @@ func SetupCompatWithOpts(tb testtb.TB, opts *SetupCompatOpts) *SetupCompatResult

var targetClient *mongo.Client
if *targetURLF == "" {
uri := setupListener(tb, setupCtx, logger)
uri := setupListener(tb, setupCtx, logger, NewBackendOpts())
targetClient = setupClient(tb, setupCtx, uri)
} else {
targetClient = setupClient(tb, setupCtx, *targetURLF)
Expand Down
134 changes: 99 additions & 35 deletions internal/handler/handler.go
Expand Up @@ -246,69 +246,69 @@ func (h *Handler) cleanupAllCappedCollections(ctx context.Context) error {
func (h *Handler) cleanupCappedCollection(ctx context.Context, db backends.Database, cInfo *backends.CollectionInfo, force bool) (int32, int64, error) { //nolint:lll // for readability
must.BeTrue(cInfo.Capped())

var docsDeleted int32
var bytesFreed int64
var statsBefore, statsAfter *backends.CollectionStatsResult

coll, err := db.Collection(cInfo.Name)
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

statsBefore, err := coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true})
statsBefore, err = coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true})
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

h.L.Debug("cleanupCappedCollection: stats before", zap.Any("stats", statsBefore))

if statsBefore.SizeCollection < cInfo.CappedSize && statsBefore.CountDocuments < cInfo.CappedDocuments {
return 0, 0, nil
}
// In order to be more precise w.r.t number of documents getting dropped and to avoid
// deleting too many documents unnecessarily,
//
// - First, drop the surplus documents, if document count exceeds capped configuration.
// - Collect stats again.
// - If collection size still exceeds the capped size, then drop the documents based on
// CappedCleanupPercentage.

res, err := coll.Query(ctx, &backends.QueryParams{
Sort: must.NotFail(types.NewDocument("$natural", int64(1))),
Limit: int64(float64(statsBefore.CountDocuments) * float64(h.CappedCleanupPercentage) / 100),
OnlyRecordIDs: true,
})
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

defer res.Iter.Close()

var recordIDs []int64
for {
var doc *types.Document
if _, doc, err = res.Iter.Next(); err != nil {
if errors.Is(err, iterator.ErrIteratorDone) {
break
}
if count := getDocCleanupCount(cInfo, statsBefore); count > 0 {
err = deleteFirstNDocuments(ctx, coll, count)
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

statsAfter, err = coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true})
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

recordIDs = append(recordIDs, doc.RecordID())
}
h.L.Debug("cleanupCappedCollection: stats after document count reduction", zap.Any("stats", statsAfter))

docsDeleted += int32(count)
bytesFreed += (statsBefore.SizeTotal - statsAfter.SizeTotal)

if len(recordIDs) == 0 {
h.L.Debug("cleanupCappedCollection: no documents to delete")
return 0, 0, nil
statsBefore = statsAfter
}

deleteRes, err := coll.DeleteAll(ctx, &backends.DeleteAllParams{RecordIDs: recordIDs})
if err != nil {
return 0, 0, lazyerrors.Error(err)
if count := getSizeCleanupCount(cInfo, statsBefore, h.CappedCleanupPercentage); count > 0 {
err = deleteFirstNDocuments(ctx, coll, count)
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

docsDeleted += int32(count)
}

if _, err = coll.Compact(ctx, &backends.CompactParams{Full: force}); err != nil {
return 0, 0, lazyerrors.Error(err)
}

statsAfter, err := coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true})
statsAfter, err = coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true})
if err != nil {
return 0, 0, lazyerrors.Error(err)
}

h.L.Debug("cleanupCappedCollection: stats after", zap.Any("stats", statsAfter))
h.L.Debug("cleanupCappedCollection: stats after compact", zap.Any("stats", statsAfter))

bytesFreed := statsBefore.SizeTotal - statsAfter.SizeTotal
bytesFreed += (statsBefore.SizeTotal - statsAfter.SizeTotal)

// There's a possibility that the size of a collection might be greater at the
// end of a compact operation if the collection is being actively written to at
Expand All @@ -317,5 +317,69 @@ func (h *Handler) cleanupCappedCollection(ctx context.Context, db backends.Datab
bytesFreed = 0
}

return deleteRes.Deleted, bytesFreed, nil
return docsDeleted, bytesFreed, nil
}

// getDocCleanupCount returns the number of documents to be deleted during capped collection cleanup
// based on document count of the collection and capped configuration.
func getDocCleanupCount(cInfo *backends.CollectionInfo, cStats *backends.CollectionStatsResult) int64 {
if cInfo.CappedDocuments == 0 || cInfo.CappedDocuments >= cStats.CountDocuments {
return 0
}

return (cStats.CountDocuments - cInfo.CappedDocuments)
}

// getSizeCleanupCount returns the number of documents to be deleted during capped collection cleanup
// based collection size, capped configuration and cleanup percentage.
func getSizeCleanupCount(cInfo *backends.CollectionInfo, cStats *backends.CollectionStatsResult, cleanupPercent uint8) int64 {
if cInfo.CappedSize >= cStats.SizeCollection {
return 0
}

return int64(float64(cStats.CountDocuments) * float64(cleanupPercent) / 100)
}

// deleteFirstNDocuments drops first n documents (based on order of insertion) from the collection.
func deleteFirstNDocuments(ctx context.Context, coll backends.Collection, n int64) error {
if n == 0 {
return nil
}

res, err := coll.Query(ctx, &backends.QueryParams{
Sort: must.NotFail(types.NewDocument("$natural", int64(1))),
Limit: n,
OnlyRecordIDs: true,
})
if err != nil {
return lazyerrors.Error(err)
}

defer res.Iter.Close()

var recordIDs []int64

for {
var doc *types.Document

_, doc, err = res.Iter.Next()
if err != nil {
if errors.Is(err, iterator.ErrIteratorDone) {
break
}

return lazyerrors.Error(err)
}

recordIDs = append(recordIDs, doc.RecordID())
}

if len(recordIDs) > 0 {
_, err := coll.DeleteAll(ctx, &backends.DeleteAllParams{RecordIDs: recordIDs})
if err != nil {
return lazyerrors.Error(err)
}
}

return nil
}

0 comments on commit aea56ea

Please sign in to comment.