Skip to content
2 changes: 1 addition & 1 deletion lib/docker/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Docker.Client do
def send_request(url, method, body \\ "", headers \\ [], opts \\ []) do
json_body = Poison.encode!(body)
json_headers = headers ++ [{"Content-Type", "application/json"}]
merged_opts = opts ++ default_options
merged_opts = opts ++ default_options()
request!(method, url, json_body, json_headers, merged_opts)
end

Expand Down
10 changes: 5 additions & 5 deletions lib/docker/container.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Docker.Container do
{:ok, %{id: id}} ->
case start(host, id) do
{:ok, _} -> {:ok, %{id: id}}
{:error, reason} -> {:error, :reason}
{:error, reason} -> {:error, reason}
end
{:error, reason} -> {:error, reason}
end
Expand Down Expand Up @@ -95,7 +95,7 @@ defmodule Docker.Container do

def exec_start(host, exec_id, opts) do
"#{host}/exec/#{exec_id}/start"
|> Client.send_request(:post, opts, [], [stream_to: self])
|> Client.send_request(:post, opts, [], [stream_to: self()])
|> Response.parse(:exec_start)
end

Expand All @@ -111,7 +111,7 @@ defmodule Docker.Container do
case exec_create(host, container_id, create_opts) do
{:ok, %{id: exec_id}} ->
case exec_start(host, exec_id, %{}) do
{:ok, _} -> stream_response
{:ok, _} -> stream_response()
{:error, reason} -> {:error, reason}
end
{:error, reason} -> {:error, reason}
Expand All @@ -130,11 +130,11 @@ defmodule Docker.Container do
def logs_stream(host, container_id, opts \\ %{}) do
"#{host}/containers/#{container_id}/logs"
|> Client.add_query_params(opts)
|> Client.send_request(:get, [], [stream_to: self])
|> Client.send_request(:get, [], [stream_to: self()])
|> parse_logs_stream_response
end

defp parse_logs_stream_response(%HTTPoison.AsyncResponse{id: _ref}), do: stream_response
defp parse_logs_stream_response(%HTTPoison.AsyncResponse{id: _ref}), do: stream_response()

