Skip to content

Commit

Permalink
Add defragment support for HFE (#13229)
Browse files Browse the repository at this point in the history
## Background
1. All hash objects that contain HFE are referenced by db->hexpires.
2. All fields in a dict hash object with HFE are referenced by an
ebucket.

So when we defrag the hash object or the field in a dict with HFE, we
also need to update the references in them.

## Interface
1. Add a new interface `ebDefragItem`, which can accept a defrag
callback to defrag items in ebuckets, and simultaneously update their
references in the ebucket.

## Mainly changes
1. The key type of dict of hash object is no longer sds, so add new
`activeDefragHfieldDict()` to defrag the dict instead of
`activeDefragSdsDict()`.
2. When we defrag the dict of hash object by using `dictScanDefrag()`,
we always set the defrag callback `defragKey` of `dictDefragFunctions`
to NULL, because we can't reallocate a field with out updating it's
reference in ebuckets.
Instead, we will defrag the field of the dict and update its reference
in the callback `dictScanDefrag` of dictScanFunction().
3. When we defrag the hash robj with HFE, we will use `ebDefragItem` to
defrag the robj and update the reference in db->hexpires.

## TODO:
Defrag ebucket structure incremently, which will be handler in a future
PR.

---------

Co-authored-by: Ozan Tezcan <ozantezcan@gmail.com>
Co-authored-by: Moti Cohen <moti.cohen@redis.com>
  • Loading branch information
3 people committed May 14, 2024
1 parent 5066e6e commit 80be2cc
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 11 deletions.
84 changes: 78 additions & 6 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ sds activeDefragSds(sds sdsptr) {
return NULL;
}

/* Defrag helper for hfield strings
*
* returns NULL in case the allocation wasn't moved.
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
hfield activeDefragHfield(hfield hf) {
void *ptr = hfieldGetAllocPtr(hf);
void *newptr = activeDefragAlloc(ptr);
if (newptr) {
size_t offset = hf - (char*)ptr;
hf = (char*)newptr + offset;
return hf;
}
return NULL;
}

/* Defrag helper for robj and/or string objects with expected refcount.
*
* Like activeDefragStringOb, but it requires the caller to pass in the expected
Expand Down Expand Up @@ -250,6 +266,31 @@ void activeDefragSdsDictCallback(void *privdata, const dictEntry *de) {
UNUSED(de);
}

void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de) {
dict *d = privdata;
hfield newhf, hf = dictGetKey(de);

if (hfieldGetExpireTime(hf) == EB_EXPIRE_TIME_INVALID) {
/* If the hfield does not have TTL, we directly defrag it. */
newhf = activeDefragHfield(hf);
} else {
/* Update its reference in the ebucket while defragging it. */
ebuckets *eb = hashTypeGetDictMetaHFE(d);
newhf = ebDefragItem(eb, &hashFieldExpireBucketsType, hf, (ebDefragFunction *)activeDefragHfield);
}
if (newhf) {
/* We can't search in dict for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
dictUseStoredKeyApi(d, 1);
uint64_t hash = dictGetHash(d, newhf);
dictUseStoredKeyApi(d, 0);
dictEntry *de = dictFindEntryByPtrAndHash(d, hf, hash);
serverAssert(de);
dictSetKey(d, de, newhf);
}
}

/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
void activeDefragSdsDict(dict* d, int val_type) {
unsigned long cursor = 0;
Expand All @@ -268,6 +309,20 @@ void activeDefragSdsDict(dict* d, int val_type) {
} while (cursor != 0);
}

/* Defrag a dict with hfield key and sds value. */
void activeDefragHfieldDict(dict *d) {
unsigned long cursor = 0;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
.defragVal = (dictDefragAllocFunction *)activeDefragSds
};
do {
cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback,
&defragfns, d);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
void activeDefragList(list *l, int val_type) {
listNode *ln, *newln;
Expand Down Expand Up @@ -422,10 +477,10 @@ void scanLaterHash(robj *ob, unsigned long *cursor) {
dict *d = ob->ptr;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
.defragVal = (dictDefragAllocFunction *)activeDefragSds
};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
*cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d);
}

void defragQuicklist(redisDb *db, dictEntry *kde) {
Expand Down Expand Up @@ -477,7 +532,7 @@ void defragHash(redisDb *db, dictEntry *kde) {
if (dictSize(d) > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
activeDefragHfieldDict(d);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr)))
ob->ptr = newd;
Expand Down Expand Up @@ -672,7 +727,7 @@ void defragModule(redisDb *db, dictEntry *kde) {
* all the various pointers it has. */
void defragKey(defragCtx *ctx, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
robj *newob, *ob = dictGetVal(de);
unsigned char *newzl;
sds newsds;
redisDb *db = ctx->privdata;
Expand All @@ -689,11 +744,22 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash);
if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}

