Skip to content

Commit

Permalink
feat: add GetKeys and GetValues (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
bigboss2063 committed Dec 27, 2023
1 parent 4b73679 commit ddf24f3
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 81 deletions.
10 changes: 5 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,11 +829,11 @@ func (db *DB) managed(writable bool, fn func(tx *Tx) error) (err error) {
if err != nil {
return err
}
//defer func() {
// if r := recover(); r != nil {
// err = fmt.Errorf("panic when executing tx, err is %+v", r)
// }
//}()
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic when executing tx, err is %+v", r)
}
}()

if err = fn(tx); err == nil {
err = tx.Commit()
Expand Down
17 changes: 17 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ func txGet(t *testing.T, db *DB, bucket string, key []byte, expectVal []byte, ex
require.NoError(t, err)
}

func txGetAll(t *testing.T, db *DB, bucket string, expectKeys [][]byte, expectValues [][]byte, expectErr error) {
require.NoError(t, db.View(func(tx *Tx) error {
keys, values, err := tx.GetAll(bucket)
if expectErr != nil {
require.Equal(t, expectErr, err)
} else {
require.NoError(t, err)
n := len(keys)
for i := 0; i < n; i++ {
require.Equal(t, expectKeys[i], keys[i])
require.Equal(t, expectValues[i], values[i])
}
}
return nil
}))
}

func txDel(t *testing.T, db *DB, bucket string, key []byte, expectErr error) {
err := db.Update(func(tx *Tx) error {
err := tx.Delete(bucket, key)
Expand Down
87 changes: 59 additions & 28 deletions tx_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"github.com/xujiajun/utils/strconv2"
)

const (
getAllType uint8 = 0
getKeysType uint8 = 1
getValuesType uint8 = 2
)

func (tx *Tx) PutWithTimestamp(bucket string, key, value []byte, ttl uint32, timestamp uint64) error {
return tx.put(bucket, key, value, ttl, DataSetFlag, timestamp, DataStructureBTree)
}
Expand Down Expand Up @@ -174,35 +180,57 @@ func (tx *Tx) getMaxOrMinKey(bucket string, isMax bool) ([]byte, error) {
}

// GetAll returns all keys and values of the bucket stored at given bucket.
func (tx *Tx) GetAll(bucket string) (values [][]byte, err error) {
func (tx *Tx) GetAll(bucket string) ([][]byte, [][]byte, error) {
return tx.getAllOrKeysOrValues(bucket, getAllType)
}

// GetKeys returns all keys of the bucket stored at given bucket.
func (tx *Tx) GetKeys(bucket string) ([][]byte, error) {
keys, _, err := tx.getAllOrKeysOrValues(bucket, getKeysType)
return keys, err
}

// GetValues returns all values of the bucket stored at given bucket.
func (tx *Tx) GetValues(bucket string) ([][]byte, error) {
_, values, err := tx.getAllOrKeysOrValues(bucket, getValuesType)
return values, err
}

func (tx *Tx) getAllOrKeysOrValues(bucket string, typ uint8) ([][]byte, [][]byte, error) {
if err := tx.checkTxIsClosed(); err != nil {
return nil, err
return nil, nil, err
}

b, err := tx.db.bm.GetBucket(DataStructureBTree, bucket)
bucketId, err := tx.db.bm.GetBucketID(DataStructureBTree, bucket)
if err != nil {
return nil, err
return nil, nil, err
}
bucketId := b.Id

if index, ok := tx.db.Index.bTree.exist(bucketId); ok {
records := index.All()

if len(records) == 0 {
return nil, ErrBucketEmpty
var (
keys [][]byte
values [][]byte
)

switch typ {
case getAllType:
keys, values, err = tx.getHintIdxDataItemsWrapper(records, ScanNoLimit, bucketId, true, true)
case getKeysType:
keys, _, err = tx.getHintIdxDataItemsWrapper(records, ScanNoLimit, bucketId, true, false)
case getValuesType:
_, values, err = tx.getHintIdxDataItemsWrapper(records, ScanNoLimit, bucketId, false, true)
}

values, err = tx.getHintIdxDataItemsWrapper(records, ScanNoLimit, bucketId)
if err != nil {
return nil, ErrBucketEmpty
return nil, nil, err
}
}

if len(values) == 0 {
return nil, ErrBucketEmpty
return keys, values, nil
}

return
return nil, nil, nil
}

func (tx *Tx) GetSet(bucket string, key, value []byte) (oldValue []byte, err error) {
Expand Down Expand Up @@ -231,7 +259,7 @@ func (tx *Tx) RangeScan(bucket string, start, end []byte) (values [][]byte, err
return nil, ErrRangeScan
}

values, err = tx.getHintIdxDataItemsWrapper(records, ScanNoLimit, bucketId)
_, values, err = tx.getHintIdxDataItemsWrapper(records, ScanNoLimit, bucketId, false, true)
if err != nil {
return nil, ErrRangeScan
}
Expand All @@ -258,7 +286,7 @@ func (tx *Tx) PrefixScan(bucket string, prefix []byte, offsetNum int, limitNum i

if idx, ok := tx.db.Index.bTree.exist(bucketId); ok {
records := idx.PrefixScan(prefix, offsetNum, limitNum)
values, err = tx.getHintIdxDataItemsWrapper(records, limitNum, bucketId)
_, values, err = tx.getHintIdxDataItemsWrapper(records, limitNum, bucketId, false, true)
if err != nil {
return nil, ErrPrefixScan
}
Expand All @@ -285,7 +313,7 @@ func (tx *Tx) PrefixSearchScan(bucket string, prefix []byte, reg string, offsetN

if idx, ok := tx.db.Index.bTree.exist(bucketId); ok {
records := idx.PrefixSearchScan(prefix, reg, offsetNum, limitNum)
values, err = tx.getHintIdxDataItemsWrapper(records, limitNum, bucketId)
_, values, err = tx.getHintIdxDataItemsWrapper(records, limitNum, bucketId, false, true)
if err != nil {
return nil, ErrPrefixSearchScan
}
Expand Down Expand Up @@ -320,27 +348,30 @@ func (tx *Tx) Delete(bucket string, key []byte) error {
return tx.put(bucket, key, nil, Persistent, DataDeleteFlag, uint64(time.Now().Unix()), DataStructureBTree)
}

// getHintIdxDataItemsWrapper returns wrapped entries when prefix scanning or range scanning.
func (tx *Tx) getHintIdxDataItemsWrapper(records []*Record, limitNum int, bucketId BucketId) (values [][]byte, err error) {
// getHintIdxDataItemsWrapper returns keys and values when prefix scanning or range scanning.
func (tx *Tx) getHintIdxDataItemsWrapper(records []*Record, limitNum int, bucketId BucketId, needKeys bool, needValues bool) (keys [][]byte, values [][]byte, err error) {
for _, record := range records {
bucket, err := tx.db.bm.GetBucketById(bucketId)
if err != nil {
return nil, err
}
if record.IsExpired() {
tx.putDeleteLog(bucket.Id, record.Key, nil, Persistent, DataDeleteFlag, uint64(time.Now().Unix()), DataStructureBTree)
tx.putDeleteLog(bucketId, record.Key, nil, Persistent, DataDeleteFlag, uint64(time.Now().Unix()), DataStructureBTree)
continue
}

if limitNum > 0 && len(values) < limitNum || limitNum == ScanNoLimit {
value, err := tx.db.getValueByRecord(record)
if err != nil {
return nil, err
if needKeys {
keys = append(keys, record.Key)
}

if needValues {
value, err := tx.db.getValueByRecord(record)
if err != nil {
return nil, nil, err
}
values = append(values, value)
}
values = append(values, value)
}
}

return values, nil
return keys, values, nil
}

func (tx *Tx) tryGet(bucket string, key []byte, solveRecord func(record *Record, found bool, bucketId BucketId) error) error {
Expand Down
83 changes: 35 additions & 48 deletions tx_btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,63 +77,50 @@ func TestTx_PutAndGet(t *testing.T) {

}

func TestTx_GetAll(t *testing.T) {
bucket := "bucket_for_scanAll"

t.Run("get_all_from_empty_db", func(t *testing.T) {

withDefaultDB(t, func(t *testing.T, db *DB) {

tx, err := db.Begin(false)
require.NoError(t, err)
defer assert.NoError(t, tx.Commit())

_, err = tx.GetAll(bucket)
assert.Error(t, err)
})
})

t.Run("get_all_from_db", func(t *testing.T) {

withDefaultDB(t, func(t *testing.T, db *DB) {

{
// setup the data
txCreateBucket(t, db, DataStructureBTree, bucket, nil)
tx, err := db.Begin(true)
require.NoError(t, err)

key0 := []byte("key_" + fmt.Sprintf("%07d", 0))
val0 := []byte("val" + fmt.Sprintf("%07d", 0))
err = tx.Put(bucket, key0, val0, Persistent)
assert.NoError(t, err)
func TestTx_GetAll_GetKeys_GetValues(t *testing.T) {
bucket := "bucket"

key1 := []byte("key_" + fmt.Sprintf("%07d", 1))
val1 := []byte("val" + fmt.Sprintf("%07d", 1))
err = tx.Put(bucket, key1, val1, Persistent)
assert.NoError(t, err)
runNutsDBTest(t, nil, func(t *testing.T, db *DB) {
txCreateBucket(t, db, DataStructureBTree, bucket, nil)

assert.NoError(t, tx.Commit())
}
txGetAll(t, db, bucket, nil, nil, nil)

{
// check the data
n := 10
keys := make([][]byte, n)
values := make([][]byte, n)
for i := 0; i < n; i++ {
keys[i] = GetTestBytes(i)
values[i] = GetRandomBytes(10)
txPut(t, db, bucket, keys[i], values[i], Persistent, nil, nil)
}

tx, err = db.Begin(false)
require.NoError(t, err)
txGetAll(t, db, bucket, keys, values, nil)

values, err := tx.GetAll(bucket)
assert.NoError(t, err)
keys = append(keys, GetTestBytes(10))
values = append(values, GetRandomBytes(10))
txPut(t, db, bucket, keys[10], values[10], Persistent, nil, nil)
txGetAll(t, db, bucket, keys, values, nil)

for i, value := range values {
wantVal := []byte("val" + fmt.Sprintf("%07d", i))
assert.Equal(t, wantVal, value)
}
txDel(t, db, bucket, keys[0], nil)
keys = keys[1:]
values = values[1:]
txGetAll(t, db, bucket, keys, values, nil)

assert.NoError(t, tx.Commit())
txPut(t, db, bucket, GetTestBytes(11), GetRandomBytes(10), 1, nil, nil)
time.Sleep(1100 * time.Millisecond)
txGetAll(t, db, bucket, keys, values, nil)

require.NoError(t, db.View(func(tx *Tx) error {
keysInBucket, err := tx.GetKeys(bucket)
require.NoError(t, err)
valuesInBucket, err := tx.GetValues(bucket)
require.NoError(t, err)
for i := 0; i < n; i++ {
require.Equal(t, keys[i], keysInBucket[i])
require.Equal(t, values[i], valuesInBucket[i])
}
})
return nil
}))
})
}

Expand Down

0 comments on commit ddf24f3

Please sign in to comment.