Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zombie detector #57

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# ehttpc changes

## 0.6.0

- Changed log format to be more structured, `host` and `port` are included in the log data fields.
- Add `max_inactive` duration option (default is `10_000` milliseconds).
This is to detect zombie connections especially when pipelining is set > 1.
With `{max_inactive, 10_000}` added to the `start_pool` option,
it will try to reconnect HTTP server up on detection of the last sent request had been expired for 10 seconds.

## 0.5.0

- Dropped hot-upgrade support.
Expand Down
181 changes: 152 additions & 29 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
%%--------------------------------------------------------------------

-module(ehttpc).
-feature(maybe_expr, enable).

-behaviour(gen_server).

Expand Down Expand Up @@ -67,7 +68,6 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-define(LOG(Level, Format, Args), logger:Level("ehttpc: " ++ Format, Args)).
-define(REQ(Method, Req, ExpireAt), {Method, Req, ExpireAt}).
-define(PEND_REQ(ReplyTo, Req), {ReplyTo, Req}).
-define(SENT_REQ(ReplyTo, ExpireAt, Acc), {ReplyTo, ExpireAt, Acc}).
Expand All @@ -77,6 +77,7 @@
-define(GEN_CALL_REQ(From, Call), {'$gen_call', From, ?REQ(_, _, _) = Call}).
-define(undef, undefined).
-define(IS_POOL(Pool), (not is_tuple(Pool) andalso not is_pid(Pool))).
-define(DEFAULT_MAX_INACTIVE, 10_000).

-record(state, {
pool :: term(),
Expand All @@ -90,7 +91,9 @@
gun_opts :: gun:opts(),
gun_state :: down | up,
requests :: map(),
proxy :: undefined | map()
proxy :: undefined | map(),
max_inactive :: pos_integer(),
inactive_check_tref :: reference() | ?undef
}).

-type pool_name() :: any().
Expand Down Expand Up @@ -199,6 +202,7 @@ init([Pool, Id, Opts0]) ->
process_flag(trap_exit, true),
PrioLatest = proplists:get_bool(prioritise_latest, Opts0),
#{opts := Opts, proxy := Proxy} = parse_proxy_opts(Opts0),
MaxInactive = proplists:get_value(max_inactive, Opts, ?DEFAULT_MAX_INACTIVE),
State = #state{
pool = Pool,
id = Id,
Expand All @@ -213,12 +217,14 @@ init([Pool, Id, Opts0]) ->
pending => queue:new(),
pending_count => 0,
sent => #{},
max_sent_expire => 0,
prioritise_latest => PrioLatest
},
proxy = Proxy
proxy = Proxy,
max_inactive = MaxInactive
},
true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}),
{ok, State}.
{ok, start_check_inactive_timer(State)}.