/* Update the key's reference in the dict's metadata or the listpackEx. */
if (unlikely(ob->type == OBJ_HASH))
hashTypeUpdateKeyRef(ob, newsds);
}

/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
if ((newob = activeDefragStringOb(ob))) {
if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob) != EB_EXPIRE_TIME_INVALID)) {
/* Update its reference in the ebucket while defragging it. */
newob = ebDefragItem(&db->hexpires, &hashExpireBucketsType, ob,
(ebDefragFunction *)activeDefragStringOb);
} else {
/* If the dict doesn't have metadata, we directly defrag it. */
newob = activeDefragStringOb(ob);
}
if (newob) {
kvstoreDictSetVal(db->keys, slot, de, newob);
ob = newob;
}
Expand Down Expand Up @@ -734,6 +800,12 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr)))
ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_LISTPACK_EX) {
listpackEx *newlpt, *lpt = (listpackEx*)ob->ptr;
if ((newlpt = activeDefragAlloc(lpt)))
ob->ptr = lpt = newlpt;
if ((newzl = activeDefragAlloc(lpt->lp)))
lpt->lp = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
defragHash(db, de);
} else {
Expand Down
95 changes: 95 additions & 0 deletions src/ebuckets.c
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,68 @@ void ebValidate(ebuckets eb, EbucketsType *type) {
ebValidateRax(ebGetRaxPtr(eb), type);
}

/* Reallocates the memory used by the item using the provided allocation function.
* This feature was added for the active defrag feature.
*
* The 'defragfn' callbacks are called with a pointer to memory that callback
* can reallocate. The callbacks should return a new memory address or NULL,
* where NULL means that no reallocation happened and the old memory is still valid.
*
* Note: It is the caller's responsibility to ensure that the item has a valid expire time. */
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *defragfn) {
assert(!ebIsEmpty(*eb));
if (ebIsList(*eb)) {
ExpireMeta *prevem = NULL;
eItem curitem = ebGetListPtr(type, *eb);
while (curitem != NULL) {
if (curitem == item) {
if ((curitem = defragfn(curitem))) {
if (prevem)
prevem->next = curitem;
else
*eb = ebMarkAsList(curitem);
}
return curitem;
}

/* Move to the next item in the list. */
prevem = type->getExpireMeta(curitem);
curitem = prevem->next;
}
} else {
CommonSegHdr *currHdr;
ExpireMeta *mIter = type->getExpireMeta(item);
assert(mIter->trash != 1);
while (mIter->lastInSegment == 0)
mIter = type->getExpireMeta(mIter->next);

if (mIter->lastItemBucket)
currHdr = (CommonSegHdr *) mIter->next;
else
currHdr = (CommonSegHdr *) ((NextSegHdr *) mIter->next)->prevSeg;
/* If the item is the first in the segment, then update the segment header */
if (currHdr->head == item) {
if ((item = defragfn(item))) {
currHdr->head = item;
}
return item;
}

/* Iterate over all items in the segment until the next is 'item' */
ExpireMeta *mHead = type->getExpireMeta(currHdr->head);
mIter = mHead;
while (mIter->next != item)
mIter = type->getExpireMeta(mIter->next);
assert(mIter->next == item);

if ((item = defragfn(item))) {
mIter->next = item;
}
return item;
}
redis_unreachable();
}

/* Retrieves the expiration time associated with the given item. If associated
* ExpireMeta is marked as trash, then return EB_EXPIRE_TIME_INVALID */
uint64_t ebGetExpireTime(EbucketsType *type, eItem item) {
Expand All @@ -1794,12 +1856,14 @@ uint64_t ebGetExpireTime(EbucketsType *type, eItem item) {
#include <stddef.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <string.h>
#include "testhelp.h"

#define TEST(name) printf("[TEST] >>> %s\n", name);
#define TEST_COND(name, cond) printf("[%s] >>> %s\n", (cond) ? "TEST" : "BYPS", name); if (cond)

typedef struct MyItem {
int index;
ExpireMeta mexpire;
} MyItem;

Expand Down Expand Up @@ -1976,6 +2040,14 @@ void distributeTest(int lowestTime,
#define UNUSED(x) (void)(x)
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))

eItem defragCallback(const eItem item) {
size_t size = zmalloc_usable_size(item);
eItem newitem = zmalloc(size);
memcpy(newitem, item, size);
zfree(item);
return newitem;
}

int ebucketsTest(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
Expand Down Expand Up @@ -2307,6 +2379,29 @@ int ebucketsTest(int argc, char **argv, int flags) {

}

TEST("item defragmentation") {
for (int s = 1; s <= EB_LIST_MAX_ITEMS * 3; s++) {
ebuckets eb = NULL;
MyItem *items[s];
for (int i = 0; i < s; i++) {
items[i] = zmalloc(sizeof(MyItem));
items[i]->index = i;
ebAdd(&eb, &myEbucketsType, items[i], i);
}
assert((s <= EB_LIST_MAX_ITEMS) ? ebIsList(eb) : !ebIsList(eb));
/* Defrag all the items. */
for (int i = 0; i < s; i++) {
MyItem *newitem = ebDefragItem(&eb, &myEbucketsType, items[i], defragCallback);
if (newitem) items[i] = newitem;
}
/* Verify that the data is not corrupted. */
ebValidate(eb, &myEbucketsType);
for (int i = 0; i < s; i++)
assert(items[i]->index == i);
ebDestroy(&eb, &myEbucketsType, NULL);
}
}

// TEST("segment - Add smaller item to full segment that all share same ebucket-key")
// TEST("segment - Add item to full segment and make it extended-segment (all share same ebucket-key)")
// TEST("ebuckets - Create rax tree with extended-segment and add item before")
Expand Down
3 changes: 3 additions & 0 deletions src/ebuckets.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime);

uint64_t ebGetExpireTime(EbucketsType *type, eItem item);

typedef eItem (ebDefragFunction)(const eItem item);
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *fn);

