diff --git a/README.md b/README.md index 495d812..ae143b6 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +18,15 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished. +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,14 +158,54 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: the module that represents the workers - mandatory +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be - placed first or last in the line of available workers. So, `lifo` operates like a traditional stack; `fifo` like a queue. Default is `lifo`. + placed first or last in the line of available workers. Default is `lifo`. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. +- `checkin_callback`: Optionally supply a function that accepts the following + options {Pid, normal} | {Pid, owner_death}. This can be used to perform health + checks on a worker or to reset connections if needed. The expected return values + are keep | kill atoms where poolboy will keep or terminate the worker before + checkin. If not in overflow state kill will start a new worker. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out + +## Full Pool Status +Returns a propslist of counters relating to a specified pool. Useful +for graphing the state of your pools +- `size`: The defined size of the permanent worker pool +- `max_overflow`: The maximum number of overflow workers allowed +- `total_worker_count`: The total supervised workers. This includes + any workers waiting to be culled and not available to the + general pool +- `ready_worker_count`: The count of workers available workers to be + used including overflow workers. Workers in this count may or may + not be checked out. +- `checked_out_worker_count`: The count of workers that are currently + checked out +- `overflow_worker_count`: The count of active overflow workers +- `waiting_request_count`: The backlog of requests waiting to checkout + a worker +- `total_workers_started`: The total number of workers started since the pool + started, good for measuring worker churn + ## Authors diff --git a/rebar.config b/rebar.config index deada4c..c4ca425 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,7 @@ {profiles, [ {test, [ {plugins, [ - {rebar3_eqc, ".*", {git, "https://github.com/kellymclaughlin/rebar3-eqc-plugin.git", {tag, "0.0.8"}}} + {rebar3_eqc, ".*", {git, "https://github.com/kellymclaughlin/rebar3-eqc-plugin.git", {tag, "0.0.10"}}} ]} ] }]}. diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..a953d4f 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -1,11 +1,10 @@ %% Poolboy - A hunky Erlang worker pool factory - -module(poolboy). -behaviour(gen_server). -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, transaction/3, child_spec/2, child_spec/3, start/1, start/2, - start_link/1, start_link/2, stop/1, status/1]). + start_link/1, start_link/2, stop/1, status/1, full_status/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export_type([pool/0]). @@ -25,20 +24,59 @@ {global, GlobalName :: any()} | {via, Module :: atom(), ViaName :: any()}. +-ifdef(OTP_RELEASE). %% this implies 21 or higher +-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace). +-define(GET_STACK(Stacktrace), Stacktrace). +-else. +-define(EXCEPTION(Class, Reason, _), Class:Reason). +-define(GET_STACK(_), erlang:get_stacktrace()). +-endif. + % Copied from gen:start_ret/0 -type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. -record(state, { - supervisor :: pid(), - workers :: [pid()], + worker_supervisor :: pid(), + workers :: queue:queue(), + total_workers_started = 0 :: non_neg_integer(), waiting :: pid_queue(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_ttl = 0 :: non_neg_integer(), + reap_timer = none, + checkin_callback = fun({_Pid, _Reason}) -> keep end :: function() }). +init({PoolArgs, WorkerArgs}) -> + process_flag(trap_exit, true), + Waiting = queue:new(), + Monitors = ets:new(monitors, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + +init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> + {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), + init(Rest, WorkerArgs, State#state{worker_supervisor = Sup}); +init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> + init(Rest, WorkerArgs, State#state{size = Size}); +init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> + init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); +init([{strategy, lifo} | Rest], WorkerArgs, State) -> + init(Rest, WorkerArgs, State#state{strategy = lifo}); +init([{strategy, fifo} | Rest], WorkerArgs, State) -> + init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl}); +init([{checkin_callback, CheckinCallback} | Rest], WorkerArgs, State) when is_function(CheckinCallback, 1) -> + init(Rest, WorkerArgs, State#state{checkin_callback = CheckinCallback}); +init([_ | Rest], WorkerArgs, State) -> + init(Rest, WorkerArgs, State); +init([], _WorkerArgs, #state{size = Size, worker_supervisor = Sup, total_workers_started = Counter} = State) -> + Workers = prepopulate(Size, Sup, Counter), + {ok, State#state{workers = Workers, total_workers_started = Size}}. + -spec checkout(Pool :: pool()) -> pid(). checkout(Pool) -> checkout(Pool, true). @@ -54,9 +92,9 @@ checkout(Pool, Block, Timeout) -> try gen_server:call(Pool, {checkout, CRef, Block}, Timeout) catch - Class:Reason -> + ?EXCEPTION(Class, Reason, Stacktrace) -> gen_server:cast(Pool, {cancel_waiting, CRef}), - erlang:raise(Class, Reason, erlang:get_stacktrace()) + erlang:raise(Class, Reason, ?GET_STACK(Stacktrace)) end. -spec checkin(Pool :: pool(), Worker :: pid()) -> ok. @@ -122,46 +160,29 @@ stop(Pool) -> status(Pool) -> gen_server:call(Pool, status). -init({PoolArgs, WorkerArgs}) -> - process_flag(trap_exit, true), - Waiting = queue:new(), - Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). - -init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> - {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), - init(Rest, WorkerArgs, State#state{supervisor = Sup}); -init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size = Size}); -init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> - init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); -init([{strategy, lifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = lifo}); -init([{strategy, fifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = fifo}); -init([_ | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State); -init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> - Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. +-spec full_status(Pool :: pool()) -> proplists:proplist(). +full_status(Pool) -> + gen_server:call(Pool, full_status). -handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> +handle_cast({checkin, Pid}, State = #state{monitors = Monitors, checkin_callback = Callback}) -> case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), true = ets:delete(Monitors, Pid), - NewState = handle_checkin(Pid, State), + KillOrKeep = Callback({Pid, normal}), + NewState = handle_checkin(Pid, State, KillOrKeep), {noreply, NewState}; [] -> {noreply, State} end; -handle_cast({cancel_waiting, CRef}, State) -> - case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of +handle_cast({cancel_waiting, CRef}, State = #state{monitors = Monitors, checkin_callback = Callback}) -> + case ets:match(Monitors, {'$1', CRef, '$2'}) of [[Pid, MRef]] -> demonitor(MRef, [flush]), true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), + KillOrKeep = Callback({Pid, normal}), + NewState = handle_checkin(Pid, State, KillOrKeep), {noreply, NewState}; [] -> Cancel = fun({_, Ref, MRef}) when Ref =:= CRef -> @@ -178,39 +199,72 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> - #state{supervisor = Sup, + #state{worker_supervisor = Sup, workers = Workers, monitors = Monitors, overflow = Overflow, - max_overflow = MaxOverflow} = State, - case Workers of - [Pid | Left] -> + max_overflow = MaxOverflow, + strategy = Strategy, + total_workers_started = WorkersStarted} = State, + case get_worker_with_strategy(Workers, Strategy) of + {{value, {_Time, Pid}}, Left} when State#state.overflow_ttl > 0 -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + NewState = State#state{workers = Left}, + {ok, ReapTimer} = reset_worker_reap(NewState, Pid), + {reply, Pid, NewState#state{reap_timer = ReapTimer}}; + {{value, {_Time, Pid}}, Left} -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{workers = Left}}; - [] when MaxOverflow > 0, Overflow < MaxOverflow -> + {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, MRef} = new_worker(Sup, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), - {reply, Pid, State#state{overflow = Overflow + 1}}; - [] when Block =:= false -> + {reply, Pid, State#state{overflow = Overflow + 1, total_workers_started = WorkersStarted + 1}}; + {empty, _Left} when Block =:= false -> {reply, full, State}; - [] -> + {empty, _Left} -> MRef = erlang:monitor(process, FromPid), Waiting = queue:in({From, CRef, MRef}, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; - handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, queue:len(Workers), Overflow, CheckedOutWorkers}, State}; +handle_call(full_status, _From, State) -> + #state{workers = Workers, + size = Size, + monitors = Monitors, + overflow = Overflow, + max_overflow = MaxOverflow, + worker_supervisor = Sup, + waiting = Waiting, + total_workers_started = WorkersStarted} = State, + CheckedOutWorkers = ets:info(Monitors, size), + {reply, + [ + {size, Size}, % The permanent worker size + {max_overflow, MaxOverflow}, % The overflow size + % The maximum amount of worker is size + overflow_size + + {total_worker_count, length(supervisor:which_children(Sup))}, % The total of all workers + {ready_worker_count, queue:len(Workers)}, % Number of workers ready to use + {overflow_worker_count, Overflow}, % Number of overflow workers + {checked_out_worker_count, CheckedOutWorkers}, % Number of workers currently checked out + {waiting_request_count, queue:len(Waiting)}, % Number of waiting requests + {total_workers_started, WorkersStarted} % Number of workers started, helps establish how bad churn is + ], + State + }; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; handle_call(get_all_workers, _From, State) -> - Sup = State#state.supervisor, + Sup = State#state.worker_supervisor, WorkerList = supervisor:which_children(Sup), {reply, WorkerList, State}; handle_call(get_all_monitors, _From, State) -> @@ -223,41 +277,39 @@ handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, {reply, Reply, State}. -handle_info({'DOWN', MRef, _, _, _}, State) -> - case ets:match(State#state.monitors, {'$1', '_', MRef}) of +handle_info({'DOWN', MRef, _, _, _}, State = #state{monitors = Monitors, checkin_callback = Callback}) -> + case ets:match(Monitors, {'$1', '_', MRef}) of [[Pid]] -> true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), + KillOrKeep = Callback({Pid, owner_death}), + NewState = handle_checkin(Pid, State, KillOrKeep), {noreply, NewState}; [] -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; handle_info({'EXIT', Pid, _Reason}, State) -> - #state{supervisor = Sup, - monitors = Monitors} = State, + #state{monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), true = ets:delete(Monitors, Pid), - NewState = handle_worker_exit(Pid, State), + NewState = handle_checked_out_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of - true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), - {noreply, State#state{workers = [new_worker(Sup) | W]}}; - false -> - {noreply, State} - end + NewState = handle_checked_in_worker_exit(State, Pid), + {noreply, NewState} end; - +handle_info({reap_worker, Pid}, State)-> + NewState = reap_worker(Pid, State), + {noreply, NewState}; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> - ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), - true = exit(State#state.supervisor, shutdown), + Workers = queue:to_list(State#state.workers), + ok = lists:foreach(fun ({_Time, Pid}) -> unlink(Pid) end, Workers), + true = exit(State#state.worker_supervisor, shutdown), ok. code_change(_OldVsn, State, _Extra) -> @@ -271,79 +323,179 @@ start_pool(StartFun, PoolArgs, WorkerArgs) -> gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) end. -new_worker(Sup) -> - {ok, Pid} = supervisor:start_child(Sup, []), - true = link(Pid), - Pid. - -new_worker(Sup, FromPid) -> - Pid = new_worker(Sup), - Ref = erlang:monitor(process, FromPid), - {Pid, Ref}. - -dismiss_worker(Sup, Pid) -> - true = unlink(Pid), - supervisor:terminate_child(Sup, Pid). - -prepopulate(N, _Sup) when N < 1 -> - []; -prepopulate(N, Sup) -> - prepopulate(N, Sup, []). - -prepopulate(0, _Sup, Workers) -> - Workers; -prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). - -handle_checkin(Pid, State) -> - #state{supervisor = Sup, - waiting = Waiting, - monitors = Monitors, - overflow = Overflow, - strategy = Strategy} = State, +handle_checkin(Pid, State, keep) -> + #state{worker_supervisor = Sup, + waiting = Waiting, + monitors = Monitors, + overflow = Overflow, + overflow_ttl = OverflowTtl, + reap_timer = ReapTimer} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + CurrentTime = erlang:monotonic_time(milli_seconds), + {ok, Timer} = set_reap_timer(ReapTimer, CurrentTime, OverflowTtl, Pid), + Workers = queue:in({CurrentTime, Pid}, State#state.workers), + State#state{workers = Workers, waiting = Empty, reap_timer = Timer}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = case Strategy of - lifo -> [Pid | State#state.workers]; - fifo -> State#state.workers ++ [Pid] - end, + Workers = queue:in({erlang:monotonic_time(milli_seconds), Pid}, State#state.workers), State#state{workers = Workers, waiting = Empty, overflow = 0} - end. - -handle_worker_exit(Pid, State) -> - #state{supervisor = Sup, - monitors = Monitors, - overflow = Overflow} = State, + end; +handle_checkin(Pid, #state{waiting = {[],[]}, overflow = Overflow} = State, kill) when Overflow > 0 -> + W = remove_worker(Pid, State), + State#state{workers=W, overflow = Overflow - 1}; +handle_checkin(Pid, State, kill) -> + W = remove_worker(Pid, State), + NewWorker = new_worker(State#state.worker_supervisor), + handle_checkin(NewWorker, State#state{workers = W}, keep). + +handle_checked_out_worker_exit(Pid, State) -> + #state{worker_supervisor = Sup, + monitors = Monitors, + overflow = Overflow, + total_workers_started = WorkersStarted} = State, case queue:out(State#state.waiting) of {{value, {From, CRef, MRef}}, LeftWaiting} -> - NewWorker = new_worker(State#state.supervisor), + NewWorker = new_worker(State#state.worker_supervisor), true = ets:insert(Monitors, {NewWorker, CRef, MRef}), gen_server:reply(From, NewWorker), - State#state{waiting = LeftWaiting}; + State#state{waiting = LeftWaiting, total_workers_started = WorkersStarted + 1}; {empty, Empty} when Overflow > 0 -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> - Workers = - [new_worker(Sup) - | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], - State#state{workers = Workers, waiting = Empty} + W = delete_worker_by_pid(Pid, State#state.workers), + Workers = queue:in({erlang:monotonic_time(milli_seconds), new_worker(Sup)}, W), + State#state{workers = Workers, waiting = Empty, total_workers_started = WorkersStarted + 1} end. -state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> +handle_checked_in_worker_exit(State = #state{overflow = Overflow}, Pid) when Overflow > 0 -> + #state{workers = Workers} = State, + NewWorkers = delete_worker_by_pid(Pid, Workers), + {ok, ReapTimer} = reset_worker_reap(State#state{workers = NewWorkers}, Pid), + State#state{workers = NewWorkers, reap_timer = ReapTimer, overflow = Overflow - 1}; +handle_checked_in_worker_exit(State, Pid) -> + #state{workers = Workers, total_workers_started = WorkersStarted, worker_supervisor = Sup} = State, + % Have to remove the existing worker before resetting timer + FilteredWorkers = delete_worker_by_pid(Pid, Workers), + NewWorkers = queue:in({erlang:monotonic_time(milli_seconds), new_worker(Sup)}, FilteredWorkers), + {ok, ReapTimer} = reset_worker_reap(State#state{workers = NewWorkers}, Pid), + State#state{workers = NewWorkers, reap_timer = ReapTimer, total_workers_started = WorkersStarted + 1}. + +prepopulate(N, _Sup, _Counter) when N < 1 -> + queue:new(); +prepopulate(N, Sup, Counter) -> + prepopulate(N, Sup, queue:new(), Counter). + +prepopulate(0, _Sup, Workers, _Counter) -> + Workers; +prepopulate(N, Sup, Workers, Counter) -> + prepopulate(N-1, Sup, queue:in({erlang:monotonic_time(milli_seconds), new_worker(Sup)}, Workers), Counter + 1). + +new_worker(Sup) -> + {ok, Pid} = supervisor:start_child(Sup, []), + true = link(Pid), + Pid. + +new_worker(Sup, FromPid) -> + Pid = new_worker(Sup), + Ref = erlang:monitor(process, FromPid), + {Pid, Ref}. + +dismiss_worker(Sup, Pid) -> + true = unlink(Pid), + supervisor:terminate_child(Sup, Pid). + +remove_worker(Pid, State) -> + #state{workers = Workers, + worker_supervisor = Sup} = State, + W = delete_worker_by_pid(Pid, Workers), + ok = dismiss_worker(Sup, Pid), + W. + +reap_worker(Pid, #state{overflow = Overflow} = State) when Overflow == 1 -> + W = remove_worker(Pid, State), + State#state{workers = W, overflow = Overflow -1, reap_timer = none}; +reap_worker(Pid, State = #state{overflow = Overflow}) when Overflow > 1 -> + #state{reap_timer = ReapTimer, + overflow_ttl = OverflowTtl} = State, + W = remove_worker(Pid, State), + {ok, Timer} = reset_reap_timer(W, ReapTimer, OverflowTtl), + State#state{workers = W, overflow = Overflow -1, reap_timer = Timer}; +reap_worker(_Pid, State) -> + State. + + +reset_worker_reap(#state{reap_timer = none} = _State, Pid) -> + ok = clear_worker_reap_message(Pid), + {ok, none}; +reset_worker_reap(#state{overflow = Overflow, reap_timer = ReapTimer} = _State, Pid) when Overflow == 0 -> + erlang:cancel_timer(ReapTimer), + ok = clear_worker_reap_message(Pid), + {ok, ReapTimer}; +reset_worker_reap(#state{reap_timer = ReapTimer, overflow = Overflow} = State, Pid) when Overflow > 0 -> + #state{workers = Workers, + overflow_ttl = OverflowTtl} = State, + erlang:cancel_timer(ReapTimer), + {ok, NewTimer} = reset_reap_timer(Workers, ReapTimer, OverflowTtl), + ok = clear_worker_reap_message(Pid), + {ok, NewTimer}. + +set_reap_timer(ReapTimer, _CurrentTime, _OverflowTtl, _Pid) when is_reference(ReapTimer) -> + {ok, ReapTimer}; +set_reap_timer(_ReapTimer, CurrentTime, OverflowTtl, Pid) -> + NewTimer = erlang:send_after(CurrentTime + OverflowTtl, self(), {reap_worker, Pid}, [{abs, true}]), + {ok, NewTimer}. + +reset_reap_timer(Workers, ReapTimer, OverflowTtl)-> + erlang:cancel_timer(ReapTimer), + case queue:peek(Workers) of + {value, {Time, OldestWorker}} -> + ReapTime = Time + OverflowTtl, + NewTimer = erlang:send_after(ReapTime, self(), {reap_worker, OldestWorker}, [{abs, true}]), + {ok, NewTimer}; + empty -> + {ok, none} + end. + +clear_worker_reap_message(Pid) -> + receive + {reap_worker, Pid} -> + ok + after 0 -> + ok + end. + +delete_worker_by_pid(Pid, Workers) -> + queue:filter(fun ({_Time, WPid}) -> WPid =/= Pid end, Workers). + +get_worker_with_strategy(Workers, fifo) -> + queue:out(Workers); +get_worker_with_strategy(Workers, lifo) -> + queue:out_r(Workers). + +state_name(State = #state{overflow = Overflow, max_overflow = MaxOverflow}) when Overflow < 1-> #state{max_overflow = MaxOverflow, workers = Workers} = State, - case length(Workers) == 0 of + case queue:len(Workers) == 0 of true when MaxOverflow < 1 -> full; true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; -state_name(_State) -> - overflow. +state_name(State = #state{overflow = Overflow, max_overflow = MaxOverflow}) when Overflow == MaxOverflow-> + #state{workers = Workers} = State, + case queue:len(Workers) of + NumberOfWorkers when NumberOfWorkers > 0 -> ready; + _NumberOfWorkers -> full + end; +state_name(State = #state{overflow = Overflow, max_overflow = MaxOverflow}) when Overflow < MaxOverflow-> + #state{workers = Workers} = State, + case queue:len(Workers) of + NumberOfWorkers when NumberOfWorkers > 0 -> ready; + _NumberOfWorkers -> overflow + end. + diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..a2b054e 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -18,6 +18,18 @@ pool_test_() -> {<<"Basic pool operations">>, fun pool_startup/0 }, + {<<"Basic lifo load">>, + fun load_lifo/0 + }, + {<<"Basic fifo load">>, + fun load_fifo/0 + }, + {<<"Basic overflow_ttl load lifo">>, + fun load_overflow_ttl_lifo/0 + }, + {<<"Basic overflow_ttl load fifo">>, + fun load_overflow_ttl_fifo/0 + }, {<<"Pool overflow should work">>, fun pool_overflow/0 }, @@ -42,15 +54,51 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl checks in worker normally">>, + fun pool_overflow_ttl_checkin_basic/0 + }, + {<<"Pool with overflow_ttl reaps worker after set time">>, + fun pool_overflow_ttl_checkin_reap_timer/0 + }, + {<<"Pool with overflow_ttl cancels worker reap on checkout">>, + fun pool_overflow_ttl_checkout_cancels_reap/0 + }, + {<<"Pool with overflow_ttl behaves like a normal pool when full">>, + fun pool_overflow_ttl_full/0 + }, + {<<"Pool with overflow_ttl behaves reaps worker at correct time after worker death when checked out">>, + fun pool_overflow_ttl_worker_death_checked_out/0 + }, + {<<"Pool with overflow_ttl behaves reaps worker at correct time after worker death when checked in">>, + fun pool_overflow_ttl_worker_death_checked_in/0 + }, + {<<"Pool with overflow_ttl behaves reaps worker at correct time after owner death">>, + fun pool_overflow_ttl_owner_death/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, + {<<"When checkin_callback is set to kill on owner death it's reaped early in overflow_ttl situation and others are reaped normally">>, + fun checkin_callback_kill_on_owner_death_overflow_ttl/0 + }, + {<<"When checkin_callback is set to kill on owner death it's killed and not replaced when in overflow">>, + fun checkin_callback_kill_on_owner_death_overflow/0 + }, + {<<"When checkin_callback is set to kill on owner death it's killed and it's replaced with a new one">>, + fun checkin_callback_kill_on_owner_death/0 + }, + {<<"When checkin_callback is set to kill on normal checkin it's killed and it's replaced with a new one">>, + fun checkin_callback_kill_on_normal_checkin/0 + }, {<<"Worker checked-in after an exception in a transaction">>, fun checkin_after_exception_in_transaction/0 }, {<<"Pool returns status">>, fun pool_returns_status/0 }, + {<<"Pool returns full status">>, + fun pool_returns_full_status/0 + }, {<<"Pool demonitors previously waiting processes">>, fun demonitors_previously_waiting_processes/0 }, @@ -127,13 +175,13 @@ transaction_timeout() -> pool_startup() -> %% Check basic pool operation. {ok, Pid} = new_pool(10, 5), - ?assertEqual(10, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(10, queue:len(pool_call(Pid, get_avail_workers))), poolboy:checkout(Pid), - ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))), Worker = poolboy:checkout(Pid), - ?assertEqual(8, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(8, queue:len(pool_call(Pid, get_avail_workers))), checkin_worker(Pid, Worker), - ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(1, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -141,23 +189,23 @@ pool_overflow() -> %% Check that the pool overflows properly. {ok, Pid} = new_pool(5, 5), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, checkin_worker(Pid, A), checkin_worker(Pid, B), - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -167,7 +215,7 @@ pool_empty() -> %% overflow is enabled. {ok, Pid} = new_pool(5, 2), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, Self = self(), @@ -192,18 +240,18 @@ pool_empty() -> after 500 -> ?assert(false) end, - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -213,7 +261,7 @@ pool_empty_no_overflow() -> %% disabled. {ok, Pid} = new_pool(5, 0), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E] = Workers, Self = self(), @@ -238,14 +286,14 @@ pool_empty_no_overflow() -> after 500 -> ?assert(false) end, - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -256,16 +304,16 @@ worker_death() -> {ok, Pid} = new_pool(5, 2), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), kill_worker(A), - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_workers))), kill_worker(B), kill_worker(C), - ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(4, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -277,9 +325,9 @@ worker_death_while_full() -> {ok, Pid} = new_pool(5, 2), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), [A, B|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> @@ -306,7 +354,7 @@ worker_death_while_full() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -318,9 +366,9 @@ worker_death_while_full_no_overflow() -> {ok, Pid} = new_pool(5, 0), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> @@ -346,10 +394,10 @@ worker_death_while_full_no_overflow() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), kill_worker(C), - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(3, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -359,7 +407,7 @@ pool_full_nonblocking_no_overflow() -> %% option to use non-blocking checkouts is used. {ok, Pid} = new_pool(5, 0), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), ?assertEqual(full, poolboy:checkout(Pid, false)), @@ -374,7 +422,7 @@ pool_full_nonblocking() -> %% option to use non-blocking checkouts is used. {ok, Pid} = new_pool(5, 5), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(10, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), A = hd(Workers), @@ -386,6 +434,167 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +load_lifo() -> + {ok, Pid} = new_pool(10000,10000, lifo), + Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + lists:foreach(fun(Worker) -> poolboy:checkin(Pid, Worker) end, Workers), + _Workers2 = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + ok = pool_call(Pid, stop). + +load_fifo() -> + {ok, Pid} = new_pool(10000,10000, fifo), + Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + lists:foreach(fun(Worker) -> poolboy:checkin(Pid, Worker) end, Workers), + _Workers2 = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + ok = pool_call(Pid, stop). + +load_overflow_ttl_lifo() -> + {ok, Pid} = new_pool_with_overflow_ttl_strategy(10000,10000, 2000, lifo), + Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + lists:foreach(fun(Worker) -> poolboy:checkin(Pid, Worker) end, Workers), + _Workers2 = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + ok = pool_call(Pid, stop). + +load_overflow_ttl_fifo() -> + {ok, Pid} = new_pool_with_overflow_ttl_strategy(10000,10000, 2000, fifo), + Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + lists:foreach(fun(Worker) -> poolboy:checkin(Pid, Worker) end, Workers), + _Workers2 = [poolboy:checkout(Pid) || _ <- lists:seq(0, 19999)], + ok = pool_call(Pid, stop). + +pool_overflow_ttl_full() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + _Worker = poolboy:checkout(Pid), + _Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + ok = pool_call(Pid, stop). + +pool_overflow_ttl_checkin_basic() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + Worker = poolboy:checkout(Pid), + _Worker1 = poolboy:checkout(Pid), + % Test first worker is returned to list of available workers + poolboy:checkin(Pid, Worker), + timer:sleep(100), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(Worker, Worker2), + ok = pool_call(Pid, stop). + +pool_overflow_ttl_checkin_reap_timer() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 200), + Worker = poolboy:checkout(Pid), + _Worker1 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + timer:sleep(200), + % Ensure worker is reaped after checkin + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + +pool_overflow_ttl_checkout_cancels_reap() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 200), + Worker = poolboy:checkout(Pid), + _Worker1 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + % When checking out a worker the timer should be cancelled + _Worker2 = poolboy:checkout(Pid), + timer:sleep(150), + % When the worker is checked in the timer should start again + poolboy:checkin(Pid, Worker), + timer:sleep(100), + % Check the worker has not been reaped yet + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + timer:sleep(100), + % Check the worker is now reaped + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + +pool_overflow_ttl_worker_death_checked_out() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + kill_worker(Worker), + % Make sure a new worker is not started when in overflow + ?assertEqual({overflow, 0, 1, 2}, poolboy:status(Pid)), + % Make sure monitors are properly removed on worker death + ?assertEqual(2, length(pool_call(Pid, get_all_monitors))), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + kill_worker(Worker1), + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + % Make sure monitors are properly removed on worker death + ?assertEqual(1, length(pool_call(Pid, get_all_monitors))), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + kill_worker(Worker2), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + % Make sure monitors are properly removed on worker death + ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ok = pool_call(Pid, stop). + +pool_overflow_ttl_worker_death_checked_in() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + Worker2 = poolboy:checkout(Pid), +%% ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + ?assertEqual(3, length(pool_call(Pid, get_all_monitors))), + ?assertEqual(3, length(pool_call(Pid, get_all_workers))), + poolboy:checkin(Pid, Worker), + ?assertEqual(3, length(pool_call(Pid, get_all_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(3, length(pool_call(Pid, get_all_workers))), + poolboy:checkin(Pid, Worker1), + ?assertEqual(3, length(pool_call(Pid, get_all_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), + poolboy:checkin(Pid, Worker2), + ?assertEqual(3, queue:len(pool_call(Pid, get_avail_workers))), + kill_worker(Worker), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + ?assertEqual({ready, 2, 1, 0}, poolboy:status(Pid)), + kill_worker(Worker1), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + kill_worker(Worker2), + timer:sleep(50), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + +pool_overflow_ttl_owner_death() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300), + _Worker = poolboy:checkout(Pid), + _Worker1 = poolboy:checkout(Pid), + + % Test an owner death + spawn(fun() -> + poolboy:checkout(Pid), + receive after 100 -> exit(normal) end + end), + % Give the spawn time to actually check out the worker + timer:sleep(5), + ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + timer:sleep(140), + % Owner should have died by now so monitor should be removed and worker should be available + ?assertEqual({ready, 1, 2, 2}, poolboy:status(Pid)), + ?assertEqual(2, length(pool_call(Pid, get_all_monitors))), + % Worker should be reaped after it's owner dies at correct ttl + timer:sleep(255), + ?assertEqual({overflow, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + ok = pool_call(Pid, stop). + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -395,17 +604,125 @@ owner_death() -> receive after 500 -> exit(normal) end end), timer:sleep(1000), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). +checkin_callback_kill_on_owner_death_overflow_ttl() -> + Callback = fun({_Pid, owner_death}) -> + kill; + ({_Pid, normal}) -> + keep + end, + {ok, Pid} = new_pool_with_callback(1, 2, 600, Callback), + + _Worker = poolboy:checkout(Pid), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + spawn(fun() -> + poolboy:checkout(Pid), + receive after 300 -> exit(normal) end + end), + % Give it time to checkout + timer:sleep(75), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + %% Check that the presence of the callback decides to purge the worker early + timer:sleep(300), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + + % Ensure when there are two overflow workers the second worker is reaped + % normally + Worker2 = poolboy:checkout(Pid), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + spawn(fun() -> + poolboy:checkout(Pid), + receive after 300 -> exit(normal) end + end), + % Give it time to checkout + timer:sleep(75), + ?assertEqual(3, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + % Worker 2 should be reaped in the normal period of 600 milliseconds + poolboy:checkin(Pid, Worker2), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + timer:sleep(300), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + timer:sleep(100), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + timer:sleep(200), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ok = pool_call(Pid, stop). + +checkin_callback_kill_on_owner_death_overflow() -> + Callback = fun({_Pid, owner_death}) -> + kill; + ({_Pid, normal}) -> + keep + end, + {ok, Pid} = new_pool_with_callback(1, 1, 0, Callback), + _Worker = poolboy:checkout(Pid), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + spawn(fun() -> + poolboy:checkout(Pid), + receive after 200 -> exit(normal) end + end), + % Give it time to checkout + timer:sleep(75), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + timer:sleep(200), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ok = pool_call(Pid, stop). + + +checkin_callback_kill_on_owner_death() -> + Callback = fun({_Pid, owner_death}) -> + kill; + ({_Pid, normal}) -> + keep + end, + {ok, Pid} = new_pool_with_callback(1, 0, 0, Callback), + Worker = poolboy:checkout(Pid), + poolboy:checkin(Pid, Worker), + spawn(fun() -> + poolboy:checkout(Pid), + receive after 200 -> exit(normal) end + end), + % Give it time to checkout + timer:sleep(75), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + timer:sleep(200), + ?assertEqual(1, length(pool_call(Pid, get_all_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + Worker2 = poolboy:checkout(Pid), + ?assertNotEqual(Worker, Worker2), + ok = pool_call(Pid, stop). + +checkin_callback_kill_on_normal_checkin() -> + Callback = fun({_Pid, owner_death}) -> + keep; + ({_Pid, normal}) -> + kill + end, + {ok, Pid} = new_pool_with_callback(1, 0, 0, Callback), + Worker = poolboy:checkout(Pid), + poolboy:checkin(Pid, Worker), + Worker2 = poolboy:checkout(Pid), + ?assertNotEqual(Worker, Worker2), + ok = pool_call(Pid, stop). + + checkin_after_exception_in_transaction() -> {ok, Pool} = new_pool(2, 0), - ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))), Tx = fun(Worker) -> ?assert(is_pid(Worker)), - ?assertEqual(1, length(pool_call(Pool, get_avail_workers))), + ?assertEqual(1, queue:len(pool_call(Pool, get_avail_workers))), throw(it_on_the_ground), ?assert(false) end, @@ -414,7 +731,7 @@ checkin_after_exception_in_transaction() -> catch throw:it_on_the_ground -> ok end, - ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))), ok = pool_call(Pool, stop). pool_returns_status() -> @@ -446,6 +763,63 @@ pool_returns_status() -> ?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)), ok = pool_call(Pool4, stop). +pool_returns_full_status() -> + {ok, Pool} = new_pool(2, 0), + ?assertEqual(full_status(2,0,2,2,0,0,0,2), poolboy:full_status(Pool)), + poolboy:checkout(Pool), + ?assertEqual(full_status(2,0,2,1,0,1,0,2), poolboy:full_status(Pool)), + poolboy:checkout(Pool), + ?assertEqual(full_status(2,0,2,0,0,2,0,2), poolboy:full_status(Pool)), + ok = pool_call(Pool, stop), + + {ok, Pool2} = new_pool(1, 1), + ?assertEqual(full_status(1,1,1,1,0,0,0,1), poolboy:full_status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual(full_status(1,1,1,0,0,1,0,1), poolboy:full_status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual(full_status(1,1,2,0,1,2,0,2), poolboy:full_status(Pool2)), + ok = pool_call(Pool2, stop), + + {ok, Pool3} = new_pool(0, 2), + ?assertEqual(full_status(0,2,0,0,0,0,0,0), poolboy:full_status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual(full_status(0,2,1,0,1,1,0,1), poolboy:full_status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual(full_status(0,2,2,0,2,2,0,2), poolboy:full_status(Pool3)), + ok = pool_call(Pool3, stop), + + {ok, Pool4} = new_pool(0, 0), + ?assertEqual(full_status(0,0,0,0,0,0,0,0), poolboy:full_status(Pool4)), + ok = pool_call(Pool4, stop), + + % Check that the wait queue is showing correct amount + {ok, Pool5} = new_pool(1, 0), + Checkout1 = poolboy:checkout(Pool5), + Self = self(), + spawn(fun() -> + Worker = poolboy:checkout(Pool5), + Self ! got_worker, + checkin_worker(Pool5, Worker) + end), + + %% Spawned process should block waiting for worker to be available. + receive + got_worker -> ?assert(false) + after + 500 -> ?assert(true) + end, + ?assertEqual(full_status(1,0,1,0,0,1,1,1), poolboy:full_status(Pool5)), + checkin_worker(Pool5, Checkout1), + + %% Spawned process should have been able to obtain a worker. + receive + got_worker -> ?assert(true) + after + 500 -> ?assert(false) + end, + ?assertEqual(full_status(1,0,1,1,0,0,0,1), poolboy:full_status(Pool5)), + ok = pool_call(Pool5, stop). + demonitors_previously_waiting_processes() -> {ok, Pool} = new_pool(1,0), Self = self(), @@ -516,7 +890,7 @@ reuses_waiting_monitor_on_worker_exit() -> receive ok -> ok end end), - Worker = receive {worker, Worker} -> Worker end, + Worker = receive {worker, W} -> W end, Ref = monitor(process, Worker), exit(Worker, kill), receive @@ -546,5 +920,36 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}]). + +new_pool_with_overflow_ttl_strategy(Size, MaxOverflow, OverflowTtl, Strategy) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl},{strategy, Strategy}]). + +new_pool_with_callback(Size, MaxOverflow, OverflowTtl, CallbackFunction) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}, {checkin_callback, CallbackFunction}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). + +full_status(Size, MaxOverFlow, TotalWorker, ReadyWorker, OverflowWorker, + CheckedOutWorker, Waiting, WorkersStarted) -> + % Helper function to populate the results tuple + [{size, Size}, + {max_overflow, MaxOverFlow}, + {total_worker_count,TotalWorker}, + {ready_worker_count, ReadyWorker}, + {overflow_worker_count, OverflowWorker}, + {checked_out_worker_count, CheckedOutWorker}, + {waiting_request_count, Waiting}, + {total_workers_started, WorkersStarted} + ].