Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add defragment support for HFE #13229

Merged
merged 87 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
5ee153e
Hash Field Expiration Defragmentation
sundb Apr 19, 2024
53dbff1
Optimize ebScanDefrag
sundb Apr 20, 2024
3162c25
Spell
sundb Apr 20, 2024
9117b42
Add notification tests for HFE
sundb Apr 22, 2024
4986870
New way to skip defraging the fields with TTL
sundb Apr 23, 2024
f7bf686
Remove activeDefragHfieldDict
sundb Apr 23, 2024
99b3bad
Use dictUseStoredKeyApi to get hash from hfield
sundb Apr 23, 2024
09562c4
Revert the code for defraging hash obj with TTL
sundb Apr 23, 2024
761b3a4
Add commands.def
sundb Apr 23, 2024
5a86b46
Add activeDefragHfieldSkipTTL
sundb Apr 23, 2024
6942c4a
add new ebDefragItem
sundb Apr 25, 2024
fdbc69e
Revert some code
sundb Apr 25, 2024
af9eadc
Simplify code
sundb Apr 25, 2024
871ec6f
Fix compile warning
sundb Apr 25, 2024
b19f6f5
Fix crash due to forget skip defraging hfield with TTL in scanLaterHash
sundb Apr 25, 2024
5e8dfd1
Unify the defragment of hfield in the callback of dictScanDefrag
sundb Apr 25, 2024
2f988f0
Add comment
sundb Apr 25, 2024
f8bbf7f
Cleanup
sundb Apr 25, 2024
acfc293
Use zmalloc_usable_size instead of zmalloc_size
sundb Apr 25, 2024
4a9f8f6
Remove unused code
sundb Apr 25, 2024
d701bf8
Remove unused code
sundb Apr 25, 2024
dcc5da0
Add missing raxStop
sundb Apr 26, 2024
35ec1a3
Merge branch 'hfe' into hfe-defrag
sundb Apr 26, 2024
1f62c55
Add individual test for HFE defragment
sundb Apr 26, 2024
c0ec94c
For CI test
sundb Apr 26, 2024
c270ca4
Fix defrag HFE test failed
sundb Apr 26, 2024
5fe4b02
add listpack support (todo: fix rebase issues)
tezc Apr 26, 2024
1318272
test fix
tezc Apr 26, 2024
6177070
Make HFE defrag test more stable
sundb Apr 26, 2024
06d13a3
Remove test code
sundb Apr 26, 2024
cd7396c
add hgetf
tezc Apr 26, 2024
3b5b33b
silence compiler warning
tezc Apr 26, 2024
eada402
fix
tezc Apr 26, 2024
c952c1e
Reflect ebDefragItem() about rax and other CRs
sundb Apr 27, 2024
08c4276
Merge the tests about item defragmentation
sundb Apr 27, 2024
b8f81a0
add hashTypeIsDictWithMetaHFE to check if hash obj is dict with HFE
sundb Apr 28, 2024
78cebfc
cleanup
sundb Apr 28, 2024
ea61eb4
Try to fix the failure of expiration test
sundb Apr 28, 2024
2d40d61
hsetf
tezc Apr 28, 2024
b90ab61
Make dictExpireMetadata opaque
sundb Apr 29, 2024
66c8785
Fix the reply schema for HFE
sundb Apr 29, 2024
f2124a5
Add .def
sundb Apr 29, 2024
4f64806
Revert the reply schema for hexpire
sundb Apr 29, 2024
2ea37aa
Fix reply schema for hpersist
sundb Apr 29, 2024
f9c0106
Remvoe reply schema def
sundb Apr 30, 2024
293b127
Add test for hpexpireat for schema validate CI
sundb Apr 30, 2024
765ebc4
Defrag listpack for HFE
sundb Apr 30, 2024
b59dbb7
Fix missing update ob->ptr after defraging listpack
sundb Apr 30, 2024
7e8cd04
simply hsetf
tezc May 1, 2024
aa096d1
comment
tezc May 1, 2024
2296d05
expose struct listpackTTL
tezc May 1, 2024
920d784
minor
tezc May 1, 2024
d0b523c
minor
tezc May 1, 2024
6c6c6a4
rename listpackttl to listpackex
tezc May 2, 2024
7712b00
Merge branch 'hfe-listpack-v1' into hfe-listpack-defrag
sundb May 6, 2024
d20743e
Fix complaint
sundb May 6, 2024
ac1130f
Merge branch 'hash-field-expiry-integ' into hfe-listpack-defrag
sundb May 6, 2024
f0c74fa
comments
tezc May 6, 2024
07dad91
merge
tezc May 6, 2024
36bd618
schema fix
tezc May 6, 2024
2c0b560
comment
tezc May 6, 2024
e8ea643
comment
tezc May 6, 2024
d1ce106
minor
tezc May 6, 2024
5775ba0
minor
tezc May 6, 2024
f0ec7e3
minor
tezc May 6, 2024
9e323bb
Merge branch 'hfe-listpack-v1' into hfe-listpack-defrag
sundb May 6, 2024
01563e6
Revert code
sundb May 7, 2024
9a2fb77
test fix
tezc May 7, 2024
af548c8
Merge branch 'hfe-listpack-v1' into hfe-listpack-defrag
sundb May 7, 2024
661fc72
Merge branch 'hash-field-expiry-integ' into hfe-defrag
sundb May 8, 2024
b187702
Missing .def
sundb May 8, 2024
e6b8b6b
Add ebValidate() to check if the data is corrupted in the unittest
sundb May 8, 2024
1662f40
Merge branch 'hash-field-expiry-integ' into hfe-defrag
sundb May 9, 2024
1452444
Merge branch 'hfe-listpack-defrag' into hfe-defrag
sundb May 9, 2024
496e242
Revert some code
sundb May 9, 2024
b18cb26
Add comments
sundb May 9, 2024
2d86fc8
Revert some code
sundb May 9, 2024
6fdbfb3
Fix comment
sundb May 9, 2024
5bf9ddb
Fix CR
sundb May 9, 2024
0936517
Format
sundb May 9, 2024
1bea6c9
Update src/defrag.c
sundb May 10, 2024
b619ea5
Add a assertion for ExpireMeta's trash in ebDefragItem()
sundb May 10, 2024
5023c03
Fix a `else if` mistake
sundb May 11, 2024
ebbb291
Merge branch 'hash-field-expiry-integ' into hfe-defrag
sundb May 13, 2024
8db938b
Improve the HFE deframent test to cover listpackex and eblist
sundb May 14, 2024
8363b2c
Increase the threshold to make test more stable
sundb May 14, 2024
79cdbb0
Improve test
sundb May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 78 additions & 6 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ sds activeDefragSds(sds sdsptr) {
return NULL;
}

