From 56b0b61048a26abded06c73e15b915ae0cc8cf45 Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Fri, 9 May 2025 19:41:02 +0800 Subject: [PATCH 1/2] refactor(driver_cleanup): PPT=2034 Refactor stale tracking process --- shard.lock | 2 +- spec/driver_cleanup_spec.cr | 12 ++- src/placeos-core/driver_cleanup.cr | 163 ++++++++++++++++++++++++----- 3 files changed, 146 insertions(+), 31 deletions(-) diff --git a/shard.lock b/shard.lock index 24995bf..eceb342 100644 --- a/shard.lock +++ b/shard.lock @@ -131,7 +131,7 @@ shards: office365: git: https://github.com/placeos/office365.git - version: 1.25.6 + version: 1.25.7 openssl_ext: git: https://github.com/spider-gazelle/openssl_ext.git diff --git a/spec/driver_cleanup_spec.cr b/spec/driver_cleanup_spec.cr index 1064b82..6d49cf6 100644 --- a/spec/driver_cleanup_spec.cr +++ b/spec/driver_cleanup_spec.cr @@ -21,11 +21,13 @@ module PlaceOS::Core module_manager.local_processes.run_count.should eq(ProcessManager::Count.new(1, 1)) - expected = ["drivers_place_private_helper_cce023_#{DriverCleanup.arch}"] - running = DriverCleanup.running_drivers - running.should eq(expected) - local = Dir.new(DriverStore::BINARY_PATH).children - running.should eq(expected) + tracker = DriverCleanup::StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT) + stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || 30) + stale_list.size.should eq(0) + driver_file = Path[DriverStore::BINARY_PATH, "drivers_place_private_helper_cce023a_#{DriverCleanup.arch}"].to_s + p! driver_file + value = REDIS_CLIENT.hgetall(driver_file) + value["last_executed_at"].to_i64.should be > 0 end end end diff --git a/src/placeos-core/driver_cleanup.cr b/src/placeos-core/driver_cleanup.cr index 44f8368..23c4c26 100644 --- a/src/placeos-core/driver_cleanup.cr +++ b/src/placeos-core/driver_cleanup.cr @@ -1,43 +1,156 @@ require "file_utils" require "pg-orm" +require "redis" +require "time" module PlaceOS::Core::DriverCleanup def self.start_cleanup spawn do + tracker = StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT) loop do - sleep(23.hours + rand(60).minutes) - cleanup_unused_drivers rescue nil + sleep(ENV["STALE_SCAN_INTERVAL"]?.try &.to_i.hours || (23.hours + rand(60).minutes)) + stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || 30) + tracker.delete_stale_executables(stale_list) end end end - def self.cleanup_unused_drivers - local = Dir.new(DriverStore::BINARY_PATH).children - running = running_drivers - stale = local - running - FileUtils.rm_rf(stale.map { |file| Path[DriverStore::BINARY_PATH, file] }) unless stale.empty? - end - def self.arch {% if flag?(:x86_64) %} "amd64" {% elsif flag?(:aarch64) %} "arm64" {% end %} || raise("Uknown architecture") end - def self.running_drivers - sql = <<-SQL - SELECT DISTINCT ON (driver.commit) - regexp_replace(regexp_replace(driver.file_name, '.cr$', '', 'g'), '[/.]', '_', 'g') || '_' || LEFT(driver.commit, 6) || '_' AS driver_file - FROM - mod, - driver - WHERE - mod.running = true - AND driver.id = mod.driver_id - ORDER BY driver.commit; - SQL - running = - ::DB.connect(Healthcheck.pg_healthcheck_url) do |db| - db.query_all sql, &.read(String) + class StaleProcessTracker + Log = Core::Log + + def initialize(@folder : String, @redis : Redis::Client) + @now = Time.utc + end + + def update_and_find_stale(days_threshold : Int32 = 30) + Log.info { "Starting stale executable check for #{@folder}" } + + current_executables = get_current_executables + Log.debug { "Found #{current_executables.size} executables in folder" } + + # Register new executables and update tracking + track_execution_events(current_executables) + + # Identify stale binaries considering both discovery and execution time + find_stale_binaries(current_executables, days_threshold.days) + end + + private def get_current_executables + Dir.children(@folder) + .map { |f| File.join(@folder, f) } + .select { |f| File.file?(f) && File::Info.executable?(f) } + .to_set + end + + def delete_stale_executables(stale_list : Array(String)) : Nil + Log.info { "Starting deletion of #{stale_list.size} stale executables" } + + stale_list.each do |exe_path| + begin + if File.exists?(exe_path) + File.delete(exe_path) + Log.info { "Deleted file: #{exe_path}" } + else + Log.warn { "File not found, skipped deletion: #{exe_path}" } + end + + deleted_count = @redis.del(exe_path) + if deleted_count > 0 + Log.debug { "Removed Redis entry for: #{exe_path}" } + else + Log.warn { "No Redis entry found for: #{exe_path}" } + end + rescue ex : Errno + Log.error(exception: ex) { "Failed to delete #{exe_path}" } + end + end + + Log.info { "Completed deletion process" } + end + + private def track_execution_events(current_executables) + # Register new executables with discovery time + current_executables.each do |exe| + unless @redis.hexists(exe, "discovered_at") + @redis.hset(exe, "discovered_at", @now.to_unix) + end + end + + # Update execution times for running processes + update_running_processes(current_executables) + end + + private def update_running_processes(current_executables) + current_uid = LibC.getuid + folder_basenames = current_executables.map { |exe| File.basename(exe) }.to_set + + Dir.glob("/proc/[0-9]*").each do |pid_dir| + begin + next unless process_owned_by_current_user?(pid_dir, current_uid) + + exe_name = get_process_executable_name(pid_dir) + next unless exe_name && folder_basenames.includes?(exe_name) + + full_path = File.join(@folder, exe_name) + next unless current_executables.includes?(full_path) + + # Update last execution time + @redis.hset(full_path, "last_executed_at", @now.to_unix) + rescue + # Ignore permission issues and race conditions + end + end + end + + private def find_stale_binaries(current_executables, threshold) + cutoff = @now - threshold + stale = [] of String + + current_executables.each do |exe| + redis_data = @redis.hgetall(exe) + discovered_at = redis_data["discovered_at"]?.try(&.to_i64) + last_executed_at = redis_data["last_executed_at"]?.try(&.to_i64) + + # Determine reference time (last execution or discovery) + reference_time = if lea = last_executed_at + Time.unix(lea) + elsif da = discovered_at + Time.unix(da) + else + @now # Should never happen due to registration + end + + if (@now - reference_time) > threshold + stale << exe + end end - running.map(&.+(arch)) + Log.info { "Found #{stale.size} stale executables" } + Log.debug { "Stale list: #{stale.join(", ")}" } unless stale.empty? + stale + end + + private def process_owned_by_current_user?(pid_dir : String, current_uid : UInt32) : Bool + status_file = File.join(pid_dir, "status") + return false unless File.exists?(status_file) + + uid_line = File.read(status_file).split("\n").find(&.starts_with?("Uid:")) + return false unless uid_line + + process_uid = uid_line.split(/\s+/)[1].to_i + process_uid == current_uid + end + + private def get_process_executable_name(pid_dir : String) : String? + cmdline = File.read(File.join(pid_dir, "cmdline")).split("\0").first? + return unless cmdline + + File.basename(cmdline) + rescue + nil + end end end From 343e47bdce223715ac6180151342442509dab382 Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Fri, 9 May 2025 20:31:09 +0800 Subject: [PATCH 2/2] refactor(driver_cleanup): minor exception class change --- src/placeos-core/driver_cleanup.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/placeos-core/driver_cleanup.cr b/src/placeos-core/driver_cleanup.cr index 23c4c26..8ddf5b7 100644 --- a/src/placeos-core/driver_cleanup.cr +++ b/src/placeos-core/driver_cleanup.cr @@ -64,7 +64,7 @@ module PlaceOS::Core::DriverCleanup else Log.warn { "No Redis entry found for: #{exe_path}" } end - rescue ex : Errno + rescue ex : Exception Log.error(exception: ex) { "Failed to delete #{exe_path}" } end end