diff --git a/lib/docker/client.ex b/lib/docker/client.ex index 81d1043..7ccd276 100644 --- a/lib/docker/client.ex +++ b/lib/docker/client.ex @@ -7,7 +7,7 @@ defmodule Docker.Client do def send_request(url, method, body \\ "", headers \\ [], opts \\ []) do json_body = Poison.encode!(body) json_headers = headers ++ [{"Content-Type", "application/json"}] - merged_opts = opts ++ default_options + merged_opts = opts ++ default_options() request!(method, url, json_body, json_headers, merged_opts) end diff --git a/lib/docker/container.ex b/lib/docker/container.ex index a67258a..b6a3390 100644 --- a/lib/docker/container.ex +++ b/lib/docker/container.ex @@ -50,7 +50,7 @@ defmodule Docker.Container do {:ok, %{id: id}} -> case start(host, id) do {:ok, _} -> {:ok, %{id: id}} - {:error, reason} -> {:error, :reason} + {:error, reason} -> {:error, reason} end {:error, reason} -> {:error, reason} end @@ -95,7 +95,7 @@ defmodule Docker.Container do def exec_start(host, exec_id, opts) do "#{host}/exec/#{exec_id}/start" - |> Client.send_request(:post, opts, [], [stream_to: self]) + |> Client.send_request(:post, opts, [], [stream_to: self()]) |> Response.parse(:exec_start) end @@ -111,7 +111,7 @@ defmodule Docker.Container do case exec_create(host, container_id, create_opts) do {:ok, %{id: exec_id}} -> case exec_start(host, exec_id, %{}) do - {:ok, _} -> stream_response + {:ok, _} -> stream_response() {:error, reason} -> {:error, reason} end {:error, reason} -> {:error, reason} @@ -130,11 +130,11 @@ defmodule Docker.Container do def logs_stream(host, container_id, opts \\ %{}) do "#{host}/containers/#{container_id}/logs" |> Client.add_query_params(opts) - |> Client.send_request(:get, [], [stream_to: self]) + |> Client.send_request(:get, [], [stream_to: self()]) |> parse_logs_stream_response end - defp parse_logs_stream_response(%HTTPoison.AsyncResponse{id: _ref}), do: stream_response + defp parse_logs_stream_response(%HTTPoison.AsyncResponse{id: _ref}), do: stream_response() @default_attach_params %{ stream: 1, diff --git a/lib/docker/container_manager.ex b/lib/docker/container_manager.ex new file mode 100644 index 0000000..771a2c3 --- /dev/null +++ b/lib/docker/container_manager.ex @@ -0,0 +1,23 @@ +defmodule Docker.ContainerManager do + use Supervisor + + def start_link do + Supervisor.start_link(__MODULE__, :ok, name: :container_manager) + end + + def start_container(image_name) do + Supervisor.start_child(:container_manager, [%{"Image" => image_name}]) + end + + def start_container do + Supervisor.start_child(:container_manager, []) + end + + def init(:ok) do + children = [ + worker(Docker.ContainerWorker, [], restart: :temporary) + ] + + supervise(children, strategy: :simple_one_for_one) + end +end diff --git a/lib/docker/container_registry.ex b/lib/docker/container_registry.ex new file mode 100644 index 0000000..5206763 --- /dev/null +++ b/lib/docker/container_registry.ex @@ -0,0 +1,61 @@ +defmodule Docker.ContainerRegistry do + use GenServer + alias Docker.Container + + def start_link(name) do + GenServer.start_link(__MODULE__, :ok, name: name) + end + + def register_worker(worker_pid, container_id) do + GenServer.cast( + __MODULE__, + {:register_worker, worker_pid, container_id} + ) + end + + def list_workers do + GenServer.call(__MODULE__, {:list_workers}) + end + + def init(:ok) do + case Docker.Container.list(host()) do + {:error, _reason} -> + {:ok, %{}} + ids -> + Enum.each(ids, &Docker.Container.kill(host(), &1)) + {:ok, %{}} + end + end + + def handle_cast({:register_worker, worker_pid, container_id}, state) do + ref = Process.monitor(worker_pid) + new_state = state |> Map.put(worker_pid, { container_id, ref }) + {:noreply, new_state} + end + + def handle_call({:list_workers}, _from, state) do + {:reply, {:ok, state}, state} + end + + def handle_info({:DOWN, ref, _type, pid, _reason}, state) do + case Map.get(state, pid) do + {container_id, ^ref} -> + try do + Container.kill(host(), container_id) + rescue + reason -> + IO.puts "Error Killing Container" + IO.inspect reason + end + Process.demonitor(ref) + new_state = state |> Map.delete(pid) + {:noreply, new_state} + _ -> + {:noreply, state} + end + end + + defp host do + "127.0.0.1:2375" + end +end diff --git a/lib/docker/container_worker.ex b/lib/docker/container_worker.ex new file mode 100644 index 0000000..8aa2219 --- /dev/null +++ b/lib/docker/container_worker.ex @@ -0,0 +1,152 @@ +defmodule Docker.ContainerWorker do + alias Docker.ContainerRegistry + alias Docker.Container + use GenServer + + @default_timeout 300_000 + @default_container_opts %{ + "NetworkDisabled" => true, + "HostConfig" => %{ + "Memory" => 50_000_000, + "MemorySwap" => 50_000_000, + "KernelMemory" => 50_000_000, + "CpuQuota" => 5_000 + } + } + + def start_link(opts \\ %{}) do + GenServer.start_link(__MODULE__, opts) + end + + def exec(pid, commands) do + GenServer.call(pid, {:exec, commands}, 15_000) + end + + def stdin(pid, stdin) do + GenServer.cast(pid, {:stdin, stdin}) + end + + def send_files(pid, blobs) when is_list(blobs) do + Enum.each(blobs, fn blob -> + send_file(pid, blob.name, blob.contents) + end) + end + + def send_file(pid, filename, file_contents) do + commands = save_file_commands(filename, file_contents) + GenServer.cast(pid, {:exec_detached, commands}) + end + + def stop(pid) do + GenServer.stop(pid, :shutdown) + end + + ### Server Callbacks + def init(opts \\ %{}) do + {:ok, timer_ref} = set_keep_alive_timer() + case create_container(opts) do + {:ok, container_id} -> + {:ok, %{timer: timer_ref, container_id: container_id}} + {:error, _reason} -> + {:stop, :error_creating_container} + end + end + + def handle_call({:exec, commands}, _from, state) do + try do + container_id = Map.get(state, :container_id) + json = Container.exec_stream(host(), container_id, + %{ + "Cmd" => commands, + "AttachStdout" => true, + "AttachStderr" => true + } + ) + {:reply, {:ok, json}, reset_timer(state)} + rescue + reason -> + {:reply, {:error, reason}, state} + end + end + + def handle_cast({:exec_detached, commands}, state) do + container_id = Map.get(state, :container_id) + Container.exec_detached(host(), container_id, + %{ + "Cmd" => commands + } + ) + {:noreply, reset_timer(state)} + end + + def handle_cast({:stdin, stdin}, state) do + case state do + %{persistent_conn: connRef} -> + Container.stream_stdin(stdin, connRef) + {:noreply, reset_timer(state)} + %{container_id: container_id} -> + {:ok, %{connRef: connRef}} = Container.attach(host(), container_id, self()) + Container.stream_stdin(stdin, connRef) + new_state = Map.put(state, :persistent_conn, connRef) + {:noreply, reset_timer(new_state)} + end + end + + def terminate(:shutdown, state), do: kill_container(state) + def terminate({:shutdown, _exit_reason}, state), do: kill_container(state) + + + def handle_info({:hackney_response, _conn, message}, state) when is_binary(message) do + Docker.RequestMap.whereis_request(self()) + |> send({:stdout, message}) + {:noreply, state} + end + + def handle_info(_msg, state) do + {:noreply, state} + end + + defp reset_timer(state = %{timer: timer_ref}) do + :timer.cancel(timer_ref) + {:ok, new_timer_ref} = set_keep_alive_timer() + %{state | timer: new_timer_ref} + end + + defp set_keep_alive_timer do + :timer.apply_after(@default_timeout, GenServer, :stop, [self(), {:shutdown, :timeout}]) + end + + defp host do + "127.0.0.1:2375" + end + + defp create_container(opts \\ %{}) do + try do + merged_opts = Map.merge(@default_container_opts, opts) + {:ok, %{id: container_id}} = Container.run host(), merged_opts + ContainerRegistry.register_worker(self(), container_id) + {:ok, container_id} + rescue + reason -> + IO.puts "Error creating container" + {:error, reason} + end + end + + defp save_file_commands(filename, file_contents) do + ["bash", "-c", ~s(cat << 'EOF' > #{filename}\n#{file_contents}\nEOF)] + end + + defp kill_container(%{container_id: container_id}) do + case Container.kill(host(), container_id) do + {:ok, _ } -> + IO.puts "Container #{container_id} shutdown" + %{"message" => message} -> + IO.puts message + {:error, reason} -> + IO.puts "Error killing container" + IO.inspect reason + end + end +end + diff --git a/lib/docker/request_map.ex b/lib/docker/request_map.ex new file mode 100644 index 0000000..01ae44d --- /dev/null +++ b/lib/docker/request_map.ex @@ -0,0 +1,42 @@ +defmodule Docker.RequestMap do + def start_link do + Agent.start_link(fn -> %{} end, name: __MODULE__) + end + + def register(request_pid, worker_pid) do + Agent.update(__MODULE__, fn(current) -> + Map.put( current, worker_pid, %{ request: request_pid }) + end) + end + + def unregister(worker_pid) do + Agent.update(__MODULE__, fn(current) -> + Map.drop(current, [worker_pid]) + end) + end + + def whereis_request(worker_pid) do + Agent.get(__MODULE__, fn(current) -> + current + |> Map.get(worker_pid) + |> Map.get(:request) + end) + end + + def find_by_request_pid(request_pid) do + Agent.get(__MODULE__, fn(current) -> + current + |> Enum.find(fn {_key, val} -> val == %{request: request_pid} end) + |> case do + {worker_pid, _val} -> worker_pid + nil -> nil + end + end) + end + + def list_all do + Agent.get(__MODULE__, fn(current) -> + current + end) + end +end diff --git a/mix.exs b/mix.exs index 81dafee..a4fe31d 100644 --- a/mix.exs +++ b/mix.exs @@ -8,7 +8,7 @@ defmodule Docker.Mixfile do build_embedded: Mix.env == :prod, start_permanent: Mix.env == :prod, description: "Api wrapper for Docker API in Elixir", - package: package, + package: package(), deps: deps()] end