Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{erl_opts, [
debug_info,
{platform_define, "^R", pre17}
{platform_define, "^R", pre17},
{platform_define, "^(R|17)", pre18}
]}.

{eunit_opts, [verbose]}.
Expand Down
116 changes: 83 additions & 33 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
transaction/3, child_spec/2, child_spec/3, child_spec/4, start/1,
start/2, start_link/1, start_link/2, stop/1, status/1]).
start/2, start_link/1, start_link/2, stop/1, status/1, stats/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export_type([pool/0]).
Expand All @@ -18,6 +18,15 @@
-type pid_queue() :: queue:queue().
-endif.

-ifdef(pre18).
-define(timestamp(), timestamp()).
timestamp() ->
{M, S, Mi} = os:timestamp(),
(M*1000000 + S)*1000000 + Mi.
-else.
-define(timestamp(), erlang:monotonic_time()).
-endif.

-ifdef(OTP_RELEASE). %% this implies 21 or higher
-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace).
-define(GET_STACK(Stacktrace), Stacktrace).
Expand All @@ -44,7 +53,12 @@
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
max_overflow = 10 :: non_neg_integer(),
strategy = lifo :: lifo | fifo
strategy = lifo :: lifo | fifo,
stats = false,
%% quant_start is set in init/3 function
%% because of timestamping issues on erlang/otp 17
quant_start = undefined,
time_used = 0
}).

-spec checkout(Pool :: pool()) -> pid().
Expand Down Expand Up @@ -145,6 +159,15 @@ stop(Pool) ->
status(Pool) ->
gen_server:call(Pool, status).

-spec stats(Pool :: pool()) -> {ok, float()} | {error, disabled}.
stats(Pool) ->
case gen_server:call(Pool, stats) of
{error, disabled} -> {error, disabled};
{ok, {TimeTotal, TimeUsed, Workers}} ->
{ok, (TimeUsed / TimeTotal / Workers)}
end.


