Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ shards:

office365:
git: https://github.com/placeos/office365.git
version: 1.25.6
version: 1.25.7

openssl_ext:
git: https://github.com/spider-gazelle/openssl_ext.git
Expand Down Expand Up @@ -163,7 +163,7 @@ shards:

placeos-driver:
git: https://github.com/placeos/driver.git
version: 7.9.6
version: 7.10.1

placeos-log-backend:
git: https://github.com/place-labs/log-backend.git
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require "./helper"
require "../helper"

module PlaceOS::Core
describe DriverCleanup do
it "get running drivers information in expected format" do
it "should capture and retrieve stale drivers" do
_, driver, mod = setup
module_manager = module_manager_mock

Expand All @@ -21,11 +21,12 @@ module PlaceOS::Core

module_manager.local_processes.run_count.should eq(ProcessManager::Count.new(1, 1))

expected = ["drivers_place_private_helper_cce023_#{DriverCleanup.arch}"]
running = DriverCleanup.running_drivers
running.should eq(expected)
local = Dir.new(DriverStore::BINARY_PATH).children
running.should eq(expected)
tracker = DriverCleanup::StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT)
stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || 30)
stale_list.size.should eq(0)
driver_file = Path[DriverStore::BINARY_PATH, "drivers_place_private_helper_cce023a_#{Core::ARCH}"].to_s
value = REDIS_CLIENT.hgetall(driver_file)
value["last_executed_at"].to_i64.should be > 0
end
end
end
3 changes: 0 additions & 3 deletions src/core-app.cr
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ Signal::TERM.trap &terminate
# Wait for redis and postgres to be ready
PlaceOS::Core.wait_for_resources

# Start cleaning un-used driver task
PlaceOS::Core::DriverCleanup.start_cleanup

spawn do
begin
PlaceOS::Core.start_managers
Expand Down
43 changes: 0 additions & 43 deletions src/placeos-core/driver_cleanup.cr

This file was deleted.

256 changes: 22 additions & 234 deletions src/placeos-core/driver_manager.cr
Original file line number Diff line number Diff line change
@@ -1,240 +1,9 @@
require "uri"
require "digest"
require "connect-proxy"
require "placeos-models"
require "placeos-resource"
require "./module_manager"
require "./driver_manager/**"

module PlaceOS::Core
class DriverStore
BINARY_PATH = ENV["PLACEOS_DRIVER_BINARIES"]?.presence || Path["./bin/drivers"].expand.to_s

protected getter binary_path : String

def initialize(@binary_path : String = BINARY_PATH)
Dir.mkdir_p binary_path
end

def compiled?(file_name : String, commit : String, branch : String, uri : String) : Bool
Log.debug { {message: "Checking whether driver is compiled or not?", driver: file_name, commit: commit, branch: branch, repo: uri} }
path = Path[binary_path, executable_name(file_name, commit)]
return true if File.exists?(path)
resp = BuildApi.compiled?(file_name, commit, branch, uri)
return false unless resp.success?
ret = fetch_binary(LinkData.from_json(resp.body)) rescue nil
!ret.nil?
end

def compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true) : Result
Log.info { {message: "Requesting build service to compile driver", driver_file: file_name, branch: branch, repository: url} }
begin
resp = BuildApi.compile(file_name, url, commit, branch, force, username, password)
unless fetch
return Result.new(success: true)
end
resp = resp.not_nil!
unless resp.success?
Log.error { {message: resp.body, status_code: resp.status_code, driver: file_name, commit: commit, branch: branch, force: force} }
return Result.new(output: resp.body, name: file_name)
end
link = LinkData.from_json(resp.body)
begin
driver = fetch_binary(link)
rescue ex
return Result.new(output: ex.message.not_nil!, name: file_name)
end
Result.new(success: true, name: driver, path: binary_path)
rescue ex
msg = ex.message || "compiled returned no exception message"
Log.error(exception: ex) { {message: msg, driver: file_name, commit: commit, branch: branch, force: force} }
Result.new(output: msg, name: file_name)
end
end

def metadata(file_name : String, commit : String, branch : String, uri : String)
resp = BuildApi.metadata(file_name, commit, branch, uri)
return Result.new(success: true, output: resp.body.as(String)) if resp.success?
Result.new(output: "Metadata not found. Server returned #{resp.status_code}")
rescue ex
Result.new(output: ex.message.not_nil!, name: file_name)
end