/* Defrag helper for hfield strings
*
* returns NULL in case the allocation wasn't moved.
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
hfield activeDefragHfield(hfield hf) {
void *ptr = hfieldGetAllocPtr(hf);
void *newptr = activeDefragAlloc(ptr);
if (newptr) {
size_t offset = hf - (char*)ptr;
hf = (char*)newptr + offset;
return hf;
}
return NULL;
}

/* Defrag helper for robj and/or string objects with expected refcount.
*
* Like activeDefragStringOb, but it requires the caller to pass in the expected
Expand Down Expand Up @@ -250,6 +266,31 @@ void activeDefragSdsDictCallback(void *privdata, const dictEntry *de) {
UNUSED(de);
}

void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de) {
dict *d = privdata;
hfield newhf, hf = dictGetKey(de);

if (hfieldGetExpireTime(hf) == EB_EXPIRE_TIME_INVALID) {
/* If the hfield does not have TTL, we directly defrag it. */
newhf = activeDefragHfield(hf);
} else {
/* Update its reference in the ebucket while defragging it. */
ebuckets *eb = hashTypeGetDictMetaHFE(d);
newhf = ebDefragItem(eb, &hashFieldExpireBucketsType, hf, (ebDefragFunction *)activeDefragHfield);
}
if (newhf) {
/* We can't search in dict for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
dictUseStoredKeyApi(d, 1);
uint64_t hash = dictGetHash(d, newhf);
dictUseStoredKeyApi(d, 0);
dictEntry *de = dictFindEntryByPtrAndHash(d, hf, hash);
serverAssert(de);
dictSetKey(d, de, newhf);
}
}

/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
void activeDefragSdsDict(dict* d, int val_type) {
unsigned long cursor = 0;
Expand All @@ -268,6 +309,20 @@ void activeDefragSdsDict(dict* d, int val_type) {
} while (cursor != 0);
}

/* Defrag a dict with hfield key and sds value. */
void activeDefragHfieldDict(dict *d) {
unsigned long cursor = 0;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
.defragVal = (dictDefragAllocFunction *)activeDefragSds
};
do {
cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback,
&defragfns, d);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
void activeDefragList(list *l, int val_type) {
listNode *ln, *newln;
Expand Down Expand Up @@ -422,10 +477,10 @@ void scanLaterHash(robj *ob, unsigned long *cursor) {
dict *d = ob->ptr;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
.defragVal = (dictDefragAllocFunction *)activeDefragSds
};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
*cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d);
}