handle_call({health_check, _}, _From, State = #state{gun_state = up}) ->
{reply, ok, State};
Expand Down Expand Up @@ -267,11 +273,18 @@ handle_info({suspend, Time}, State) ->
%% only for testing
timer:sleep(Time),
{noreply, State};
handle_info(check_inactive, State0) ->
State = maybe_shoot(State0),
{noreply, start_check_inactive_timer(State)};
handle_info(Info, State0) ->
State1 = do_handle_info(Info, upgrade_requests(State0)),
State = maybe_shoot(State1),
{noreply, State}.

start_check_inactive_timer(#state{inactive_check_tref = Tref, max_inactive = T} = State) ->
is_reference(Tref) andalso erlang:cancel_timer(Tref),
State#state{inactive_check_tref = erlang:send_after(T, self(), check_inactive)}.

do_handle_info(
{gun_response, Client, StreamRef, IsFin, StatusCode, Headers},
#state{client = Client} = State
Expand Down Expand Up @@ -308,7 +321,7 @@ do_handle_info(
State = #state{client = Client}
) ->
Reason =/= normal andalso Reason =/= closed andalso
?LOG(warning, "Received 'gun_down' message with reason: ~p", [Reason]),
log(warning, #{msg => "http_connection_down", reason => Reason}, State),
NewState = handle_gun_down(State, KilledStreams, Reason),
NewState;
do_handle_info(
Expand All @@ -320,7 +333,14 @@ do_handle_info(
do_handle_info({'EXIT', Client, Reason}, State = #state{client = Client}) ->
handle_client_down(State, Reason);
do_handle_info(Info, State) ->
?LOG(warning, "~p unexpected_info: ~p, client: ~p", [?MODULE, Info, State#state.client]),
log(
warning,
#{
msg => "ehttpc_unexpected_info",
info => Info
},
State
),
State.

terminate(_Reason, #state{pool = Pool, id = Id, client = Client}) ->
Expand Down Expand Up @@ -532,22 +552,49 @@ upgrade_requests(Map) when is_map(Map) ->
pending => queue:new(),
pending_count => 0,
sent => Map,
prioritise_latest => false
prioritise_latest => false,
max_sent_expire => 0
zmstone marked this conversation as resolved.
Show resolved Hide resolved
}.

put_sent_req(StreamRef, Req, #{sent := Sent} = Requests) ->
Requests#{sent := maps:put(StreamRef, Req, Sent)}.
put_sent_req(
StreamRef,
Req,
#{
sent := Sent,
max_sent_expire := T
} = Requests
) ->
?SENT_REQ(_, Expire, _) = Req,
Requests#{
sent := maps:put(StreamRef, Req, Sent),
max_sent_expire := max_expire(T, Expire)
}.

%% if a request has infinity timeout, ignore it
max_expire(T, infinity) -> T;
max_expire(T1, T2) when is_integer(T2) -> max(T1, T2).

take_sent_req(StreamRef, #{sent := Sent} = Requests) ->
take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) ->
case maps:take(StreamRef, Sent) of
error ->
error;
{Req, NewSent} ->
%% we assume all calls use the same timeout value
%% so there is no need to scan the map to find a new max
%% or even if calls may use different timeout
%% the impact of a wrong max is minimal: delayed detection of zombie connection
NewT =
case map_size(NewSent) of
0 ->
0;
_ ->
T
end,
case is_sent_req_expired(Req, now_()) of
true ->
{expired, Requests#{sent := NewSent}};
{expired, Requests#{sent := NewSent, max_sent_expire := NewT}};
false ->
{Req, Requests#{sent := NewSent}}
{Req, Requests#{sent := NewSent, max_sent_expire := NewT}}
end
end.

Expand Down Expand Up @@ -579,14 +626,7 @@ reply_error_for_sent_reqs(#{sent := Sent} = R, Reason) ->
end,
maps:to_list(Sent)
),
R#{sent := #{}}.

%% allow 100 async requests maximum when enable_pipelining is 'true'
%% allow only 1 async request when enable_pipelining is 'false'
%% otherwise stop shooting at the number limited by enable_pipelining
should_cool_down(true, Sent) -> Sent >= 100;
should_cool_down(false, Sent) -> Sent > 0;
should_cool_down(N, Sent) when is_integer(N) -> Sent >= N.
R#{sent => #{}, max_sent_expire => 0}.

%% Continue droping expired requests, to avoid the state RAM usage
%% explosion if http client can not keep up.
Expand Down Expand Up @@ -633,20 +673,100 @@ enqueue_req(ReplyTo, Req, #state{requests = Requests0} = State) ->
State#state{requests = drop_expired(Requests)}.

%% call gun to shoot the request out
maybe_shoot(#state{enable_pipelining = EP, requests = Requests0, client = Client} = State0) ->
#{sent := Sent} = Requests0,
maybe_shoot(
#state{
requests =
#{
sent := Sent,
max_sent_expire := MaxExpire
} = Requests0,
client = Client,
max_inactive = MaxInactive,
enable_pipelining = PipelineLimit
} = State0
) ->
State = State0#state{requests = drop_expired(Requests0)},
%% If the gun http client is down
ClientDown = is_pid(Client) andalso (not is_process_alive(Client)),
%% Or when too many has been sent already
case ClientDown orelse should_cool_down(EP, maps:size(Sent)) of
true ->
SentCount = map_size(Sent),
case check_gun(Client, PipelineLimit, SentCount, MaxExpire, MaxInactive) of
continue ->
do_shoot(State);
pause ->
%% Then we should cool down, and let the gun responses
%% or 'EXIT' message to trigger the flow again
?tp(cool_down, #{enable_pipelining => EP}),
?tp(cool_down, #{enable_pipelining => State#state.enable_pipelining}),
State;
reconnect ->
%% assert
true = (MaxExpire > 0),
%% the connection has been inactive for too long
log(
error,
#{
msg => "force_reconnecting_zombie_http_connection",
last_request_expire => calendar:system_time_to_rfc3339(MaxExpire, [
zmstone marked this conversation as resolved.
Show resolved Hide resolved
{unit, millisecond}
]),
inactive_duration_threshold => MaxInactive,
inflight_requests => SentCount,
connection_pid => Client
},
State
),
?tp(reconnect, #{sent => SentCount}),
_ = exit(Client, kill),
State
end.

check_gun(ClientPid, PipelineLimit, SentCount, MaxExpireTs, MaxInactiveDuration) ->
maybe
ok ?= check_gun_pid(ClientPid),
ok ?= check_gun_jammed(SentCount, MaxExpireTs, MaxInactiveDuration),
check_gun_limit(PipelineLimit, SentCount)
end.

check_gun_pid(Pid) when not is_pid(Pid) ->
%% go straight to initialize client
continue;
check_gun_pid(Pid) ->
case is_process_alive(Pid) of
true ->
%% ok to send
ok;
false ->
do_shoot(State)
%% once initialized but now restarting
%% should not send but wait for EXIT message to trigger
%% reconnect
pause
end.

%% if there are sent requests, and the last reply is older than max_inactive,
%% the connection is considered in zomebie state hence require a reconnect.
check_gun_jammed(_SentCount, 0, _MaxInactiveDuration) ->
%% there was no expire time recorded before
ok;
check_gun_jammed(_SentCount, MaxExpireTs, MaxInactiveDuration) ->
case (now_() - MaxExpireTs) > MaxInactiveDuration of
true ->
reconnect;
false ->
ok
end.

%% allow 100 async requests maximum when enable_pipelining is 'true'
%% allow only 1 async request when enable_pipelining is 'false'
%% otherwise stop shooting at the number limited by enable_pipelining
check_gun_limit(_EnablePipeline = true, SentCount) ->
%% backward compatible
check_gun_limit(100, SentCount);
check_gun_limit(_EnablePipeline = false, SentCount) ->
%% backward compatible
check_gun_limit(1, SentCount);
check_gun_limit(PipelineLimit, SentCount) ->
case SentCount < PipelineLimit of
true ->
continue;
false ->
pause
end.

do_shoot(#state{requests = #{pending_count := 0}} = State) ->
Expand Down Expand Up @@ -914,6 +1034,9 @@ take_proplist(Key, Proplist0) ->
{ValueFromProplist, Proplist1}
end.

log(Level, Data, #state{host = Host, port = Port}) ->
logger:log(Level, Data#{host => Host, port => Port}).

-ifdef(TEST).

prioritise_latest_test() ->
Expand Down
Loading
Loading