init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
Expand All @@ -162,29 +185,31 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = lifo});
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = fifo});
init([{stats, StatsEnabled} | Rest], WorkerArgs, State) when is_boolean(StatsEnabled) ->
init(Rest, WorkerArgs, State#state{stats = StatsEnabled});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Workers = prepopulate(Size, Sup),
{ok, State#state{workers = Workers}}.
{ok, State#state{workers = Workers, quant_start = ?timestamp()}}.

handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
[{Pid, _, MRef, MaybeStartedAt}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_checkin(Pid, State),
NewState = handle_checkin(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
{noreply, State}
end;

handle_cast({cancel_waiting, CRef}, State) ->
case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
[[Pid, MRef]] ->
case ets:match(State#state.monitors, {'$1', CRef, '$2', '$3'}) of
[[Pid, MRef, MaybeStartedAt]] ->
demonitor(MRef, [flush]),
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
NewState = handle_checkin(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
Expand All @@ -206,15 +231,20 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow,
strategy = Strategy} = State,
strategy = Strategy,
stats = StatsEnabled} = State,
case get_worker_with_strategy(Workers, Strategy) of
{{value, Pid}, Left} ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
MaybeStartedAt = if StatsEnabled -> ?timestamp();
true -> undefined end,
true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}),
{reply, Pid, State#state{workers = Left}};
{empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, MRef} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
MaybeStartedAt = if StatsEnabled -> ?timestamp();
true -> undefined end,
true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}),
{reply, Pid, State#state{overflow = Overflow + 1}};
{empty, _Left} when Block =:= false ->
{reply, full, State};
Expand All @@ -230,6 +260,13 @@ handle_call(status, _From, State) ->
overflow = Overflow} = State,
StateName = state_name(State),
{reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State};
handle_call(stats, _From, #state{stats = false} = State) ->
{reply, {error, disabled}, State};
handle_call(stats, _From, State) ->
{reply, {ok, {
?timestamp() - State#state.quant_start,
State#state.time_used, State#state.size
}}, State#state{quant_start = ?timestamp(), time_used = 0}};
handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
{reply, Workers, State};
Expand All @@ -239,7 +276,7 @@ handle_call(get_all_workers, _From, State) ->
{reply, WorkerList, State};
handle_call(get_all_monitors, _From, State) ->
Monitors = ets:select(State#state.monitors,
[{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
[{{'$1', '_', '$2', '_'}, [], [{{'$1', '$2'}}]}]),
{reply, Monitors, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
Expand All @@ -248,10 +285,10 @@ handle_call(_Msg, _From, State) ->
{reply, Reply, State}.

handle_info({'DOWN', MRef, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
[[Pid]] ->
case ets:match(State#state.monitors, {'$1', '_', MRef, '$2'}) of
[[Pid, MaybeStartedAt]] ->
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
NewState = handle_checkin(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
Expand All @@ -261,10 +298,10 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
[{Pid, _, MRef, MaybeStartedAt}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_worker_exit(Pid, State),
NewState = handle_worker_exit(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
case queue:member(Pid, State#state.workers) of
Expand Down Expand Up @@ -328,40 +365,53 @@ prepopulate(0, _Sup, Workers) ->
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).

handle_checkin(Pid, State) ->
handle_checkin(Pid, StartedAt, State0) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow} = State,
overflow = Overflow,
stats = StatsEnabled} = State0,
State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0),
case queue:out(Waiting) of
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
MaybeStartedAt = if StatsEnabled -> ?timestamp();
true -> undefined end,
true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}),
gen_server:reply(From, Pid),
State#state{waiting = Left};
State1#state{waiting = Left};
{empty, Empty} when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
State1#state{waiting = Empty, overflow = Overflow - 1};
{empty, Empty} ->
Workers = queue:in(Pid, State#state.workers),
State#state{workers = Workers, waiting = Empty, overflow = 0}
Workers = queue:in(Pid, State1#state.workers),
State1#state{workers = Workers, waiting = Empty, overflow = 0}
end.

handle_worker_exit(Pid, State) ->
handle_checkin_stats(false, _, State) -> State;
handle_checkin_stats(_, undefined, State) -> State;
handle_checkin_stats(_, StartedAt, #state{time_used = TimeUsed} = State) ->
State#state{time_used = TimeUsed + (?timestamp() - StartedAt)}.

handle_worker_exit(Pid, StartedAt, State0) ->
#state{supervisor = Sup,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
overflow = Overflow,
stats = StatsEnabled} = State0,
State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0),
case queue:out(State1#state.waiting) of
{{value, {From, CRef, MRef}}, LeftWaiting} ->
NewWorker = new_worker(State#state.supervisor),
true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
NewWorker = new_worker(State1#state.supervisor),
MaybeStartedAt = if StatsEnabled -> ?timestamp();
true -> undefined end,
true = ets:insert(Monitors, {NewWorker, CRef, MRef, MaybeStartedAt}),
gen_server:reply(From, NewWorker),
State#state{waiting = LeftWaiting};
State1#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
State1#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
W = filter_worker_by_pid(Pid, State#state.workers),
W = filter_worker_by_pid(Pid, State1#state.workers),
Workers = queue:in(new_worker(Sup), W),
State#state{workers = Workers, waiting = Empty}
State1#state{workers = Workers, waiting = Empty}
end.

state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
Expand Down
35 changes: 34 additions & 1 deletion test/poolboy_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ pool_test_() ->
{<<"Recover from timeout without exit handling">>,
fun transaction_timeout_without_exit/0},
{<<"Recover from transaction timeout">>,
fun transaction_timeout/0}
fun transaction_timeout/0},
{<<"Pool behaves when stats disabled">>,
fun pool_stats_disabled/0},
{<<"Pool behaves when stats enabled">>,
fun pool_stats_enabled/0},
{<<"Pool behaves when stats enabled with overflow">>,
fun pool_stats_enabled_overflow/0}
]
}.

Expand Down Expand Up @@ -124,6 +130,27 @@ transaction_timeout() ->
?assertEqual({ready,1,0,0}, pool_call(Pid, status)).


pool_stats_disabled() ->
{ok, Pid} = new_pool(1, 0),
?assertMatch({error, disabled}, poolboy:stats(Pid)).

pool_stats_enabled() ->
{ok, Pid} = new_pool(1, 0, lifo, true),
?assertMatch({ok, 0.0}, poolboy:stats(Pid)),
Worker = poolboy:checkout(Pid),
timer:sleep(1000), %% emulating load?..
checkin_worker(Pid, Worker),
?assertMatch({ok, Load} when Load > 0.5, poolboy:stats(Pid)),
?assertMatch({ok, 0.0}, poolboy:stats(Pid)).

pool_stats_enabled_overflow() ->
{ok, Pid} = new_pool(2, 2, lifo, true),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(1,4)],
timer:sleep(1000), %% emulating load?..
[poolboy:checkin(Pid, Worker) || Worker <- Workers],
timer:sleep(500), %% see checkin_worker/2 comment
?assertMatch({ok, Load} when Load > 1.0, poolboy:stats(Pid)).

pool_startup() ->
%% Check basic pool operation.
{ok, Pid} = new_pool(10, 5),
Expand Down Expand Up @@ -546,5 +573,11 @@ new_pool(Size, MaxOverflow, Strategy) ->
{size, Size}, {max_overflow, MaxOverflow},
{strategy, Strategy}]).

new_pool(Size, MaxOverflow, Strategy, StatsEnabled) ->
poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, Size}, {max_overflow, MaxOverflow},
{strategy, Strategy}, {stats, StatsEnabled}]).

pool_call(ServerRef, Request) ->
gen_server:call(ServerRef, Request).