@default_attach_params %{
stream: 1,
Expand Down
23 changes: 23 additions & 0 deletions lib/docker/container_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Docker.ContainerManager do
use Supervisor

def start_link do
Supervisor.start_link(__MODULE__, :ok, name: :container_manager)
end

def start_container(image_name) do
Supervisor.start_child(:container_manager, [%{"Image" => image_name}])
end

def start_container do
Supervisor.start_child(:container_manager, [])
end

def init(:ok) do
children = [
worker(Docker.ContainerWorker, [], restart: :temporary)
]

supervise(children, strategy: :simple_one_for_one)
end
end
61 changes: 61 additions & 0 deletions lib/docker/container_registry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Docker.ContainerRegistry do
use GenServer
alias Docker.Container

def start_link(name) do
GenServer.start_link(__MODULE__, :ok, name: name)
end

def register_worker(worker_pid, container_id) do
GenServer.cast(
__MODULE__,
{:register_worker, worker_pid, container_id}
)
end

def list_workers do
GenServer.call(__MODULE__, {:list_workers})
end

def init(:ok) do
case Docker.Container.list(host()) do
{:error, _reason} ->
{:ok, %{}}
ids ->
Enum.each(ids, &Docker.Container.kill(host(), &1))
{:ok, %{}}
end
end

def handle_cast({:register_worker, worker_pid, container_id}, state) do
ref = Process.monitor(worker_pid)
new_state = state |> Map.put(worker_pid, { container_id, ref })
{:noreply, new_state}
end

def handle_call({:list_workers}, _from, state) do
{:reply, {:ok, state}, state}
end

def handle_info({:DOWN, ref, _type, pid, _reason}, state) do
case Map.get(state, pid) do
{container_id, ^ref} ->
try do
Container.kill(host(), container_id)
rescue
reason ->
IO.puts "Error Killing Container"
IO.inspect reason
end
Process.demonitor(ref)
new_state = state |> Map.delete(pid)
{:noreply, new_state}
_ ->
{:noreply, state}
end
end

defp host do
"127.0.0.1:2375"
end
end
152 changes: 152 additions & 0 deletions lib/docker/container_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
defmodule Docker.ContainerWorker do
alias Docker.ContainerRegistry
alias Docker.Container
use GenServer

@default_timeout 300_000
@default_container_opts %{
"NetworkDisabled" => true,
"HostConfig" => %{
"Memory" => 50_000_000,
"MemorySwap" => 50_000_000,
"KernelMemory" => 50_000_000,
"CpuQuota" => 5_000
}
}

def start_link(opts \\ %{}) do
GenServer.start_link(__MODULE__, opts)
end

def exec(pid, commands) do
GenServer.call(pid, {:exec, commands}, 15_000)
end

def stdin(pid, stdin) do
GenServer.cast(pid, {:stdin, stdin})
end

def send_files(pid, blobs) when is_list(blobs) do
Enum.each(blobs, fn blob ->
send_file(pid, blob.name, blob.contents)
end)
end

def send_file(pid, filename, file_contents) do
commands = save_file_commands(filename, file_contents)
GenServer.cast(pid, {:exec_detached, commands})
end

def stop(pid) do
GenServer.stop(pid, :shutdown)
end

### Server Callbacks
def init(opts \\ %{}) do
{:ok, timer_ref} = set_keep_alive_timer()
case create_container(opts) do
{:ok, container_id} ->
{:ok, %{timer: timer_ref, container_id: container_id}}
{:error, _reason} ->
{:stop, :error_creating_container}
end
end

def handle_call({:exec, commands}, _from, state) do
try do
container_id = Map.get(state, :container_id)
json = Container.exec_stream(host(), container_id,
%{
"Cmd" => commands,
"AttachStdout" => true,
"AttachStderr" => true
}
)
{:reply, {:ok, json}, reset_timer(state)}
rescue
reason ->
{:reply, {:error, reason}, state}
end
end

def handle_cast({:exec_detached, commands}, state) do
container_id = Map.get(state, :container_id)
Container.exec_detached(host(), container_id,
%{
"Cmd" => commands
}
)
{:noreply, reset_timer(state)}
end

def handle_cast({:stdin, stdin}, state) do
case state do
%{persistent_conn: connRef} ->
Container.stream_stdin(stdin, connRef)
{:noreply, reset_timer(state)}
%{container_id: container_id} ->
{:ok, %{connRef: connRef}} = Container.attach(host(), container_id, self())
Container.stream_stdin(stdin, connRef)
new_state = Map.put(state, :persistent_conn, connRef)
{:noreply, reset_timer(new_state)}
end
end

def terminate(:shutdown, state), do: kill_container(state)
def terminate({:shutdown, _exit_reason}, state), do: kill_container(state)


def handle_info({:hackney_response, _conn, message}, state) when is_binary(message) do
Docker.RequestMap.whereis_request(self())
|> send({:stdout, message})
{:noreply, state}
end

def handle_info(_msg, state) do
{:noreply, state}
end

defp reset_timer(state = %{timer: timer_ref}) do
:timer.cancel(timer_ref)
{:ok, new_timer_ref} = set_keep_alive_timer()
%{state | timer: new_timer_ref}
end

defp set_keep_alive_timer do
:timer.apply_after(@default_timeout, GenServer, :stop, [self(), {:shutdown, :timeout}])
end

defp host do
"127.0.0.1:2375"
end

defp create_container(opts \\ %{}) do
try do
merged_opts = Map.merge(@default_container_opts, opts)
{:ok, %{id: container_id}} = Container.run host(), merged_opts
ContainerRegistry.register_worker(self(), container_id)
{:ok, container_id}
rescue
reason ->
IO.puts "Error creating container"
{:error, reason}
end
end

defp save_file_commands(filename, file_contents) do
["bash", "-c", ~s(cat << 'EOF' > #{filename}\n#{file_contents}\nEOF)]
end

defp kill_container(%{container_id: container_id}) do
case Container.kill(host(), container_id) do
{:ok, _ } ->
IO.puts "Container #{container_id} shutdown"
%{"message" => message} ->
IO.puts message
{:error, reason} ->
IO.puts "Error killing container"
IO.inspect reason
end
end
end

42 changes: 42 additions & 0 deletions lib/docker/request_map.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Docker.RequestMap do
def start_link do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end

def register(request_pid, worker_pid) do
Agent.update(__MODULE__, fn(current) ->
Map.put( current, worker_pid, %{ request: request_pid })
end)
end

def unregister(worker_pid) do
Agent.update(__MODULE__, fn(current) ->
Map.drop(current, [worker_pid])
end)
end

def whereis_request(worker_pid) do
Agent.get(__MODULE__, fn(current) ->
current
|> Map.get(worker_pid)
|> Map.get(:request)
end)
end

def find_by_request_pid(request_pid) do
Agent.get(__MODULE__, fn(current) ->
current
|> Enum.find(fn {_key, val} -> val == %{request: request_pid} end)
|> case do
{worker_pid, _val} -> worker_pid
nil -> nil
end
end)
end

def list_all do
Agent.get(__MODULE__, fn(current) ->
current
end)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Docker.Mixfile do
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
description: "Api wrapper for Docker API in Elixir",
package: package,
package: package(),
deps: deps()]
end

Expand Down