diff --git a/internal/logdb/kv/pebble/kv_pebble.go b/internal/logdb/kv/pebble/kv_pebble.go index 7554d4a4d..e7647fe79 100644 --- a/internal/logdb/kv/pebble/kv_pebble.go +++ b/internal/logdb/kv/pebble/kv_pebble.go @@ -20,9 +20,10 @@ import ( "bytes" "fmt" + "github.com/cockroachdb/pebble" + "github.com/lni/dragonboat/v3/internal/logdb/kv" "github.com/lni/dragonboat/v3/raftio" - "github.com/petermattis/pebble" ) type pebbleWriteBatch struct { diff --git a/internal/raft/inmemory.go b/internal/raft/inmemory.go index 07e0c233d..69739e74c 100644 --- a/internal/raft/inmemory.go +++ b/internal/raft/inmemory.go @@ -28,6 +28,7 @@ var ( // inMemory is a two stage in memory log storage struct to keep log entries // that will be used by the raft protocol in immediate future. type inMemory struct { + shrunk bool snapshot *pb.Snapshot entries []pb.Entry markerIndex uint64 @@ -36,6 +37,9 @@ type inMemory struct { } func newInMemory(lastIndex uint64, rl *server.RateLimiter) inMemory { + if minEntrySliceSize >= entrySliceSize { + panic("minEntrySliceSize >= entrySliceSize") + } return inMemory{ markerIndex: lastIndex + 1, savedTo: lastIndex, @@ -135,6 +139,7 @@ func (im *inMemory) appliedLogTo(index uint64) { } newMarkerIndex := index applied := im.entries[:newMarkerIndex-im.markerIndex] + im.shrunk = true im.entries = im.entries[newMarkerIndex-im.markerIndex:] im.markerIndex = newMarkerIndex im.resizeEntrySlice() @@ -152,11 +157,23 @@ func (im *inMemory) savedSnapshotTo(index uint64) { } } +func (im *inMemory) resize() { + old := im.entries + im.shrunk = false + sz := max(entrySliceSize, uint64(len(old)*2)) + im.entries = make([]pb.Entry, 0, sz) + im.entries = append(im.entries, old...) +} + +func (im *inMemory) tryResize() { + if im.shrunk { + im.resize() + } +} + func (im *inMemory) resizeEntrySlice() { - if cap(im.entries)-len(im.entries) < int(minEntrySliceSize) { - old := im.entries - im.entries = make([]pb.Entry, 0, entrySliceSize) - im.entries = append(im.entries, old...) + if im.shrunk && cap(im.entries)-len(im.entries) < int(minEntrySliceSize) { + im.resize() } } @@ -172,6 +189,7 @@ func (im *inMemory) merge(ents []pb.Entry) { } else if firstNewIndex <= im.markerIndex { im.markerIndex = firstNewIndex // ents might come from entryQueue, copy it to its own storage + im.shrunk = false im.entries = newEntrySlice(ents) im.savedTo = firstNewIndex - 1 if im.rateLimited() { @@ -180,6 +198,7 @@ func (im *inMemory) merge(ents []pb.Entry) { } else { existing := im.getEntries(im.markerIndex, firstNewIndex) checkEntriesToAppend(existing, ents) + im.shrunk = false im.entries = make([]pb.Entry, 0, len(existing)+len(ents)) im.entries = append(im.entries, existing...) im.entries = append(im.entries, ents...) @@ -195,6 +214,7 @@ func (im *inMemory) merge(ents []pb.Entry) { func (im *inMemory) restore(ss pb.Snapshot) { im.snapshot = &ss im.markerIndex = ss.Index + 1 + im.shrunk = false im.entries = nil im.savedTo = ss.Index if im.rateLimited() { diff --git a/internal/raft/inmemory_test.go b/internal/raft/inmemory_test.go index 8ee7dae63..8c8e4550a 100644 --- a/internal/raft/inmemory_test.go +++ b/internal/raft/inmemory_test.go @@ -232,7 +232,11 @@ func TestInMemRestore(t *testing.T) { }, } ss := pb.Snapshot{Index: 100} + im.shrunk = true im.restore(ss) + if im.shrunk { + t.Errorf("shrunk flag not cleared") + } if len(im.entries) != 0 || im.markerIndex != 101 || im.snapshot == nil { t.Errorf("unexpected im state") } @@ -254,20 +258,25 @@ func TestInMemSaveSnapshotTo(t *testing.T) { } } -func TestInMemMergeFullAppend(t *testing.T) { +func testInMemMergeFullAppend(t *testing.T, shrunk bool) { im := inMemory{ markerIndex: 5, - entries: []pb.Entry{ - {Index: 5, Term: 5}, - {Index: 6, Term: 6}, - {Index: 7, Term: 7}, - }, } + im.resize() + im.entries = append(im.entries, []pb.Entry{ + {Index: 5, Term: 5}, + {Index: 6, Term: 6}, + {Index: 7, Term: 7}, + }...) + im.shrunk = shrunk ents := []pb.Entry{ {Index: 8, Term: 8}, {Index: 9, Term: 9}, } im.merge(ents) + if im.shrunk != shrunk { + t.Errorf("shrunk flag unexpectedly changed, %t:%t", im.shrunk, shrunk) + } if len(im.entries) != 5 || im.markerIndex != 5 { t.Errorf("not fully appended") } @@ -276,20 +285,30 @@ func TestInMemMergeFullAppend(t *testing.T) { } } +func TestInMemMergeFullAppend(t *testing.T) { + testInMemMergeFullAppend(t, false) + testInMemMergeFullAppend(t, true) +} + func TestInMemMergeReplace(t *testing.T) { im := inMemory{ markerIndex: 5, - entries: []pb.Entry{ - {Index: 5, Term: 5}, - {Index: 6, Term: 6}, - {Index: 7, Term: 7}, - }, } + im.resize() + im.entries = append(im.entries, []pb.Entry{ + {Index: 5, Term: 5}, + {Index: 6, Term: 6}, + {Index: 7, Term: 7}, + }...) + im.shrunk = true ents := []pb.Entry{ {Index: 2, Term: 2}, {Index: 3, Term: 3}, } im.merge(ents) + if im.shrunk { + t.Errorf("shrunk flag unexpectedly not cleared") + } if len(im.entries) != 2 || im.markerIndex != 2 { t.Errorf("not fully appended") } @@ -323,17 +342,22 @@ func TestInMemMergeWithHoleCausePanic(t *testing.T) { func TestInMemMerge(t *testing.T) { im := inMemory{ markerIndex: 5, - entries: []pb.Entry{ - {Index: 5, Term: 5}, - {Index: 6, Term: 6}, - {Index: 7, Term: 7}, - }, } + im.resize() + im.entries = append(im.entries, []pb.Entry{ + {Index: 5, Term: 5}, + {Index: 6, Term: 6}, + {Index: 7, Term: 7}, + }...) + im.shrunk = true ents := []pb.Entry{ {Index: 6, Term: 7}, {Index: 7, Term: 10}, } im.merge(ents) + if im.shrunk { + t.Errorf("shrunk flag unexpectedly not cleared") + } if len(im.entries) != 3 || im.markerIndex != 5 { t.Errorf("not fully appended") } @@ -511,7 +535,14 @@ func TestAppliedLogTo(t *testing.T) { {10, 1, 10}, } for idx, tt := range tests { + len1 := len(im.entries) im.appliedLogTo(tt.appliedTo) + len2 := len(im.entries) + if len2 < len1 { + if !im.shrunk { + t.Errorf("shrunk flag not set") + } + } if len(im.entries) != tt.length { t.Errorf("%d, unexpected entry slice len %d, want %d", idx, len(im.entries), tt.length) @@ -624,3 +655,45 @@ func TestRateLimitCanBeUpdatedAfterCutAndMergingEntries(t *testing.T) { t.Errorf("log size %d, want %d", im.rl.Get(), expSz) } } + +func TestResize(t *testing.T) { + im := inMemory{ + markerIndex: 10, + entries: []pb.Entry{ + {Index: 10, Term: 1}, + {Index: 11, Term: 1}, + }, + shrunk: true, + } + im.resize() + if uint64(cap(im.entries)) != entrySliceSize { + t.Errorf("not resized") + } + if len(im.entries) != 2 { + t.Errorf("unexpected len %d", len(im.entries)) + } + if im.shrunk { + t.Errorf("shrunk flag not clearaed") + } +} + +func TestTryResize(t *testing.T) { + im := inMemory{ + markerIndex: 10, + entries: []pb.Entry{ + {Index: 10, Term: 1}, + {Index: 11, Term: 1}, + }, + } + initcap := cap(im.entries) + initlen := len(im.entries) + im.tryResize() + if cap(im.entries) != initcap || len(im.entries) != initlen { + t.Errorf("cap/len unexpectedly changed") + } + im.shrunk = true + im.tryResize() + if cap(im.entries) == initcap { + t.Errorf("cap/len unexpectedly not changed") + } +} diff --git a/internal/raft/raft.go b/internal/raft/raft.go index a1fec3e99..f9059aabc 100644 --- a/internal/raft/raft.go +++ b/internal/raft/raft.go @@ -50,7 +50,9 @@ const ( ) var ( - emptyState = pb.State{} + emptyState = pb.State{} + maxEntrySize = settings.Soft.MaxEntrySize + inMemGcTimeout = settings.Soft.InMemGCTimeout ) // State is the state of a raft node defined in the raft paper, possible states @@ -207,6 +209,7 @@ type raft struct { readyToRead []pb.ReadyToRead droppedEntries []pb.Entry droppedReadIndexes []pb.SystemCtx + quiesce bool checkQuorum bool tickCount uint64 electionTick uint64 @@ -462,8 +465,18 @@ func (r *raft) timeForRateLimitCheck() bool { return r.tickCount%r.electionTimeout == 0 } +func (r *raft) timeForInMemGC() bool { + return r.tickCount%inMemGcTimeout == 0 +} + func (r *raft) tick() { + r.quiesce = false r.tickCount++ + // this is to work around the language limitation described in + // https://github.com/golang/go/issues/9618 + if r.timeForInMemGC() { + r.log.inmem.tryResize() + } if r.state == leader { r.leaderTick() } else { @@ -531,6 +544,10 @@ func (r *raft) leaderTick() { } func (r *raft) quiescedTick() { + if !r.quiesce { + r.quiesce = true + r.log.inmem.resize() + } r.electionTick++ } diff --git a/internal/raft/raft_test.go b/internal/raft/raft_test.go index 9444069a1..fda7ba2cc 100644 --- a/internal/raft/raft_test.go +++ b/internal/raft/raft_test.go @@ -2279,3 +2279,35 @@ func TestRateLimitMessageIsSentByNonLeader(t *testing.T) { testRateLimitMessageIsSentByNonLeader(2, true, t) testRateLimitMessageIsSentByNonLeader(NoLeader, false, t) } + +func TestInMemoryEntriesSliceCanBeResized(t *testing.T) { + r := newTestRaft(1, []uint64{1}, 5, 1, NewTestLogDB()) + oldcap := cap(r.log.inmem.entries) + if oldcap != 0 { + t.Errorf("unexpected cap val: %d", oldcap) + } + r.log.inmem.shrunk = true + for i := uint64(0); i < inMemGcTimeout; i++ { + r.tick() + } + if uint64(cap(r.log.inmem.entries)) != entrySliceSize { + t.Errorf("not resized") + } +} + +func TestFirstQuiescedTickResizesInMemoryEntriesSlice(t *testing.T) { + r := newTestRaft(1, []uint64{1}, 5, 1, NewTestLogDB()) + oldcap := cap(r.log.inmem.entries) + if oldcap != 0 { + t.Errorf("unexpected cap val: %d", oldcap) + } + r.quiescedTick() + if uint64(cap(r.log.inmem.entries)) != entrySliceSize { + t.Errorf("not resized, cap: %d", oldcap) + } + r.log.inmem.entries = make([]pb.Entry, 0, 0) + r.quiescedTick() + if cap(r.log.inmem.entries) != 0 { + t.Errorf("unexpectedly resized again") + } +} diff --git a/internal/settings/soft.go b/internal/settings/soft.go index f674df62c..2636486b5 100644 --- a/internal/settings/soft.go +++ b/internal/settings/soft.go @@ -71,6 +71,9 @@ type soft struct { // ExpectedMaxInMemLogSize is the minimum MaxInMemLogSize value expected // in a raft config. ExpectedMaxInMemLogSize uint64 + // InMemGCTimeout defines how often dragonboat collects partial object. + // It is defined in terms of number of ticks. + InMemGCTimeout uint64 // // Multiraft @@ -229,6 +232,7 @@ func getDefaultSoftSettings() soft { LocalRaftRequestTimeoutMs: 10000, GetConnectedTimeoutSecond: 5, MaxEntrySize: 2 * MaxProposalPayloadSize, + InMemGCTimeout: 100, InMemEntrySliceSize: 512, MinEntrySliceFreeSize: 96, ExpectedMaxInMemLogSize: 2 * (MaxProposalPayloadSize + EntryNonCmdFieldsSize), diff --git a/nodehost_test.go b/nodehost_test.go index fb4170d4a..6edd4b241 100644 --- a/nodehost_test.go +++ b/nodehost_test.go @@ -1433,6 +1433,17 @@ func TestOnDiskStateMachineCanTakeDummySnapshot(t *testing.T) { t.Fatalf("dummy snapshot is not recorded as dummy") } break + } else if i%100 == 0 { + // this is an ugly hack to workaround RocksDB's incorrect fsync + // implementation on macos. + // fcntl(fd, F_FULLFSYNC) is required for a proper fsync on macos, + // sadly rocksdb is not doing that. this means we can make proposals + // very fast as they are not actually fsynced on macos but making + // snapshots are going to be much much slower as dragonboat properly + // fsyncs its snapshot data. we can end up completing all required + // proposals even before completing the first ongoing snapshotting + // operation. + time.Sleep(200 * time.Millisecond) } } if !snapshotted { @@ -1490,6 +1501,9 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) { if len(snapshots) >= 3 { snapshotted = true break + } else if i%50 == 0 { + // see comments in testOnDiskStateMachineCanTakeDummySnapshot + time.Sleep(100 * time.Millisecond) } } if !snapshotted { @@ -1549,6 +1563,9 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) { } } break + } else if i%50 == 0 { + // see comments in testOnDiskStateMachineCanTakeDummySnapshot + time.Sleep(100 * time.Millisecond) } } if !snapshotted {