Skip to content

Commit

Permalink
Merge branch 'optimize-exception' into 'master'
Browse files Browse the repository at this point in the history
adjust topic check frequency to avoid long check run too much

See merge request !36
  • Loading branch information
absolute8511 committed Dec 16, 2020
2 parents 8b4aa09 + 9099717 commit 18884a6
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 93 deletions.
6 changes: 6 additions & 0 deletions consistence/nsqlookup_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ func (nlcoord *NsqLookupCoordinator) checkTopics(monitorChan chan struct{}) {
ticker := time.NewTicker(doCheckInterval)
waitingMigrateTopic := make(map[string]map[int]time.Time)
lostLeaderSessions := make(map[string]bool)
lastFullCheck := time.Now()
defer func() {
ticker.Stop()
coordLog.Infof("check topics quit.")
Expand All @@ -652,7 +653,11 @@ func (nlcoord *NsqLookupCoordinator) checkTopics(monitorChan chan struct{}) {
if nlcoord.leadership == nil {
continue
}
if time.Since(lastFullCheck) < doCheckInterval {
continue
}
nlcoord.doCheckTopics(monitorChan, nil, waitingMigrateTopic, lostLeaderSessions, true)
lastFullCheck = time.Now()
case failedInfo := <-nlcoord.checkTopicFailChan:
if nlcoord.leadership == nil {
continue
Expand All @@ -667,6 +672,7 @@ func (nlcoord *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, fa

time.Sleep(time.Millisecond * 10)
coordLog.Infof("do check topics...")
defer coordLog.Infof("do check topics done")
if !atomic.CompareAndSwapInt32(&nlcoord.doChecking, 0, 1) {
return
}
Expand Down
33 changes: 23 additions & 10 deletions internal/levellogger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package levellogger

import (
"fmt"
"github.com/absolute8511/glog"
"log"
"os"
"sync/atomic"

"github.com/absolute8511/glog"
)

type Logger interface {
Expand Down Expand Up @@ -74,12 +75,24 @@ const (
type LevelLogger struct {
Logger Logger
level int32
prefix string
depth int
}

func NewLevelLogger(level int32, l Logger) *LevelLogger {
return &LevelLogger{
Logger: l,
level: level,
depth: 2,
}
}

func (self *LevelLogger) WrappedWithPrefix(prefix string, incredDepth int) *LevelLogger {
return &LevelLogger{
Logger: self.Logger,
level: self.level,
depth: self.depth + incredDepth,
prefix: prefix + self.prefix,
}
}

Expand All @@ -93,54 +106,54 @@ func (self *LevelLogger) Level() int32 {

func (self *LevelLogger) Logf(f string, args ...interface{}) {
if self.Logger != nil && self.Level() >= LOG_INFO {
self.Logger.Output(2, fmt.Sprintf(f, args...))
self.Logger.Output(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) LogDebugf(f string, args ...interface{}) {
if self.Logger != nil && self.Level() >= LOG_DEBUG {
self.Logger.Output(2, fmt.Sprintf(f, args...))
self.Logger.Output(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) LogErrorf(f string, args ...interface{}) {
if self.Logger != nil {
self.Logger.OutputErr(2, fmt.Sprintf(f, args...))
self.Logger.OutputErr(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) LogWarningf(f string, args ...interface{}) {
if self.Logger != nil && self.Level() >= LOG_WARN {
self.Logger.OutputWarning(2, fmt.Sprintf(f, args...))
self.Logger.OutputWarning(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) Infof(f string, args ...interface{}) {
if self.Logger != nil && self.Level() >= LOG_INFO {
self.Logger.Output(2, fmt.Sprintf(f, args...))
self.Logger.Output(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) Debugf(f string, args ...interface{}) {
if self.Logger != nil && self.Level() >= LOG_DEBUG {
self.Logger.Output(2, fmt.Sprintf(f, args...))
self.Logger.Output(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) Errorf(f string, args ...interface{}) {
if self.Logger != nil {
self.Logger.OutputErr(2, fmt.Sprintf(f, args...))
self.Logger.OutputErr(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) Warningf(f string, args ...interface{}) {
if self.Logger != nil && self.Level() >= LOG_WARN {
self.Logger.OutputWarning(2, fmt.Sprintf(f, args...))
self.Logger.OutputWarning(self.depth, self.prefix+fmt.Sprintf(f, args...))
}
}

func (self *LevelLogger) Warningln(f string) {
if self.Logger != nil && self.Level() >= LOG_WARN {
self.Logger.OutputWarning(2, f)
self.Logger.OutputWarning(self.depth, self.prefix+f)
}
}
2 changes: 1 addition & 1 deletion internal/version/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime"
)

const Binary = "0.3.7-HA.1.12.3"
const Binary = "0.3.7-HA.1.12.4"

var (
Commit = "unset"
Expand Down
Loading

0 comments on commit 18884a6

Please sign in to comment.