def defaults(file_name : String, commit : String, branch : String, uri : String)
resp = BuildApi.defaults(file_name, commit, branch, uri)
return Result.new(success: true, output: resp.body.as(String)) if resp.success?
Result.new(output: "Driver defaults not found. Server returned #{resp.status_code}")
rescue ex
Result.new(output: ex.message.not_nil!, name: file_name)
end

def built?(file_name : String, commit : String, branch : String, uri : String) : String?
return nil unless compiled?(file_name, commit, branch, uri)
driver_binary_path(file_name, commit).to_s
end

def driver_binary_path(file_name : String, commit : String)
Path[binary_path, executable_name(file_name, commit)]
end

def path(driver_file : String) : Path
Path[binary_path, driver_file]
end

def compiled_drivers : Array(String)
Dir.children(binary_path)
end

def executable_name(driver_source, commit)
driver_source = driver_source.rchop(".cr").gsub(/\/|\./, "_")
commit = commit[..6] if commit.size > 6
{driver_source, commit, Core::ARCH}.join("_").downcase
end

def reload_driver(driver_id : String)
if driver = Model::Driver.find?(driver_id)
repo = driver.repository!

if compiled?(driver.file_name, driver.commit, repo.branch, repo.uri)
manager = ModuleManager.instance
stale_path = manager.reload_modules(driver)
if path = stale_path
File.delete(path) rescue nil if File.exists?(path)
end
else
return {status: 404, message: "Driver not compiled or not available on S3"}
end
else
return {status: 404, message: "Driver with id #{driver_id} not found "}
end
{status: 200, message: "OK"}
end

private def fetch_binary(link : LinkData) : String
url = URI.parse(link.url)
driver_file = Path[url.path].basename
filename = Path[binary_path, driver_file]
resp = if Core.production? || url.scheme == "https"
ConnectProxy::HTTPClient.get(url.to_s)
else
uri = URI.new(path: url.path, query: url.query)
ConnectProxy::HTTPClient.new(url.host.not_nil!, 9000).get(uri.to_s)
end
if resp.success?
unless link.size == resp.headers.fetch("Content-Length", "0").to_i
Log.error { {message: "Expected content length #{link.size}, but received #{resp.headers.fetch("Content-Length", "0")}", driver_file: driver_file} }
raise Error.new("Response size doesn't match with build service returned result")
end

body_io = IO::Digest.new(resp.body_io? || IO::Memory.new(resp.body), Digest::MD5.new)
File.open(filename, "wb+") do |f|
IO.copy(body_io, f)
f.chmod(0o755)
end
filename.to_s
else
raise Error.new("Unable to fetch driver. Error : #{resp.body}")
end
end

private record LinkData, size : Int64, md5 : String, modified : Time, url : String, link_expiry : Time do
include JSON::Serializable
@[JSON::Field(converter: Time::EpochConverter)]
getter modified : Time
@[JSON::Field(converter: Time::EpochConverter)]
getter link_expiry : Time
end
end

module BuildApi
BUILD_API_BASE = "/api/build/v1"

