diff --git a/.travis.yml b/.travis.yml index 5439132bc..825bad7ca 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,26 +1,38 @@ language: go go: - - "1.11" - - "1.12" - - "1.13" - - tip + - "1.11" + - "1.12" + - "1.13" + - tip os: - - osx - + - osx +env: + jobs: + - GOARCH=386 + - GOARCH=amd64 + global: + - secure: CRkV2+/jlO0gXzzS50XGxfMS117FNwiVjxNY/LeWq06RKD+dDCPxTJl3JCNe3l0cYEPAglV2uMMYukDiTqJ7e+HI4nh4N4mv6lwx39N8dAvJe1x5ITS2T4qk4kTjuQb1Q1vw/ZOxoQqmvNKj2uRmBdJ/HHmysbRJ1OzCWML3OXdUwJf0AYlJzTjpMfkOKr7sTtE4rwyyQtd4tKH1fGdurgI9ZuFd9qvYxK2qcJhsQ6CNqMXt+7FkVkN1rIPmofjjBTNryzUr4COFXuWH95aDAif19DeBW4lbNgo1+FpDsrgmqtuhl6NAuptI8q/imow2KXBYJ8JPXsxW8DVFj0IIp0RCd3GjaEnwBEbxAyiIHLfW7AudyTS/dJOvZffPqXnuJ8xj3OPIdNe4xY0hWl8Ju2HhKfLOAHq7VadHZWd3IHLil70EiL4/JLD1rNbMImUZisFaA8pyrcIvYYebjOnk4TscwKFLedClRSX1XsMjWWd0oykQtrdkHM2IxknnBpaLu7mFnfE07f6dkG0nlpyu4SCLey7hr5FdcEmljA0nIxTSYDg6035fQkBEAbe7hlESOekkVNT9IZPwG+lmt3vU4ofi6NqNbJecOuSB+h36IiZ9s4YQtxYNnLgW14zjuFGGyT5smc3IjBT7qngDjKIgyrSVoRkY/8udy9qbUgvBeW8= + + jobs: allow_failures: - go: tip + exclude: + # Exclude builds for 386 architecture on go 1.11, 1.12 and tip + # Since we don't want it to run for 32 bit + - go: "1.11" + env: GOARCH=386 + - go: "1.12" + env: GOARCH=386 + - go: tip + env: GOARCH=386 notifications: email: false slack: secure: X7uBLWYbuUhf8QFE16CoS5z7WvFR8EN9j6cEectMW6mKZ3vwXGwVXRIPsgUq/606DsQdCCx34MR8MRWYGlu6TBolbSe9y0EP0i46yipPz22YtuT7umcVUbGEyx8MZKgG0v1u/zA0O4aCsOBpGAA3gxz8h3JlEHDt+hv6U8xRsSllVLzLSNb5lwxDtcfEDxVVqP47GMEgjLPM28Pyt5qwjk7o5a4YSVzkfdxBXxd3gWzFUWzJ5E3cTacli50dK4GVfiLcQY2aQYoYO7AAvDnvP+TPfjDkBlUEE4MUz5CDIN51Xb+WW33sX7g+r3Bj7V5IRcF973RiYkpEh+3eoiPnyWyxhDZBYilty3b+Hysp6d4Ov/3I3ll7Bcny5+cYjakjkMH3l9w3gs6Y82GlpSLSJshKWS8vPRsxFe0Pstj6QSJXTd9EBaFr+l1ScXjJv/Sya9j8N9FfTuOTESWuaL1auX4Y7zEEVHlA8SCNOO8K0eTfxGZnC/YcIHsR8rePEAcFxfOYQppkyLF/XvAtnb/LMUuu0g4y2qNdme6Oelvyar1tFEMRtbl4mRCdu/krXBFtkrsfUaVY6WTPdvXAGotsFJ0wuA53zGVhlcd3+xAlSlR3c1QX95HIMeivJKb5L4nTjP+xnrmQNtnVk+tG4LSH2ltuwcZSSczModtcBmRefrk= -env: - global: - - secure: CRkV2+/jlO0gXzzS50XGxfMS117FNwiVjxNY/LeWq06RKD+dDCPxTJl3JCNe3l0cYEPAglV2uMMYukDiTqJ7e+HI4nh4N4mv6lwx39N8dAvJe1x5ITS2T4qk4kTjuQb1Q1vw/ZOxoQqmvNKj2uRmBdJ/HHmysbRJ1OzCWML3OXdUwJf0AYlJzTjpMfkOKr7sTtE4rwyyQtd4tKH1fGdurgI9ZuFd9qvYxK2qcJhsQ6CNqMXt+7FkVkN1rIPmofjjBTNryzUr4COFXuWH95aDAif19DeBW4lbNgo1+FpDsrgmqtuhl6NAuptI8q/imow2KXBYJ8JPXsxW8DVFj0IIp0RCd3GjaEnwBEbxAyiIHLfW7AudyTS/dJOvZffPqXnuJ8xj3OPIdNe4xY0hWl8Ju2HhKfLOAHq7VadHZWd3IHLil70EiL4/JLD1rNbMImUZisFaA8pyrcIvYYebjOnk4TscwKFLedClRSX1XsMjWWd0oykQtrdkHM2IxknnBpaLu7mFnfE07f6dkG0nlpyu4SCLey7hr5FdcEmljA0nIxTSYDg6035fQkBEAbe7hlESOekkVNT9IZPwG+lmt3vU4ofi6NqNbJecOuSB+h36IiZ9s4YQtxYNnLgW14zjuFGGyT5smc3IjBT7qngDjKIgyrSVoRkY/8udy9qbUgvBeW8= - before_script: - go get github.com/mattn/goveralls script: diff --git a/CHANGELOG.md b/CHANGELOG.md index 6408ebcdc..ae9c43fbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,31 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Serialization Versioning](VERSIONING.md). +## [2.0.2-rc1] - 2020-02-26 + +### Fixed + +- Cast sz to uint32 to fix compilation on 32 bit. (#1175) +- Fix checkOverlap in compaction. (#1166) +- Avoid sync in inmemory mode. (#1190) +- Support disabling the cache completely. (#1185) +- Add support for caching bloomfilters. (#1204) +- Fix int overflow for 32bit. (#1216) +- Remove the 'this entry should've caught' log from value.go. (#1170) +- Rework concurrency semantics of valueLog.maxFid. (#1187) + +### Performance + +- Use fastRand instead of locked-rand in skiplist. (#1173) +- Improve write stalling on level 0 and 1. (#1186) +- Disable compression and set ZSTD Compression Level to 1. (#1191) + ## [2.0.1] - 2020-01-02 ### New APIs - badger.Options - - WithInMemory (5b6321) + - WithInMemory (f5b6321) - WithZSTDCompressionLevel (3eb4e72) - Badger.TableInfo @@ -274,7 +293,7 @@ Bug fix: ## [1.0.1] - 2017-11-06 * Fix an uint16 overflow when resizing key slice -[Unreleased]: https://github.com/dgraph-io/badger/compare/v2.0.1...HEAD +[2.0.2-rc1]: https://github.com/dgraph-io/badger/compare/v2.0.1...v2.0.2-rc1 [2.0.1]: https://github.com/dgraph-io/badger/compare/v2.0.0...v2.0.1 [2.0.0]: https://github.com/dgraph-io/badger/compare/v1.6.0...v2.0.0 [1.6.0]: https://github.com/dgraph-io/badger/compare/v1.5.5...v1.6.0 diff --git a/README.md b/README.md index a3064653c..25fc2a4e9 100644 --- a/README.md +++ b/README.md @@ -252,7 +252,7 @@ on it. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")) + e := badger.NewEntry([]byte("answer"), []byte("42")) err := txn.SetEntry(e) return err }) @@ -401,7 +401,7 @@ and `Txn.SetEntry()` API methods. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) + e := badger.NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) err := txn.SetEntry(e) return err }) @@ -414,7 +414,7 @@ metadata can be set using `Entry.WithMeta()` and `Txn.SetEntry()` API methods. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) + e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) err := txn.SetEntry(e) return err }) @@ -425,7 +425,7 @@ then can be set using `Txn.SetEntry()`. ```go err := db.Update(func(txn *badger.Txn) error { - e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) + e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) err := txn.SetEntry(e) return err }) @@ -748,6 +748,7 @@ Below is a list of known projects that use Badger: * [0-stor](https://github.com/zero-os/0-stor) - Single device object store. * [Dgraph](https://github.com/dgraph-io/dgraph) - Distributed graph database. +* [Jaeger](https://github.com/jaegertracing/jaeger) - Distributed tracing platform. * [TalariaDB](https://github.com/grab/talaria) - Distributed, low latency time-series database. * [Dispatch Protocol](https://github.com/dispatchlabs/disgo) - Blockchain protocol for distributed application data analytics. * [Sandglass](https://github.com/celrenheit/sandglass) - distributed, horizontally scalable, persistent, time sorted message queue. diff --git a/backup_test.go b/backup_test.go index 16ef8ed5c..b652c89e3 100644 --- a/backup_test.go +++ b/backup_test.go @@ -116,9 +116,8 @@ func TestBackupRestore1(t *testing.T) { func TestBackupRestore2(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) s1Path := filepath.Join(tmpdir, "test1") @@ -126,9 +125,9 @@ func TestBackupRestore2(t *testing.T) { s3Path := filepath.Join(tmpdir, "test3") db1, err := Open(getTestOptions(s1Path)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db1.Close() key1 := []byte("key1") key2 := []byte("key2") rawValue := []byte("NotLongValue") @@ -139,9 +138,8 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(key2, rawValue)) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + for i := byte(1); i < N; i++ { err = db1.Update(func(tx *Txn) error { if err := tx.SetEntry(NewEntry(append(key1, i), rawValue)); err != nil { @@ -149,25 +147,21 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(append(key2, i), rawValue)) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } var backup bytes.Buffer _, err = db1.Backup(&backup, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + fmt.Println("backup1 length:", backup.Len()) db2, err := Open(getTestOptions(s2Path)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db2.Close() err = db2.Load(&backup, 16) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := byte(1); i < N; i++ { err = db2.View(func(tx *Txn) error { @@ -188,9 +182,8 @@ func TestBackupRestore2(t *testing.T) { } return nil }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } for i := byte(1); i < N; i++ { @@ -200,26 +193,22 @@ func TestBackupRestore2(t *testing.T) { } return tx.SetEntry(NewEntry(append(key2, i), rawValue)) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } backup.Reset() _, err = db2.Backup(&backup, 0) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + fmt.Println("backup2 length:", backup.Len()) db3, err := Open(getTestOptions(s3Path)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db3.Close() err = db3.Load(&backup, 16) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := byte(1); i < N; i++ { err = db3.View(func(tx *Txn) error { @@ -240,9 +229,8 @@ func TestBackupRestore2(t *testing.T) { } return nil }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } } @@ -310,9 +298,8 @@ func TestBackup(t *testing.T) { } t.Run("disk mode", func(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) opt := DefaultOptions(filepath.Join(tmpdir, "backup0")) runBadgerTest(t, &opt, func(t *testing.T, db *DB) { @@ -330,11 +317,9 @@ func TestBackup(t *testing.T) { func TestBackupRestore3(t *testing.T) { var bb bytes.Buffer - tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) N := 1000 @@ -343,10 +328,9 @@ func TestBackupRestore3(t *testing.T) { // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer db1.Close() require.NoError(t, populateEntries(db1, entries)) _, err = db1.Backup(&bb, 0) @@ -358,9 +342,9 @@ func TestBackupRestore3(t *testing.T) { // restore db2, err := Open(DefaultOptions(filepath.Join(tmpdir, "restore1"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db2.Close() require.NoError(t, db2.Load(&bb, 16)) // verify @@ -390,9 +374,8 @@ func TestBackupRestore3(t *testing.T) { func TestBackupLoadIncremental(t *testing.T) { tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(tmpdir) N := 100 @@ -403,9 +386,9 @@ func TestBackupLoadIncremental(t *testing.T) { // backup { db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db1.Close() require.NoError(t, populateEntries(db1, entries)) since, err := db1.Backup(&bb, 0) @@ -463,9 +446,10 @@ func TestBackupLoadIncremental(t *testing.T) { // restore db2, err := Open(getTestOptions(filepath.Join(tmpdir, "restore2"))) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + defer db2.Close() + require.NoError(t, db2.Load(&bb, 16)) // verify diff --git a/db.go b/db.go index 4cffe9cea..a40aa8d92 100644 --- a/db.go +++ b/db.go @@ -146,7 +146,8 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { ExpiresAt: e.ExpiresAt, } - if e.meta&bitFinTxn > 0 { + switch { + case e.meta&bitFinTxn > 0: txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil { return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value) @@ -160,7 +161,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { txn = txn[:0] lastCommit = 0 - } else if e.meta&bitTxn > 0 { + case e.meta&bitTxn > 0: txnTs := y.ParseTs(nk) if lastCommit == 0 { lastCommit = txnTs @@ -174,7 +175,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { te := txnEntry{nk: nk, v: v} txn = append(txn, te) - } else { + default: // This entry is from a rewrite. toLSM(nk, v) @@ -278,17 +279,6 @@ func Open(opt Options) (db *DB, err error) { elog = trace.NewEventLog("Badger", "DB") } - config := ristretto.Config{ - // Use 5% of cache memory for storing counters. - NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), - MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), - BufferItems: 64, - Metrics: true, - } - cache, err := ristretto.NewCache(&config) - if err != nil { - return nil, errors.Wrap(err, "failed to create cache") - } db = &DB{ imm: make([]*skl.Skiplist, 0, opt.NumMemtables), flushChan: make(chan flushTask, opt.NumMemtables), @@ -300,7 +290,20 @@ func Open(opt Options) (db *DB, err error) { valueDirGuard: valueDirLockGuard, orc: newOracle(opt), pub: newPublisher(), - blockCache: cache, + } + + if opt.MaxCacheSize > 0 { + config := ristretto.Config{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(opt.MaxCacheSize) * 0.05 * 2), + MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), + BufferItems: 64, + Metrics: true, + } + db.blockCache, err = ristretto.NewCache(&config) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache") + } } if db.opt.InMemory { @@ -386,7 +389,10 @@ func Open(opt Options) (db *DB, err error) { // CacheMetrics returns the metrics for the underlying cache. func (db *DB) CacheMetrics() *ristretto.Metrics { - return db.blockCache.Metrics + if db.blockCache != nil { + return db.blockCache.Metrics + } + return nil } // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to @@ -1050,9 +1056,10 @@ func (db *DB) calculateSize() { return err } ext := filepath.Ext(path) - if ext == ".sst" { + switch ext { + case ".sst": lsmSize += info.Size() - } else if ext == ".vlog" { + case ".vlog": vlogSize += info.Size() } return nil @@ -1209,11 +1216,12 @@ func (seq *Sequence) Release() error { func (seq *Sequence) updateLease() error { return seq.db.Update(func(txn *Txn) error { item, err := txn.Get(seq.key) - if err == ErrKeyNotFound { + switch { + case err == ErrKeyNotFound: seq.next = 0 - } else if err != nil { + case err != nil: return err - } else { + default: var num uint64 if err := item.Value(func(v []byte) error { num = binary.BigEndian.Uint64(v) @@ -1501,6 +1509,7 @@ func (db *DB) dropAll() (func(), error) { db.lc.nextFileID = 1 db.opt.Infof("Deleted %d value log files. DropAll done.\n", num) db.blockCache.Clear() + return resume, nil } diff --git a/db2_test.go b/db2_test.go index 3885f9aae..56d9ca145 100644 --- a/db2_test.go +++ b/db2_test.go @@ -23,6 +23,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "math/rand" "os" "path" @@ -310,7 +311,7 @@ func TestPushValueLogLimit(t *testing.T) { for i := 0; i < 32; i++ { if i == 4 { - v := make([]byte, 2<<30) + v := make([]byte, math.MaxInt32) err := db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte(key(i)), v)) }) @@ -627,22 +628,29 @@ func TestL0GCBug(t *testing.T) { return []byte(fmt.Sprintf("%10d", i)) } val := []byte{1, 1, 1, 1, 1, 1, 1, 1} - // Insert 100 entries. This will create about 50 vlog files and 2 SST files. - for i := 0; i < 100; i++ { - err = db1.Update(func(txn *Txn) error { - return txn.SetEntry(NewEntry(key(i), val)) - }) - require.NoError(t, err) + // Insert 100 entries. This will create about 50*3 vlog files and 6 SST files. + for i := 0; i < 3; i++ { + for j := 0; j < 100; j++ { + err = db1.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry(key(j), val)) + }) + require.NoError(t, err) + } } // Run value log GC multiple times. This would ensure at least // one value log file is garbage collected. + success := 0 for i := 0; i < 10; i++ { err := db1.RunValueLogGC(0.01) + if err == nil { + success++ + } if err != nil && err != ErrNoRewrite { t.Fatalf(err.Error()) } } - + // Ensure alteast one GC call was successful. + require.NotZero(t, success) // CheckKeys reads all the keys previously stored. checkKeys := func(db *DB) { for i := 0; i < 100; i++ { @@ -665,7 +673,12 @@ func TestL0GCBug(t *testing.T) { if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) } - require.NoError(t, db1.vlog.Close()) + for _, f := range db1.vlog.filesMap { + require.NoError(t, f.fd.Close()) + } + require.NoError(t, db1.registry.Close()) + require.NoError(t, db1.lc.close()) + require.NoError(t, db1.manifest.close()) db2, err := Open(opts) require.NoError(t, err) @@ -723,7 +736,6 @@ func TestWindowsDataLoss(t *testing.T) { opt.Truncate = true db, err = Open(opt) require.NoError(t, err) - // Return after reading one entry. We're simulating a crash. // Simulate a crash by not closing db but releasing the locks. if db.dirLockGuard != nil { @@ -735,6 +747,12 @@ func TestWindowsDataLoss(t *testing.T) { // Don't use vlog.Close here. We don't want to fix the file size. Only un-mmap // the data so that we can truncate the file durning the next vlog.Open. require.NoError(t, y.Munmap(db.vlog.filesMap[db.vlog.maxFid].fmap)) + for _, f := range db.vlog.filesMap { + require.NoError(t, f.fd.Close()) + } + require.NoError(t, db.registry.Close()) + require.NoError(t, db.manifest.close()) + require.NoError(t, db.lc.close()) fmt.Println() fmt.Println("Third DB Open") diff --git a/db_test.go b/db_test.go index 03ea090dd..59781511a 100644 --- a/db_test.go +++ b/db_test.go @@ -23,10 +23,8 @@ import ( "flag" "fmt" "io/ioutil" - "log" "math" "math/rand" - "net/http" "os" "path/filepath" "runtime" @@ -289,6 +287,13 @@ func TestGet(t *testing.T) { test(t, db) require.NoError(t, db.Close()) }) + t.Run("cache disabled", func(t *testing.T) { + opts := DefaultOptions("").WithInMemory(true).WithMaxCacheSize(0) + db, err := Open(opts) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) + }) } func TestGetAfterDelete(t *testing.T) { @@ -1161,6 +1166,9 @@ func TestExpiryImproperDBClose(t *testing.T) { // it would return Truncate Required Error. require.NoError(t, db0.vlog.Close()) + require.NoError(t, db0.registry.Close()) + require.NoError(t, db0.manifest.close()) + db1, err := Open(opt) require.NoError(t, err) err = db1.View(func(txn *Txn) error { @@ -1200,7 +1208,7 @@ func randBytes(n int) []byte { recv := make([]byte, n) in, err := rand.Read(recv) if err != nil { - log.Fatal(err) + panic(err) } return recv[:in] } @@ -1558,9 +1566,6 @@ func TestLSMOnly(t *testing.T) { opts.ValueLogMaxEntries = 100 db, err := Open(opts) require.NoError(t, err) - if err != nil { - t.Fatal(err) - } value := make([]byte, 128) _, err = rand.Read(value) @@ -1572,9 +1577,7 @@ func TestLSMOnly(t *testing.T) { db, err = Open(opts) require.NoError(t, err) - if err != nil { - t.Fatal(err) - } + defer db.Close() require.NoError(t, db.RunValueLogGC(0.2)) } @@ -1670,12 +1673,12 @@ func TestGoroutineLeak(t *testing.T) { func ExampleOpen() { dir, err := ioutil.TempDir("", "badger-test") if err != nil { - log.Fatal(err) + panic(err) } defer removeDir(dir) db, err := Open(DefaultOptions(dir)) if err != nil { - log.Fatal(err) + panic(err) } defer db.Close() @@ -1687,17 +1690,17 @@ func ExampleOpen() { }) if err != nil { - log.Fatal(err) + panic(err) } txn := db.NewTransaction(true) // Read-write txn err = txn.SetEntry(NewEntry([]byte("key"), []byte("value"))) if err != nil { - log.Fatal(err) + panic(err) } err = txn.Commit() if err != nil { - log.Fatal(err) + panic(err) } err = db.View(func(txn *Txn) error { @@ -1714,7 +1717,7 @@ func ExampleOpen() { }) if err != nil { - log.Fatal(err) + panic(err) } // Output: @@ -1725,13 +1728,13 @@ func ExampleOpen() { func ExampleTxn_NewIterator() { dir, err := ioutil.TempDir("", "badger-test") if err != nil { - log.Fatal(err) + panic(err) } defer removeDir(dir) db, err := Open(DefaultOptions(dir)) if err != nil { - log.Fatal(err) + panic(err) } defer db.Close() @@ -1749,13 +1752,13 @@ func ExampleTxn_NewIterator() { for i := 0; i < n; i++ { err := txn.SetEntry(NewEntry(bkey(i), bval(i))) if err != nil { - log.Fatal(err) + panic(err) } } err = txn.Commit() if err != nil { - log.Fatal(err) + panic(err) } opt := DefaultIteratorOptions @@ -1772,7 +1775,7 @@ func ExampleTxn_NewIterator() { return nil }) if err != nil { - log.Fatal(err) + panic(err) } fmt.Printf("Counted %d elements", count) // Output: @@ -1954,79 +1957,10 @@ func TestVerifyChecksum(t *testing.T) { } func TestMain(m *testing.M) { - // call flag.Parse() here if TestMain uses flags - go func() { - if err := http.ListenAndServe("localhost:8080", nil); err != nil { - log.Fatalf("Unable to open http port at 8080") - } - }() + flag.Parse() os.Exit(m.Run()) } -func ExampleDB_Subscribe() { - prefix := []byte{'a'} - - // This key should be printed, since it matches the prefix. - aKey := []byte("a-key") - aValue := []byte("a-value") - - // This key should not be printed. - bKey := []byte("b-key") - bValue := []byte("b-value") - - // Open the DB. - dir, err := ioutil.TempDir("", "badger-test") - if err != nil { - log.Fatal(err) - } - defer removeDir(dir) - db, err := Open(DefaultOptions(dir)) - if err != nil { - log.Fatal(err) - } - defer db.Close() - - // Create the context here so we can cancel it after sending the writes. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Use the WaitGroup to make sure we wait for the subscription to stop before continuing. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - cb := func(kvs *KVList) error { - for _, kv := range kvs.Kv { - fmt.Printf("%s is now set to %s\n", kv.Key, kv.Value) - } - return nil - } - if err := db.Subscribe(ctx, cb, prefix); err != nil && err != context.Canceled { - log.Fatal(err) - } - log.Printf("subscription closed") - }() - - // Wait for the above go routine to be scheduled. - time.Sleep(time.Second) - // Write both keys, but only one should be printed in the Output. - err = db.Update(func(txn *Txn) error { return txn.Set(aKey, aValue) }) - if err != nil { - log.Fatal(err) - } - err = db.Update(func(txn *Txn) error { return txn.Set(bKey, bValue) }) - if err != nil { - log.Fatal(err) - } - - log.Printf("stopping subscription") - cancel() - log.Printf("waiting for subscription to close") - wg.Wait() - // Output: - // a-key is now set to a-value -} - func removeDir(dir string) { if err := os.RemoveAll(dir); err != nil { panic(err) diff --git a/go.mod b/go.mod index f7f1fb0d3..eae04e485 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e + github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index 60e673a75..4c71dbdf4 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e h1:aeUNgwup7PnDOBAD1BOKAqzb/W/NksOj6r3dwKKuqfg= -github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs= +github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= +github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/level_handler.go b/level_handler.go index dbc2532ba..19ba0892b 100644 --- a/level_handler.go +++ b/level_handler.go @@ -188,7 +188,9 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool { // Need lock as we may be deleting the first table during a level 0 compaction. s.Lock() defer s.Unlock() - if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { + // Return false only if L0 is in memory and number of tables is more than number of + // ZeroTableStall. For on disk L0, we should just add the tables to the level. + if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { return false } diff --git a/levels.go b/levels.go index 0a4b92f26..41faf6e0b 100644 --- a/levels.go +++ b/levels.go @@ -44,12 +44,9 @@ type levelsController struct { kv *DB cstatus compactStatus -} - -var ( // This is for getting timings between stalls. lastUnstalled time.Time -) +} // revertToManifest checks that all necessary table files exist and removes all table files not // referenced by the manifest. idMap is a set of table file id's that were read from the directory @@ -87,12 +84,13 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { for i := 0; i < db.opt.MaxLevels; i++ { s.levels[i] = newLevelHandler(db, i) - if i == 0 { + switch i { + case 0: // Do nothing. - } else if i == 1 { + case 1: // Level 1 probably shouldn't be too much bigger than level 0. s.levels[i].maxTotalSize = db.opt.LevelOneSize - } else { + default: s.levels[i].maxTotalSize = s.levels[i-1].maxTotalSize * int64(db.opt.LevelSizeMultiplier) } s.cstatus.levels[i] = new(levelCompactStatus) @@ -363,12 +361,15 @@ func (s *levelsController) runWorker(lc *y.Closer) { // Can add a done channel or other stuff. case <-ticker.C: prios := s.pickCompactLevels() + loop: for _, p := range prios { - if err := s.doCompact(p); err == nil { - break - } else if err == errFillTables { + err := s.doCompact(p) + switch err { + case nil: + break loop + case errFillTables: // pass - } else { + default: s.kv.opt.Warningf("While running doCompact: %v\n", err) } } @@ -424,34 +425,42 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { prios = append(prios, pri) } } - sort.Slice(prios, func(i, j int) bool { - return prios[i].score > prios[j].score - }) + // We used to sort compaction priorities based on the score. But, we + // decided to compact based on the level, not the priority. So, upper + // levels (level 0, level 1, etc) always get compacted first, before the + // lower levels -- this allows us to avoid stalls. return prios } -// compactBuildTables merge topTables and botTables to form a list of new tables. +// checkOverlap checks if the given tables overlap with any level from the given "lev" onwards. +func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool { + kr := getKeyRange(tables...) + for i, lh := range s.levels { + if i < lev { // Skip upper levels. + continue + } + lh.RLock() + left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) + lh.RUnlock() + if right-left > 0 { + return true + } + } + return false +} + +// compactBuildTables merges topTables and botTables to form a list of new tables. func (s *levelsController) compactBuildTables( lev int, cd compactDef) ([]*table.Table, func() error, error) { topTables := cd.top botTables := cd.bot - var hasOverlap bool - { - kr := getKeyRange(cd.top...) - for i, lh := range s.levels { - if i <= lev { // Skip upper levels. - continue - } - lh.RLock() - left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) - lh.RUnlock() - if right-left > 0 { - hasOverlap = true - break - } - } - } + // Check overlap of the top level with the levels which are not being + // compacted in this compaction. We don't need to check overlap of the bottom + // tables with other levels because if the top tables overlap with any of the lower + // levels, it implies bottom level also overlaps because top and bottom tables + // overlap with each other. + hasOverlap := s.checkOverlap(cd.top, cd.nextLevel.level+1) // Try to collect stats so that we can inform value log about GC. That would help us find which // value log file should be GCed. @@ -470,9 +479,10 @@ func (s *levelsController) compactBuildTables( // Create iterators across all the tables involved first. var iters []y.Iterator - if lev == 0 { + switch { + case lev == 0: iters = appendIteratorsReversed(iters, topTables, false) - } else if len(topTables) > 0 { + case len(topTables) > 0: y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(false)} } @@ -561,22 +571,28 @@ func (s *levelsController) compactBuildTables( // versions which are below the minReadTs, otherwise, we might end up discarding the // only valid version for a running transaction. numVersions++ - lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 - if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) || - numVersions > s.kv.opt.NumVersionsToKeep || - lastValidVersion { + + // Keep the current version and discard all the next versions if + // - The `discardEarlierVersions` bit is set OR + // - We've already processed `NumVersionsToKeep` number of versions + // (including the current item being processed) + lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 || + numVersions == s.kv.opt.NumVersionsToKeep + + if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) || lastValidVersion { // If this version of the key is deleted or expired, skip all the rest of the // versions. Ensure that we're only removing versions below readTs. skipKey = y.SafeCopy(skipKey, it.Key()) - if lastValidVersion { + switch { + case lastValidVersion: // Add this key. We have set skipKey, so the following key versions // would be skipped. - } else if hasOverlap { + case hasOverlap: // If this key range has overlap with lower levels, then keep the deletion // marker with the latest version, discarding the rest. We have set skipKey, // so the following key versions would be skipped. - } else { + default: // If no overlap, we can skip all the versions, by continuing here. numSkips++ updateStats(vs) @@ -916,7 +932,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // Stall. Make sure all levels are healthy before we unstall. var timeStart time.Time { - s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(lastUnstalled)) + s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled)) s.cstatus.RLock() for i := 0; i < s.kv.opt.MaxLevels; i++ { s.elog.Printf("level=%d. Status=%s Size=%d\n", @@ -925,15 +941,13 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { s.cstatus.RUnlock() timeStart = time.Now() } - // Before we unstall, we need to make sure that level 0 and 1 are healthy. Otherwise, we - // will very quickly fill up level 0 again and if the compaction strategy favors level 0, - // then level 1 is going to super full. + // Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we + // will very quickly fill up level 0 again. for i := 0; ; i++ { - // Passing 0 for delSize to compactable means we're treating incomplete compactions as - // not having finished -- we wait for them to finish. Also, it's crucial this behavior - // replicates pickCompactLevels' behavior in computing compactability in order to - // guarantee progress. - if !s.isLevel0Compactable() && !s.levels[1].isCompactable(0) { + // It's crucial that this behavior replicates pickCompactLevels' behavior in + // computing compactability in order to guarantee progress. + // Break the loop once L0 has enough space to accommodate new tables. + if !s.isLevel0Compactable() { break } time.Sleep(10 * time.Millisecond) @@ -945,7 +959,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { } { s.elog.Printf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart)) - lastUnstalled = time.Now() + s.lastUnstalled = time.Now() } } diff --git a/levels_test.go b/levels_test.go new file mode 100644 index 000000000..688e3d64d --- /dev/null +++ b/levels_test.go @@ -0,0 +1,567 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package badger + +import ( + "math" + "testing" + "time" + + "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" + "github.com/dgraph-io/badger/v2/table" + "github.com/dgraph-io/badger/v2/y" + "github.com/stretchr/testify/require" +) + +// createAndOpen creates a table with the given data and adds it to the given level. +func createAndOpen(db *DB, td []keyValVersion, level int) { + opts := table.Options{ + BlockSize: db.opt.BlockSize, + BloomFalsePositive: db.opt.BloomFalsePositive, + LoadingMode: options.LoadToRAM, + ChkMode: options.NoVerification, + } + b := table.NewTableBuilder(opts) + + // Add all keys and versions to the table. + for _, item := range td { + key := y.KeyWithTs([]byte(item.key), uint64(item.version)) + val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} + b.Add(key, val, 0) + } + fd, err := y.CreateSyncedFile(table.NewFilename(db.lc.reserveFileID(), db.opt.Dir), true) + if err != nil { + panic(err) + } + + if _, err = fd.Write(b.Finish()); err != nil { + panic(err) + } + tab, err := table.OpenTable(fd, opts) + if err != nil { + panic(err) + } + if err := db.manifest.addChanges([]*pb.ManifestChange{ + newCreateChange(tab.ID(), level, 0, tab.CompressionType()), + }); err != nil { + panic(err) + } + // Add table to the given level. + db.lc.levels[level].tables = append(db.lc.levels[level].tables, tab) +} + +type keyValVersion struct { + key string + val string + version int + meta byte +} + +func TestCheckOverlap(t *testing.T) { + t.Run("overlap", func(t *testing.T) { + // This test consists of one table on level 0 and one on level 1. + // There is an overlap amongst the tables but there is no overlap + // with rest of the levels. + t.Run("same keys", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Level 0 should overlap with level 0 tables. + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 0)) + // Level 1 should overlap with level 0 tables (they have the same keys). + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) + // Level 2 and 3 should not overlap with level 0 tables. + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[1].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) + require.False(t, db.lc.checkOverlap(db.lc.levels[1].tables, 3)) + + }) + }) + t.Run("overlapping keys", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"a", "x", 1, 0}, {"b", "x", 1, 0}, {"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Level 0 should overlap with level 0 tables. + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 0)) + require.True(t, db.lc.checkOverlap(db.lc.levels[1].tables, 1)) + // Level 1 should overlap with level 0 tables, "foo" key is common. + require.True(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) + // Level 2 and 3 should not overlap with level 0 tables. + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) + }) + }) + }) + t.Run("non-overlapping", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"a", "x", 1, 0}, {"b", "x", 1, 0}, {"c", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Level 1 should not overlap with level 0 tables + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 1)) + // Level 2 and 3 should not overlap with level 0 tables. + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 2)) + require.False(t, db.lc.checkOverlap(db.lc.levels[0].tables, 3)) + }) + }) +} + +func getAllAndCheck(t *testing.T, db *DB, expected []keyValVersion) { + db.View(func(txn *Txn) error { + opt := DefaultIteratorOptions + opt.AllVersions = true + opt.InternalAccess = true + it := txn.NewIterator(opt) + defer it.Close() + i := 0 + for it.Rewind(); it.Valid(); it.Next() { + require.Less(t, i, len(expected), "DB has more number of key than expected") + item := it.Item() + v, err := item.ValueCopy(nil) + require.NoError(t, err) + // fmt.Printf("k: %s v: %d val: %s\n", item.key, item.Version(), v) + expect := expected[i] + require.Equal(t, expect.key, string(item.Key()), "expected key: %s actual key: %s", + expect.key, item.Key()) + require.Equal(t, expect.val, string(v), "key: %s expected value: %s actual %s", + item.key, expect.val, v) + require.Equal(t, expect.version, int(item.Version()), + "key: %s expected version: %d actual %d", item.key, expect.version, item.Version()) + require.Equal(t, expect.meta, item.meta, + "key: %s expected meta: %d meta %d", item.key, expect.meta, item.meta) + i++ + } + require.Equal(t, len(expected), i, "keys examined should be equal to keys expected") + return nil + }) + +} + +func TestCompaction(t *testing.T) { + // Disable compactions and keep single version of each key. + opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) + opt.managedTxns = true + t.Run("level 0 to level 1", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} + l01 := []keyValVersion{{"foo", "bar", 2, 0}} + l1 := []keyValVersion{{"foo", "bar", 1, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, + {"foo", "bar", 1, 0}, {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo version 2 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) + }) + }) + + t.Run("level 0 to level 1 with lower overlap", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} + l01 := []keyValVersion{{"foo", "bar", 2, 0}} + l1 := []keyValVersion{{"foo", "bar", 1, 0}} + l2 := []keyValVersion{{"foo", "bar", 0, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + // Level 2 has table l2. + createAndOpen(db, l2, 2) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"foo", "bar", 1, 0}, + {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo version 2 and version 1 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, + }) + }) + }) + + t.Run("level 1 to level 2", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l1 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}} + l2 := []keyValVersion{{"foo", "bar", 2, 0}} + createAndOpen(db, l1, 1) + createAndOpen(db, l2, 2) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"fooz", "baz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[1], + nextLevel: db.lc.levels[2], + top: db.lc.levels[1].tables, + bot: db.lc.levels[2].tables, + } + require.NoError(t, db.lc.runCompactDef(1, cdef)) + // foo version 2 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) + }) + }) +} + +func TestHeadKeyCleanup(t *testing.T) { + // Disable compactions and keep single version of each key. + opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) + opt.managedTxns = true + + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{ + {string(head), "foo", 5, 0}, {string(head), "bar", 4, 0}, {string(head), "baz", 3, 0}, + } + l1 := []keyValVersion{{string(head), "fooz", 2, 0}, {string(head), "foozbaz", 1, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set a high discard timestamp so that all the keys are below the discard timestamp. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {string(head), "foo", 5, 0}, {string(head), "bar", 4, 0}, {string(head), "baz", 3, 0}, + {string(head), "fooz", 2, 0}, {string(head), "foozbaz", 1, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo version 2 should be dropped after compaction. + getAllAndCheck(t, db, []keyValVersion{{string(head), "foo", 5, 0}}) + }) +} + +func TestDiscardTs(t *testing.T) { + // Disable compactions and keep single version of each key. + opt := DefaultOptions("").WithNumCompactors(0).WithNumVersionsToKeep(1) + opt.managedTxns = true + + t.Run("all keys above discardTs", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}} + l01 := []keyValVersion{{"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set dicardTs to 1. All the keys are above discardTs. + db.SetDiscardTs(1) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // No keys should be dropped. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, + }) + }) + }) + t.Run("some keys above discardTs", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 2, 0}, + } + l1 := []keyValVersion{{"foo", "bbb", 1, 0}} + createAndOpen(db, l0, 0) + createAndOpen(db, l1, 1) + + // Set dicardTs to 3. foo2 and foo1 should be dropped. + db.SetDiscardTs(3) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, + {"foo", "bbb", 1, 0}, {"fooz", "baz", 2, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // foo1 and foo2 should be dropped. + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"fooz", "baz", 2, 0}, + }) + }) + }) + t.Run("all keys below discardTs", func(t *testing.T) { + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}} + l01 := []keyValVersion{{"foo", "bar", 3, 0}} + l1 := []keyValVersion{{"foo", "bar", 2, 0}} + // Level 0 has table l0 and l01. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + // Level 1 has table l1. + createAndOpen(db, l1, 1) + + // Set dicardTs to 10. All the keys are below discardTs. + db.SetDiscardTs(10) + + getAllAndCheck(t, db, []keyValVersion{ + {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, + {"foo", "bar", 2, 0}, {"fooz", "baz", 3, 0}, + }) + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + // Only one version of every key should be left. + getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}}) + }) + }) +} + +// This is a test to ensure that the first entry with DiscardEarlierversion bit < DiscardTs +// is kept around (when numversionstokeep is infinite). +func TestDiscardFirstVersion(t *testing.T) { + opt := DefaultOptions("") + opt.NumCompactors = 0 + opt.NumVersionsToKeep = math.MaxInt32 + opt.managedTxns = true + + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + l0 := []keyValVersion{{"foo", "bar", 1, 0}} + l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}} + l02 := []keyValVersion{{"foo", "bar", 3, 0}} + l03 := []keyValVersion{{"foo", "bar", 4, 0}} + l04 := []keyValVersion{{"foo", "bar", 9, 0}} + l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}} + + // Level 0 has all the tables. + createAndOpen(db, l0, 0) + createAndOpen(db, l01, 0) + createAndOpen(db, l02, 0) + createAndOpen(db, l03, 0) + createAndOpen(db, l04, 0) + createAndOpen(db, l05, 0) + + // Discard Time stamp is set to 7. + db.SetDiscardTs(7) + + // Compact L0 to L1 + cdef := compactDef{ + thisLevel: db.lc.levels[0], + nextLevel: db.lc.levels[1], + top: db.lc.levels[0].tables, + bot: db.lc.levels[1].tables, + } + require.NoError(t, db.lc.runCompactDef(0, cdef)) + + // - Version 10, 9 lie above version 7 so they should be there. + // - Version 4, 3, 2 lie below the discardTs but they don't have the + // "bitDiscardEarlierVersions" versions set so they should not be removed because number + // of versions to keep is set to infinite. + // - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions" + // marker so IT WILL BE REMOVED. + ExpectedKeys := []keyValVersion{ + {"foo", "bar", 10, bitDiscardEarlierVersions}, + {"foo", "bar", 9, 0}, + {"foo", "bar", 4, 0}, + {"foo", "bar", 3, 0}, + {"foo", "bar", 2, bitDiscardEarlierVersions}} + + getAllAndCheck(t, db, ExpectedKeys) + }) +} + +// This test ensures we don't stall when L1's size is greater than opt.LevelOneSize. +// We should stall only when L0 tables more than the opt.NumLevelZeroTableStall. +func TestL1Stall(t *testing.T) { + opt := DefaultOptions("") + // Disable all compactions. + opt.NumCompactors = 0 + // Number of level zero tables. + opt.NumLevelZeroTables = 3 + // Addition of new tables will stall if there are 4 or more L0 tables. + opt.NumLevelZeroTablesStall = 4 + // Level 1 size is 10 bytes. + opt.LevelOneSize = 10 + + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + // Level 0 has 4 tables. + db.lc.levels[0].Lock() + db.lc.levels[0].tables = []*table.Table{createEmptyTable(db), createEmptyTable(db), + createEmptyTable(db), createEmptyTable(db)} + db.lc.levels[0].Unlock() + + timeout := time.After(5 * time.Second) + done := make(chan bool) + + // This is important. Set level 1 size more than the opt.LevelOneSize (we've set it to 10). + db.lc.levels[1].totalSize = 100 + go func() { + tab := createEmptyTable(db) + require.NoError(t, db.lc.addLevel0Table(tab)) + tab.DecrRef() + done <- true + }() + time.Sleep(time.Second) + + db.lc.levels[0].Lock() + // Drop two tables from Level 0 so that addLevel0Table can make progress. Earlier table + // count was 4 which is equal to L0 stall count. + toDrop := db.lc.levels[0].tables[:2] + decrRefs(toDrop) + db.lc.levels[0].tables = db.lc.levels[0].tables[2:] + db.lc.levels[0].Unlock() + + select { + case <-timeout: + t.Fatal("Test didn't finish in time") + case <-done: + } + }) +} + +func createEmptyTable(db *DB) *table.Table { + opts := table.Options{ + BloomFalsePositive: db.opt.BloomFalsePositive, + LoadingMode: options.LoadToRAM, + ChkMode: options.NoVerification, + } + b := table.NewTableBuilder(opts) + // Add one key so that we can open this table. + b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0) + + // Open table in memory to avoid adding changes to manifest file. + tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts) + if err != nil { + panic(err) + } + + return tab +} + +func TestL0Stall(t *testing.T) { + test := func(t *testing.T, opt *Options) { + runBadgerTest(t, opt, func(t *testing.T, db *DB) { + db.lc.levels[0].Lock() + // Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level + // zero and all new additions are expected to stall if L0 is in memory. + for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ { + db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db)) + } + db.lc.levels[0].Unlock() + + timeout := time.After(5 * time.Second) + done := make(chan bool) + + go func() { + tab := createEmptyTable(db) + require.NoError(t, db.lc.addLevel0Table(tab)) + tab.DecrRef() + done <- true + }() + // Let it stall for a second. + time.Sleep(time.Second) + + select { + case <-timeout: + if opt.KeepL0InMemory { + t.Log("Timeout triggered") + // Mark this test as successful since L0 is in memory and the + // addition of new table to L0 is supposed to stall. + } else { + t.Fatal("Test didn't finish in time") + } + case <-done: + // The test completed before 5 second timeout. Mark it as successful. + } + }) + } + + opt := DefaultOptions("") + opt.EventLogging = false + // Disable all compactions. + opt.NumCompactors = 0 + // Number of level zero tables. + opt.NumLevelZeroTables = 3 + // Addition of new tables will stall if there are 4 or more L0 tables. + opt.NumLevelZeroTablesStall = 4 + + t.Run("with KeepL0InMemory", func(t *testing.T) { + opt.KeepL0InMemory = true + test(t, &opt) + }) + t.Run("with L0 on disk", func(t *testing.T) { + opt.KeepL0InMemory = false + test(t, &opt) + }) +} diff --git a/manifest_test.go b/manifest_test.go index c9a51fc60..5062b3f1b 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -168,6 +168,7 @@ func TestOverlappingKeyRangeError(t *testing.T) { defer removeDir(dir) kv, err := Open(DefaultOptions(dir)) require.NoError(t, err) + defer kv.Close() lh0 := newLevelHandler(kv, 0) lh1 := newLevelHandler(kv, 1) diff --git a/options.go b/options.go index 4374fc39d..4fbe09199 100644 --- a/options.go +++ b/options.go @@ -21,7 +21,6 @@ import ( "github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/table" - "github.com/dgraph-io/badger/v2/y" ) // Note: If you add a new option X make sure you also add a WithX method on Options. @@ -102,11 +101,6 @@ type Options struct { // DefaultOptions sets a list of recommended options for good performance. // Feel free to modify these to suit your needs with the WithX methods. func DefaultOptions(path string) Options { - defaultCompression := options.ZSTD - // Use snappy as default compression algorithm if badger is built without CGO. - if !y.CgoEnabled { - defaultCompression = options.Snappy - } return Options{ Dir: path, ValueDir: path, @@ -129,16 +123,19 @@ func DefaultOptions(path string) Options { CompactL0OnClose: true, KeepL0InMemory: true, VerifyValueChecksum: false, - Compression: defaultCompression, + Compression: options.None, MaxCacheSize: 1 << 30, // 1 GB - // Benchmarking compression level against performance showed that level 15 gives - // the best speed vs ratio tradeoff. - // For a data size of 4KB we get - // Level: 3 Ratio: 2.72 Time: 24112 n/s - // Level: 10 Ratio: 2.95 Time: 75655 n/s - // Level: 15 Ratio: 4.38 Time: 239042 n/s - // See https://github.com/dgraph-io/badger/pull/1111#issue-338120757 - ZSTDCompressionLevel: 15, + // The following benchmarks were done on a 4 KB block size (default block size). The + // compression is ratio supposed to increase with increasing compression level but since the + // input for compression algorithm is small (4 KB), we don't get significant benefit at + // level 3. + // no_compression-16 10 502848865 ns/op 165.46 MB/s - + // zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93 + // zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72 + // zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38 + // Benchmark code can be found in table/builder_test.go file + ZSTDCompressionLevel: 1, + // Nothing to read/write value log using standard File I/O // MemoryMap to mmap() the value log files // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32. @@ -539,7 +536,8 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat // WithMaxCacheSize returns a new Options value with MaxCacheSize set to the given value. // // This value specifies how much data cache should hold in memory. A small size of cache means lower -// memory consumption and lookups/iterations would take longer. +// memory consumption and lookups/iterations would take longer. Setting size to zero disables the +// cache altogether. func (opt Options) WithMaxCacheSize(size int64) Options { opt.MaxCacheSize = size return opt @@ -560,7 +558,18 @@ func (opt Options) WithInMemory(b bool) Options { // The ZSTD compression algorithm supports 20 compression levels. The higher the compression // level, the better is the compression ratio but lower is the performance. Lower levels // have better performance and higher levels have better compression ratios. -// The default value of ZSTDCompressionLevel is 15. +// We recommend using level 1 ZSTD Compression Level. Any level higher than 1 seems to +// deteriorate badger's performance. +// The following benchmarks were done on a 4 KB block size (default block size). The compression is +// ratio supposed to increase with increasing compression level but since the input for compression +// algorithm is small (4 KB), we don't get significant benefit at level 3. It is advised to write +// your own benchmarks before choosing a compression algorithm or level. +// +// no_compression-16 10 502848865 ns/op 165.46 MB/s - +// zstd_compression/level_1-16 7 739037966 ns/op 112.58 MB/s 2.93 +// zstd_compression/level_3-16 7 756950250 ns/op 109.91 MB/s 2.72 +// zstd_compression/level_15-16 1 11135686219 ns/op 7.47 MB/s 4.38 +// Benchmark code can be found in table/builder_test.go file func (opt Options) WithZSTDCompressionLevel(cLevel int) Options { opt.ZSTDCompressionLevel = cLevel return opt diff --git a/skl/skl.go b/skl/skl.go index cdfc599be..43694f14b 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -34,11 +34,11 @@ package skl import ( "math" - "math/rand" "sync/atomic" "unsafe" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -165,9 +165,9 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { // return n != nil && y.CompareKeys(key, n.key) > 0 //} -func randomHeight() int { +func (s *Skiplist) randomHeight() int { h := 1 - for h < maxHeight && rand.Uint32() <= heightIncrease { + for h < maxHeight && z.FastRand() <= heightIncrease { h++ } return h @@ -300,7 +300,7 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) { } // We do need to create a new node. - height := randomHeight() + height := s.randomHeight() x := newNode(s.arena, key, v, height) // Try to increase s.height via CAS. diff --git a/skl/skl_test.go b/skl/skl_test.go index 6bd075862..0be7a64e4 100644 --- a/skl/skl_test.go +++ b/skl/skl_test.go @@ -499,7 +499,7 @@ func BenchmarkReadWriteMap(b *testing.B) { b.RunParallel(func(pb *testing.PB) { rng := rand.New(rand.NewSource(time.Now().UnixNano())) for pb.Next() { - if rand.Float32() < readFrac { + if rng.Float32() < readFrac { mutex.RLock() _, ok := m[string(randomKey(rng))] mutex.RUnlock() @@ -516,3 +516,16 @@ func BenchmarkReadWriteMap(b *testing.B) { }) } } + +func BenchmarkWrite(b *testing.B) { + value := newValue(123) + l := NewSkiplist(int64((b.N + 1) * MaxNodeSize)) + defer l.DecrRef() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + l.Put(randomKey(rng), y.ValueStruct{Value: value, Meta: 0, UserMeta: 0}) + } + }) +} diff --git a/table/builder_test.go b/table/builder_test.go index 3af2ce358..76296562e 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -57,7 +57,7 @@ func TestTableIndex(t *testing.T) { keysCount := 10000 for _, opt := range opts { builder := NewTableBuilder(opt) - filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Int63()) + filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) require.NoError(t, err) @@ -80,11 +80,11 @@ func TestTableIndex(t *testing.T) { require.NoError(t, err, "unable to write to file") tbl, err := OpenTable(f, opt) + require.NoError(t, err, "unable to open table") if opt.DataKey == nil { // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } - require.NoError(t, err, "unable to open table") // Ensure index is built correctly require.Equal(t, blockCount, len(tbl.blockIndex)) @@ -124,14 +124,42 @@ func BenchmarkBuilder(b *testing.B) { vs := y.ValueStruct{Value: []byte(val)} keysCount := 1300000 // This number of entries consumes ~64MB of memory. - for i := 0; i < b.N; i++ { - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} - builder := NewTableBuilder(opts) - for i := 0; i < keysCount; i++ { - builder.Add(key(i), vs, 0) - } + bench := func(b *testing.B, opt *Options) { + // KeyCount * (keySize + ValSize) + b.SetBytes(int64(keysCount) * (32 + 32)) + for i := 0; i < b.N; i++ { + opt.BlockSize = 4 * 1024 + opt.BloomFalsePositive = 0.01 + builder := NewTableBuilder(*opt) + + for i := 0; i < keysCount; i++ { + builder.Add(key(i), vs, 0) + } - _ = builder.Finish() + _ = builder.Finish() + } } + + b.Run("no compression", func(b *testing.B) { + var opt Options + opt.Compression = options.None + bench(b, &opt) + }) + b.Run("zstd compression", func(b *testing.B) { + var opt Options + opt.Compression = options.ZSTD + b.Run("level 1", func(b *testing.B) { + opt.ZSTDCompressionLevel = 1 + bench(b, &opt) + }) + b.Run("level 3", func(b *testing.B) { + opt.ZSTDCompressionLevel = 3 + bench(b, &opt) + }) + b.Run("level 15", func(b *testing.B) { + opt.ZSTDCompressionLevel = 15 + bench(b, &opt) + }) + }) } diff --git a/table/merge_iterator.go b/table/merge_iterator.go index e93edbbb9..e1809e027 100644 --- a/table/merge_iterator.go +++ b/table/merge_iterator.go @@ -55,17 +55,18 @@ func (n *node) setIterator(iter y.Iterator) { } func (n *node) setKey() { - if n.merge != nil { + switch { + case n.merge != nil: n.valid = n.merge.small.valid if n.valid { n.key = n.merge.small.key } - } else if n.concat != nil { + case n.concat != nil: n.valid = n.concat.Valid() if n.valid { n.key = n.concat.Key() } - } else { + default: n.valid = n.iter.Valid() if n.valid { n.key = n.iter.Key() @@ -74,11 +75,12 @@ func (n *node) setKey() { } func (n *node) next() { - if n.merge != nil { + switch { + case n.merge != nil: n.merge.Next() - } else if n.concat != nil { + case n.concat != nil: n.concat.Next() - } else { + default: n.iter.Next() } n.setKey() @@ -103,22 +105,22 @@ func (mi *MergeIterator) fix() { return } cmp := y.CompareKeys(mi.small.key, mi.bigger().key) - // Both the keys are equal. - if cmp == 0 { + switch { + case cmp == 0: // Both the keys are equal. // In case of same keys, move the right iterator ahead. mi.right.next() if &mi.right == mi.small { mi.swapSmall() } return - } else if cmp < 0 { // Small is less than bigger(). + case cmp < 0: // Small is less than bigger(). if mi.reverse { mi.swapSmall() } else { // we don't need to do anything. Small already points to the smallest. } return - } else { // bigger() is less than small. + default: // bigger() is less than small. if mi.reverse { // Do nothing since we're iterating in reverse. Small currently points to // the bigger key and that's okay in reverse iteration. @@ -206,11 +208,12 @@ func (mi *MergeIterator) Close() error { // NewMergeIterator creates a merge iterator. func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator { - if len(iters) == 0 { + switch len(iters) { + case 0: return nil - } else if len(iters) == 1 { + case 1: return iters[0] - } else if len(iters) == 2 { + case 2: mi := &MergeIterator{ reverse: reverse, } diff --git a/table/table.go b/table/table.go index d68169384..25227be34 100644 --- a/table/table.go +++ b/table/table.go @@ -18,6 +18,7 @@ package table import ( "crypto/aes" + "encoding/binary" "fmt" "io" "math" @@ -81,7 +82,7 @@ type TableInterface interface { DoesNotHave(hash uint64) bool } -// Table represents a loaded table file with the info we have about it +// Table represents a loaded table file with the info we have about it. type Table struct { sync.Mutex @@ -97,10 +98,11 @@ type Table struct { smallest, biggest []byte // Smallest and largest keys (with timestamps). id uint64 // file id, part of filename - bf *z.Bloom Checksum []byte // Stores the total size of key-values stored in this table (including the size on vlog). estimatedSize uint64 + indexStart int + indexLen int IsInmemory bool // Set to true if the table is on level 0 and opened in memory. opt *Options @@ -146,6 +148,13 @@ func (t *Table) DecrRef() error { if err := os.Remove(filename); err != nil { return err } + // Delete all blocks from the cache. + for i := range t.blockIndex { + t.opt.Cache.Del(t.blockCacheKey(i)) + } + // Delete bloom filter from the cache. + t.opt.Cache.Del(t.bfCacheKey()) + } return nil } @@ -232,6 +241,7 @@ func OpenTable(fd *os.File, opts Options) (*Table, error) { if err := t.initBiggestAndSmallest(); err != nil { return nil, errors.Wrapf(err, "failed to initialize table") } + if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead { if err := t.VerifyChecksum(); err != nil { _ = fd.Close() @@ -320,6 +330,9 @@ func (t *Table) readIndex() error { readPos -= 4 buf := t.readNoFail(readPos, 4) checksumLen := int(y.BytesToU32(buf)) + if checksumLen < 0 { + return errors.New("checksum length less than zero. Data corrupted") + } // Read checksum. expectedChk := &pb.Checksum{} @@ -332,10 +345,12 @@ func (t *Table) readIndex() error { // Read index size from the footer. readPos -= 4 buf = t.readNoFail(readPos, 4) - indexLen := int(y.BytesToU32(buf)) + t.indexLen = int(y.BytesToU32(buf)) + // Read index. - readPos -= indexLen - data := t.readNoFail(readPos, indexLen) + readPos -= t.indexLen + t.indexStart = readPos + data := t.readNoFail(readPos, t.indexLen) if err := y.VerifyChecksum(data, expectedChk); err != nil { return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) @@ -354,8 +369,18 @@ func (t *Table) readIndex() error { y.Check(err) t.estimatedSize = index.EstimatedSize - t.bf = z.JSONUnmarshal(index.BloomFilter) t.blockIndex = index.Offsets + + // Avoid the cost of unmarshalling the bloom filters if the cache is absent. + if t.opt.Cache != nil { + var bf *z.Bloom + if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil { + return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex", + t.id) + } + + t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter))) + } return nil } @@ -436,10 +461,25 @@ func (t *Table) block(idx int) (*block, error) { return blk, nil } -func (t *Table) blockCacheKey(idx int) uint64 { - y.AssertTrue(t.ID() < math.MaxUint32) +// bfCacheKey returns the cache key for bloom filter. +func (t *Table) bfCacheKey() []byte { + y.AssertTrue(t.id < math.MaxUint32) + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, uint32(t.id)) + + // Without the "bf" prefix, we will have conflict with the blockCacheKey. + return append([]byte("bf"), buf...) +} + +func (t *Table) blockCacheKey(idx int) []byte { + y.AssertTrue(t.id < math.MaxUint32) y.AssertTrue(uint32(idx) < math.MaxUint32) - return (t.ID() << 32) | uint64(idx) + + buf := make([]byte, 8) + // Assume t.ID does not overflow uint32. + binary.BigEndian.PutUint32(buf[:4], uint32(t.ID())) + binary.BigEndian.PutUint32(buf[4:], uint32(idx)) + return buf } // EstimatedSize returns the total size of key-values stored in this table (including the @@ -463,7 +503,44 @@ func (t *Table) ID() uint64 { return t.id } // DoesNotHave returns true if (but not "only if") the table does not have the key hash. // It does a bloom filter lookup. -func (t *Table) DoesNotHave(hash uint64) bool { return !t.bf.Has(hash) } +func (t *Table) DoesNotHave(hash uint64) bool { + var bf *z.Bloom + + // Return fast if cache is absent. + if t.opt.Cache == nil { + bf, _ := t.readBloomFilter() + return !bf.Has(hash) + } + + // Check if the bloomfilter exists in the cache. + if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok { + bf = b.(*z.Bloom) + return !bf.Has(hash) + } + + bf, sz := t.readBloomFilter() + t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz)) + return !bf.Has(hash) +} + +// readBloomFilter reads the bloom filter from the SST and returns its length +// along with the bloom filter. +func (t *Table) readBloomFilter() (*z.Bloom, int) { + // Read bloom filter from the SST. + data := t.readNoFail(t.indexStart, t.indexLen) + index := pb.TableIndex{} + var err error + // Decrypt the table index if it is encrypted. + if t.shouldDecrypt() { + data, err = t.decrypt(data) + y.Check(err) + } + y.Check(proto.Unmarshal(data, &index)) + + bf, err := z.JSONUnmarshal(index.BloomFilter) + y.Check(err) + return bf, len(index.BloomFilter) +} // VerifyChecksum verifies checksum for all blocks of table. This function is called by // OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). diff --git a/table/table_test.go b/table/table_test.go index 82bddf591..27a4f1d16 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -77,13 +77,9 @@ func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. - filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) + filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Uint32()) f, err := y.CreateSyncedFile(filename, true) - if t != nil { - require.NoError(t, err) - } else { - y.Check(err) - } + require.NoError(t, err) sort.Slice(keyValues, func(i, j int) bool { return keyValues[i][0] < keyValues[j][0] @@ -743,7 +739,10 @@ func TestTableChecksum(t *testing.T) { f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") - f.WriteAt(rb, rand.Int63n(fi.Size())) + // Write random bytes at random location. + n, err := f.WriteAt(rb, rand.Int63n(fi.Size())) + require.NoError(t, err) + require.Equal(t, n, len(rb)) _, err = OpenTable(f, opts) if err == nil || !strings.Contains(err.Error(), "checksum") { diff --git a/test.sh b/test.sh index 90d21889c..b4e40601a 100755 --- a/test.sh +++ b/test.sh @@ -4,30 +4,43 @@ set -e go version +packages=$(go list ./... | grep github.com/dgraph-io/badger/v2/) + +if [[ ! -z "$TEAMCITY_VERSION" ]]; then + export GOFLAGS="-json" +fi + # Ensure that we can compile the binary. pushd badger go build -v . popd # Run the memory intensive tests first. -go test -v --manual=true -run='TestBigKeyValuePairs$' -go test -v --manual=true -run='TestPushValueLogLimit' +go test -v -run='TestBigKeyValuePairs$' --manual=true +go test -v -run='TestPushValueLogLimit' --manual=true # Run the special Truncate test. rm -rf p -go test -v --manual=true -run='TestTruncateVlogNoClose$' . +go test -v -run='TestTruncateVlogNoClose$' --manual=true truncate --size=4096 p/000000.vlog -go test -v --manual=true -run='TestTruncateVlogNoClose2$' . -go test -v --manual=true -run='TestTruncateVlogNoClose3$' . +go test -v -run='TestTruncateVlogNoClose2$' --manual=true +go test -v -run='TestTruncateVlogNoClose3$' --manual=true rm -rf p # Then the normal tests. +echo +echo "==> Starting test for table, skl and y package" +go test -v -race github.com/dgraph-io/badger/v2/skl +# Run test for all package except the top level package. The top level package support the +# `vlog_mmap` flag which rest of the packages don't support. +go test -v -race $packages + echo echo "==> Starting tests with value log mmapped..." -sleep 5 -go test -v --vlog_mmap=true -race ./... +# Run top level package tests with mmap flag. +go test -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=true echo echo "==> Starting tests with value log not mmapped..." -sleep 5 -go test -v --vlog_mmap=false -race ./... +go test -v -race github.com/dgraph-io/badger/v2 --vlog_mmap=false + diff --git a/txn_test.go b/txn_test.go index 8450da910..647bd9806 100644 --- a/txn_test.go +++ b/txn_test.go @@ -837,9 +837,8 @@ func TestManagedDB(t *testing.T) { func TestArmV7Issue311Fix(t *testing.T) { dir, err := ioutil.TempDir("", "") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer removeDir(dir) db, err := Open(DefaultOptions(dir). @@ -848,31 +847,21 @@ func TestArmV7Issue311Fix(t *testing.T) { WithLevelOneSize(8 << 20). WithMaxTableSize(2 << 20). WithSyncWrites(false)) - if err != nil { - t.Fatalf("cannot open db at location %s: %v", dir, err) - } + + require.NoError(t, err) err = db.View(func(txn *Txn) error { return nil }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte{0x11}, []byte{0x22})) }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = db.Update(func(txn *Txn) error { return txn.SetEntry(NewEntry([]byte{0x11}, []byte{0x22})) }) - if err != nil { - t.Fatal(err) - } - - if err = db.Close(); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + require.NoError(t, db.Close()) } diff --git a/value.go b/value.go index cdf575a29..b0d94ad86 100644 --- a/value.go +++ b/value.go @@ -197,7 +197,7 @@ func (lf *logFile) encryptionEnabled() bool { } func (lf *logFile) munmap() (err error) { - if lf.loadingMode != options.MemoryMap { + if lf.loadingMode != options.MemoryMap || len(lf.fmap) == 0 { // Nothing to do return nil } @@ -436,15 +436,18 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, var lastCommit uint64 var validEndOffset uint32 = offset + +loop: for { e, err := read.Entry(reader) - if err == io.EOF { - break - } else if err == io.ErrUnexpectedEOF || err == errTruncate { - break - } else if err != nil { + switch { + case err == io.EOF: + break loop + case err == io.ErrUnexpectedEOF || err == errTruncate: + break loop + case err != nil: return 0, err - } else if e == nil { + case e == nil: continue } @@ -455,29 +458,30 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, vp.Offset = e.offset vp.Fid = lf.fid - if e.meta&bitTxn > 0 { + switch { + case e.meta&bitTxn > 0: txnTs := y.ParseTs(e.Key) if lastCommit == 0 { lastCommit = txnTs } if lastCommit != txnTs { - break + break loop } - } else if e.meta&bitFinTxn > 0 { + case e.meta&bitFinTxn > 0: txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) if err != nil || lastCommit != txnTs { - break + break loop } // Got the end of txn. Now we can store them. lastCommit = 0 validEndOffset = read.recordOffset - } else { + default: if lastCommit != 0 { // This is most likely an entry which was moved as part of GC. // We shouldn't get this entry in the middle of a transaction. - break + break loop } validEndOffset = read.recordOffset } @@ -493,7 +497,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, } func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { - maxFid := atomic.LoadUint32(&vlog.maxFid) + vlog.filesLock.RLock() + maxFid := vlog.maxFid + vlog.filesLock.RUnlock() y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid) tr.LazyPrintf("Rewriting fid: %d", f.fid) @@ -523,12 +529,19 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { var vp valuePointer vp.Decode(vs.Value) + // If the entry found from the LSM Tree points to a newer vlog file, don't do anything. if vp.Fid > f.fid { return nil } + // If the entry found from the LSM Tree points to an offset greater than the one + // read from vlog, don't do anything. if vp.Offset > e.offset { return nil } + // If the entry read from LSM Tree and vlog file point to the same vlog file and offset, + // insert them back into the DB. + // NOTE: It might be possible that the entry read from the LSM Tree points to + // an older vlog file. See the comments in the else part. if vp.Fid == f.fid && vp.Offset == e.offset { moved++ // This new entry only contains the key, and a pointer to the value. @@ -564,7 +577,46 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { wb = append(wb, ne) size += es } else { - vlog.db.opt.Warningf("This entry should have been caught. %+v\n", e) + // It might be possible that the entry read from LSM Tree points to an older vlog file. + // This can happen in the following situation. Assume DB is opened with + // numberOfVersionsToKeep=1 + // + // Now, if we have ONLY one key in the system "FOO" which has been updated 3 times and + // the same key has been garbage collected 3 times, we'll have 3 versions of the movekey + // for the same key "FOO". + // NOTE: moveKeyi is the moveKey with version i + // Assume we have 3 move keys in L0. + // - moveKey1 (points to vlog file 10), + // - moveKey2 (points to vlog file 14) and + // - moveKey3 (points to vlog file 15). + + // Also, assume there is another move key "moveKey1" (points to vlog file 6) (this is + // also a move Key for key "FOO" ) on upper levels (let's say 3). The move key + // "moveKey1" on level 0 was inserted because vlog file 6 was GCed. + // + // Here's what the arrangement looks like + // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15) + // L1 => .... + // L2 => .... + // L3 => (moveKey1 => vlog6) + // + // When L0 compaction runs, it keeps only moveKey3 because the number of versions + // to keep is set to 1. (we've dropped moveKey1's latest version) + // + // The new arrangement of keys is + // L0 => .... + // L1 => (moveKey3 => vlog15) + // L2 => .... + // L3 => (moveKey1 => vlog6) + // + // Now if we try to GC vlog file 10, the entry read from vlog file will point to vlog10 + // but the entry read from LSM Tree will point to vlog6. The move key read from LSM tree + // will point to vlog6 because we've asked for version 1 of the move key. + // + // This might seem like an issue but it's not really an issue because the user has set + // the number of versions to keep to 1 and the latest version of moveKey points to the + // correct vlog file and offset. The stale move key on L3 will be eventually dropped by + // compaction because there is a newer versions in the upper levels. } return nil } @@ -762,10 +814,9 @@ func (vlog *valueLog) dropAll() (int, error) { } vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0") - if _, err := vlog.createVlogFile(0); err != nil { + if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped. return count, err } - atomic.StoreUint32(&vlog.maxFid, 0) return count, nil } @@ -786,12 +837,12 @@ type valueLog struct { // guards our view of which files exist, which to be deleted, how many active iterators filesLock sync.RWMutex filesMap map[uint32]*logFile + maxFid uint32 filesToBeDeleted []uint32 // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted. numActiveIterators int32 db *DB - maxFid uint32 // accessed via atomics. writableLogOffset uint32 // read by read, written by write. Must access via atomics. numEntriesWritten uint32 opt Options @@ -856,7 +907,11 @@ func (lf *logFile) open(path string, flags uint32) error { return errFile(err, lf.path, "Unable to run file.Stat") } sz := fi.Size() - y.AssertTruef(sz <= math.MaxUint32, "file size: %d greater than %d", sz, math.MaxUint32) + y.AssertTruef( + sz <= math.MaxUint32, + "file size: %d greater than %d", + uint32(sz), uint32(math.MaxUint32), + ) lf.size = uint32(sz) if sz < vlogHeaderSize { // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize @@ -947,14 +1002,15 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) { if err = lf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil { return nil, errFile(err, lf.path, "Mmap value log file") } + + vlog.filesLock.Lock() + vlog.filesMap[fid] = lf + vlog.maxFid = fid // writableLogOffset is only written by write func, by read by Read func. // To avoid a race condition, all reads and updates to this variable must be // done via atomics. atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize) vlog.numEntriesWritten = 0 - - vlog.filesLock.Lock() - vlog.filesMap[fid] = lf vlog.filesLock.Unlock() return lf, nil @@ -1105,12 +1161,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { // plain text mode or vice versa. A single vlog file can't have both // encrypted entries and plain text entries. if last.encryptionEnabled() != vlog.db.shouldEncrypt() { - newid := atomic.AddUint32(&vlog.maxFid, 1) + newid := vlog.maxFid + 1 _, err := vlog.createVlogFile(newid) if err != nil { return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid) } - last, ok = vlog.filesMap[vlog.maxFid] + last, ok = vlog.filesMap[newid] y.AssertTrue(ok) } lastOffset, err := last.fd.Seek(0, io.SeekEnd) @@ -1172,7 +1228,7 @@ func (vlog *valueLog) Close() error { err = munmapErr } - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid if !vlog.opt.ReadOnly && id == maxFid { // truncate writable log file to correct offset. if truncErr := f.fd.Truncate( @@ -1265,12 +1321,12 @@ func (reqs requests) IncrRef() { // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32. func (vlog *valueLog) sync(fid uint32) error { - if vlog.opt.SyncWrites { + if vlog.opt.SyncWrites || vlog.opt.InMemory { return nil } vlog.filesLock.RLock() - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid // During replay it is possible to get sync call with fid less than maxFid. // Because older file has already been synced, we can return from here. if fid < maxFid || len(vlog.filesMap) == 0 { @@ -1303,7 +1359,7 @@ func (vlog *valueLog) write(reqs []*request) error { return nil } vlog.filesLock.RLock() - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid curlf := vlog.filesMap[maxFid] vlog.filesLock.RUnlock() @@ -1335,7 +1391,7 @@ func (vlog *valueLog) write(reqs []*request) error { return err } - newid := atomic.AddUint32(&vlog.maxFid, 1) + newid := vlog.maxFid + 1 y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid) newlf, err := vlog.createVlogFile(newid) if err != nil { @@ -1396,14 +1452,26 @@ func (vlog *valueLog) write(reqs []*request) error { // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file // (if non-nil) -func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { +func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) { vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() - ret, ok := vlog.filesMap[fid] + ret, ok := vlog.filesMap[vp.Fid] if !ok { // log file has gone away, will need to retry the operation. return nil, ErrRetry } + + // Check for valid offset if we are reading from writable log. + maxFid := vlog.maxFid + if vp.Fid == maxFid { + currentOffset := vlog.woffset() + if vp.Offset >= currentOffset { + return nil, errors.Errorf( + "Invalid value pointer offset: %d greater than current offset: %d", + vp.Offset, currentOffset) + } + } + ret.lock.RLock() return ret, nil } @@ -1411,13 +1479,6 @@ func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { // Read reads the value log at a given location. // TODO: Make this read private. func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) { - // Check for valid offset if we are reading from writable log. - maxFid := atomic.LoadUint32(&vlog.maxFid) - if vp.Fid == maxFid && vp.Offset >= vlog.woffset() { - return nil, nil, errors.Errorf( - "Invalid value pointer offset: %d greater than current offset: %d", - vp.Offset, vlog.woffset()) - } buf, lf, err := vlog.readValueBytes(vp, s) // log file is locked so, decide whether to lock immediately or let the caller to // unlock it, after caller uses it. @@ -1467,10 +1528,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() { // readValueBytes return vlog entry slice and read locked log file. Caller should take care of // logFile unlocking. func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) { - lf, err := vlog.getFileRLocked(vp.Fid) + lf, err := vlog.getFileRLocked(vp) if err != nil { return nil, nil, err } + buf, err := lf.read(vp, s) return buf, lf, err } @@ -1479,10 +1541,11 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() fids := vlog.sortedFids() - if len(fids) <= 1 { + switch { + case len(fids) <= 1: tr.LazyPrintf("Only one or less value log file.") return nil - } else if head.Fid == 0 { + case head.Fid == 0: tr.LazyPrintf("Head pointer is at zero.") return nil } diff --git a/value_test.go b/value_test.go index 304ac7d41..8aa7c0909 100644 --- a/value_test.go +++ b/value_test.go @@ -780,6 +780,9 @@ func TestPenultimateLogCorruption(t *testing.T) { if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) } + require.NoError(t, db0.vlog.Close()) + require.NoError(t, db0.manifest.close()) + require.NoError(t, db0.registry.Close()) opt.Truncate = true db1, err := Open(opt) @@ -799,7 +802,9 @@ func TestPenultimateLogCorruption(t *testing.T) { func checkKeys(t *testing.T, kv *DB, keys [][]byte) { i := 0 txn := kv.NewTransaction(false) + defer txn.Discard() iter := txn.NewIterator(IteratorOptions{}) + defer iter.Close() for iter.Seek(keys[0]); iter.Valid(); iter.Next() { require.Equal(t, iter.Item().Key(), keys[i]) i++ diff --git a/y/y_test.go b/y/y_test.go index 168da889b..d1b963184 100644 --- a/y/y_test.go +++ b/y/y_test.go @@ -176,7 +176,7 @@ func TestPagebufferReader2(t *testing.T) { require.Equal(t, n, 10, "length of buffer and length written should be equal") require.NoError(t, err, "unable to write bytes to buffer") - randOffset := int(rand.Int31n(int32(b.length))) + randOffset := int(rand.Int31n(int32(b.length) - 1)) randLength := int(rand.Int31n(int32(b.length - randOffset))) reader := b.NewReaderAt(randOffset) // Read randLength bytes.