static inline uint64_t ebGetMetaExpTime(ExpireMeta *expMeta) {
return (((uint64_t)(expMeta)->expireTimeHi << 32) | (expMeta)->expireTimeLo);
}
Expand Down
6 changes: 5 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2482,7 +2482,7 @@ extern dictType keylistDictType;
extern dict *modules;

extern EbucketsType hashExpireBucketsType; /* global expires */
extern EbucketsType hashFieldExpiresBucketType; /* local per hash */
extern EbucketsType hashFieldExpireBucketsType; /* local per hash */

/*-----------------------------------------------------------------------------
* Functions prototypes
Expand Down Expand Up @@ -3197,12 +3197,16 @@ void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTi
void hashTypeFree(robj *o);
int hashTypeIsExpired(const robj *o, uint64_t expireAt);
unsigned char *hashTypeListpackGetLp(robj *o);
uint64_t hashTypeGetMinExpire(robj *o);
void hashTypeUpdateKeyRef(robj *o, sds newkey);
ebuckets *hashTypeGetDictMetaHFE(dict *d);

/* Hash-Field data type (of t_hash.c) */
hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta);
hfield hfieldTryNew(const void *field, size_t fieldlen, int withExpireMeta);
int hfieldIsExpireAttached(hfield field);
int hfieldIsExpired(hfield field);
uint64_t hfieldGetExpireTime(hfield field);
static inline void hfieldFree(hfield field) { mstrFree(&mstrFieldKind, field); }
static inline void *hfieldGetAllocPtr(hfield field) { return mstrGetAllocPtr(&mstrFieldKind, field); }
static inline size_t hfieldlen(hfield field) { return mstrlen(field);}
Expand Down
23 changes: 19 additions & 4 deletions src/t_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ static ExpireMeta *hashGetExpireMeta(const eItem hash);
static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit);
static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx);
static void hfieldPersist(robj *hashObj, hfield field);
static uint64_t hfieldGetExpireTime(hfield field);
static void updateGlobalHfeDs(redisDb *db, robj *o, uint64_t minExpire, uint64_t minExpireFields);
static uint64_t hashTypeGetNextTimeToExpire(robj *o);
static uint64_t hashTypeGetMinExpire(robj *keyObj);


/* hash dictType funcs */
static int dictHfieldKeyCompare(dict *d, const void *key1, const void *key2);
Expand Down Expand Up @@ -1999,6 +1996,24 @@ void hashTypeFree(robj *o) {
}
}

/* Attempts to update the reference to the new key. Now it's only used in defrag. */
void hashTypeUpdateKeyRef(robj *o, sds newkey) {
if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
listpackEx *lpt = o->ptr;
lpt->key = newkey;
} else if (o->encoding == OBJ_ENCODING_HT && isDictWithMetaHFE(o->ptr)) {
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *)dictMetadata((dict*)o->ptr);
dictExpireMeta->key = newkey;
} else {
/* Nothing to do. */
}
}

ebuckets *hashTypeGetDictMetaHFE(dict *d) {
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
return &dictExpireMeta->hfe;
}

/*-----------------------------------------------------------------------------
* Hash type commands
*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -2635,7 +2650,7 @@ static ExpireMeta* hfieldGetExpireMeta(const eItem field) {
return mstrMetaRef(field, &mstrFieldKind, (int) HFIELD_META_EXPIRE);
}

static uint64_t hfieldGetExpireTime(hfield field) {
uint64_t hfieldGetExpireTime(hfield field) {
if (!hfieldIsExpireAttached(field))
return EB_EXPIRE_TIME_INVALID;

Expand Down

0 comments on commit 80be2cc

Please sign in to comment.