def self.metadata(file_name : String, commit : String, branch : String, uri : String)
host = URI.parse(Core.build_host)
file_name = URI.encode_www_form(file_name)
ConnectProxy::HTTPClient.new(host) do |client|
path = "#{BUILD_API_BASE}/metadata/#{file_name}"
params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit})
uri = "#{path}?#{params}"
rep = client.get(uri)
Log.debug { {message: "Getting driver metadata. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch} }
rep
end
end

def self.defaults(file_name : String, commit : String, branch : String, uri : String)
host = URI.parse(Core.build_host)
file_name = URI.encode_www_form(file_name)
ConnectProxy::HTTPClient.new(host) do |client|
path = "#{BUILD_API_BASE}/defaults/#{file_name}"
params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit})
uri = "#{path}?#{params}"
rep = client.get(uri)
Log.debug { {message: "Getting driver defaults. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch} }
rep
end
end

def self.compiled?(file_name : String, commit : String, branch : String, uri : String)
host = URI.parse(Core.build_host)
file_name = URI.encode_www_form(file_name)
ConnectProxy::HTTPClient.new(host) do |client|
path = "#{BUILD_API_BASE}/#{Core::ARCH}/compiled/#{file_name}"
params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit})
uri = "#{path}?#{params}"
rep = client.get(uri)
Log.debug { {message: "Checking if driver is compiled?. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch, server_rep: rep.body} }
rep
end
end

def self.compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true)
host = URI.parse(Core.build_host)
file_name = URI.encode_www_form(file_name)
headers = HTTP::Headers.new
headers["X-Git-Username"] = username.not_nil! unless username.nil?
headers["X-Git-Password"] = password.not_nil! unless password.nil?

resp = ConnectProxy::HTTPClient.new(host) do |client|
path = "#{BUILD_API_BASE}/#{Core::ARCH}/#{file_name}"
params = URI::Params.encode({"url" => url, "branch" => branch, "commit" => commit, "force" => force.to_s})
uri = "#{path}?#{params}"
rep = client.post(uri, headers: headers)
Log.debug { {message: "Build URL host : #{client.host}, URI: #{uri} . Server response: #{rep.status_code}", server_resp: rep.body} }
rep
end

raise "Build API returned #{resp.status_code} while 202 was expected. Returned error: #{resp.body}" unless resp.status_code == 202
link = resp.headers["Content-Location"] rescue raise "Build API returned invalid response, missing Content-Location header"

task = JSON.parse(resp.body).as_h
loop do
resp = ConnectProxy::HTTPClient.new(host) do |client|
rep = client.get(link)
Log.debug { {message: "Invoked request: URI: #{link} . Server response: #{rep.status_code}", server_resp: rep.body} }
rep
end

raise "Returned invalid response code: #{resp.status_code}, #{link}, resp: #{resp.body}" unless resp.success? || resp.status_code == 303
task = JSON.parse(resp.body).as_h
break if task["state"].in?("cancelled", "error", "done")
sleep 5.seconds
end
if resp.success? && task["state"].in?("cancelled", "error")
raise task["message"].to_s
end
raise "Build API end-point #{link} returned invalid response code #{resp.status_code}, expected 303" unless resp.status_code == 303
raise "Build API end-point #{link} returned invalid state #{task["state"]}, expected 'done'" unless task["state"] == "done"
hdr = resp.headers["Location"] rescue raise "Build API returned compilation done, but missing Location URL"
if fetch
ConnectProxy::HTTPClient.new(host) do |client|
client.get(hdr)
end
end
end
end

record Result, success : Bool = false, output : String = "", name : String = "", path : String = ""

class DriverResource < Resource(Model::Driver)
private getter? startup : Bool = true
private getter module_manager : ModuleManager
Expand All @@ -244,7 +13,7 @@ module PlaceOS::Core
def initialize(
@startup : Bool = true,
@binary_dir : String = "#{Dir.current}/bin/drivers",
@module_manager : ModuleManager = ModuleManager.instance
@module_manager : ModuleManager = ModuleManager.instance,
)
@store = DriverStore.new
buffer_size = System.cpu_count.to_i
Expand All @@ -265,6 +34,7 @@ module PlaceOS::Core
driver.update_fields(compilation_output: nil) unless driver.compilation_output.nil?
Resource::Result::Success
in .deleted?
DriverResource.remove_driver(driver, store)
Result::Skipped
end
rescue exception
Expand All @@ -275,7 +45,7 @@ module PlaceOS::Core
driver : Model::Driver,
store : DriverStore,
startup : Bool = false,
module_manager : ModuleManager = ModuleManager.instance
module_manager : ModuleManager = ModuleManager.instance,
) : Core::Result
driver_id = driver.id.as(String)
repository = driver.repository!
Expand Down Expand Up @@ -361,10 +131,28 @@ module PlaceOS::Core
Log.info { {message: "updated commit on driver", id: driver.id, name: driver.name, commit: commit} }
end

def self.remove_driver(driver : Model::Driver, store : DriverStore)
path = store.driver_binary_path(driver.file_name, driver.commit)
Log.info { {message: "removing driver binary as it got removed from drivers", driver_id: driver.id.as(String), path: path.to_s} }
remove_stale_driver(path, driver.id.as(String))
end

def start_driver_jobs
DriverIntegrity.start_integrity_checker
DriverCleanup.start_cleanup
end

def start
super
@startup = false
start_driver_jobs
self
end

def stop
super
DriverIntegrity.stop_integrity_checker
DriverCleanup.stop_cleanup
end
end
end
Loading
Loading