void defragQuicklist(redisDb *db, dictEntry *kde) {
Expand Down Expand Up @@ -477,7 +532,7 @@ void defragHash(redisDb *db, dictEntry *kde) {
if (dictSize(d) > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
activeDefragHfieldDict(d);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr)))
ob->ptr = newd;
Expand Down Expand Up @@ -672,7 +727,7 @@ void defragModule(redisDb *db, dictEntry *kde) {
* all the various pointers it has. */
void defragKey(defragCtx *ctx, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
robj *newob, *ob = dictGetVal(de);
unsigned char *newzl;
sds newsds;
redisDb *db = ctx->privdata;
Expand All @@ -689,11 +744,22 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash);
if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}

/* Update the key's reference in the dict's metadata or the listpackEx. */
if (unlikely(ob->type == OBJ_HASH))
hashTypeUpdateKeyRef(ob, newsds);
}

/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
if ((newob = activeDefragStringOb(ob))) {
if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob) != EB_EXPIRE_TIME_INVALID)) {
/* Update its reference in the ebucket while defragging it. */
newob = ebDefragItem(&db->hexpires, &hashExpireBucketsType, ob,
(ebDefragFunction *)activeDefragStringOb);
} else {
/* If the dict doesn't have metadata, we directly defrag it. */
newob = activeDefragStringOb(ob);
}
if (newob) {
kvstoreDictSetVal(db->keys, slot, de, newob);
ob = newob;
}
Expand Down Expand Up @@ -734,6 +800,12 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr)))
ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_LISTPACK_EX) {
listpackEx *newlpt, *lpt = (listpackEx*)ob->ptr;
if ((newlpt = activeDefragAlloc(lpt)))
ob->ptr = lpt = newlpt;
if ((newzl = activeDefragAlloc(lpt->lp)))
lpt->lp = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
defragHash(db, de);
} else {
Expand Down
95 changes: 95 additions & 0 deletions src/ebuckets.c
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,68 @@ void ebValidate(ebuckets eb, EbucketsType *type) {
ebValidateRax(ebGetRaxPtr(eb), type);
}

/* Reallocates the memory used by the item using the provided allocation function.
* This feature was added for the active defrag feature.
*
* The 'defragfn' callbacks are called with a pointer to memory that callback
* can reallocate. The callbacks should return a new memory address or NULL,
* where NULL means that no reallocation happened and the old memory is still valid.
*
* Note: It is the caller's responsibility to ensure that the item has a valid expire time. */
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *defragfn) {
assert(!ebIsEmpty(*eb));
if (ebIsList(*eb)) {
ExpireMeta *prevem = NULL;
eItem curitem = ebGetListPtr(type, *eb);
while (curitem != NULL) {
if (curitem == item) {
if ((curitem = defragfn(curitem))) {
if (prevem)
prevem->next = curitem;
else
*eb = ebMarkAsList(curitem);
}
return curitem;
}

/* Move to the next item in the list. */
prevem = type->getExpireMeta(curitem);
curitem = prevem->next;
}
tezc marked this conversation as resolved.
Show resolved Hide resolved
} else {
CommonSegHdr *currHdr;
ExpireMeta *mIter = type->getExpireMeta(item);
assert(mIter->trash != 1);
while (mIter->lastInSegment == 0)
mIter = type->getExpireMeta(mIter->next);

if (mIter->lastItemBucket)
currHdr = (CommonSegHdr *) mIter->next;
else
currHdr = (CommonSegHdr *) ((NextSegHdr *) mIter->next)->prevSeg;
/* If the item is the first in the segment, then update the segment header */
if (currHdr->head == item) {
if ((item = defragfn(item))) {
currHdr->head = item;
}
return item;
}

/* Iterate over all items in the segment until the next is 'item' */
ExpireMeta *mHead = type->getExpireMeta(currHdr->head);
mIter = mHead;
while (mIter->next != item)
mIter = type->getExpireMeta(mIter->next);
assert(mIter->next == item);
tezc marked this conversation as resolved.
Show resolved Hide resolved

if ((item = defragfn(item))) {
mIter->next = item;
}
return item;
}
redis_unreachable();
}

