From b0f5821c17575f9f761b38061e4fe9be9c131273 Mon Sep 17 00:00:00 2001 From: Denis Fakhrtdinov Date: Fri, 27 Nov 2020 18:57:19 +0300 Subject: [PATCH 1/2] add optional pool utilization stats --- src/poolboy.erl | 103 ++++++++++++++++++++++++++++------------- test/poolboy_tests.erl | 27 ++++++++++- 2 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index db20541..4e43070 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -5,7 +5,7 @@ -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, transaction/3, child_spec/2, child_spec/3, child_spec/4, start/1, - start/2, start_link/1, start_link/2, stop/1, status/1]). + start/2, start_link/1, start_link/2, stop/1, status/1, stats/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export_type([pool/0]). @@ -44,7 +44,10 @@ size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + stats = false, + quant_start = erlang:monotonic_time(), + time_used = 0 }). -spec checkout(Pool :: pool()) -> pid(). @@ -145,6 +148,15 @@ stop(Pool) -> status(Pool) -> gen_server:call(Pool, status). +-spec stats(Pool :: pool()) -> {ok, float()} | {error, disabled}. +stats(Pool) -> + case gen_server:call(Pool, stats) of + {error, disabled} -> {error, disabled}; + {ok, {TimeTotal, TimeUsed, Workers}} -> + {ok, (TimeUsed / TimeTotal / Workers)} + end. + + init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), @@ -162,6 +174,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{stats, StatsEnabled} | Rest], WorkerArgs, State) when is_boolean(StatsEnabled) -> + init(Rest, WorkerArgs, State#state{stats = StatsEnabled}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -170,21 +184,21 @@ init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> + [{Pid, _, MRef, MaybeStartedAt}] -> true = erlang:demonitor(MRef), true = ets:delete(Monitors, Pid), - NewState = handle_checkin(Pid, State), + NewState = handle_checkin(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> {noreply, State} end; handle_cast({cancel_waiting, CRef}, State) -> - case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of - [[Pid, MRef]] -> + case ets:match(State#state.monitors, {'$1', CRef, '$2', '$3'}) of + [[Pid, MRef, MaybeStartedAt]] -> demonitor(MRef, [flush]), true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), + NewState = handle_checkin(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> Cancel = fun({_, Ref, MRef}) when Ref =:= CRef -> @@ -206,15 +220,20 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> monitors = Monitors, overflow = Overflow, max_overflow = MaxOverflow, - strategy = Strategy} = State, + strategy = Strategy, + stats = StatsEnabled} = State, case get_worker_with_strategy(Workers, Strategy) of {{value, Pid}, Left} -> MRef = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + true -> undefined end, + true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), {reply, Pid, State#state{workers = Left}}; {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, MRef} = new_worker(Sup, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + true -> undefined end, + true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), {reply, Pid, State#state{overflow = Overflow + 1}}; {empty, _Left} when Block =:= false -> {reply, full, State}; @@ -230,6 +249,13 @@ handle_call(status, _From, State) -> overflow = Overflow} = State, StateName = state_name(State), {reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State}; +handle_call(stats, _From, #state{stats = false} = State) -> + {reply, {error, disabled}, State}; +handle_call(stats, _From, State) -> + {reply, {ok, { + erlang:monotonic_time() - State#state.quant_start, + State#state.time_used, State#state.size + }}, State#state{quant_start = erlang:monotonic_time(), time_used = 0}}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -239,7 +265,7 @@ handle_call(get_all_workers, _From, State) -> {reply, WorkerList, State}; handle_call(get_all_monitors, _From, State) -> Monitors = ets:select(State#state.monitors, - [{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]), + [{{'$1', '_', '$2', '_'}, [], [{{'$1', '$2'}}]}]), {reply, Monitors, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -248,10 +274,10 @@ handle_call(_Msg, _From, State) -> {reply, Reply, State}. handle_info({'DOWN', MRef, _, _, _}, State) -> - case ets:match(State#state.monitors, {'$1', '_', MRef}) of - [[Pid]] -> + case ets:match(State#state.monitors, {'$1', '_', MRef, '$2'}) of + [[Pid, MaybeStartedAt]] -> true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), + NewState = handle_checkin(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), @@ -261,10 +287,10 @@ handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> + [{Pid, _, MRef, MaybeStartedAt}] -> true = erlang:demonitor(MRef), true = ets:delete(Monitors, Pid), - NewState = handle_worker_exit(Pid, State), + NewState = handle_worker_exit(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> case queue:member(Pid, State#state.workers) of @@ -328,40 +354,53 @@ prepopulate(0, _Sup, Workers) -> prepopulate(N, Sup, Workers) -> prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). -handle_checkin(Pid, State) -> +handle_checkin(Pid, StartedAt, State0) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, - overflow = Overflow} = State, + overflow = Overflow, + stats = StatsEnabled} = State0, + State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0), case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> - true = ets:insert(Monitors, {Pid, CRef, MRef}), + MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + true -> undefined end, + true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), gen_server:reply(From, Pid), - State#state{waiting = Left}; + State1#state{waiting = Left}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), - State#state{waiting = Empty, overflow = Overflow - 1}; + State1#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = queue:in(Pid, State#state.workers), - State#state{workers = Workers, waiting = Empty, overflow = 0} + Workers = queue:in(Pid, State1#state.workers), + State1#state{workers = Workers, waiting = Empty, overflow = 0} end. -handle_worker_exit(Pid, State) -> +handle_checkin_stats(false, _, State) -> State; +handle_checkin_stats(_, undefined, State) -> State; +handle_checkin_stats(_, StartedAt, #state{time_used = TimeUsed} = State) -> + State#state{time_used = TimeUsed + (erlang:monotonic_time() - StartedAt)}. + +handle_worker_exit(Pid, StartedAt, State0) -> #state{supervisor = Sup, monitors = Monitors, - overflow = Overflow} = State, - case queue:out(State#state.waiting) of + overflow = Overflow, + stats = StatsEnabled} = State0, + State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0), + case queue:out(State1#state.waiting) of {{value, {From, CRef, MRef}}, LeftWaiting} -> - NewWorker = new_worker(State#state.supervisor), - true = ets:insert(Monitors, {NewWorker, CRef, MRef}), + NewWorker = new_worker(State1#state.supervisor), + MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + true -> undefined end, + true = ets:insert(Monitors, {NewWorker, CRef, MRef, MaybeStartedAt}), gen_server:reply(From, NewWorker), - State#state{waiting = LeftWaiting}; + State1#state{waiting = LeftWaiting}; {empty, Empty} when Overflow > 0 -> - State#state{overflow = Overflow - 1, waiting = Empty}; + State1#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> - W = filter_worker_by_pid(Pid, State#state.workers), + W = filter_worker_by_pid(Pid, State1#state.workers), Workers = queue:in(new_worker(Sup), W), - State#state{workers = Workers, waiting = Empty} + State1#state{workers = Workers, waiting = Empty} end. state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 5b27024..98e6846 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -72,7 +72,11 @@ pool_test_() -> {<<"Recover from timeout without exit handling">>, fun transaction_timeout_without_exit/0}, {<<"Recover from transaction timeout">>, - fun transaction_timeout/0} + fun transaction_timeout/0}, + {<<"Pool behaves when stats disabled">>, + fun pool_stats_disabled/0}, + {<<"Pool behaves when stats enabled">>, + fun pool_stats_enabled/0} ] }. @@ -124,6 +128,21 @@ transaction_timeout() -> ?assertEqual({ready,1,0,0}, pool_call(Pid, status)). +pool_stats_disabled() -> + {ok, Pid} = new_pool(1, 0), + ?assertMatch({error, disabled}, poolboy:stats(Pid)). + +pool_stats_enabled() -> + {ok, Pid} = new_pool(1, 0, lifo, true), + ?assertMatch({ok, 0.0}, poolboy:stats(Pid)), + Worker = poolboy:checkout(Pid), + timer:sleep(1000), %% emulating load?.. + checkin_worker(Pid, Worker), + StatsRet = poolboy:stats(Pid), + ?assertMatch({ok, Load} when Load > 0.5, StatsRet), + ?assertNotMatch({ok, 0.0}, StatsRet), + ?assertMatch({ok, 0.0}, poolboy:stats(Pid)). + pool_startup() -> %% Check basic pool operation. {ok, Pid} = new_pool(10, 5), @@ -546,5 +565,11 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool(Size, MaxOverflow, Strategy, StatsEnabled) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {strategy, Strategy}, {stats, StatsEnabled}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). From 9da6432c1e5f8c041e446dfb5113e72de3a26115 Mon Sep 17 00:00:00 2001 From: Denis Fakhrtdinov Date: Sat, 28 Nov 2020 01:44:03 +0300 Subject: [PATCH 2/2] compatibility with erlang/otp 17; fix tests --- rebar.config | 3 ++- src/poolboy.erl | 29 ++++++++++++++++++++--------- test/poolboy_tests.erl | 16 ++++++++++++---- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/rebar.config b/rebar.config index 1d494ca..203606d 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {erl_opts, [ debug_info, - {platform_define, "^R", pre17} + {platform_define, "^R", pre17}, + {platform_define, "^(R|17)", pre18} ]}. {eunit_opts, [verbose]}. diff --git a/src/poolboy.erl b/src/poolboy.erl index 4e43070..b4137e2 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -18,6 +18,15 @@ -type pid_queue() :: queue:queue(). -endif. +-ifdef(pre18). +-define(timestamp(), timestamp()). +timestamp() -> + {M, S, Mi} = os:timestamp(), + (M*1000000 + S)*1000000 + Mi. +-else. +-define(timestamp(), erlang:monotonic_time()). +-endif. + -ifdef(OTP_RELEASE). %% this implies 21 or higher -define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace). -define(GET_STACK(Stacktrace), Stacktrace). @@ -46,7 +55,9 @@ max_overflow = 10 :: non_neg_integer(), strategy = lifo :: lifo | fifo, stats = false, - quant_start = erlang:monotonic_time(), + %% quant_start is set in init/3 function + %% because of timestamping issues on erlang/otp 17 + quant_start = undefined, time_used = 0 }). @@ -180,7 +191,7 @@ init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. + {ok, State#state{workers = Workers, quant_start = ?timestamp()}}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of @@ -225,13 +236,13 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> case get_worker_with_strategy(Workers, Strategy) of {{value, Pid}, Left} -> MRef = erlang:monitor(process, FromPid), - MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + MaybeStartedAt = if StatsEnabled -> ?timestamp(); true -> undefined end, true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), {reply, Pid, State#state{workers = Left}}; {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, MRef} = new_worker(Sup, FromPid), - MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + MaybeStartedAt = if StatsEnabled -> ?timestamp(); true -> undefined end, true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), {reply, Pid, State#state{overflow = Overflow + 1}}; @@ -253,9 +264,9 @@ handle_call(stats, _From, #state{stats = false} = State) -> {reply, {error, disabled}, State}; handle_call(stats, _From, State) -> {reply, {ok, { - erlang:monotonic_time() - State#state.quant_start, + ?timestamp() - State#state.quant_start, State#state.time_used, State#state.size - }}, State#state{quant_start = erlang:monotonic_time(), time_used = 0}}; + }}, State#state{quant_start = ?timestamp(), time_used = 0}}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -363,7 +374,7 @@ handle_checkin(Pid, StartedAt, State0) -> State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0), case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> - MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + MaybeStartedAt = if StatsEnabled -> ?timestamp(); true -> undefined end, true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), gen_server:reply(From, Pid), @@ -379,7 +390,7 @@ handle_checkin(Pid, StartedAt, State0) -> handle_checkin_stats(false, _, State) -> State; handle_checkin_stats(_, undefined, State) -> State; handle_checkin_stats(_, StartedAt, #state{time_used = TimeUsed} = State) -> - State#state{time_used = TimeUsed + (erlang:monotonic_time() - StartedAt)}. + State#state{time_used = TimeUsed + (?timestamp() - StartedAt)}. handle_worker_exit(Pid, StartedAt, State0) -> #state{supervisor = Sup, @@ -390,7 +401,7 @@ handle_worker_exit(Pid, StartedAt, State0) -> case queue:out(State1#state.waiting) of {{value, {From, CRef, MRef}}, LeftWaiting} -> NewWorker = new_worker(State1#state.supervisor), - MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time(); + MaybeStartedAt = if StatsEnabled -> ?timestamp(); true -> undefined end, true = ets:insert(Monitors, {NewWorker, CRef, MRef, MaybeStartedAt}), gen_server:reply(From, NewWorker), diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 98e6846..af0471a 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -76,7 +76,9 @@ pool_test_() -> {<<"Pool behaves when stats disabled">>, fun pool_stats_disabled/0}, {<<"Pool behaves when stats enabled">>, - fun pool_stats_enabled/0} + fun pool_stats_enabled/0}, + {<<"Pool behaves when stats enabled with overflow">>, + fun pool_stats_enabled_overflow/0} ] }. @@ -138,11 +140,17 @@ pool_stats_enabled() -> Worker = poolboy:checkout(Pid), timer:sleep(1000), %% emulating load?.. checkin_worker(Pid, Worker), - StatsRet = poolboy:stats(Pid), - ?assertMatch({ok, Load} when Load > 0.5, StatsRet), - ?assertNotMatch({ok, 0.0}, StatsRet), + ?assertMatch({ok, Load} when Load > 0.5, poolboy:stats(Pid)), ?assertMatch({ok, 0.0}, poolboy:stats(Pid)). +pool_stats_enabled_overflow() -> + {ok, Pid} = new_pool(2, 2, lifo, true), + Workers = [poolboy:checkout(Pid) || _ <- lists:seq(1,4)], + timer:sleep(1000), %% emulating load?.. + [poolboy:checkin(Pid, Worker) || Worker <- Workers], + timer:sleep(500), %% see checkin_worker/2 comment + ?assertMatch({ok, Load} when Load > 1.0, poolboy:stats(Pid)). + pool_startup() -> %% Check basic pool operation. {ok, Pid} = new_pool(10, 5),