Skip to content

Commit

Permalink
tikvdb: Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cyilong committed Mar 28, 2024
1 parent 4c64f3d commit 33b82b0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 28 deletions.
66 changes: 40 additions & 26 deletions tikvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/tikv/client-go/v2/txnkv"
)

// value 为空的特殊标识
// value is a special identifier for null
var emptyStringTag = []byte("nilStr!")

func init() {
Expand All @@ -36,7 +36,7 @@ func NewTikvDB(name string, dir string) (*TikvDB, error) {
return NewTikvDBWithOpts(name, dir, addrs, nil)
}

func NewTikvDBWithOpts(name string, dir string, pdAddrs []string, o ...txnkv.ClientOpt) (*TikvDB, error) {
func NewTikvDBWithOpts(name string, dir string, pdAddrs []string, _ ...txnkv.ClientOpt) (*TikvDB, error) {
// Initializing the tikv client
txnClient, err := txnkv.NewClient(pdAddrs)
if err != nil {
Expand Down Expand Up @@ -77,7 +77,9 @@ func (t *TikvDB) Get(key []byte) ([]byte, error) {
if err != nil {
return nil, err
}
defer txn.Commit(context.Background())
defer func() {
err = txn.Commit(context.Background())
}()

val, err := txn.Get(context.Background(), t.getTikvKey(key))
if err != nil {
Expand All @@ -86,15 +88,17 @@ func (t *TikvDB) Get(key []byte) ([]byte, error) {
}
return nil, err
}
return checkEmptyValue(val), nil
return checkEmptyValue(val), err
}

func (t *TikvDB) Has(key []byte) (bool, error) {
txn, err := t.txn.Begin()
if err != nil {
return false, err
}
defer txn.Commit(context.Background())
defer func() {
err = txn.Commit(context.Background())
}()

_, err = txn.Get(context.Background(), t.getTikvKey(key))
if err == nil {
Expand All @@ -107,57 +111,68 @@ func (t *TikvDB) Has(key []byte) (bool, error) {
}

func (t *TikvDB) Set(key []byte, value []byte) error {
txn, err := t.txn.Begin()
if err != nil {
return err
}
defer txn.Commit(context.Background())

return txn.Set(t.getTikvKey(key), setNotEmptyValue(value))
return t.setKV(key, value)
}

func (t *TikvDB) SetSync(key []byte, value []byte) error {
return t.setKV(key, value)
}

func (t *TikvDB) setKV(key []byte, value []byte) error {
txn, err := t.txn.Begin()
if err != nil {
return err
}
defer func() {
err = txn.Commit(context.Background())
}()

err = txn.Set(t.getTikvKey(key), value)
err = txn.Commit(context.Background())
return err
}

func (t *TikvDB) Delete(key []byte) error {
txn, err := t.txn.Begin()
err = txn.Set(t.getTikvKey(key), setNotEmptyValue(value))
if err != nil {
return err
}
defer txn.Commit(context.Background())
return err
}

return txn.Delete(t.getTikvKey(key))
func (t *TikvDB) Delete(key []byte) error {
return t.deleteKey(key)
}

func (t *TikvDB) DeleteSync(key []byte) error {
return t.deleteKey(key)
}

func (t *TikvDB) deleteKey(key []byte) error {
txn, err := t.txn.Begin()
if err != nil {
return err
}
defer func() {
err = txn.Commit(context.Background())
}()

err = txn.Delete(t.getTikvKey(key))
err = txn.Commit(context.Background())
if err != nil {
return err
}
return err
}

func (t *TikvDB) Close() error {
func (t *TikvDB) Close() (err error) {
t.lock.Lock()
defer t.lock.Unlock()

txn, err := t.txn.Begin()
if err != nil {
return err
}
defer func() {
err = txn.Commit(context.Background())
}()
err = txn.Delete(t.getTikvStateKey())
err = txn.Commit(context.Background())
if err != nil {
return err
}
return t.txn.Close()
}

Expand All @@ -170,14 +185,13 @@ func (t *TikvDB) Print() error {
if err != nil {
return err
}
defer itr.Close()

for ; itr.Valid(); itr.Next() {
key := itr.Key()
value := itr.Value()
fmt.Printf("[%X]:\t[%X]\n", key, value)
}
return nil
return itr.Close()
}

func (t *TikvDB) Stats() map[string]string {
Expand Down
5 changes: 4 additions & 1 deletion tikvdb_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (b *tikvDBBatch) write(_ bool) error {
if err != nil {
return err
}
defer func() {
err = txn.Commit(context.Background())
}()

for _, keyValue := range b.writes {
if keyValue.delete {
Expand All @@ -77,7 +80,7 @@ func (b *tikvDBBatch) write(_ bool) error {
return err
}
}
return txn.Commit(context.Background())
return err
}

// Close resets the batch for reuse.
Expand Down
2 changes: 1 addition & 1 deletion tikvdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var pdAddr = []string{"127.0.0.1:2379"}
//var pdAddr = []string{"192.168.0.166:2379"}

func TestTikvDBNewTikvDB(t *testing.T) {
name := fmt.Sprintf("testname%x", randStr(12))
name := "application"
dir := fmt.Sprintf("testdir%x", randStr(12))

// Test we can't open the db twice for writing
Expand Down

0 comments on commit 33b82b0

Please sign in to comment.