From f571d46033bdaf86e3cbdbc85727ca7c14fff493 Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Fri, 16 May 2025 13:18:16 +0800 Subject: [PATCH] feat: PPT-2047,PPT-2048 driver integrity checker + refactoring driver_manager --- shard.lock | 4 +- .../driver_cleanup_spec.cr | 15 +- src/core-app.cr | 3 - src/placeos-core/driver_cleanup.cr | 43 --- src/placeos-core/driver_manager.cr | 256 ++---------------- src/placeos-core/driver_manager/build_api.cr | 92 +++++++ .../driver_manager/driver_cleanup.cr | 160 +++++++++++ .../driver_manager/driver_integrity.cr | 114 ++++++++ .../driver_manager/driver_store.cr | 147 ++++++++++ 9 files changed, 545 insertions(+), 289 deletions(-) rename spec/{ => driver_manager}/driver_cleanup_spec.cr (56%) delete mode 100644 src/placeos-core/driver_cleanup.cr create mode 100644 src/placeos-core/driver_manager/build_api.cr create mode 100644 src/placeos-core/driver_manager/driver_cleanup.cr create mode 100644 src/placeos-core/driver_manager/driver_integrity.cr create mode 100644 src/placeos-core/driver_manager/driver_store.cr diff --git a/shard.lock b/shard.lock index 24995bf5..5047576d 100644 --- a/shard.lock +++ b/shard.lock @@ -131,7 +131,7 @@ shards: office365: git: https://github.com/placeos/office365.git - version: 1.25.6 + version: 1.25.7 openssl_ext: git: https://github.com/spider-gazelle/openssl_ext.git @@ -163,7 +163,7 @@ shards: placeos-driver: git: https://github.com/placeos/driver.git - version: 7.9.6 + version: 7.10.1 placeos-log-backend: git: https://github.com/place-labs/log-backend.git diff --git a/spec/driver_cleanup_spec.cr b/spec/driver_manager/driver_cleanup_spec.cr similarity index 56% rename from spec/driver_cleanup_spec.cr rename to spec/driver_manager/driver_cleanup_spec.cr index 1064b823..c2ccb768 100644 --- a/spec/driver_cleanup_spec.cr +++ b/spec/driver_manager/driver_cleanup_spec.cr @@ -1,8 +1,8 @@ -require "./helper" +require "../helper" module PlaceOS::Core describe DriverCleanup do - it "get running drivers information in expected format" do + it "should capture and retrieve stale drivers" do _, driver, mod = setup module_manager = module_manager_mock @@ -21,11 +21,12 @@ module PlaceOS::Core module_manager.local_processes.run_count.should eq(ProcessManager::Count.new(1, 1)) - expected = ["drivers_place_private_helper_cce023_#{DriverCleanup.arch}"] - running = DriverCleanup.running_drivers - running.should eq(expected) - local = Dir.new(DriverStore::BINARY_PATH).children - running.should eq(expected) + tracker = DriverCleanup::StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT) + stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || 30) + stale_list.size.should eq(0) + driver_file = Path[DriverStore::BINARY_PATH, "drivers_place_private_helper_cce023a_#{Core::ARCH}"].to_s + value = REDIS_CLIENT.hgetall(driver_file) + value["last_executed_at"].to_i64.should be > 0 end end end diff --git a/src/core-app.cr b/src/core-app.cr index 429c0fb7..a235dd32 100644 --- a/src/core-app.cr +++ b/src/core-app.cr @@ -102,9 +102,6 @@ Signal::TERM.trap &terminate # Wait for redis and postgres to be ready PlaceOS::Core.wait_for_resources -# Start cleaning un-used driver task -PlaceOS::Core::DriverCleanup.start_cleanup - spawn do begin PlaceOS::Core.start_managers diff --git a/src/placeos-core/driver_cleanup.cr b/src/placeos-core/driver_cleanup.cr deleted file mode 100644 index 44f83684..00000000 --- a/src/placeos-core/driver_cleanup.cr +++ /dev/null @@ -1,43 +0,0 @@ -require "file_utils" -require "pg-orm" - -module PlaceOS::Core::DriverCleanup - def self.start_cleanup - spawn do - loop do - sleep(23.hours + rand(60).minutes) - cleanup_unused_drivers rescue nil - end - end - end - - def self.cleanup_unused_drivers - local = Dir.new(DriverStore::BINARY_PATH).children - running = running_drivers - stale = local - running - FileUtils.rm_rf(stale.map { |file| Path[DriverStore::BINARY_PATH, file] }) unless stale.empty? - end - - def self.arch - {% if flag?(:x86_64) %} "amd64" {% elsif flag?(:aarch64) %} "arm64" {% end %} || raise("Uknown architecture") - end - - def self.running_drivers - sql = <<-SQL - SELECT DISTINCT ON (driver.commit) - regexp_replace(regexp_replace(driver.file_name, '.cr$', '', 'g'), '[/.]', '_', 'g') || '_' || LEFT(driver.commit, 6) || '_' AS driver_file - FROM - mod, - driver - WHERE - mod.running = true - AND driver.id = mod.driver_id - ORDER BY driver.commit; - SQL - running = - ::DB.connect(Healthcheck.pg_healthcheck_url) do |db| - db.query_all sql, &.read(String) - end - running.map(&.+(arch)) - end -end diff --git a/src/placeos-core/driver_manager.cr b/src/placeos-core/driver_manager.cr index 79bd8d21..03b03154 100644 --- a/src/placeos-core/driver_manager.cr +++ b/src/placeos-core/driver_manager.cr @@ -1,240 +1,9 @@ -require "uri" -require "digest" -require "connect-proxy" require "placeos-models" require "placeos-resource" require "./module_manager" +require "./driver_manager/**" module PlaceOS::Core - class DriverStore - BINARY_PATH = ENV["PLACEOS_DRIVER_BINARIES"]?.presence || Path["./bin/drivers"].expand.to_s - - protected getter binary_path : String - - def initialize(@binary_path : String = BINARY_PATH) - Dir.mkdir_p binary_path - end - - def compiled?(file_name : String, commit : String, branch : String, uri : String) : Bool - Log.debug { {message: "Checking whether driver is compiled or not?", driver: file_name, commit: commit, branch: branch, repo: uri} } - path = Path[binary_path, executable_name(file_name, commit)] - return true if File.exists?(path) - resp = BuildApi.compiled?(file_name, commit, branch, uri) - return false unless resp.success? - ret = fetch_binary(LinkData.from_json(resp.body)) rescue nil - !ret.nil? - end - - def compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true) : Result - Log.info { {message: "Requesting build service to compile driver", driver_file: file_name, branch: branch, repository: url} } - begin - resp = BuildApi.compile(file_name, url, commit, branch, force, username, password) - unless fetch - return Result.new(success: true) - end - resp = resp.not_nil! - unless resp.success? - Log.error { {message: resp.body, status_code: resp.status_code, driver: file_name, commit: commit, branch: branch, force: force} } - return Result.new(output: resp.body, name: file_name) - end - link = LinkData.from_json(resp.body) - begin - driver = fetch_binary(link) - rescue ex - return Result.new(output: ex.message.not_nil!, name: file_name) - end - Result.new(success: true, name: driver, path: binary_path) - rescue ex - msg = ex.message || "compiled returned no exception message" - Log.error(exception: ex) { {message: msg, driver: file_name, commit: commit, branch: branch, force: force} } - Result.new(output: msg, name: file_name) - end - end - - def metadata(file_name : String, commit : String, branch : String, uri : String) - resp = BuildApi.metadata(file_name, commit, branch, uri) - return Result.new(success: true, output: resp.body.as(String)) if resp.success? - Result.new(output: "Metadata not found. Server returned #{resp.status_code}") - rescue ex - Result.new(output: ex.message.not_nil!, name: file_name) - end - - def defaults(file_name : String, commit : String, branch : String, uri : String) - resp = BuildApi.defaults(file_name, commit, branch, uri) - return Result.new(success: true, output: resp.body.as(String)) if resp.success? - Result.new(output: "Driver defaults not found. Server returned #{resp.status_code}") - rescue ex - Result.new(output: ex.message.not_nil!, name: file_name) - end - - def built?(file_name : String, commit : String, branch : String, uri : String) : String? - return nil unless compiled?(file_name, commit, branch, uri) - driver_binary_path(file_name, commit).to_s - end - - def driver_binary_path(file_name : String, commit : String) - Path[binary_path, executable_name(file_name, commit)] - end - - def path(driver_file : String) : Path - Path[binary_path, driver_file] - end - - def compiled_drivers : Array(String) - Dir.children(binary_path) - end - - def executable_name(driver_source, commit) - driver_source = driver_source.rchop(".cr").gsub(/\/|\./, "_") - commit = commit[..6] if commit.size > 6 - {driver_source, commit, Core::ARCH}.join("_").downcase - end - - def reload_driver(driver_id : String) - if driver = Model::Driver.find?(driver_id) - repo = driver.repository! - - if compiled?(driver.file_name, driver.commit, repo.branch, repo.uri) - manager = ModuleManager.instance - stale_path = manager.reload_modules(driver) - if path = stale_path - File.delete(path) rescue nil if File.exists?(path) - end - else - return {status: 404, message: "Driver not compiled or not available on S3"} - end - else - return {status: 404, message: "Driver with id #{driver_id} not found "} - end - {status: 200, message: "OK"} - end - - private def fetch_binary(link : LinkData) : String - url = URI.parse(link.url) - driver_file = Path[url.path].basename - filename = Path[binary_path, driver_file] - resp = if Core.production? || url.scheme == "https" - ConnectProxy::HTTPClient.get(url.to_s) - else - uri = URI.new(path: url.path, query: url.query) - ConnectProxy::HTTPClient.new(url.host.not_nil!, 9000).get(uri.to_s) - end - if resp.success? - unless link.size == resp.headers.fetch("Content-Length", "0").to_i - Log.error { {message: "Expected content length #{link.size}, but received #{resp.headers.fetch("Content-Length", "0")}", driver_file: driver_file} } - raise Error.new("Response size doesn't match with build service returned result") - end - - body_io = IO::Digest.new(resp.body_io? || IO::Memory.new(resp.body), Digest::MD5.new) - File.open(filename, "wb+") do |f| - IO.copy(body_io, f) - f.chmod(0o755) - end - filename.to_s - else - raise Error.new("Unable to fetch driver. Error : #{resp.body}") - end - end - - private record LinkData, size : Int64, md5 : String, modified : Time, url : String, link_expiry : Time do - include JSON::Serializable - @[JSON::Field(converter: Time::EpochConverter)] - getter modified : Time - @[JSON::Field(converter: Time::EpochConverter)] - getter link_expiry : Time - end - end - - module BuildApi - BUILD_API_BASE = "/api/build/v1" - - def self.metadata(file_name : String, commit : String, branch : String, uri : String) - host = URI.parse(Core.build_host) - file_name = URI.encode_www_form(file_name) - ConnectProxy::HTTPClient.new(host) do |client| - path = "#{BUILD_API_BASE}/metadata/#{file_name}" - params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit}) - uri = "#{path}?#{params}" - rep = client.get(uri) - Log.debug { {message: "Getting driver metadata. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch} } - rep - end - end - - def self.defaults(file_name : String, commit : String, branch : String, uri : String) - host = URI.parse(Core.build_host) - file_name = URI.encode_www_form(file_name) - ConnectProxy::HTTPClient.new(host) do |client| - path = "#{BUILD_API_BASE}/defaults/#{file_name}" - params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit}) - uri = "#{path}?#{params}" - rep = client.get(uri) - Log.debug { {message: "Getting driver defaults. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch} } - rep - end - end - - def self.compiled?(file_name : String, commit : String, branch : String, uri : String) - host = URI.parse(Core.build_host) - file_name = URI.encode_www_form(file_name) - ConnectProxy::HTTPClient.new(host) do |client| - path = "#{BUILD_API_BASE}/#{Core::ARCH}/compiled/#{file_name}" - params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit}) - uri = "#{path}?#{params}" - rep = client.get(uri) - Log.debug { {message: "Checking if driver is compiled?. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch, server_rep: rep.body} } - rep - end - end - - def self.compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true) - host = URI.parse(Core.build_host) - file_name = URI.encode_www_form(file_name) - headers = HTTP::Headers.new - headers["X-Git-Username"] = username.not_nil! unless username.nil? - headers["X-Git-Password"] = password.not_nil! unless password.nil? - - resp = ConnectProxy::HTTPClient.new(host) do |client| - path = "#{BUILD_API_BASE}/#{Core::ARCH}/#{file_name}" - params = URI::Params.encode({"url" => url, "branch" => branch, "commit" => commit, "force" => force.to_s}) - uri = "#{path}?#{params}" - rep = client.post(uri, headers: headers) - Log.debug { {message: "Build URL host : #{client.host}, URI: #{uri} . Server response: #{rep.status_code}", server_resp: rep.body} } - rep - end - - raise "Build API returned #{resp.status_code} while 202 was expected. Returned error: #{resp.body}" unless resp.status_code == 202 - link = resp.headers["Content-Location"] rescue raise "Build API returned invalid response, missing Content-Location header" - - task = JSON.parse(resp.body).as_h - loop do - resp = ConnectProxy::HTTPClient.new(host) do |client| - rep = client.get(link) - Log.debug { {message: "Invoked request: URI: #{link} . Server response: #{rep.status_code}", server_resp: rep.body} } - rep - end - - raise "Returned invalid response code: #{resp.status_code}, #{link}, resp: #{resp.body}" unless resp.success? || resp.status_code == 303 - task = JSON.parse(resp.body).as_h - break if task["state"].in?("cancelled", "error", "done") - sleep 5.seconds - end - if resp.success? && task["state"].in?("cancelled", "error") - raise task["message"].to_s - end - raise "Build API end-point #{link} returned invalid response code #{resp.status_code}, expected 303" unless resp.status_code == 303 - raise "Build API end-point #{link} returned invalid state #{task["state"]}, expected 'done'" unless task["state"] == "done" - hdr = resp.headers["Location"] rescue raise "Build API returned compilation done, but missing Location URL" - if fetch - ConnectProxy::HTTPClient.new(host) do |client| - client.get(hdr) - end - end - end - end - - record Result, success : Bool = false, output : String = "", name : String = "", path : String = "" - class DriverResource < Resource(Model::Driver) private getter? startup : Bool = true private getter module_manager : ModuleManager @@ -244,7 +13,7 @@ module PlaceOS::Core def initialize( @startup : Bool = true, @binary_dir : String = "#{Dir.current}/bin/drivers", - @module_manager : ModuleManager = ModuleManager.instance + @module_manager : ModuleManager = ModuleManager.instance, ) @store = DriverStore.new buffer_size = System.cpu_count.to_i @@ -265,6 +34,7 @@ module PlaceOS::Core driver.update_fields(compilation_output: nil) unless driver.compilation_output.nil? Resource::Result::Success in .deleted? + DriverResource.remove_driver(driver, store) Result::Skipped end rescue exception @@ -275,7 +45,7 @@ module PlaceOS::Core driver : Model::Driver, store : DriverStore, startup : Bool = false, - module_manager : ModuleManager = ModuleManager.instance + module_manager : ModuleManager = ModuleManager.instance, ) : Core::Result driver_id = driver.id.as(String) repository = driver.repository! @@ -361,10 +131,28 @@ module PlaceOS::Core Log.info { {message: "updated commit on driver", id: driver.id, name: driver.name, commit: commit} } end + def self.remove_driver(driver : Model::Driver, store : DriverStore) + path = store.driver_binary_path(driver.file_name, driver.commit) + Log.info { {message: "removing driver binary as it got removed from drivers", driver_id: driver.id.as(String), path: path.to_s} } + remove_stale_driver(path, driver.id.as(String)) + end + + def start_driver_jobs + DriverIntegrity.start_integrity_checker + DriverCleanup.start_cleanup + end + def start super @startup = false + start_driver_jobs self end + + def stop + super + DriverIntegrity.stop_integrity_checker + DriverCleanup.stop_cleanup + end end end diff --git a/src/placeos-core/driver_manager/build_api.cr b/src/placeos-core/driver_manager/build_api.cr new file mode 100644 index 00000000..340a3d11 --- /dev/null +++ b/src/placeos-core/driver_manager/build_api.cr @@ -0,0 +1,92 @@ +require "uri" +require "connect-proxy" + +module PlaceOS::Core + module BuildApi + BUILD_API_BASE = "/api/build/v1" + + def self.metadata(file_name : String, commit : String, branch : String, uri : String) + host = URI.parse(Core.build_host) + file_name = URI.encode_www_form(file_name) + ConnectProxy::HTTPClient.new(host) do |client| + path = "#{BUILD_API_BASE}/metadata/#{file_name}" + params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit}) + uri = "#{path}?#{params}" + rep = client.get(uri) + Log.debug { {message: "Getting driver metadata. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch} } + rep + end + end + + def self.defaults(file_name : String, commit : String, branch : String, uri : String) + host = URI.parse(Core.build_host) + file_name = URI.encode_www_form(file_name) + ConnectProxy::HTTPClient.new(host) do |client| + path = "#{BUILD_API_BASE}/defaults/#{file_name}" + params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit}) + uri = "#{path}?#{params}" + rep = client.get(uri) + Log.debug { {message: "Getting driver defaults. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch} } + rep + end + end + + def self.compiled?(file_name : String, commit : String, branch : String, uri : String) + host = URI.parse(Core.build_host) + file_name = URI.encode_www_form(file_name) + ConnectProxy::HTTPClient.new(host) do |client| + path = "#{BUILD_API_BASE}/#{Core::ARCH}/compiled/#{file_name}" + params = URI::Params.encode({"url" => uri, "branch" => branch, "commit" => commit}) + uri = "#{path}?#{params}" + rep = client.get(uri) + Log.debug { {message: "Checking if driver is compiled?. Server respose: #{rep.status_code}", file_name: file_name, commit: commit, branch: branch, server_rep: rep.body} } + rep + end + end + + def self.compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true) + host = URI.parse(Core.build_host) + file_name = URI.encode_www_form(file_name) + headers = HTTP::Headers.new + headers["X-Git-Username"] = username.not_nil! unless username.nil? + headers["X-Git-Password"] = password.not_nil! unless password.nil? + + resp = ConnectProxy::HTTPClient.new(host) do |client| + path = "#{BUILD_API_BASE}/#{Core::ARCH}/#{file_name}" + params = URI::Params.encode({"url" => url, "branch" => branch, "commit" => commit, "force" => force.to_s}) + uri = "#{path}?#{params}" + rep = client.post(uri, headers: headers) + Log.debug { {message: "Build URL host : #{client.host}, URI: #{uri} . Server response: #{rep.status_code}", server_resp: rep.body} } + rep + end + + raise "Build API returned #{resp.status_code} while 202 was expected. Returned error: #{resp.body}" unless resp.status_code == 202 + link = resp.headers["Content-Location"] rescue raise "Build API returned invalid response, missing Content-Location header" + + task = JSON.parse(resp.body).as_h + loop do + resp = ConnectProxy::HTTPClient.new(host) do |client| + rep = client.get(link) + Log.debug { {message: "Invoked request: URI: #{link} . Server response: #{rep.status_code}", server_resp: rep.body} } + rep + end + + raise "Returned invalid response code: #{resp.status_code}, #{link}, resp: #{resp.body}" unless resp.success? || resp.status_code == 303 + task = JSON.parse(resp.body).as_h + break if task["state"].in?("cancelled", "error", "done") + sleep 5.seconds + end + if resp.success? && task["state"].in?("cancelled", "error") + raise task["message"].to_s + end + raise "Build API end-point #{link} returned invalid response code #{resp.status_code}, expected 303" unless resp.status_code == 303 + raise "Build API end-point #{link} returned invalid state #{task["state"]}, expected 'done'" unless task["state"] == "done" + hdr = resp.headers["Location"] rescue raise "Build API returned compilation done, but missing Location URL" + if fetch + ConnectProxy::HTTPClient.new(host) do |client| + client.get(hdr) + end + end + end + end +end diff --git a/src/placeos-core/driver_manager/driver_cleanup.cr b/src/placeos-core/driver_manager/driver_cleanup.cr new file mode 100644 index 00000000..a2fb0ac3 --- /dev/null +++ b/src/placeos-core/driver_manager/driver_cleanup.cr @@ -0,0 +1,160 @@ +require "file_utils" +require "pg-orm" +require "redis" +require "time" +require "tasker" + +module PlaceOS::Core::DriverCleanup + DEFAULT_STALE_SCAN_INTERVAL = (23.hours + rand(60).minutes) + DEFAULT_STALE_THRESHOLD_DAYS = 15 + + @@tasker_inst : Tasker::Repeat(Nil)? + + def self.start_cleanup + tracker = StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT) + + @@tasker_inst = Tasker.every(ENV["STALE_SCAN_INTERVAL"]?.try &.to_i.hours || DEFAULT_STALE_SCAN_INTERVAL) do + stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || DEFAULT_STALE_THRESHOLD_DAYS) + tracker.delete_stale_executables(stale_list) + end + end + + def self.stop_cleanup + @@tasker_inst.try &.cancel + end + + class StaleProcessTracker + Log = Core::Log + + def initialize(@folder : String, @redis : Redis::Client) + @now = Time.utc + end + + def update_and_find_stale(days_threshold : Int32 = 30) + Log.info { "Starting stale executable check for #{@folder}" } + + current_executables = get_current_executables + Log.debug { "Found #{current_executables.size} executables in folder" } + + # Register new executables and update tracking + track_execution_events(current_executables) + + # Identify stale binaries considering both discovery and execution time + find_stale_binaries(current_executables, days_threshold.days) + end + + private def get_current_executables + Dir.children(@folder) + .map { |f| File.join(@folder, f) } + .select { |f| File.file?(f) && File::Info.executable?(f) } + .to_set + end + + def delete_stale_executables(stale_list : Array(String)) : Nil + Log.info { "Starting deletion of #{stale_list.size} stale executables" } + + stale_list.each do |exe_path| + begin + if File.exists?(exe_path) + File.delete(exe_path) + Log.info { "Deleted file: #{exe_path}" } + else + Log.warn { "File not found, skipped deletion: #{exe_path}" } + end + + deleted_count = @redis.del(exe_path) + if deleted_count > 0 + Log.debug { "Removed Redis entry for: #{exe_path}" } + else + Log.warn { "No Redis entry found for: #{exe_path}" } + end + rescue ex : Exception + Log.error(exception: ex) { "Failed to delete #{exe_path}" } + end + end + + Log.info { "Completed deletion process" } + end + + private def track_execution_events(current_executables) + # Register new executables with discovery time + current_executables.each do |exe| + unless @redis.hexists(exe, "discovered_at") + @redis.hset(exe, "discovered_at", @now.to_unix) + end + end + + # Update execution times for running processes + update_running_processes(current_executables) + end + + private def update_running_processes(current_executables) + current_uid = LibC.getuid + folder_basenames = current_executables.map { |exe| File.basename(exe) }.to_set + + Dir.glob("/proc/[0-9]*").each do |pid_dir| + begin + next unless process_owned_by_current_user?(pid_dir, current_uid) + + exe_name = get_process_executable_name(pid_dir) + next unless exe_name && folder_basenames.includes?(exe_name) + + full_path = File.join(@folder, exe_name) + next unless current_executables.includes?(full_path) + + # Update last execution time + @redis.hset(full_path, "last_executed_at", @now.to_unix) + rescue + # Ignore permission issues and race conditions + end + end + end + + private def find_stale_binaries(current_executables, threshold) + cutoff = @now - threshold + stale = [] of String + + current_executables.each do |exe| + redis_data = @redis.hgetall(exe) + discovered_at = redis_data["discovered_at"]?.try(&.to_i64) + last_executed_at = redis_data["last_executed_at"]?.try(&.to_i64) + + # Determine reference time (last execution or discovery) + reference_time = if lea = last_executed_at + Time.unix(lea) + elsif da = discovered_at + Time.unix(da) + else + @now # Should never happen due to registration + end + + if (@now - reference_time) > threshold + stale << exe + end + end + Log.info { "Found #{stale.size} stale executables" } + Log.debug { "Stale list: #{stale.join(", ")}" } unless stale.empty? + stale + end + + private def process_owned_by_current_user?(pid_dir : String, current_uid : UInt32) : Bool + status_file = File.join(pid_dir, "status") + return false unless File.exists?(status_file) + + uid_line = File.read(status_file).split("\n").find(&.starts_with?("Uid:")) + return false unless uid_line + + process_uid = uid_line.split(/\s+/)[1].to_i + process_uid == current_uid + end + + private def get_process_executable_name(pid_dir : String) : String? + cmdline = File.read(File.join(pid_dir, "cmdline")).split("\0").first? + return unless cmdline + + File.basename(cmdline) + rescue + nil + end + end +end diff --git a/src/placeos-core/driver_manager/driver_integrity.cr b/src/placeos-core/driver_manager/driver_integrity.cr new file mode 100644 index 00000000..ef7e07b6 --- /dev/null +++ b/src/placeos-core/driver_manager/driver_integrity.cr @@ -0,0 +1,114 @@ +require "file_utils" +require "db" +require "tasker" +require "./driver_store" + +module PlaceOS::Core::DriverIntegrity + DEFAULT_SCAN_INTERVAL = 2.hours + + record DriverRecord, id : String, driver_file : String, file_name : String, commit : String, uri : String, branch : String, username : String?, password : String?, running : Bool do + include DB::Serializable + end + @@tasker_inst : Tasker::Repeat(Nil)? + + def self.start_integrity_checker + @@tasker_inst = Tasker.every(ENV["INTEGRITY_SCAN_INTERVAL"]?.try &.to_i.hours || DEFAULT_SCAN_INTERVAL) do + sync_drivers + end + end + + def self.stop_integrity_checker + @@tasker_inst.try &.cancel + end + + def self.remove_blank_files + empty_files = [] of String + Dir.glob("#{DriverStore::BINARY_PATH}/**/*") do |driver| + if File.file?(driver) && File.size(driver) == 0 + empty_files << driver + end + end + FileUtils.rm_rf(empty_files) unless empty_files.empty? + end + + def self.current_executables + remove_blank_files + Dir.children(DriverStore::BINARY_PATH) + .select { |f| File.file?(f) && File::Info.executable?(f) } + .to_set + end + + def self.sync_drivers : Nil + existing = current_executables + drivers_arr = all_drivers + db_drivers = drivers_arr.map { |rec| rec.driver_file + Core::ARCH }.to_set + add_drivers = db_drivers - existing + stale = existing - db_drivers + FileUtils.rm_rf(stale.map { |file| Path[DriverStore::BINARY_PATH, file] }) unless stale.empty? + add_drivers_obj = drivers_arr.select { |rec| (rec.driver_file + Core::ARCH).in?(add_drivers) } + download_drivers(add_drivers_obj) + load_running_modules(drivers_arr.select(&.running)) + end + + def self.all_drivers : Array(DriverRecord) + sql = <<-SQL + SELECT + regexp_replace(regexp_replace(d.file_name, '.cr$', '', 'g'), '[/.]', '_', 'g') + || '_' || + (CASE + WHEN char_length(d.commit) >= 7 THEN LEFT(d.commit, 7) + ELSE d.commit + END) || '_' AS driver_file, + d.id, d.file_name, d.commit, r.uri, r.branch,r.username, r.password, m.running + FROM driver d + JOIN repo r ON d.repository_id = r.id + JOIN mod m ON d.id = m.driver_id + WHERE d.compilation_output IS NULL + AND r.has_runtime_error = false; + SQL + + result = ::DB.connect(Healthcheck.pg_healthcheck_url) do |db| + db.query_all sql, &.read(DriverRecord) + end + result + end + + def self.load_running_modules(drivers : Array(DriverRecord)) + return if drivers.empty? + should_be_running = drivers.map { |rec| rec.driver_file + Core::ARCH }.to_set + drivers_delta = should_be_running - find_running_drivers + return if drivers_delta.empty? + drivers_to_start = drivers.select { |rec| (rec.driver_file + Core::ARCH).in?(drivers_delta) } + module_manager = ModuleManager.instance + drivers_to_start.each do |driver| + module_manager.reload_modules(Model::Driver.find!(driver.id)) + end + end + + def self.download_drivers(drivers : Array(DriverRecord)) + store = DriverStore.new + drivers.each do |driver| + next if store.built?(driver.file_name, driver.commit, driver.branch, driver.uri) + store.compile(driver.file_name, driver.uri, driver.commit, driver.branch, false, driver.username, driver.password) + end + end + + def self.find_running_drivers : Set(String) + running = Set(String).new + + Dir.each_child("/proc") do |entry| + # Skip non-PID entries (process IDs are numeric) + next unless entry =~ /^\d+$/ + + pid = entry + exe_path = "/proc/#{pid}/exe" + begin + target = File.readlink(exe_path) + running << File.basename(target) if target.starts_with?(DriverStore::BINARY_PATH) + rescue ex : Exception + # Ignore processes we can't inspect + end + end + running + end +end diff --git a/src/placeos-core/driver_manager/driver_store.cr b/src/placeos-core/driver_manager/driver_store.cr new file mode 100644 index 00000000..bf0f5d54 --- /dev/null +++ b/src/placeos-core/driver_manager/driver_store.cr @@ -0,0 +1,147 @@ +require "uri" +require "digest" +require "connect-proxy" +require "./build_api" + +module PlaceOS::Core + record Result, success : Bool = false, output : String = "", name : String = "", path : String = "" + + class DriverStore + BINARY_PATH = ENV["PLACEOS_DRIVER_BINARIES"]?.presence || Path["./bin/drivers"].expand.to_s + + protected getter binary_path : String + + def initialize(@binary_path : String = BINARY_PATH) + Dir.mkdir_p binary_path + end + + def compiled?(file_name : String, commit : String, branch : String, uri : String) : Bool + Log.debug { {message: "Checking whether driver is compiled or not?", driver: file_name, commit: commit, branch: branch, repo: uri} } + path = Path[binary_path, executable_name(file_name, commit)] + return true if File.exists?(path) + resp = BuildApi.compiled?(file_name, commit, branch, uri) + return false unless resp.success? + ret = fetch_binary(LinkData.from_json(resp.body)) rescue nil + !ret.nil? + end + + def compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true) : Result + Log.info { {message: "Requesting build service to compile driver", driver_file: file_name, branch: branch, repository: url} } + begin + resp = BuildApi.compile(file_name, url, commit, branch, force, username, password) + unless fetch + return Result.new(success: true) + end + resp = resp.not_nil! + unless resp.success? + Log.error { {message: resp.body, status_code: resp.status_code, driver: file_name, commit: commit, branch: branch, force: force} } + return Result.new(output: resp.body, name: file_name) + end + link = LinkData.from_json(resp.body) + begin + driver = fetch_binary(link) + rescue ex + return Result.new(output: ex.message.not_nil!, name: file_name) + end + Result.new(success: true, name: driver, path: binary_path) + rescue ex + msg = ex.message || "compiled returned no exception message" + Log.error(exception: ex) { {message: msg, driver: file_name, commit: commit, branch: branch, force: force} } + Result.new(output: msg, name: file_name) + end + end + + def metadata(file_name : String, commit : String, branch : String, uri : String) + resp = BuildApi.metadata(file_name, commit, branch, uri) + return Result.new(success: true, output: resp.body.as(String)) if resp.success? + Result.new(output: "Metadata not found. Server returned #{resp.status_code}") + rescue ex + Result.new(output: ex.message.not_nil!, name: file_name) + end + + def defaults(file_name : String, commit : String, branch : String, uri : String) + resp = BuildApi.defaults(file_name, commit, branch, uri) + return Result.new(success: true, output: resp.body.as(String)) if resp.success? + Result.new(output: "Driver defaults not found. Server returned #{resp.status_code}") + rescue ex + Result.new(output: ex.message.not_nil!, name: file_name) + end + + def built?(file_name : String, commit : String, branch : String, uri : String) : String? + return nil unless compiled?(file_name, commit, branch, uri) + driver_binary_path(file_name, commit).to_s + end + + def driver_binary_path(file_name : String, commit : String) + Path[binary_path, executable_name(file_name, commit)] + end + + def path(driver_file : String) : Path + Path[binary_path, driver_file] + end + + def compiled_drivers : Array(String) + Dir.children(binary_path) + end + + def executable_name(driver_source, commit) + driver_source = driver_source.rchop(".cr").gsub(/\/|\./, "_") + commit = commit[..6] if commit.size > 6 + {driver_source, commit, Core::ARCH}.join("_").downcase + end + + def reload_driver(driver_id : String) + if driver = Model::Driver.find?(driver_id) + repo = driver.repository! + + if compiled?(driver.file_name, driver.commit, repo.branch, repo.uri) + manager = ModuleManager.instance + stale_path = manager.reload_modules(driver) + if path = stale_path + File.delete(path) rescue nil if File.exists?(path) + end + else + return {status: 404, message: "Driver not compiled or not available on S3"} + end + else + return {status: 404, message: "Driver with id #{driver_id} not found "} + end + {status: 200, message: "OK"} + end + + private def fetch_binary(link : LinkData) : String + url = URI.parse(link.url) + driver_file = Path[url.path].basename + filename = Path[binary_path, driver_file] + resp = if Core.production? || url.scheme == "https" + ConnectProxy::HTTPClient.get(url.to_s) + else + uri = URI.new(path: url.path, query: url.query) + ConnectProxy::HTTPClient.new(url.host.not_nil!, 9000).get(uri.to_s) + end + if resp.success? + unless link.size == resp.headers.fetch("Content-Length", "0").to_i + Log.error { {message: "Expected content length #{link.size}, but received #{resp.headers.fetch("Content-Length", "0")}", driver_file: driver_file} } + raise Error.new("Response size doesn't match with build service returned result") + end + + body_io = IO::Digest.new(resp.body_io? || IO::Memory.new(resp.body), Digest::MD5.new) + File.open(filename, "wb+") do |f| + IO.copy(body_io, f) + f.chmod(0o755) + end + filename.to_s + else + raise Error.new("Unable to fetch driver. Error : #{resp.body}") + end + end + + private record LinkData, size : Int64, md5 : String, modified : Time, url : String, link_expiry : Time do + include JSON::Serializable + @[JSON::Field(converter: Time::EpochConverter)] + getter modified : Time + @[JSON::Field(converter: Time::EpochConverter)] + getter link_expiry : Time + end + end +end