Skip to content

Commit

Permalink
Support fixed and jittered retry backoff policy (#2273)
Browse files Browse the repository at this point in the history
* Support fixed and jittered retry backoff policy

* Enable retry backoff in pthread

* Support CanRetryBackoffInPthread virtual method

* Add backoff strategy to RetryPolicy

* Only pass Controller param

* Add doc of retry backoff

* Opt retry backoff
  • Loading branch information
chenBright authored Jul 14, 2023
1 parent 629fabb commit 65b753d
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 50 deletions.
77 changes: 77 additions & 0 deletions docs/cn/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,83 @@ options.retry_policy = &g_my_retry_policy;
* 通过cntl->response()可获得对应RPC的response。
* 对ERPCTIMEDOUT代表的RPC超时总是不重试,即使你继承的RetryPolicy中允许。
### 重试退避
对于一些暂时性的错误,如网络抖动等,等待一小会儿再重试的成功率比立即重试的成功率高,同时可以打散上游重试的时机,减轻服务端压力,避免重试风暴导致服务端出现瞬间流量洪峰。
框架支持两种重试退避策略:固定时间间隔退避策略和随机时间间隔退避策略。
固定时间间隔退避策略需要设置固定时间间隔(毫秒)、无需重试退避的剩余rpc时间阈值(毫秒,当剩余rpc时间小于阈值,则不进行重试退避)、是否允许在pthread进行重试退避。使用方法如下:
```c++
// 给ChannelOptions.retry_policy赋值就行了。
// 注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
brpc::ChannelOptions options;
int32_t fixed_backoff_time_ms = 100; // 固定时间间隔(毫秒)
int32_t no_backoff_remaining_rpc_time_ms = 150; // 无需重试退避的剩余rpc时间阈值(毫秒)
bool retry_backoff_in_pthread = false;
static brpc::RpcRetryPolicyWithFixedBackoff g_retry_policy_with_fixed_backoff(
fixed_backoff_time_ms, no_backoff_remaining_rpc_time_ms, retry_backoff_in_pthread);
options.retry_policy = &g_retry_policy_with_fixed_backoff;
...
```

随机时间间隔退避策略需要设置最小时间间隔(毫秒)、最大时间间隔(毫秒)、无需重试退避的剩余rpc时间阈值(毫秒,当剩余rpc时间小于阈值,则不进行重试退避)、是否允许在pthread做重试退避。每次策略会随机生成一个在最小时间间隔和最大时间间隔之间的重试退避间隔。使用方法如下:

```c++
// 给ChannelOptions.retry_policy赋值就行了。
// 注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
brpc::ChannelOptions options;
int32_t min_backoff_time_ms = 100; // 最小时间间隔(毫秒)
int32_t max_backoff_time_ms = 200; // 最大时间间隔(毫秒)
int32_t no_backoff_remaining_rpc_time_ms = 150; // 无需重试退避的剩余rpc时间阈值(毫秒)
bool retry_backoff_in_pthread = false; // 是否允许在pthread做重试退避
static brpc::RpcRetryPolicyWithJitteredBackoff g_retry_policy_with_jitter_backoff(
min_backoff_time_ms, max_backoff_time_ms,
no_backoff_remaining_rpc_time_ms, retry_backoff_in_pthread);
options.retry_policy = &g_retry_policy_with_jitter_backoff;
...
```
用户可以通过继承[brpc::RetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)自定义重试退避策略。比如只需要针对服务端并发数超限的情况进行重试退避,可以这么做:
```c++
class MyRetryPolicy : public brpc::RetryPolicy {
public:
bool DoRetry(const brpc::Controller* cntl) const {
// 同《错误值得重试》一节
}
int32_t GetBackoffTimeMs(const brpc::Controller* cntl) const {
if (controller->ErrorCode() == brpc::ELIMIT) {
return 100; // 退避100毫秒
}
return 0; // 返回0表示不进行重试退避。
}
bool CanRetryBackoffInPthread() const {
return true;
}
};
...
// 给ChannelOptions.retry_policy赋值就行了。
// 注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
brpc::ChannelOptions options;
static MyRetryPolicy g_my_retry_policy;
options.retry_policy = &g_my_retry_policy;
...
```

如果用户希望使用框架默认的DoRetry,只实现自定义的重试退避策略,则可以继承[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)

一些提示:

- 当策略返回的重试退避时间大于等于剩余的rpc时间或者等于0,框架不会进行重试退避,而是立即进行重试。
- [brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。
- 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。

### 重试应当保守

由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的: 只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。
Expand Down
71 changes: 47 additions & 24 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
// Force linking the .o in UT (which analysis deps by inclusions)
#include "brpc/parallel_channel.h"
#include "brpc/selective_channel.h"
#include "bthread/task_group.h"

namespace bthread {
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
}

// This is the only place that both client/server must link, so we put
// registrations of errno here.
Expand Down Expand Up @@ -627,33 +632,51 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
++_current_call.nretry;
add_flag(FLAGS_BACKUP_REQUEST);
return IssueRPC(butil::gettimeofday_us());
} else if (_retry_policy ? _retry_policy->DoRetry(this)
: DefaultRetryPolicy()->DoRetry(this)) {
// The error must come from _current_call because:
// * we intercepted error from _unfinished_call in OnVersionedRPCReturned
// * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
if (!SingleServer()) {
if (_accessed == NULL) {
_accessed = ExcludedServers::Create(
std::min(_max_retry, RETRY_AVOIDANCE));
if (NULL == _accessed) {
SetFailed(ENOMEM, "Fail to create ExcludedServers");
goto END_OF_RPC;
} else {
auto retry_policy = _retry_policy ? _retry_policy : DefaultRetryPolicy();
if (retry_policy->DoRetry(this)) {
// The error must come from _current_call because:
// * we intercepted error from _unfinished_call in OnVersionedRPCReturned
// * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
if (!SingleServer()) {
if (_accessed == NULL) {
_accessed = ExcludedServers::Create(
std::min(_max_retry, RETRY_AVOIDANCE));
if (NULL == _accessed) {
SetFailed(ENOMEM, "Fail to create ExcludedServers");
goto END_OF_RPC;
}
}
_accessed->Add(_current_call.peer_id);
}
_accessed->Add(_current_call.peer_id);
}
_current_call.OnComplete(this, _error_code, info.responded, false);
++_current_call.nretry;
// Clear http responses before retrying, otherwise the response may
// be mixed with older (and undefined) stuff. This is actually not
// done before r32008.
if (_http_response) {
_http_response->Clear();
_current_call.OnComplete(this, _error_code, info.responded, false);
++_current_call.nretry;
// Clear http responses before retrying, otherwise the response may
// be mixed with older (and undefined) stuff. This is actually not
// done before r32008.
if (_http_response) {
_http_response->Clear();
}
response_attachment().clear();

// Retry backoff.
bthread::TaskGroup* g = bthread::tls_task_group;
if (retry_policy->CanRetryBackoffInPthread() ||
(g && !g->is_current_pthread_task())) {
int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L;
// No need to do retry backoff when the backoff time is longer than the remaining rpc time.
if (backoff_time_us > 0 &&
backoff_time_us < _deadline_us - butil::gettimeofday_us()) {
bthread_usleep(backoff_time_us);
}

} else {
LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
"skip retry backoff in pthread.";
}
return IssueRPC(butil::gettimeofday_us());
}
response_attachment().clear();
return IssueRPC(butil::gettimeofday_us());
}

END_OF_RPC:
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// after CallMethod.
int _max_retry;
const RetryPolicy* _retry_policy;
// Synchronization object for one RPC call. It remains unchanged even
// Synchronization object for one RPC call. It remains unchanged even
// when retry happens. Synchronous RPC will wait on this id.
CallId _correlation_id;

Expand Down
63 changes: 40 additions & 23 deletions src/brpc/retry_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,30 @@


#include "brpc/retry_policy.h"
#include "butil/fast_rand.h"


namespace brpc {

RetryPolicy::~RetryPolicy() {}

class RpcRetryPolicy : public RetryPolicy {
public:
bool DoRetry(const Controller* controller) const {
const int error_code = controller->ErrorCode();
if (!error_code) {
return false;
}
return (EFAILEDSOCKET == error_code
|| EEOF == error_code
|| EHOSTDOWN == error_code
|| ELOGOFF == error_code
|| ETIMEDOUT == error_code // This is not timeout of RPC.
|| ELIMIT == error_code
|| ENOENT == error_code
|| EPIPE == error_code
|| ECONNREFUSED == error_code
|| ECONNRESET == error_code
|| ENODATA == error_code
|| EOVERCROWDED == error_code
|| EH2RUNOUTSTREAMS == error_code);
bool RpcRetryPolicy::DoRetry(const Controller* controller) const {
const int error_code = controller->ErrorCode();
if (!error_code) {
return false;
}
};
return (EFAILEDSOCKET == error_code
|| EEOF == error_code
|| EHOSTDOWN == error_code
|| ELOGOFF == error_code
|| ETIMEDOUT == error_code // This is not timeout of RPC.
|| ELIMIT == error_code
|| ENOENT == error_code
|| EPIPE == error_code
|| ECONNREFUSED == error_code
|| ECONNRESET == error_code
|| ENODATA == error_code
|| EOVERCROWDED == error_code
|| EH2RUNOUTSTREAMS == error_code);
}

// NOTE(gejun): g_default_policy can't be deleted on process's exit because
// client-side may still retry and use the policy at exit
Expand All @@ -58,4 +54,25 @@ const RetryPolicy* DefaultRetryPolicy() {
return g_default_policy;
}

int32_t RpcRetryPolicyWithFixedBackoff::GetBackoffTimeMs(
const Controller* controller) const {
int64_t remaining_rpc_time_ms =
(controller->deadline_us() - butil::gettimeofday_us()) / 1000;
if (remaining_rpc_time_ms < _no_backoff_remaining_rpc_time_ms) {
return 0;
}
return _backoff_time_ms;
}

int32_t RpcRetryPolicyWithJitteredBackoff::GetBackoffTimeMs(
const Controller* controller) const {
int64_t remaining_rpc_time_ms =
(controller->deadline_us() - butil::gettimeofday_us()) / 1000;
if (remaining_rpc_time_ms < _no_backoff_remaining_rpc_time_ms) {
return 0;
}
return butil::fast_rand_in(_min_backoff_time_ms,
_max_backoff_time_ms);
}

} // namespace brpc
62 changes: 61 additions & 1 deletion src/brpc/retry_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace brpc {
// Inherit this class to customize when the RPC should be retried.
class RetryPolicy {
public:
virtual ~RetryPolicy();
virtual ~RetryPolicy() = default;

// Returns true if the RPC represented by `controller' should be retried.
// [Example]
Expand Down Expand Up @@ -68,11 +68,71 @@ class RetryPolicy {
virtual bool DoRetry(const Controller* controller) const = 0;
// ^
// don't forget the const modifier

// Returns the backoff time in milliseconds before every retry.
virtual int32_t GetBackoffTimeMs(const Controller* controller) const { return 0; }
// ^
// don't forget the const modifier

// Returns true if enable retry backoff in pthread, otherwise returns false.
virtual bool CanRetryBackoffInPthread() const { return false; }
// ^
// don't forget the const modifier
};

// Get the RetryPolicy used by brpc.
const RetryPolicy* DefaultRetryPolicy();

class RpcRetryPolicy : public RetryPolicy {
public:
bool DoRetry(const Controller* controller) const override;
};

class RpcRetryPolicyWithFixedBackoff : public RpcRetryPolicy {
public:
RpcRetryPolicyWithFixedBackoff(int32_t backoff_time_ms,
int32_t no_backoff_remaining_rpc_time_ms,
bool retry_backoff_in_pthread)
: _backoff_time_ms(backoff_time_ms)
, _no_backoff_remaining_rpc_time_ms(no_backoff_remaining_rpc_time_ms)
, _retry_backoff_in_pthread(retry_backoff_in_pthread) {}

int32_t GetBackoffTimeMs(const Controller* controller) const override;

bool CanRetryBackoffInPthread() const override { return _retry_backoff_in_pthread; }


private:
int32_t _backoff_time_ms;
// If remaining rpc time is less than `_no_backoff_remaining_rpc_time', no backoff.
int32_t _no_backoff_remaining_rpc_time_ms;
bool _retry_backoff_in_pthread;
};

class RpcRetryPolicyWithJitteredBackoff : public RpcRetryPolicy {
public:
RpcRetryPolicyWithJitteredBackoff(int32_t min_backoff_time_ms,
int32_t max_backoff_time_ms,
int32_t no_backoff_remaining_rpc_time_ms,
bool retry_backoff_in_pthread)
: _min_backoff_time_ms(min_backoff_time_ms)
, _max_backoff_time_ms(max_backoff_time_ms)
, _no_backoff_remaining_rpc_time_ms(no_backoff_remaining_rpc_time_ms)
, _retry_backoff_in_pthread(retry_backoff_in_pthread) {}

int32_t GetBackoffTimeMs(const Controller* controller) const override;

bool CanRetryBackoffInPthread() const override { return _retry_backoff_in_pthread; }

private:
// Generate jittered backoff time between [_min_backoff_ms, _max_backoff_ms].
int32_t _min_backoff_time_ms;
int32_t _max_backoff_time_ms;
// If remaining rpc time is less than `_no_backoff_remaining_rpc_time', no backoff.
int32_t _no_backoff_remaining_rpc_time_ms;
bool _retry_backoff_in_pthread;
};

} // namespace brpc


Expand Down
Loading

0 comments on commit 65b753d

Please sign in to comment.