Skip to content

Commit

Permalink
feature: Enhance scanReader for large key and restore with replace pa…
Browse files Browse the repository at this point in the history
…rameter #630
  • Loading branch information
suxb201 committed Sep 28, 2023
1 parent d061277 commit 001c336
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"RedisShake/internal/config"
"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/rdb/types"
"RedisShake/internal/utils"
"fmt"
"math/bits"
Expand Down Expand Up @@ -165,11 +166,31 @@ func (r *scanStandaloneReader) fetch() {
pttl = 0 // -1 means no expire
}
if uint64(len(dump)) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen {
log.Panicf("not support large key. key=[%s], len=[%d]", key, len(dump))
log.Warnf("key=[%s] dump len=[%d] too large, split it. This is not a good practice in Redis.", key, len(dump))
typeByte := dump[0]
anotherReader := strings.NewReader(dump[1 : len(dump)-10])
o := types.ParseObject(anotherReader, typeByte, key)
cmds := o.Rewrite()
for _, cmd := range cmds {
e := entry.NewEntry()
e.DbId = dbId
e.Argv = cmd
r.ch <- e
}
if pttl != 0 {
e := entry.NewEntry()
e.DbId = dbId
e.Argv = []string{"PEXPIRE", key, strconv.Itoa(pttl)}
r.ch <- e
}
} else {
argv := []string{"RESTORE", key, strconv.Itoa(pttl), dump}
if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" {
argv = append(argv, "replace")
}
r.ch <- &entry.Entry{
DbId: dbId,
Argv: []string{"RESTORE", key, strconv.Itoa(pttl), dump},
Argv: argv,
}
}
}
Expand Down

0 comments on commit 001c336

Please sign in to comment.