/* Retrieves the expiration time associated with the given item. If associated
* ExpireMeta is marked as trash, then return EB_EXPIRE_TIME_INVALID */
uint64_t ebGetExpireTime(EbucketsType *type, eItem item) {
Expand All @@ -1794,12 +1856,14 @@ uint64_t ebGetExpireTime(EbucketsType *type, eItem item) {
#include <stddef.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <string.h>
#include "testhelp.h"

#define TEST(name) printf("[TEST] >>> %s\n", name);
#define TEST_COND(name, cond) printf("[%s] >>> %s\n", (cond) ? "TEST" : "BYPS", name); if (cond)

typedef struct MyItem {
int index;
ExpireMeta mexpire;
} MyItem;

Expand Down Expand Up @@ -1976,6 +2040,14 @@ void distributeTest(int lowestTime,
#define UNUSED(x) (void)(x)
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))

eItem defragCallback(const eItem item) {
size_t size = zmalloc_usable_size(item);
eItem newitem = zmalloc(size);
memcpy(newitem, item, size);
zfree(item);
return newitem;
}

int ebucketsTest(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
Expand Down Expand Up @@ -2307,6 +2379,29 @@ int ebucketsTest(int argc, char **argv, int flags) {

}

TEST("item defragmentation") {
for (int s = 1; s <= EB_LIST_MAX_ITEMS * 3; s++) {
ebuckets eb = NULL;
MyItem *items[s];
for (int i = 0; i < s; i++) {
items[i] = zmalloc(sizeof(MyItem));
tezc marked this conversation as resolved.
Show resolved Hide resolved
items[i]->index = i;
ebAdd(&eb, &myEbucketsType, items[i], i);
}
assert((s <= EB_LIST_MAX_ITEMS) ? ebIsList(eb) : !ebIsList(eb));
/* Defrag all the items. */
for (int i = 0; i < s; i++) {
MyItem *newitem = ebDefragItem(&eb, &myEbucketsType, items[i], defragCallback);
if (newitem) items[i] = newitem;
}
/* Verify that the data is not corrupted. */
moticless marked this conversation as resolved.
Show resolved Hide resolved
ebValidate(eb, &myEbucketsType);
for (int i = 0; i < s; i++)
assert(items[i]->index == i);
ebDestroy(&eb, &myEbucketsType, NULL);
}
}

// TEST("segment - Add smaller item to full segment that all share same ebucket-key")
// TEST("segment - Add item to full segment and make it extended-segment (all share same ebucket-key)")
// TEST("ebuckets - Create rax tree with extended-segment and add item before")
Expand Down
3 changes: 3 additions & 0 deletions src/ebuckets.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime);

uint64_t ebGetExpireTime(EbucketsType *type, eItem item);

typedef eItem (ebDefragFunction)(const eItem item);
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *fn);

