From df0755571d7696c6ae558be22524bce16abbb9f5 Mon Sep 17 00:00:00 2001 From: randyahx Date: Wed, 18 Oct 2023 10:06:00 +0800 Subject: [PATCH 1/7] added metrics for external sp api errors, update slash cooling off period, add internal sp endpoints to config --- config/config.go | 5 +++++ config/config.json | 11 ++++++++++ executor/executor.go | 3 +-- metrics/metrics.go | 43 ++++++++++++++++++++++++++------------- submitter/tx_submitter.go | 2 ++ verifier/const.go | 1 + verifier/hash_verifier.go | 34 +++++++++++++++++++++++++++---- 7 files changed, 79 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index e42c113..171da7b 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ type Config struct { AlertConfig AlertConfig `json:"alert_config"` DBConfig DBConfig `json:"db_config"` MetricsConfig MetricsConfig `json:"metrics_config"` + SPConfig SPConfig `json:"sp_config"` } type GreenfieldConfig struct { @@ -125,6 +126,10 @@ func (cfg *DBConfig) Validate() { } } +type SPConfig struct { + InternalSPEndpoints []string `json:"internal_sp_endpoints"` +} + type MetricsConfig struct { Port uint16 `json:"port"` } diff --git a/config/config.json b/config/config.json index 9343702..1c37bc8 100644 --- a/config/config.json +++ b/config/config.json @@ -39,6 +39,17 @@ "max_open_conns": 40, "debug_mode": true }, + "sp_config": { + "internal_sp_endpoints": [ + "https://greenfield-sp.bnbchain.org", + "https://greenfield-sp.nodereal.io", + "https://greenfield-sp.ninicoin.io", + "https://greenfield-sp.defibit.io", + "https://greenfield-sp.nariox.org", + "https://greenfield-sp.lumibot.org", + "https://greenfield-sp.voltbot.io" + ] + }, "alert_config": { "interval": 300, "identity": "your_identity", diff --git a/executor/executor.go b/executor/executor.go index 6412bd7..5ad7a87 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -301,7 +301,7 @@ func (e *Executor) QueryChallengeSlashCoolingOffPeriod() (uint64, error) { client := e.clients.GetClient() params, err := client.ChallengeParams(context.Background(), &challengetypes.QueryParamsRequest{}) if err != nil { - logging.Logger.Errorf("query challenge params failed, err=%+v", err.Error()) + logging.Logger.Errorf("query slash cooling off period failed, err=%+v", err.Error()) return 0, err } logging.Logger.Infof("challenge slash cooling off period: %d", params.Params.SlashCoolingOffPeriod) @@ -345,7 +345,6 @@ func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) { logging.Logger.Errorf("executor failed to query storage provider %s, err=%+v", address, err.Error()) return "", err } - logging.Logger.Infof("response res.endpoint %s", res.Endpoint) return res.Endpoint, nil } diff --git a/metrics/metrics.go b/metrics/metrics.go index f660fd2..fb0444e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -24,7 +24,8 @@ const ( MetricVerifiedChallengeSuccess = "challenge_success" MetricHeartbeatEvents = "heartbeat_events" MetricHashVerifierErr = "hash_verifier_error_count" - MetricSpAPIErr = "hash_verifier_sp_api_error" + MetricInternalSpAPIErr = "hash_verifier_internal_sp_api_error" + MetricExternalSpAPIErr = "hash_verifier_external_sp_api_error" MetricHashVerifierDuration = "hash_verifier_duration" // Vote Broadcaster @@ -51,12 +52,12 @@ const ( ) type MetricService struct { - MetricsMap map[string]prometheus.Metric + MetricsMap map[string]prometheus.Collector cfg *config.Config } func NewMetricService(config *config.Config) *MetricService { - ms := make(map[string]prometheus.Metric, 0) + ms := make(map[string]prometheus.Collector, 0) // Monitor gnfdSavedBlockMetric := prometheus.NewGauge(prometheus.GaugeOpts{ @@ -125,17 +126,24 @@ func NewMetricService(config *config.Config) *MetricService { hashVerifierErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ Name: MetricHashVerifierErr, - Help: "Hash verifier error count", + Help: "Verifier error count", }) ms[MetricHashVerifierErr] = hashVerifierErrCountMetric prometheus.MustRegister(hashVerifierErrCountMetric) - hashVerifierSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ - Name: MetricSpAPIErr, - Help: "Hash verifier SP API error count", + hashVerifierInternalSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricInternalSpAPIErr, + Help: "Internal sp error count", }) - ms[MetricSpAPIErr] = hashVerifierSpApiErrCountMetric - prometheus.MustRegister(hashVerifierSpApiErrCountMetric) + ms[MetricInternalSpAPIErr] = hashVerifierInternalSpApiErrCountMetric + prometheus.MustRegister(hashVerifierInternalSpApiErrCountMetric) + + hashVerifierExternalSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricExternalSpAPIErr, + Help: "External sp error count", + }) + ms[MetricExternalSpAPIErr] = hashVerifierExternalSpApiErrCountMetric + prometheus.MustRegister(hashVerifierExternalSpApiErrCountMetric) // Broadcaster broadcasterErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ @@ -281,15 +289,22 @@ func (m *MetricService) IncHeartbeatEvents() { func (m *MetricService) IncHashVerifierErr(err error) { if err != nil { logging.Logger.Errorf("verifier error count increased, %s", err.Error()) + m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc() + } +} + +func (m *MetricService) IncHashVerifierInternalSpApiErr(err error) { + if err != nil { + logging.Logger.Errorf("verifier internal sp error count increased, %s", err.Error()) + m.MetricsMap[MetricInternalSpAPIErr].(prometheus.Counter).Inc() } - m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc() } -func (m *MetricService) IncHashVerifierSpApiErr(err error) { +func (m *MetricService) IncHashVerifierExternalSpApiErr(err error) { if err != nil { - logging.Logger.Errorf("verifier sp api error count increased, %s", err.Error()) + logging.Logger.Errorf("verifier external sp error count increased, %s", err.Error()) + m.MetricsMap[MetricExternalSpAPIErr].(prometheus.Counter).Inc() } - m.MetricsMap[MetricSpAPIErr].(prometheus.Counter).Inc() } // Broadcaster @@ -346,8 +361,8 @@ func (m *MetricService) SetSubmitterDuration(duration time.Duration) { func (m *MetricService) IncSubmitterErr(err error) { if err != nil { logging.Logger.Errorf("submitter error count increased, %s", err.Error()) + m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc() } - m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc() } // Attest Monitor diff --git a/submitter/tx_submitter.go b/submitter/tx_submitter.go index ae113b1..f178ef8 100644 --- a/submitter/tx_submitter.go +++ b/submitter/tx_submitter.go @@ -206,6 +206,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd } return err } + s.metricService.IncSubmitterErr(err) } else { logging.Logger.Errorf("submitter failed for challengeId: %d, attempts: %d", event.ChallengeId, submittedAttempts) } @@ -217,6 +218,7 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd err = s.DataProvider.UpdateEventStatus(event.ChallengeId, model.Submitted) if err != nil { logging.Logger.Errorf("submitter succeeded in attesting but failed to update database, err=%+v", err.Error()) + s.metricService.IncSubmitterErr(err) continue } diff --git a/verifier/const.go b/verifier/const.go index 25db383..1fbe6db 100644 --- a/verifier/const.go +++ b/verifier/const.go @@ -3,3 +3,4 @@ package verifier import "time" var VerifyHashLoopInterval = 2 * time.Second +var UpdateDeduplicationInterval = 24 * time.Hour diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index bcd5ea5..5197a34 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -59,6 +59,7 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid } func (v *Verifier) VerifyHashLoop() { + go v.updateDeduplicationIntervalLoop() for { err := v.verifyHash() if err != nil { @@ -68,6 +69,7 @@ func (v *Verifier) VerifyHashLoop() { time.Sleep(VerifyHashLoopInterval) } } + func (v *Verifier) verifyHash() error { // Read unprocessed event from db with lowest challengeId currentHeight, err := v.executor.GetCachedBlockHeight() @@ -194,7 +196,7 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { return err } chainRootHash := checksums[event.RedundancyIndex+1] - logging.Logger.Infof("chainRootHash: %s for challengeId: %d", hex.EncodeToString(chainRootHash), event.ChallengeId) + logging.Logger.Infof("fetched chainRootHash: %s for challengeId: %d", hex.EncodeToString(chainRootHash), event.ChallengeId) // Call sp for challenge result challengeRes := &types.ChallengeResult{} @@ -207,7 +209,11 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { return challengeResErr }, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr) if challengeResErr != nil { - v.metricService.IncHashVerifierSpApiErr(err) + if v.isInternalSP(endpoint) { + v.metricService.IncHashVerifierInternalSpApiErr(challengeResErr) + } else { + v.metricService.IncHashVerifierExternalSpApiErr(challengeResErr) + } err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched) if err != nil { v.metricService.IncHashVerifierErr(err) @@ -233,9 +239,8 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { spChecksums = append(spChecksums, checksum) } originalSpRootHash := hash.GenerateChecksum(bytes.Join(spChecksums, []byte(""))) - logging.Logger.Infof("SpRootHash before replacing: %s for challengeId: %d", hex.EncodeToString(originalSpRootHash), event.ChallengeId) spRootHash := v.computeRootHash(event.SegmentIndex, pieceData, spChecksums) - logging.Logger.Infof("SpRootHash after replacing: %s for challengeId: %d", hex.EncodeToString(spRootHash), event.ChallengeId) + logging.Logger.Infof("hash verification for challengeId: %d, Fetched Original SpRootHash: %s, Locally Computed SpRootHash: %s, Fetched ChainRootHash: %s", event.ChallengeId, hex.EncodeToString(originalSpRootHash), hex.EncodeToString(spRootHash), hex.EncodeToString(chainRootHash)) // Update database after comparing err = v.compareHashAndUpdate(event.ChallengeId, chainRootHash, spRootHash) if err != nil { @@ -311,3 +316,24 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte v.metricService.IncChallengeSuccess() return err } + +func (v *Verifier) isInternalSP(spEndpoint string) bool { + for _, internalEndpoint := range v.config.SPConfig.InternalSPEndpoints { + if strings.Contains(spEndpoint, internalEndpoint) { + return true + } + } + return false +} + +func (v *Verifier) updateDeduplicationIntervalLoop() { + ticker := time.NewTicker(UpdateDeduplicationInterval) + for range ticker.C { + updatedDeduplicationInterval, err := v.executor.QueryChallengeSlashCoolingOffPeriod() + if err != nil { + logging.Logger.Errorf("error updating deduplication interval, err=%s", err.Error()) + return + } + v.deduplicationInterval = updatedDeduplicationInterval + } +} From 1e8c5d0a7a32b292c5cd64c5cff640c35e0b2cfc Mon Sep 17 00:00:00 2001 From: randyahx Date: Mon, 30 Oct 2023 15:39:22 +0800 Subject: [PATCH 2/7] add logs to check challenge result --- verifier/hash_verifier.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index 5197a34..589bfd5 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -215,6 +215,7 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { v.metricService.IncHashVerifierExternalSpApiErr(challengeResErr) } err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched) + logging.Logger.Infof("challenge succeeded for challengeId: %d, failed to fetch challenge result from sp", event.ChallengeId) if err != nil { v.metricService.IncHashVerifierErr(err) logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId) @@ -303,6 +304,7 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte return err } // update metrics if no err + logging.Logger.Infof("challenge failed for challengeId: %d, hash matched", challengeId) v.metricService.IncVerifiedChallenges() v.metricService.IncChallengeFailed() return err @@ -312,6 +314,7 @@ func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte return err } // update metrics if no err + logging.Logger.Infof("challenge succeeded for challengeId: %d, hash mismatched", challengeId) v.metricService.IncVerifiedChallenges() v.metricService.IncChallengeSuccess() return err From eebb8d3ce6feb2753a4266882f8fa670b072e129 Mon Sep 17 00:00:00 2001 From: randyahx Date: Tue, 31 Oct 2023 15:20:25 +0800 Subject: [PATCH 3/7] remove challenge success metric from api error --- verifier/hash_verifier.go | 1 - 1 file changed, 1 deletion(-) diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index 589bfd5..73fe7eb 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -221,7 +221,6 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId) } v.metricService.IncVerifiedChallenges() - v.metricService.IncChallengeSuccess() return err } From 7b67c12c799ae769fa472d47c876a94cea4c3f51 Mon Sep 17 00:00:00 2001 From: randyahx Date: Tue, 31 Oct 2023 15:33:52 +0800 Subject: [PATCH 4/7] update logs for sp api error --- verifier/hash_verifier.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index 73fe7eb..3cb9fbd 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -211,11 +211,12 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { if challengeResErr != nil { if v.isInternalSP(endpoint) { v.metricService.IncHashVerifierInternalSpApiErr(challengeResErr) + logging.Logger.Infof("challenge succeeded due to internal sp api error for challengeId: %d, failed to fetch challenge result from sp endpoint: %s, objectID: %d, segmentIndex: %d, redundancyIndex: %d, err=%s", event.ChallengeId, endpoint, event.ObjectId, int(event.SegmentIndex), int(event.RedundancyIndex), challengeResErr) } else { v.metricService.IncHashVerifierExternalSpApiErr(challengeResErr) + logging.Logger.Infof("challenge succeeded due to external sp api error for challengeId: %d, failed to fetch challenge result from sp endpoint: %s, objectID: %d, segmentIndex: %d, redundancyIndex: %d, err=%s", event.ChallengeId, endpoint, event.ObjectId, int(event.SegmentIndex), int(event.RedundancyIndex), challengeResErr) } err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched) - logging.Logger.Infof("challenge succeeded for challengeId: %d, failed to fetch challenge result from sp", event.ChallengeId) if err != nil { v.metricService.IncHashVerifierErr(err) logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId) From 6b91e0336cdd137022ff0f41a5da09837c3a78d0 Mon Sep 17 00:00:00 2001 From: randyahx Date: Tue, 31 Oct 2023 15:52:38 +0800 Subject: [PATCH 5/7] Update metrics.go --- metrics/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index fb0444e..88a362e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -52,12 +52,12 @@ const ( ) type MetricService struct { - MetricsMap map[string]prometheus.Collector + MetricsMap map[string]prometheus.Metric cfg *config.Config } func NewMetricService(config *config.Config) *MetricService { - ms := make(map[string]prometheus.Collector, 0) + ms := make(map[string]prometheus.Metric, 0) // Monitor gnfdSavedBlockMetric := prometheus.NewGauge(prometheus.GaugeOpts{ From 3b6a3e685a32764428ae9346dd27017c9eefad10 Mon Sep 17 00:00:00 2001 From: randyahx Date: Tue, 31 Oct 2023 16:22:00 +0800 Subject: [PATCH 6/7] add mutex for deduplication interval update --- verifier/hash_verifier.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index 3cb9fbd..949e944 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -337,6 +337,8 @@ func (v *Verifier) updateDeduplicationIntervalLoop() { logging.Logger.Errorf("error updating deduplication interval, err=%s", err.Error()) return } + v.mtx.Lock() v.deduplicationInterval = updatedDeduplicationInterval + v.mtx.Unlock() } } From 7d1601efe008f4c8c2f69e5363744e5c705591f7 Mon Sep 17 00:00:00 2001 From: randyahx Date: Tue, 31 Oct 2023 17:20:15 +0800 Subject: [PATCH 7/7] add mutex for deduplication interval --- verifier/hash_verifier.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index 949e944..38ac367 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -273,9 +273,14 @@ func (v *Verifier) preCheck(event *model.Event, currentHeight uint64) error { if heartbeatInterval == 0 { panic("heartbeat interval should not zero, potential bug") } - if event.ChallengerAddress == "" && event.ChallengeId%heartbeatInterval != 0 && event.ChallengeId > v.deduplicationInterval { + + v.mtx.Lock() + deduplicationInterval := v.deduplicationInterval + v.mtx.Unlock() + + if event.ChallengerAddress == "" && event.ChallengeId%heartbeatInterval != 0 && event.ChallengeId > deduplicationInterval { found, err := v.dataProvider.IsEventExistsBetween(event.ObjectId, event.SpOperatorAddress, - event.ChallengeId-v.deduplicationInterval, event.ChallengeId-1) + event.ChallengeId-deduplicationInterval, event.ChallengeId-1) if err != nil { logging.Logger.Errorf("verifier failed to retrieve information for event %d, err=%+v", event.ChallengeId, err.Error()) return err