diff --git a/rebar.config b/rebar.config index 1d494ca..203606d 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {erl_opts, [ debug_info, - {platform_define, "^R", pre17} + {platform_define, "^R", pre17}, + {platform_define, "^(R|17)", pre18} ]}. {eunit_opts, [verbose]}. diff --git a/src/poolboy.erl b/src/poolboy.erl index db20541..b4137e2 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -5,7 +5,7 @@ -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, transaction/3, child_spec/2, child_spec/3, child_spec/4, start/1, - start/2, start_link/1, start_link/2, stop/1, status/1]). + start/2, start_link/1, start_link/2, stop/1, status/1, stats/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export_type([pool/0]). @@ -18,6 +18,15 @@ -type pid_queue() :: queue:queue(). -endif. +-ifdef(pre18). +-define(timestamp(), timestamp()). +timestamp() -> + {M, S, Mi} = os:timestamp(), + (M*1000000 + S)*1000000 + Mi. +-else. +-define(timestamp(), erlang:monotonic_time()). +-endif. + -ifdef(OTP_RELEASE). %% this implies 21 or higher -define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace). -define(GET_STACK(Stacktrace), Stacktrace). @@ -44,7 +53,12 @@ size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + stats = false, + %% quant_start is set in init/3 function + %% because of timestamping issues on erlang/otp 17 + quant_start = undefined, + time_used = 0 }). -spec checkout(Pool :: pool()) -> pid(). @@ -145,6 +159,15 @@ stop(Pool) -> status(Pool) -> gen_server:call(Pool, status). +-spec stats(Pool :: pool()) -> {ok, float()} | {error, disabled}. +stats(Pool) -> + case gen_server:call(Pool, stats) of + {error, disabled} -> {error, disabled}; + {ok, {TimeTotal, TimeUsed, Workers}} -> + {ok, (TimeUsed / TimeTotal / Workers)} + end. + + init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), @@ -162,29 +185,31 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{stats, StatsEnabled} | Rest], WorkerArgs, State) when is_boolean(StatsEnabled) -> + init(Rest, WorkerArgs, State#state{stats = StatsEnabled}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. + {ok, State#state{workers = Workers, quant_start = ?timestamp()}}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> + [{Pid, _, MRef, MaybeStartedAt}] -> true = erlang:demonitor(MRef), true = ets:delete(Monitors, Pid), - NewState = handle_checkin(Pid, State), + NewState = handle_checkin(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> {noreply, State} end; handle_cast({cancel_waiting, CRef}, State) -> - case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of - [[Pid, MRef]] -> + case ets:match(State#state.monitors, {'$1', CRef, '$2', '$3'}) of + [[Pid, MRef, MaybeStartedAt]] -> demonitor(MRef, [flush]), true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), + NewState = handle_checkin(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> Cancel = fun({_, Ref, MRef}) when Ref =:= CRef -> @@ -206,15 +231,20 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> monitors = Monitors, overflow = Overflow, max_overflow = MaxOverflow, - strategy = Strategy} = State, + strategy = Strategy, + stats = StatsEnabled} = State, case get_worker_with_strategy(Workers, Strategy) of {{value, Pid}, Left} -> MRef = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + MaybeStartedAt = if StatsEnabled -> ?timestamp(); + true -> undefined end, + true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), {reply, Pid, State#state{workers = Left}}; {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, MRef} = new_worker(Sup, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + MaybeStartedAt = if StatsEnabled -> ?timestamp(); + true -> undefined end, + true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), {reply, Pid, State#state{overflow = Overflow + 1}}; {empty, _Left} when Block =:= false -> {reply, full, State}; @@ -230,6 +260,13 @@ handle_call(status, _From, State) -> overflow = Overflow} = State, StateName = state_name(State), {reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State}; +handle_call(stats, _From, #state{stats = false} = State) -> + {reply, {error, disabled}, State}; +handle_call(stats, _From, State) -> + {reply, {ok, { + ?timestamp() - State#state.quant_start, + State#state.time_used, State#state.size + }}, State#state{quant_start = ?timestamp(), time_used = 0}}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -239,7 +276,7 @@ handle_call(get_all_workers, _From, State) -> {reply, WorkerList, State}; handle_call(get_all_monitors, _From, State) -> Monitors = ets:select(State#state.monitors, - [{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]), + [{{'$1', '_', '$2', '_'}, [], [{{'$1', '$2'}}]}]), {reply, Monitors, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -248,10 +285,10 @@ handle_call(_Msg, _From, State) -> {reply, Reply, State}. handle_info({'DOWN', MRef, _, _, _}, State) -> - case ets:match(State#state.monitors, {'$1', '_', MRef}) of - [[Pid]] -> + case ets:match(State#state.monitors, {'$1', '_', MRef, '$2'}) of + [[Pid, MaybeStartedAt]] -> true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), + NewState = handle_checkin(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), @@ -261,10 +298,10 @@ handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> + [{Pid, _, MRef, MaybeStartedAt}] -> true = erlang:demonitor(MRef), true = ets:delete(Monitors, Pid), - NewState = handle_worker_exit(Pid, State), + NewState = handle_worker_exit(Pid, MaybeStartedAt, State), {noreply, NewState}; [] -> case queue:member(Pid, State#state.workers) of @@ -328,40 +365,53 @@ prepopulate(0, _Sup, Workers) -> prepopulate(N, Sup, Workers) -> prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). -handle_checkin(Pid, State) -> +handle_checkin(Pid, StartedAt, State0) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, - overflow = Overflow} = State, + overflow = Overflow, + stats = StatsEnabled} = State0, + State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0), case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> - true = ets:insert(Monitors, {Pid, CRef, MRef}), + MaybeStartedAt = if StatsEnabled -> ?timestamp(); + true -> undefined end, + true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}), gen_server:reply(From, Pid), - State#state{waiting = Left}; + State1#state{waiting = Left}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), - State#state{waiting = Empty, overflow = Overflow - 1}; + State1#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = queue:in(Pid, State#state.workers), - State#state{workers = Workers, waiting = Empty, overflow = 0} + Workers = queue:in(Pid, State1#state.workers), + State1#state{workers = Workers, waiting = Empty, overflow = 0} end. -handle_worker_exit(Pid, State) -> +handle_checkin_stats(false, _, State) -> State; +handle_checkin_stats(_, undefined, State) -> State; +handle_checkin_stats(_, StartedAt, #state{time_used = TimeUsed} = State) -> + State#state{time_used = TimeUsed + (?timestamp() - StartedAt)}. + +handle_worker_exit(Pid, StartedAt, State0) -> #state{supervisor = Sup, monitors = Monitors, - overflow = Overflow} = State, - case queue:out(State#state.waiting) of + overflow = Overflow, + stats = StatsEnabled} = State0, + State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0), + case queue:out(State1#state.waiting) of {{value, {From, CRef, MRef}}, LeftWaiting} -> - NewWorker = new_worker(State#state.supervisor), - true = ets:insert(Monitors, {NewWorker, CRef, MRef}), + NewWorker = new_worker(State1#state.supervisor), + MaybeStartedAt = if StatsEnabled -> ?timestamp(); + true -> undefined end, + true = ets:insert(Monitors, {NewWorker, CRef, MRef, MaybeStartedAt}), gen_server:reply(From, NewWorker), - State#state{waiting = LeftWaiting}; + State1#state{waiting = LeftWaiting}; {empty, Empty} when Overflow > 0 -> - State#state{overflow = Overflow - 1, waiting = Empty}; + State1#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> - W = filter_worker_by_pid(Pid, State#state.workers), + W = filter_worker_by_pid(Pid, State1#state.workers), Workers = queue:in(new_worker(Sup), W), - State#state{workers = Workers, waiting = Empty} + State1#state{workers = Workers, waiting = Empty} end. state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 5b27024..af0471a 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -72,7 +72,13 @@ pool_test_() -> {<<"Recover from timeout without exit handling">>, fun transaction_timeout_without_exit/0}, {<<"Recover from transaction timeout">>, - fun transaction_timeout/0} + fun transaction_timeout/0}, + {<<"Pool behaves when stats disabled">>, + fun pool_stats_disabled/0}, + {<<"Pool behaves when stats enabled">>, + fun pool_stats_enabled/0}, + {<<"Pool behaves when stats enabled with overflow">>, + fun pool_stats_enabled_overflow/0} ] }. @@ -124,6 +130,27 @@ transaction_timeout() -> ?assertEqual({ready,1,0,0}, pool_call(Pid, status)). +pool_stats_disabled() -> + {ok, Pid} = new_pool(1, 0), + ?assertMatch({error, disabled}, poolboy:stats(Pid)). + +pool_stats_enabled() -> + {ok, Pid} = new_pool(1, 0, lifo, true), + ?assertMatch({ok, 0.0}, poolboy:stats(Pid)), + Worker = poolboy:checkout(Pid), + timer:sleep(1000), %% emulating load?.. + checkin_worker(Pid, Worker), + ?assertMatch({ok, Load} when Load > 0.5, poolboy:stats(Pid)), + ?assertMatch({ok, 0.0}, poolboy:stats(Pid)). + +pool_stats_enabled_overflow() -> + {ok, Pid} = new_pool(2, 2, lifo, true), + Workers = [poolboy:checkout(Pid) || _ <- lists:seq(1,4)], + timer:sleep(1000), %% emulating load?.. + [poolboy:checkin(Pid, Worker) || Worker <- Workers], + timer:sleep(500), %% see checkin_worker/2 comment + ?assertMatch({ok, Load} when Load > 1.0, poolboy:stats(Pid)). + pool_startup() -> %% Check basic pool operation. {ok, Pid} = new_pool(10, 5), @@ -546,5 +573,11 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool(Size, MaxOverflow, Strategy, StatsEnabled) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {strategy, Strategy}, {stats, StatsEnabled}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request).