static inline uint64_t ebGetMetaExpTime(ExpireMeta *expMeta) {
return (((uint64_t)(expMeta)->expireTimeHi << 32) | (expMeta)->expireTimeLo);
}
Expand Down
6 changes: 5 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2482,7 +2482,7 @@ extern dictType keylistDictType;
extern dict *modules;

extern EbucketsType hashExpireBucketsType; /* global expires */
extern EbucketsType hashFieldExpiresBucketType; /* local per hash */
extern EbucketsType hashFieldExpireBucketsType; /* local per hash */

/*-----------------------------------------------------------------------------
* Functions prototypes
Expand Down Expand Up @@ -3197,12 +3197,16 @@ void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTi
void hashTypeFree(robj *o);
int hashTypeIsExpired(const robj *o, uint64_t expireAt);
unsigned char *hashTypeListpackGetLp(robj *o);
uint64_t hashTypeGetMinExpire(robj *o);
void hashTypeUpdateKeyRef(robj *o, sds newkey);
ebuckets *hashTypeGetDictMetaHFE(dict *d);

/* Hash-Field data type (of t_hash.c) */
hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta);
hfield hfieldTryNew(const void *field, size_t fieldlen, int withExpireMeta);
int hfieldIsExpireAttached(hfield field);
int hfieldIsExpired(hfield field);
uint64_t hfieldGetExpireTime(hfield field);
static inline void hfieldFree(hfield field) { mstrFree(&mstrFieldKind, field); }
static inline void *hfieldGetAllocPtr(hfield field) { return mstrGetAllocPtr(&mstrFieldKind, field); }
static inline size_t hfieldlen(hfield field) { return mstrlen(field);}
Expand Down
23 changes: 19 additions & 4 deletions src/t_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ static ExpireMeta *hashGetExpireMeta(const eItem hash);
static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit);
static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx);
static void hfieldPersist(robj *hashObj, hfield field);
static uint64_t hfieldGetExpireTime(hfield field);
static void updateGlobalHfeDs(redisDb *db, robj *o, uint64_t minExpire, uint64_t minExpireFields);
static uint64_t hashTypeGetNextTimeToExpire(robj *o);
static uint64_t hashTypeGetMinExpire(robj *keyObj);


/* hash dictType funcs */
static int dictHfieldKeyCompare(dict *d, const void *key1, const void *key2);
Expand Down Expand Up @@ -1999,6 +1996,24 @@ void hashTypeFree(robj *o) {
}
}

/* Attempts to update the reference to the new key. Now it's only used in defrag. */
void hashTypeUpdateKeyRef(robj *o, sds newkey) {
if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
listpackEx *lpt = o->ptr;
lpt->key = newkey;
} else if (o->encoding == OBJ_ENCODING_HT && isDictWithMetaHFE(o->ptr)) {
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *)dictMetadata((dict*)o->ptr);
dictExpireMeta->key = newkey;
} else {
/* Nothing to do. */
}
}

ebuckets *hashTypeGetDictMetaHFE(dict *d) {
dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d);
return &dictExpireMeta->hfe;
}

/*-----------------------------------------------------------------------------
* Hash type commands
*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -2635,7 +2650,7 @@ static ExpireMeta* hfieldGetExpireMeta(const eItem field) {
return mstrMetaRef(field, &mstrFieldKind, (int) HFIELD_META_EXPIRE);
}

static uint64_t hfieldGetExpireTime(hfield field) {
uint64_t hfieldGetExpireTime(hfield field) {
if (!hfieldIsExpireAttached(field))
return EB_EXPIRE_TIME_INVALID;

Expand Down