Skip to content

Commit

Permalink
[ref] Rebuild bucket code (#512)
Browse files Browse the repository at this point in the history
 rebuild the type of bucket related
  • Loading branch information
elliotchenzichang committed Nov 29, 2023
1 parent 8247035 commit 896f01b
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 97 deletions.
2 changes: 1 addition & 1 deletion bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (b *Bucket) Decode(bytes []byte) error {
id := binary.LittleEndian.Uint64(bytes[:IdSize])
ds := binary.LittleEndian.Uint16(bytes[IdSize : IdSize+DsSize])
name := bytes[IdSize+DsSize:]
b.Id = BucketId(id)
b.Id = id
b.Name = string(name)
b.Ds = Ds(ds)
return nil
Expand Down
12 changes: 6 additions & 6 deletions bucket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ var ErrBucketNotExist = errors.New("bucket not exist")

const BucketStoreFileName = "bucket.Meta"

type Ds uint16
type BucketId uint64
type BucketName string
type Ds = uint16
type BucketId = uint64
type BucketName = string

type BucketManager struct {
fd *os.File
Expand Down Expand Up @@ -61,15 +61,15 @@ func (bm *BucketManager) SubmitPendingBucketChange(reqs []*bucketSubmitRequest)
}
switch req.bucket.Meta.Op {
case BucketInsertOperation:
bm.BucketInfoMapper[BucketId(req.bucket.Id)] = req.bucket
bm.BucketIDMarker[req.name][req.bucket.Ds] = BucketId(req.bucket.Id)
bm.BucketInfoMapper[req.bucket.Id] = req.bucket
bm.BucketIDMarker[req.name][req.bucket.Ds] = req.bucket.Id
case BucketDeleteOperation:
if len(bm.BucketIDMarker[req.name]) == 1 {
delete(bm.BucketIDMarker, req.name)
} else {
delete(bm.BucketIDMarker[req.name], req.bucket.Ds)
}
delete(bm.BucketInfoMapper, BucketId(req.bucket.Id))
delete(bm.BucketInfoMapper, req.bucket.Id)
}
}
_, err := bm.fd.Write(bytes)
Expand Down
20 changes: 10 additions & 10 deletions bucket_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,31 @@ func TestBucketManager_NewBucketAndDeleteBucket(t *testing.T) {
bucket2 := "bucket_2"
runNutsDBTest(t, nil, func(t *testing.T, db *DB) {
txNewBucket(t, db, bucket1, DataStructureBTree, nil, nil)
exist := db.bm.ExistBucket(Ds(DataStructureBTree), BucketName(bucket1))
exist := db.bm.ExistBucket(DataStructureBTree, bucket1)
assert.Equal(t, true, exist)
txNewBucket(t, db, bucket2, DataStructureBTree, nil, nil)
exist = db.bm.ExistBucket(Ds(DataStructureBTree), BucketName(bucket2))
exist = db.bm.ExistBucket(DataStructureBTree, bucket2)
assert.Equal(t, true, exist)
})

runNutsDBTest(t, nil, func(t *testing.T, db *DB) {
txNewBucket(t, db, bucket1, DataStructureBTree, nil, nil)
exist := db.bm.ExistBucket(Ds(DataStructureBTree), BucketName(bucket1))
exist := db.bm.ExistBucket(DataStructureBTree, bucket1)
assert.Equal(t, true, exist)
txDeleteBucketFunc(t, db, bucket1, DataStructureBTree, nil, nil)
exist = db.bm.ExistBucket(Ds(DataStructureBTree), BucketName(bucket1))
exist = db.bm.ExistBucket(DataStructureBTree, bucket1)
assert.Equal(t, false, exist)
})
}

func TestBucketManager_ExistBucket(t *testing.T) {
bucket1 := "bucket_1"
runNutsDBTest(t, nil, func(t *testing.T, db *DB) {
exist := db.bm.ExistBucket(Ds(DataStructureBTree), BucketName(bucket1))
exist := db.bm.ExistBucket(DataStructureBTree, bucket1)
assert.Equal(t, false, exist)

txNewBucket(t, db, bucket1, DataStructureBTree, nil, nil)
exist = db.bm.ExistBucket(Ds(DataStructureBTree), BucketName(bucket1))
exist = db.bm.ExistBucket(DataStructureBTree, bucket1)
assert.Equal(t, true, exist)
})
}
Expand Down Expand Up @@ -71,9 +71,9 @@ func TestBucketManager_DataStructureIsolation(t *testing.T) {
runNutsDBTest(t, nil, func(t *testing.T, db *DB) {
txCreateBucket(t, db, DataStructureBTree, bucket1, nil)

assert.Equal(t, false, db.bm.ExistBucket(Ds(DataStructureList), bucket1))
assert.Equal(t, false, db.bm.ExistBucket(Ds(DataStructureSortedSet), bucket1))
assert.Equal(t, false, db.bm.ExistBucket(Ds(DataStructureSet), bucket1))
assert.Equal(t, false, db.bm.ExistBucket(DataStructureList, bucket1))
assert.Equal(t, false, db.bm.ExistBucket(DataStructureSortedSet, bucket1))
assert.Equal(t, false, db.bm.ExistBucket(DataStructureSet, bucket1))
})
}

Expand All @@ -99,7 +99,7 @@ func txNewBucket(t *testing.T, db *DB, bucket string, ds uint16, expectErr error

func txDeleteBucketFunc(t *testing.T, db *DB, bucket string, ds uint16, expectErr error, finalExpectErr error) {
err := db.Update(func(tx *Tx) error {
err2 := tx.DeleteBucket(uint16(ds), bucket)
err2 := tx.DeleteBucket(ds, bucket)
assertErr(t, expectErr, err2)
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestBucket_DecodeAndDecode(t *testing.T) {
Op: BucketInsertOperation,
},
Id: 1,
Ds: Ds(DataStructureBTree),
Ds: DataStructureBTree,
Name: "bucket_1",
}
bytes := bucket.Encode()
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ func (db *DB) IsClose() bool {
func (db *DB) buildExpireCallback(bucket string, key []byte) func() {
return func() {
err := db.Update(func(tx *Tx) error {
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type IteratorOptions struct {
}

func NewIterator(tx *Tx, bucket string, options IteratorOptions) *Iterator {
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestDB_MergeForSet(t *testing.T) {
for _, idxMode := range []EntryIdxMode{HintKeyValAndRAMIdxMode, HintKeyAndRAMIdxMode} {
opts.EntryIdxMode = idxMode
db, err := Open(opts)
if exist := db.bm.ExistBucket(Ds(DataStructureSet), BucketName(bucket)); !exist {
if exist := db.bm.ExistBucket(DataStructureSet, bucket); !exist {
txCreateBucket(t, db, DataStructureSet, bucket, nil)
}

Expand Down Expand Up @@ -199,7 +199,7 @@ func TestDB_MergeForZSet(t *testing.T) {

opts.EntryIdxMode = idxMode
db, err := Open(opts)
if exist := db.bm.ExistBucket(Ds(DataStructureSortedSet), BucketName(bucket)); !exist {
if exist := db.bm.ExistBucket(DataStructureSortedSet, bucket); !exist {
txCreateBucket(t, db, DataStructureSortedSet, bucket, nil)
}
require.NoError(t, err)
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestDB_MergeForList(t *testing.T) {
for _, idxMode := range []EntryIdxMode{HintKeyValAndRAMIdxMode, HintKeyAndRAMIdxMode} {
opts.EntryIdxMode = idxMode
db, err := Open(opts)
if exist := db.bm.ExistBucket(Ds(DataStructureList), BucketName(bucket)); !exist {
if exist := db.bm.ExistBucket(DataStructureList, bucket); !exist {
txCreateBucket(t, db, DataStructureList, bucket, nil)
}

Expand Down
2 changes: 1 addition & 1 deletion recovery_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Test_fileRecovery_readBucket(t *testing.T) {
Op: BucketInsertOperation,
},
Id: 1,
Ds: Ds(DataStructureBTree),
Ds: DataStructureBTree,
Name: "bucket_1",
}
bytes := bucket.Encode()
Expand Down
30 changes: 16 additions & 14 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,30 +302,32 @@ func (tx *Tx) getNewAddRecordCount() (int64, error) {
bucketId := bucket.Id
if bucket.Meta.Op == BucketDeleteOperation {
switch bucket.Ds {
case Ds(DataStructureBTree):
case DataStructureBTree:
if bTree, ok := tx.db.Index.bTree.idx[bucketId]; ok {
res -= int64(bTree.Count())
}
case Ds(DataStructureSet):
case DataStructureSet:
if set, ok := tx.db.Index.set.idx[bucketId]; ok {
for key := range set.M {
res -= int64(set.SCard(key))
}
}
case Ds(DataStructureSortedSet):
case DataStructureSortedSet:
if sortedSet, ok := tx.db.Index.sortedSet.idx[bucketId]; ok {
for key := range sortedSet.M {
curLen, _ := sortedSet.ZCard(key)
res -= int64(curLen)
}
}
case Ds(DataStructureList):
case DataStructureList:
if list, ok := tx.db.Index.list.idx[bucketId]; ok {
for key := range list.Items {
curLen, _ := list.Size(key)
res -= int64(curLen)
}
}
default:
panic("unhandled default case")
}
}
}
Expand Down Expand Up @@ -630,21 +632,21 @@ func (tx *Tx) put(bucket string, key, value []byte, ttl uint32, flag uint16, tim
return err
}

if !tx.db.bm.ExistBucket(Ds(ds), BucketName(bucket)) {
if !tx.db.bm.ExistBucket(ds, bucket) {
return ErrorBucketNotExist
}

if !tx.writable {
return ErrTxNotWritable
}

bucketId, err := tx.db.bm.GetBucketID(Ds(ds), BucketName(bucket))
bucketId, err := tx.db.bm.GetBucketID(ds, bucket)
if err != nil {
return err
}

meta := NewMetaData().WithTimeStamp(timestamp).WithKeySize(uint32(len(key))).WithValueSize(uint32(len(value))).WithFlag(flag).
WithTTL(ttl).WithStatus(UnCommitted).WithDs(ds).WithTxID(tx.id).WithBucketId(uint64(bucketId))
WithTTL(ttl).WithStatus(UnCommitted).WithDs(ds).WithTxID(tx.id).WithBucketId(bucketId)

e := NewEntry().WithKey(key).WithMeta(meta).WithValue(value)

Expand All @@ -659,12 +661,12 @@ func (tx *Tx) put(bucket string, key, value []byte, ttl uint32, flag uint16, tim
}

func (tx *Tx) putDeleteLog(bucketId BucketId, key, value []byte, ttl uint32, flag uint16, timestamp uint64, ds uint16) {
bucket, err := tx.db.bm.GetBucketById(uint64(bucketId))
bucket, err := tx.db.bm.GetBucketById(bucketId)
if err != nil {
return
}
meta := NewMetaData().WithTimeStamp(timestamp).WithKeySize(uint32(len(key))).WithValueSize(uint32(len(value))).WithFlag(flag).
WithTTL(ttl).WithStatus(UnCommitted).WithDs(ds).WithTxID(tx.id).WithBucketId(uint64(bucket.Id))
WithTTL(ttl).WithStatus(UnCommitted).WithDs(ds).WithTxID(tx.id).WithBucketId(bucket.Id)

e := NewEntry().WithKey(key).WithMeta(meta).WithValue(value)
tx.pendingWrites = append(tx.pendingWrites, e)
Expand Down Expand Up @@ -752,7 +754,7 @@ func (tx *Tx) putBucket(b *Bucket) error {
tx.pendingBucketList[b.Ds] = map[BucketName]*Bucket{}
}
bucketInDs := tx.pendingBucketList[b.Ds]
bucketInDs[BucketName(b.Name)] = b
bucketInDs[b.Name] = b
return nil
}

Expand All @@ -776,13 +778,13 @@ func (tx *Tx) DeleteBucketInIndex() error {
for _, bucket := range mapper {
if bucket.Meta.Op == BucketDeleteOperation {
switch bucket.Ds {
case Ds(DataStructureBTree):
case DataStructureBTree:
tx.db.Index.bTree.delete(bucket.Id)
case Ds(DataStructureList):
case DataStructureList:
tx.db.Index.list.delete(bucket.Id)
case Ds(DataStructureSet):
case DataStructureSet:
tx.db.Index.set.delete(bucket.Id)
case Ds(DataStructureSortedSet):
case DataStructureSortedSet:
tx.db.Index.sortedSet.delete(bucket.Id)
default:
return ErrDataStructureNotSupported
Expand Down
14 changes: 7 additions & 7 deletions tx_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (tx *Tx) Get(bucket string, key []byte) (value []byte, err error) {
}

idxMode := tx.db.opt.EntryIdxMode
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (tx *Tx) GetAll(bucket string) (values [][]byte, err error) {
return nil, err
}

b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (tx *Tx) RangeScan(bucket string, start, end []byte) (values [][]byte, err
if err := tx.checkTxIsClosed(); err != nil {
return nil, err
}
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (tx *Tx) PrefixScan(bucket string, prefix []byte, offsetNum int, limitNum i
if err := tx.checkTxIsClosed(); err != nil {
return nil, err
}
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return nil, err
}
Expand All @@ -167,7 +167,7 @@ func (tx *Tx) PrefixSearchScan(bucket string, prefix []byte, reg string, offsetN
if err := tx.checkTxIsClosed(); err != nil {
return nil, err
}
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return nil, err
}
Expand All @@ -193,7 +193,7 @@ func (tx *Tx) Delete(bucket string, key []byte) error {
if err := tx.checkTxIsClosed(); err != nil {
return err
}
b, err := tx.db.bm.GetBucket(Ds(DataStructureBTree), BucketName(bucket))
b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
if err != nil {
return err
}
Expand All @@ -213,7 +213,7 @@ func (tx *Tx) Delete(bucket string, key []byte) error {
// getHintIdxDataItemsWrapper returns wrapped entries when prefix scanning or range scanning.
func (tx *Tx) getHintIdxDataItemsWrapper(records []*Record, limitNum int, bucketId BucketId) (values [][]byte, err error) {
for _, record := range records {
bucket, err := tx.db.bm.GetBucketById(uint64(bucketId))
bucket, err := tx.db.bm.GetBucketById(bucketId)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions tx_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func (tx *Tx) NewBucket(ds uint16, name string) (err error) {
Meta: &BucketMeta{
Op: BucketInsertOperation,
},
Id: BucketId(tx.db.bm.Gen.GenId()),
Ds: Ds(ds),
Id: tx.db.bm.Gen.GenId(),
Ds: ds,
Name: name,
}
if _, exist := tx.pendingBucketList[Ds(ds)]; !exist {
tx.pendingBucketList[Ds(ds)] = map[BucketName]*Bucket{}
if _, exist := tx.pendingBucketList[ds]; !exist {
tx.pendingBucketList[ds] = map[BucketName]*Bucket{}
}
tx.pendingBucketList[Ds(ds)][BucketName(name)] = bucket
tx.pendingBucketList[ds][name] = bucket
return nil
}

Expand All @@ -107,7 +107,7 @@ func (tx *Tx) DeleteBucket(ds uint16, bucket string) error {
return err
}

b, err := tx.db.bm.GetBucket(Ds(ds), BucketName(bucket))
b, err := tx.db.bm.GetBucket(ds, bucket)
if err != nil {
return ErrBucketNotFound
}
Expand All @@ -117,13 +117,13 @@ func (tx *Tx) DeleteBucket(ds uint16, bucket string) error {
Op: BucketDeleteOperation,
},
Id: b.Id,
Ds: Ds(ds),
Ds: ds,
Name: bucket,
}

return tx.putBucket(deleteBucket)
}

func (tx *Tx) ExistBucket(ds uint16, bucket string) bool {
return tx.db.bm.ExistBucket(Ds(ds), BucketName(bucket))
return tx.db.bm.ExistBucket(ds, bucket)
}

0 comments on commit 896f01b

Please sign in to comment.