From b24270f73a3c4c02d589d9bb9e70f65c093e8322 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Thu, 3 Oct 2024 16:47:13 +0200 Subject: [PATCH 1/7] First step for cron jobs in zinit --- Cargo.lock | 136 +++++++++++++++++++++--- Cargo.toml | 1 + src/zinit/config.rs | 11 ++ src/zinit/mod.rs | 247 +++++++++++++++++++++++++------------------- 4 files changed, 276 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d08bd2..e99d015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,17 +98,94 @@ dependencies = [ "log", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" -version = "0.3.18" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] [[package]] name = "futures-sink" -version = "0.3.18" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] [[package]] name = "git-version" @@ -129,7 +206,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -308,6 +385,12 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -316,18 +399,18 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.29" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] name = "quote" -version = "1.0.9" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -370,7 +453,7 @@ checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -411,6 +494,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.6.1" @@ -434,6 +526,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "syn" +version = "2.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -460,7 +563,7 @@ checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -491,7 +594,7 @@ checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -520,6 +623,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + [[package]] name = "unicode-width" version = "0.1.9" @@ -577,6 +686,7 @@ dependencies = [ "clap", "command-group", "fern", + "futures", "git-version", "log", "nix", diff --git a/Cargo.toml b/Cargo.toml index 822ddb5..55c8c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,4 @@ thiserror = "1.0" clap = "2.33" git-version = "0.3.5" command-group = "1.0.8" +futures = "0.3.30" diff --git a/src/zinit/config.rs b/src/zinit/config.rs index f706c47..c76229e 100644 --- a/src/zinit/config.rs +++ b/src/zinit/config.rs @@ -52,6 +52,7 @@ pub struct Service { pub log: Log, pub env: HashMap, pub dir: String, + pub cron: Option, } impl Service { @@ -64,6 +65,16 @@ impl Service { Signal::from_str(&self.signal.stop.to_uppercase())?; + // Validate the cron field if present + if let Some(cron_value) = self.cron { + if cron_value == 0 { + bail!("cron value must be greater than zero"); + } + if !self.one_shot { + bail!("cron can only be specified for oneshot services"); + } + } + Ok(()) } } diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index 421525a..b6df13b 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -12,12 +12,13 @@ use nix::unistd::Pid; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::{Notify, RwLock}; -use tokio::time; use tokio::time::timeout; +use tokio::time::{self}; use tokio_stream::{wrappers::WatchStream, StreamExt}; pub trait WaitStatusExt { @@ -543,8 +544,8 @@ impl ZInit { async fn watch(self, name: String, input: Arc>) { let name = name.clone(); - let mut service = input.write().await; + if service.target == Target::Down { debug!("service '{}' target is down", name); return; @@ -556,123 +557,157 @@ impl ZInit { } service.scheduled = true; - drop(service); + drop(service); // Release the lock loop { - let name = name.clone(); - - let service = input.read().await; - // early check if service is down, so we don't have to do extra checks - if service.target == Target::Down { - // we check target in loop in case service have - // been set down. - break; - } - let config = service.service.clone(); - // we need to spawn this service now, but is it ready? - // are all dependent services are running ? - - // so we drop the table to give other services - // chance to acquire the lock and schedule themselves - drop(service); - - 'checks: loop { - let sig = self.notify.notified(); - debug!("checking {} if it can schedule", name); - if self.can_schedule(&config).await { - debug!("service {} can schedule", name); - break 'checks; + { + let service = input.read().await; + if service.target == Target::Down { + // Service target is down; exit the loop + break; + } + let config = service.service.clone(); + // Release the lock; enables other services to aquire it and schedule + // themselves + drop(service); + + // Wait for dependencies + while !self.can_schedule(&config).await { + let sig = self.notify.notified(); + self.set(&name, Some(State::Blocked), None).await; + sig.await; } - self.set(&name, Some(State::Blocked), None).await; - // don't even care if i am lagging - // as long i am notified that some services status - // has changed - debug!("service {} is blocked, waiting release", name); - sig.await; - } - - let log = match config.log { - config::Log::None => Log::None, - config::Log::Stdout => Log::Stdout, - config::Log::Ring => Log::Ring(name.clone()), - }; + let log = match config.log { + config::Log::None => Log::None, + config::Log::Stdout => Log::Stdout, + config::Log::Ring => Log::Ring(name.clone()), + }; + + let mut service = input.write().await; + // We check again in case target has changed. Since we had to release the lock + // earlier to not block locking on this service (for example if a stop was called) + // while the service was waiting for dependencies. + // The lock is kept until the spawning and the update of the pid. + if service.target == Target::Down { + // Service target is down; exit the loop + break; + } - let mut service = input.write().await; - // we check again in case target has changed. Since we had to release the lock - // earlier to not block locking on this service (for example if a stop was called) - // while the service was waiting for dependencies. - // the lock is kept until the spawning and the update of the pid. - if service.target == Target::Down { - // we check target in loop in case service have - // been set down. - break; - } + let child = match self + .pm + .run( + Process::new(&config.exec, &config.dir, Some(config.env.clone())), + log.clone(), + ) + .await + { + Ok(child) => { + service.state.set(State::Spawned); + service.pid = child.pid; + child + } + Err(err) => { + error!("service {} failed to start: {}", name, err); + service.state.set(State::Failure); + // Decide whether to break or continue based on your logic + break; + } + }; - let child = self - .pm - .run( - Process::new(&config.exec, &config.dir, Some(config.env.clone())), - log.clone(), - ) - .await; - - let child = match child { - Ok(child) => { - service.state.set(State::Spawned); - service.pid = child.pid; - child - } - Err(err) => { - // so, spawning failed. and nothing we can do about it - // this can be duo to a bad command or exe not found. - // set service to failure. - error!("service {} failed to start: {}", name, err); - service.state.set(State::Failure); - break; + if config.one_shot { + service.state.set(State::Running); } - }; - if config.one_shot { - service.state.set(State::Running); - } - // we don't lock the here here because this can take forever - // to finish. so we allow other operation on the service (for example) - // status and stop operations. - drop(service); - - let mut handler = None; - if !config.one_shot { - let m = self.clone(); - handler = Some(tokio::spawn(m.test(name.clone(), config.clone()))); - } + // We don't lock the here here because this can take forever + // to finish. So, we allow other operation on the service (for example) + // status and stop operations. + drop(service); // Release the lock - let result = child.wait().await; - if let Some(handler) = handler { - handler.abort(); - } + let mut handler = None; + if !config.one_shot { + let m = self.clone(); + handler = Some(tokio::spawn(m.test(name.clone(), config.clone()))); + } - let mut service = input.write().await; - service.pid = Pid::from_raw(0); - match result { - Err(err) => { - error!("failed to read service '{}' status: {}", name, err); - service.state.set(State::Unknown); + // Prepare futures for selection + let cron_future = config + .cron + .map(|cron_duration| tokio::time::sleep(Duration::from_secs(cron_duration))); + + let child_pid = child.pid; + let wait_future = child.wait(); + + // Use select to wait on the child process or the cron future + tokio::select! { + result = wait_future => { + // The child process has exited + if let Some(handler) = handler { + handler.abort(); + } + + let mut service = input.write().await; + service.pid = Pid::from_raw(0); + + match result { + Err(err) => { + error!("failed to read service '{}' status: {}", name, err); + service.state.set(State::Unknown); + } + Ok(status) => service.state.set(match status.success() { + true => State::Success, + false => State::Error(status), + }), + } + drop(service); + + if config.one_shot { + // For oneshot services, we don't need to restart + self.notify.notify_waiters(); + break; + } + } + _ = async { + if let Some(cron_fut) = cron_future { + cron_fut.await; + } else { + futures::future::pending::<()>().await; + } + } => { + // Cron duration elapsed + if *self.shutdown.read().await { + // If shutting down, exit the loop + break; + } + + let service = input.read().await; + if service.target == Target::Down { + break; + } + let signal_name = service.service.signal.stop.to_uppercase(); + drop(service); + + let signal = match signal::Signal::from_str(&signal_name) { + Ok(signal) => signal, + Err(err) => { + error!("unknown stop signal configured in service '{}': {}", name, err); + break; + } + }; + + // Send stop signal to the process group + let _ = self.pm.signal(child_pid, signal); + + // Optionally wait for the service to stop + time::sleep(Duration::from_secs(1)).await; + } } - Ok(status) => service.state.set(match status.success() { - true => State::Success, - false => State::Error(status), - }), - }; - drop(service); - if config.one_shot { - // we don't need to restart the service anymore - self.notify.notify_waiters(); - break; + // Allow some time before potentially restarting the service + time::sleep(Duration::from_secs(1)).await; } - // we trying again in 2 seconds - time::sleep(std::time::Duration::from_secs(2)).await; + + // The loop will repeat unless we break out due to shutdown or service being down } let mut service = input.write().await; From 6c22adc3f8de6335ce4d7a1751b12a4d38b7bc03 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Thu, 3 Oct 2024 16:59:50 +0200 Subject: [PATCH 2/7] cron jobs in zinit --- docs/readme.md | 4 +- src/zinit/mod.rs | 132 ++++++++++++----------------------------------- 2 files changed, 35 insertions(+), 101 deletions(-) diff --git a/docs/readme.md b/docs/readme.md index 5896849..4e4c5a2 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -22,6 +22,7 @@ When running zinit in a container, supply the `--container` argument to the init exec: "command line to start service" test: "command line to test service is running" # optional oneshot: true or false (false by default) +cron: amount of seconds # creates a cronjob, oneshot MUST be set to true after: # list of services that we depend on (optional) - service1_name - service2_name @@ -32,7 +33,8 @@ env: KEY: VALUE ``` -- `oneshot` service is not going to re-spawn when it exits. +- `oneshot` service is not going to re-spawn when it exits, except when `cron` has a value > 0. +- `cron` is used to create a cronjob. This only works when `oneshot` is set to `true`. - if a service depends on a `oneshot` services, it will not get started, unless the oneshot service exits with success. - if a service depends on another service (that is not `oneshot`), it will not get started, unless the service is marked as `running` - a service with no test command is marked running if it successfully executed, regardless if it exits immediately after or not, hence a test command is useful. diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index b6df13b..75db7fa 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -546,13 +546,7 @@ impl ZInit { let name = name.clone(); let mut service = input.write().await; - if service.target == Target::Down { - debug!("service '{}' target is down", name); - return; - } - - if service.scheduled { - debug!("service '{}' already scheduled", name); + if service.target == Target::Down || service.scheduled { return; } @@ -567,9 +561,7 @@ impl ZInit { break; } let config = service.service.clone(); - // Release the lock; enables other services to aquire it and schedule - // themselves - drop(service); + drop(service); // Release the lock // Wait for dependencies while !self.can_schedule(&config).await { @@ -585,10 +577,7 @@ impl ZInit { }; let mut service = input.write().await; - // We check again in case target has changed. Since we had to release the lock - // earlier to not block locking on this service (for example if a stop was called) - // while the service was waiting for dependencies. - // The lock is kept until the spawning and the update of the pid. + if service.target == Target::Down { // Service target is down; exit the loop break; @@ -615,99 +604,42 @@ impl ZInit { } }; - if config.one_shot { - service.state.set(State::Running); - } - - // We don't lock the here here because this can take forever - // to finish. So, we allow other operation on the service (for example) - // status and stop operations. + // Since only oneshot services can have cron, we are in oneshot mode + service.state.set(State::Running); drop(service); // Release the lock - let mut handler = None; - if !config.one_shot { - let m = self.clone(); - handler = Some(tokio::spawn(m.test(name.clone(), config.clone()))); - } + // Wait for the child process to finish + let wait_result = child.wait().await; - // Prepare futures for selection - let cron_future = config - .cron - .map(|cron_duration| tokio::time::sleep(Duration::from_secs(cron_duration))); - - let child_pid = child.pid; - let wait_future = child.wait(); - - // Use select to wait on the child process or the cron future - tokio::select! { - result = wait_future => { - // The child process has exited - if let Some(handler) = handler { - handler.abort(); - } - - let mut service = input.write().await; - service.pid = Pid::from_raw(0); - - match result { - Err(err) => { - error!("failed to read service '{}' status: {}", name, err); - service.state.set(State::Unknown); - } - Ok(status) => service.state.set(match status.success() { - true => State::Success, - false => State::Error(status), - }), - } - drop(service); - - if config.one_shot { - // For oneshot services, we don't need to restart - self.notify.notify_waiters(); - break; - } - } - _ = async { - if let Some(cron_fut) = cron_future { - cron_fut.await; - } else { - futures::future::pending::<()>().await; - } - } => { - // Cron duration elapsed - if *self.shutdown.read().await { - // If shutting down, exit the loop - break; - } - - let service = input.read().await; - if service.target == Target::Down { - break; - } - let signal_name = service.service.signal.stop.to_uppercase(); - drop(service); - - let signal = match signal::Signal::from_str(&signal_name) { - Ok(signal) => signal, - Err(err) => { - error!("unknown stop signal configured in service '{}': {}", name, err); - break; - } - }; - - // Send stop signal to the process group - let _ = self.pm.signal(child_pid, signal); - - // Optionally wait for the service to stop - time::sleep(Duration::from_secs(1)).await; + let mut service = input.write().await; + service.pid = Pid::from_raw(0); + + match wait_result { + Err(err) => { + error!("failed to read service '{}' status: {}", name, err); + service.state.set(State::Unknown); } + Ok(status) => service.state.set(match status.success() { + true => State::Success, + false => State::Error(status), + }), } + drop(service); // Release the lock - // Allow some time before potentially restarting the service - time::sleep(Duration::from_secs(1)).await; + // Check if we should schedule the service again based on cron + if let Some(cron_duration) = config.cron { + if *self.shutdown.read().await { + // If shutting down, exit the loop + break; + } + // Wait for the specified cron duration + tokio::time::sleep(std::time::Duration::from_secs(cron_duration)).await; + // Loop will restart and the oneshot service will be executed again + } else { + // No cron specified, exit the loop after the oneshot execution + break; + } } - - // The loop will repeat unless we break out due to shutdown or service being down } let mut service = input.write().await; From 66fa899b91de4d9ec11442f83531eeeb0fbd5104 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Fri, 4 Oct 2024 13:26:50 +0200 Subject: [PATCH 3/7] cron jobs but this time better and correct cronjob syntax --- Cargo.lock | 256 +++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 2 + src/zinit/config.rs | 33 ++++-- src/zinit/mod.rs | 224 +++++++++++++++++++++++++------------- 4 files changed, 429 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e99d015..858b8d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -40,6 +55,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "bytes" version = "1.1.0" @@ -48,9 +69,9 @@ checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "cc" -version = "1.0.70" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d26a6ce4b6a484fa3edb70f7efa6fc430fd2b87285fe8b84304fd0936faa0dc0" +checksum = "e9e8aabfac534be767c909e0690571677d49f41bd8465ae876fe043d52ba5292" [[package]] name = "cfg-if" @@ -58,6 +79,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "clap" version = "2.33.3" @@ -83,6 +118,23 @@ dependencies = [ "winapi", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "dtoa" version = "0.4.8" @@ -224,6 +276,29 @@ dependencies = [ "libc", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "indexmap" version = "1.7.0" @@ -249,6 +324,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "js-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.139" @@ -294,6 +378,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.7.13" @@ -329,6 +419,16 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -338,6 +438,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -350,9 +459,12 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.8.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" +dependencies = [ + "portable-atomic", +] [[package]] name = "parking_lot" @@ -391,6 +503,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -647,6 +765,61 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "wasm-bindgen" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.79", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" + [[package]] name = "winapi" version = "0.3.9" @@ -669,6 +842,79 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "yaml-rust" version = "0.4.5" @@ -683,8 +929,10 @@ name = "zinit" version = "0.2.0" dependencies = [ "anyhow", + "chrono", "clap", "command-group", + "cron", "fern", "futures", "git-version", diff --git a/Cargo.toml b/Cargo.toml index 55c8c3a..a074adb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,5 @@ clap = "2.33" git-version = "0.3.5" command-group = "1.0.8" futures = "0.3.30" +cron = "0.12.1" +chrono = "0.4.38" diff --git a/src/zinit/config.rs b/src/zinit/config.rs index c76229e..e12d344 100644 --- a/src/zinit/config.rs +++ b/src/zinit/config.rs @@ -1,11 +1,13 @@ use anyhow::Result; -use serde::{Deserialize, Serialize}; +use cron::Schedule; +use serde::{Deserialize, Deserializer, Serialize}; use serde_yaml as yaml; use std::collections::HashMap; use std::ffi::OsStr; use std::fs::{self, File}; use std::path::Path; pub type Services = HashMap; +use std::str::FromStr; pub const DEFAULT_SHUTDOWN_TIMEOUT: u64 = 10; // in seconds @@ -52,7 +54,8 @@ pub struct Service { pub log: Log, pub env: HashMap, pub dir: String, - pub cron: Option, + #[serde(default, deserialize_with = "deserialize_cron_option")] + pub cron: Option, } impl Service { @@ -65,14 +68,9 @@ impl Service { Signal::from_str(&self.signal.stop.to_uppercase())?; - // Validate the cron field if present - if let Some(cron_value) = self.cron { - if cron_value == 0 { - bail!("cron value must be greater than zero"); - } - if !self.one_shot { - bail!("cron can only be specified for oneshot services"); - } + // Cron jobs only possible for oneshot services + if self.cron.is_some() && !self.one_shot { + bail!("cron can only be used for oneshot services"); } Ok(()) @@ -128,3 +126,18 @@ pub fn load_dir>(p: T) -> Result { Ok(services) } + +/// Custom deserializer to parse cron expression from string +fn deserialize_cron_option<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + use serde::de::Error; + let s: Option = Option::deserialize(deserializer)?; + if let Some(s) = s { + let schedule = Schedule::from_str(&s).map_err(D::Error::custom)?; + Ok(Some(schedule)) + } else { + Ok(None) + } +} diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index 75db7fa..d63898d 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -4,6 +4,7 @@ use crate::manager::{Log, Logs, Process, ProcessManager}; use crate::zinit::ord::ProcessDAG; use crate::zinit::ord::{service_dependency_order, DUMMY_ROOT}; use anyhow::Result; +use chrono::{DateTime, Utc}; use config::DEFAULT_SHUTDOWN_TIMEOUT; use nix::sys::reboot::RebootMode; use nix::sys::signal; @@ -17,8 +18,8 @@ use thiserror::Error; use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::{Notify, RwLock}; -use tokio::time::timeout; use tokio::time::{self}; +use tokio::time::{sleep_until, timeout, Instant}; use tokio_stream::{wrappers::WatchStream, StreamExt}; pub trait WaitStatusExt { @@ -546,7 +547,13 @@ impl ZInit { let name = name.clone(); let mut service = input.write().await; - if service.target == Target::Down || service.scheduled { + if service.target == Target::Down { + debug!("service '{}' target is down", name); + return; + } + + if service.scheduled { + debug!("service '{}' already scheduled", name); return; } @@ -554,95 +561,168 @@ impl ZInit { drop(service); // Release the lock loop { - { - let service = input.read().await; - if service.target == Target::Down { - // Service target is down; exit the loop - break; - } - let config = service.service.clone(); - drop(service); // Release the lock - - // Wait for dependencies - while !self.can_schedule(&config).await { - let sig = self.notify.notified(); - self.set(&name, Some(State::Blocked), None).await; - sig.await; + let name = name.clone(); + + let service = input.read().await; + // early check if service is down, so we don't have to do extra checks + if service.target == Target::Down { + // we check target in loop in case service have + // been set down. + break; + } + let config = service.service.clone(); + // we need to spawn this service now, but is it ready? + // are all dependent services are running ? + + // so we drop the table to give other services + // chance to acquire the lock and schedule themselves + drop(service); + + 'checks: loop { + let sig = self.notify.notified(); + debug!("checking {} if it can schedule", name); + if self.can_schedule(&config).await { + debug!("service {} can schedule", name); + break 'checks; } - let log = match config.log { - config::Log::None => Log::None, - config::Log::Stdout => Log::Stdout, - config::Log::Ring => Log::Ring(name.clone()), - }; + self.set(&name, Some(State::Blocked), None).await; + // don't even care if i am lagging + // as long i am notified that some services status + // has changed + debug!("service {} is blocked, waiting release", name); + sig.await; + } - let mut service = input.write().await; + // If cron is specified for a oneshot service, schedule the execution + if let Some(ref schedule) = config.cron { + // Get current time + let now: DateTime = Utc::now(); - if service.target == Target::Down { - // Service target is down; exit the loop - break; - } + // Get next scheduled time + if let Some(next_datetime) = schedule.upcoming(Utc).next() { + let duration = next_datetime + .signed_duration_since(now) + .to_std() + .unwrap_or(Duration::from_secs(0)); - let child = match self - .pm - .run( - Process::new(&config.exec, &config.dir, Some(config.env.clone())), - log.clone(), - ) - .await - { - Ok(child) => { - service.state.set(State::Spawned); - service.pid = child.pid; - child - } - Err(err) => { - error!("service {} failed to start: {}", name, err); - service.state.set(State::Failure); - // Decide whether to break or continue based on your logic + // Wait until the next scheduled time + debug!("service {} scheduled to run at {}", name, next_datetime); + sleep_until(Instant::now() + duration).await; + + // Before executing, check if service is still up and not shutting down + if *self.shutdown.read().await || self.is_target_down(&name).await { break; } - }; + } else { + // No upcoming scheduled times; exit the loop + debug!("service '{}' has not more scheduled runs", name); + break; + } + } else if config.one_shot { + // For oneshot services without cron, proceed immediately + debug!( + "service '{}' is a oneshot service without cron; starting immediately", + name + ); + } else { + // For non-oneshot services, proceed as usual + debug!("service '{}' is not a oneshot service: proceeding", name); + } - // Since only oneshot services can have cron, we are in oneshot mode - service.state.set(State::Running); - drop(service); // Release the lock + let log = match config.log { + config::Log::None => Log::None, + config::Log::Stdout => Log::Stdout, + config::Log::Ring => Log::Ring(name.clone()), + }; - // Wait for the child process to finish - let wait_result = child.wait().await; + let mut service = input.write().await; + // we check again in case target has changed. Since we had to release the lock + // earlier to not block locking on this service (for example if a stop was called) + // while the service was waiting for dependencies. + // the lock is kept until the spawning and the update of the pid. + if service.target == Target::Down { + // we check target in loop in case service have + // been set down. + break; + } - let mut service = input.write().await; - service.pid = Pid::from_raw(0); + let child = self + .pm + .run( + Process::new(&config.exec, &config.dir, Some(config.env.clone())), + log.clone(), + ) + .await; + + let child = match child { + Ok(child) => { + service.state.set(State::Spawned); + service.pid = child.pid; + child + } + Err(err) => { + // so, spawning failed. and nothing we can do about it + // this can be duo to a bad command or exe not found. + // set service to failure. + error!("service {} failed to start: {}", name, err); + service.state.set(State::Failure); + break; + } + }; - match wait_result { - Err(err) => { - error!("failed to read service '{}' status: {}", name, err); - service.state.set(State::Unknown); - } - Ok(status) => service.state.set(match status.success() { - true => State::Success, - false => State::Error(status), - }), + service.state.set(State::Running); + drop(service); + + // Wait for the child process to finish + let result = child.wait().await; + + let mut service = input.write().await; + service.pid = Pid::from_raw(0); + + match result { + Err(err) => { + error!("failed to read service '{}' status: {}", name, err); + service.state.set(State::Unknown); } - drop(service); // Release the lock + Ok(status) => { + service.state.set(if status.success() { + State::Success + } else { + State::Error(status) + }); + } + } - // Check if we should schedule the service again based on cron - if let Some(cron_duration) = config.cron { - if *self.shutdown.read().await { - // If shutting down, exit the loop - break; - } - // Wait for the specified cron duration - tokio::time::sleep(std::time::Duration::from_secs(cron_duration)).await; - // Loop will restart and the oneshot service will be executed again + drop(service); + + // For oneshot services with cron, loop to schedule next execution + if config.one_shot { + if config.cron.is_some() { + continue; // Schedule the next execution } else { - // No cron specified, exit the loop after the oneshot execution - break; + self.notify.notify_waiters(); + break; // No cron; exit the loop } + } else { + // For non-oneshot services, handle respawn logic + // Wait before restarting + time::sleep(Duration::from_secs(2)).await; } } let mut service = input.write().await; service.scheduled = false; } + + // Helper function to check if the service target is down + async fn is_target_down(&self, name: &str) -> bool { + let table = self.services.read().await; + if let Some(service) = table.get(name) { + let service = service.read().await; + service.target == Target::Down + } else { + true // Service not found; treat as down + } + } } From c4701907f0f2c4ed0eff58d4c17763dad364a103 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Fri, 4 Oct 2024 13:33:19 +0200 Subject: [PATCH 4/7] Update readme about cronjobs --- docs/readme.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/docs/readme.md b/docs/readme.md index 4e4c5a2..3017ac7 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -22,7 +22,7 @@ When running zinit in a container, supply the `--container` argument to the init exec: "command line to start service" test: "command line to test service is running" # optional oneshot: true or false (false by default) -cron: amount of seconds # creates a cronjob, oneshot MUST be set to true +cron: "sec min hour day_of_month month day_of_week year" after: # list of services that we depend on (optional) - service1_name - service2_name @@ -33,7 +33,7 @@ env: KEY: VALUE ``` -- `oneshot` service is not going to re-spawn when it exits, except when `cron` has a value > 0. +- `oneshot` service is not going to re-spawn when it exit. - `cron` is used to create a cronjob. This only works when `oneshot` is set to `true`. - if a service depends on a `oneshot` services, it will not get started, unless the oneshot service exits with success. - if a service depends on another service (that is not `oneshot`), it will not get started, unless the service is marked as `running` @@ -80,6 +80,15 @@ after: - redis ``` +hello_world_cron.yaml + +```yaml +exec: sh -c "echo 'hello each 5 sec from cronjob'" +oneshot: true +cron: "*/5 * * * * *" +log: stdout +``` + ## Controlling commands ```bash From 085cde36b22d854d6ef4d7b93660b8df7ae4a3d4 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Wed, 9 Oct 2024 20:13:47 +0200 Subject: [PATCH 5/7] Refactored + improved workign of cronjobs - TO TEST! --- Cargo.lock | 206 +++++++++++++++++-- Cargo.toml | 1 + src/app/api.rs | 2 + src/zinit/config.rs | 3 + src/zinit/mod.rs | 476 +++++++++++++++++++++++++++++++------------- 5 files changed, 534 insertions(+), 154 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 858b8d6..39d44f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.2.1" @@ -89,6 +95,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -102,7 +109,7 @@ dependencies = [ "ansi_term", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", @@ -135,12 +142,63 @@ dependencies = [ "once_cell", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.79", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.79", +] + +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "dtoa" version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "fern" version = "0.6.0" @@ -150,6 +208,12 @@ dependencies = [ "log", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures" version = "0.3.30" @@ -263,9 +327,15 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.11.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" [[package]] name = "hermit-abi" @@ -276,6 +346,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -299,14 +375,32 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indexmap" -version = "1.7.0" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", + "serde", +] + +[[package]] +name = "indexmap" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +dependencies = [ + "equivalent", + "hashbrown 0.15.0", + "serde", ] [[package]] @@ -324,6 +418,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + [[package]] name = "js-sys" version = "0.3.70" @@ -438,6 +538,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -509,6 +615,12 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -556,22 +668,22 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.130" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.130" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 1.0.76", + "syn 2.0.79", ] [[package]] @@ -580,11 +692,41 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ - "itoa", + "itoa 0.4.8", "ryu", "serde", ] +[[package]] +name = "serde_with" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.6.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "serde_yaml" version = "0.8.21" @@ -592,7 +734,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8c608a35705a5d3cdc9fbe403147647ff34b921f8e833e49306df898f9b20af" dependencies = [ "dtoa", - "indexmap", + "indexmap 1.9.3", "serde", "yaml-rust", ] @@ -633,6 +775,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "1.0.76" @@ -684,6 +832,37 @@ dependencies = [ "syn 1.0.76", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa 1.0.11", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tokio" version = "1.14.0" @@ -940,6 +1119,7 @@ dependencies = [ "nix", "serde", "serde_json", + "serde_with", "serde_yaml", "shlex", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index a074adb..0b4ae50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ command-group = "1.0.8" futures = "0.3.30" cron = "0.12.1" chrono = "0.4.38" +serde_with = "3.11.0" diff --git a/src/app/api.rs b/src/app/api.rs index 38ec5c6..55164fa 100644 --- a/src/app/api.rs +++ b/src/app/api.rs @@ -206,6 +206,7 @@ impl Api { } async fn status>(name: S, zinit: ZInit) -> Result { + println!("running status function"); let status = zinit.status(&name).await?; let result = Status { @@ -225,6 +226,7 @@ impl Api { after }, }; + println!("finished running status function"); Ok(encoder::to_value(result)?) } diff --git a/src/zinit/config.rs b/src/zinit/config.rs index e12d344..e1e11c8 100644 --- a/src/zinit/config.rs +++ b/src/zinit/config.rs @@ -60,6 +60,8 @@ pub struct Service { impl Service { pub fn validate(&self) -> Result<()> { + println!("Validating service from dir:{}", self.dir); + use nix::sys::signal::Signal; use std::str::FromStr; if self.exec.is_empty() { @@ -99,6 +101,7 @@ pub fn load>(t: T) -> Result<(String, Service)> { /// a file, the callback can decide to either ignore the file, or stop /// the directory walking pub fn load_dir>(p: T) -> Result { + println!("loading services from directory"); let mut services: Services = HashMap::new(); for entry in fs::read_dir(p)? { diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index d63898d..345c4d6 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -43,6 +43,8 @@ pub enum ZInitError { ServiceISDown { name: String }, #[error("zinit is shutting down")] ShuttingDown, + #[error("service {name:?} has a dependency to a cronjob")] + ServiceCronDependency { name: String }, } /// Process is a representation of a scheduled/running /// service @@ -148,6 +150,8 @@ pub enum State { /// Failure means the service has failed to spawn in a way that retyring /// won't help, like command line parsing error or failed to fork Failure, + /// service has been schedulded to run (cronjob) + ScheduledCron, } type Table = HashMap>>; @@ -211,6 +215,18 @@ impl ZInit { bail!(ZInitError::ServiceAlreadyMonitored { name }) } + // Check that service does not have dependecies that are cron jobs + for dep in &service.after { + let table = self.services.read().await; + if let Some(dep_service_lock) = table.get(dep) { + if dep_service_lock.read().await.service.cron.is_some() { + bail!("Serivce {} cannot depend on cron job {}", name, dep); + } + } else { + bail!("Service {} is dependent on unknown service '{}'", name, dep); + } + } + let service = Arc::new(RwLock::new(ZInitService::new(service, State::Unknown))); services.insert(name.clone(), Arc::clone(&service)); let m = self.clone(); @@ -543,178 +559,356 @@ impl ZInit { } } - async fn watch(self, name: String, input: Arc>) { - let name = name.clone(); + async fn handle_scheduling(&self, name: &str, config: &config::Service) -> Result<()> { + println!("entering handle_scheduling function"); + 'checks: loop { + let sig = self.notify.notified(); + debug!("checking {} if it can schedule", name); + if self.can_schedule(config).await { + debug!("service {} can schedule", name); + break 'checks; + } + + self.set(name, Some(State::Blocked), None).await; + // don't even care if i am lagging + // as long i am notified that some services status + // has changed + debug!("service {} is blocked, waiting release", name); + sig.await; + } + println!("EXITING handle_scheduling function"); + Ok(()) + } + + async fn wait_for_cron_schedule(&self, name: &str, config: &config::Service) -> Result<()> { + println!("entering wait_for_cron_schedule function"); + if let Some(ref schedule) = config.cron { + let now: DateTime = Utc::now(); + if let Some(next_datetime) = schedule.upcoming(Utc).next() { + let duration = next_datetime + .signed_duration_since(now) + .to_std() + .unwrap_or(Duration::from_secs(0)); + + self.set(name, Some(State::ScheduledCron), None).await; + debug!("service {} scheduled to run at {}", name, next_datetime); + sleep_until(Instant::now() + duration).await; + + if *self.shutdown.read().await || self.is_target_down(name).await { + return Err(anyhow!( + "Service '{}' is shutting down or target is down", + name + )); + } + } else { + debug!("service '{}' has no more scheduled runs", name); + return Err(anyhow!("No more scheduled runs for service '{}'", name)); + } + } + + println!("EXITING wait_for_cron_schedule function"); + Ok(()) + } + + async fn run_service( + &self, + name: &str, + input: &Arc>, + config: &config::Service, + ) -> Result<()> { + println!("entering run_service function"); + let log = match config.log { + config::Log::None => Log::None, + config::Log::Stdout => Log::Stdout, + config::Log::Ring => Log::Ring(name.to_string()), + }; + + println!("1"); let mut service = input.write().await; + println!("2"); if service.target == Target::Down { - debug!("service '{}' target is down", name); - return; + return Err(anyhow!("Service '{}' target is down", name)); } + println!("3"); - if service.scheduled { - debug!("service '{}' already scheduled", name); - return; - } + let child = self + .pm + .run( + Process::new(&config.exec, &config.dir, Some(config.env.clone())), + log.clone(), + ) + .await?; + println!("4"); - service.scheduled = true; - drop(service); // Release the lock + service.state.set(State::Spawned); + service.pid = child.pid; + println!("run_service funciton: dropping write lock"); + drop(service); + println!("run_service funciton: dropped write lock"); - loop { - let name = name.clone(); + let result = child.wait().await; - let service = input.read().await; - // early check if service is down, so we don't have to do extra checks - if service.target == Target::Down { - // we check target in loop in case service have - // been set down. - break; - } - let config = service.service.clone(); - // we need to spawn this service now, but is it ready? - // are all dependent services are running ? - - // so we drop the table to give other services - // chance to acquire the lock and schedule themselves - drop(service); - - 'checks: loop { - let sig = self.notify.notified(); - debug!("checking {} if it can schedule", name); - if self.can_schedule(&config).await { - debug!("service {} can schedule", name); - break 'checks; - } + println!("run_service funciton: aqcuiring write lock"); + let mut service = input.write().await; + println!("run_service funciton: got write lock"); + service.pid = Pid::from_raw(0); - self.set(&name, Some(State::Blocked), None).await; - // don't even care if i am lagging - // as long i am notified that some services status - // has changed - debug!("service {} is blocked, waiting release", name); - sig.await; + match result { + Err(err) => { + error!("Failed to read service '{}' status: {}", name, err); + service.state.set(State::Unknown); } - - // If cron is specified for a oneshot service, schedule the execution - if let Some(ref schedule) = config.cron { - // Get current time - let now: DateTime = Utc::now(); - - // Get next scheduled time - if let Some(next_datetime) = schedule.upcoming(Utc).next() { - let duration = next_datetime - .signed_duration_since(now) - .to_std() - .unwrap_or(Duration::from_secs(0)); - - // Wait until the next scheduled time - debug!("service {} scheduled to run at {}", name, next_datetime); - sleep_until(Instant::now() + duration).await; - - // Before executing, check if service is still up and not shutting down - if *self.shutdown.read().await || self.is_target_down(&name).await { - break; - } + Ok(status) => { + service.state.set(if status.success() { + State::Success } else { - // No upcoming scheduled times; exit the loop - debug!("service '{}' has not more scheduled runs", name); - break; - } - } else if config.one_shot { - // For oneshot services without cron, proceed immediately - debug!( - "service '{}' is a oneshot service without cron; starting immediately", - name - ); - } else { - // For non-oneshot services, proceed as usual - debug!("service '{}' is not a oneshot service: proceeding", name); + State::Error(status) + }); } + } - let log = match config.log { - config::Log::None => Log::None, - config::Log::Stdout => Log::Stdout, - config::Log::Ring => Log::Ring(name.clone()), - }; + println!("EXITING run_service function"); + Ok(()) + } - let mut service = input.write().await; - // we check again in case target has changed. Since we had to release the lock - // earlier to not block locking on this service (for example if a stop was called) - // while the service was waiting for dependencies. - // the lock is kept until the spawning and the update of the pid. - if service.target == Target::Down { - // we check target in loop in case service have - // been set down. - break; - } + async fn watch(self, name: String, input: Arc>) { + println!("entering watch function"); + let mut service = input.write().await; - let child = self - .pm - .run( - Process::new(&config.exec, &config.dir, Some(config.env.clone())), - log.clone(), - ) - .await; - - let child = match child { - Ok(child) => { - service.state.set(State::Spawned); - service.pid = child.pid; - child - } - Err(err) => { - // so, spawning failed. and nothing we can do about it - // this can be duo to a bad command or exe not found. - // set service to failure. - error!("service {} failed to start: {}", name, err); - service.state.set(State::Failure); - break; - } - }; + if service.target == Target::Down || service.scheduled { + return; + } - service.state.set(State::Running); - drop(service); + service.scheduled = true; + drop(service); // Release the lock - // Wait for the child process to finish - let result = child.wait().await; + loop { + println!("entering watch function loop"); + if let Err(err) = self + .handle_scheduling(&name, &input.read().await.service) + .await + { + error!("Scheduling error for '{}': {}", name, err); + break; + } - let mut service = input.write().await; - service.pid = Pid::from_raw(0); + if let Err(err) = self + .wait_for_cron_schedule(&name, &input.read().await.service) + .await + { + debug!("Service '{}' scheduling completed: {}", name, err); + break; + } - match result { - Err(err) => { - error!("failed to read service '{}' status: {}", name, err); - service.state.set(State::Unknown); - } - Ok(status) => { - service.state.set(if status.success() { - State::Success - } else { - State::Error(status) - }); - } + let service_clone = { + let service_read_guard = input.read().await; + service_read_guard.service.clone() + }; + if let Err(err) = self.run_service(&name, &input, &service_clone).await { + error!("Error running service '{}': {}", name, err); + break; } - drop(service); + let config = input.read().await.service.clone(); - // For oneshot services with cron, loop to schedule next execution - if config.one_shot { - if config.cron.is_some() { - continue; // Schedule the next execution - } else { - self.notify.notify_waiters(); - break; // No cron; exit the loop - } + if config.one_shot && config.cron.is_some() { + continue; // Schedule the next execution + } else if config.one_shot { + self.notify.notify_waiters(); + break; // No cron; exit the loop } else { - // For non-oneshot services, handle respawn logic - // Wait before restarting time::sleep(Duration::from_secs(2)).await; } } let mut service = input.write().await; service.scheduled = false; + println!("EXITING watch function"); } + // async fn watch(self, name: String, input: Arc>) { + // println!("entering watch function for {}", name.clone()); + // let name = name.clone(); + // let mut service = input.write().await; + // + // if service.target == Target::Down { + // debug!("service '{}' target is down", name); + // return; + // } + // + // if service.scheduled { + // debug!("service '{}' already scheduled", name); + // return; + // } + // + // service.scheduled = true; + // drop(service); // Release the lock + // + // loop { + // let name = name.clone(); + // + // let service = input.read().await; + // // early check if service is down, so we don't have to do extra checks + // if service.target == Target::Down { + // // we check target in loop in case service have + // // been set down. + // break; + // } + // let config = service.service.clone(); + // // we need to spawn this service now, but is it ready? + // // are all dependent services are running ? + // + // // so we drop the table to give other services + // // chance to acquire the lock and schedule themselves + // drop(service); + // + // // If cron is specified for a oneshot service, schedule the execution + // if let Some(ref schedule) = config.cron { + // println!("Got a CRON service: {:#?}", schedule); + // // Get current time (for now only UTC time supported) + // // TODO: dynamically adjust to machine's timezone + // let now: DateTime = Utc::now(); + // + // // Get next scheduled time + // if let Some(next_datetime) = schedule.upcoming(Utc).next() { + // let duration = next_datetime + // .signed_duration_since(now) + // .to_std() + // .unwrap_or(Duration::from_secs(0)); + // + // debug!("service {} scheduled to run at {}", name, next_datetime); + // println!("service {} scheduled to run at {}", name, next_datetime); + // // Set state of service to `Scheduled` + // self.set(&name, Some(State::ScheduledCron), None).await; + // // Wait until the next scheduled time + // sleep_until(Instant::now() + duration).await; + // + // // Before executing, check if service is still up and not shutting down + // if *self.shutdown.read().await || self.is_target_down(&name).await { + // break; + // } + // } else { + // // No upcoming scheduled times; exit the loop + // debug!("service '{}' has not more scheduled runs", name); + // println!("service '{}' has not more scheduled runs", name); + // // break; + // } + // } else if config.one_shot { + // // For oneshot services without cron, proceed immediately + // debug!( + // "service '{}' is a oneshot service without cron; starting immediately", + // name + // ); + // } else { + // // For non-oneshot services, proceed as usual + // debug!("service '{}' is not a oneshot service: proceeding", name); + // } + // + // 'checks: loop { + // let sig = self.notify.notified(); + // debug!("checking {} if it can schedule", name); + // println!("checking {} if it can schedule", name); + // if self.can_schedule(&config).await { + // debug!("service {} can schedule", name); + // println!("service {} can schedule", name); + // break 'checks; + // } + // + // self.set(&name, Some(State::Blocked), None).await; + // // don't even care if i am lagging + // // as long i am notified that some services status + // // has changed + // debug!("service {} is blocked, waiting release", name); + // println!("service {} is blocked, waiting release", name); + // sig.await; + // } + // + // let log = match config.log { + // config::Log::None => Log::None, + // config::Log::Stdout => Log::Stdout, + // config::Log::Ring => Log::Ring(name.clone()), + // }; + // + // let mut service = input.write().await; + // // we check again in case target has changed. Since we had to release the lock + // // earlier to not block locking on this service (for example if a stop was called) + // // while the service was waiting for dependencies. + // // the lock is kept until the spawning and the update of the pid. + // if service.target == Target::Down { + // // we check target in loop in case service have + // // been set down. + // break; + // } + // + // let child = self + // .pm + // .run( + // Process::new(&config.exec, &config.dir, Some(config.env.clone())), + // log.clone(), + // ) + // .await; + // + // let child = match child { + // Ok(child) => { + // service.state.set(State::Spawned); + // service.pid = child.pid; + // child + // } + // Err(err) => { + // // so, spawning failed. and nothing we can do about it + // // this can be duo to a bad command or exe not found. + // // set service to failure. + // error!("service {} failed to start: {}", name, err); + // service.state.set(State::Failure); + // break; + // } + // }; + // + // service.state.set(State::Running); + // drop(service); + // + // // Wait for the child process to finish + // let result = child.wait().await; + // + // let mut service = input.write().await; + // service.pid = Pid::from_raw(0); + // + // match result { + // Err(err) => { + // error!("failed to read service '{}' status: {}", name, err); + // service.state.set(State::Unknown); + // } + // Ok(status) => { + // service.state.set(if status.success() { + // State::Success + // } else { + // State::Error(status) + // }); + // } + // } + // + // drop(service); + // + // // For oneshot services with cron, loop to schedule next execution + // if config.one_shot { + // if config.cron.is_some() { + // continue; // Schedule the next execution + // } else { + // self.notify.notify_waiters(); + // break; // No cron; exit the loop + // } + // } else { + // // For non-oneshot services, handle respawn logic + // // Wait before restarting + // time::sleep(Duration::from_secs(2)).await; + // } + // } + // + // let mut service = input.write().await; + // service.scheduled = false; + // } + // Helper function to check if the service target is down async fn is_target_down(&self, name: &str) -> bool { let table = self.services.read().await; From e61fc553c5a45c6f1d38bd0932b66bbadd7fbb13 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Thu, 10 Oct 2024 13:02:30 +0200 Subject: [PATCH 6/7] Fixed deadlocks with cronjobs --- src/app/api.rs | 2 - src/zinit/config.rs | 3 - src/zinit/mod.rs | 227 +++----------------------------------------- 3 files changed, 14 insertions(+), 218 deletions(-) diff --git a/src/app/api.rs b/src/app/api.rs index 55164fa..38ec5c6 100644 --- a/src/app/api.rs +++ b/src/app/api.rs @@ -206,7 +206,6 @@ impl Api { } async fn status>(name: S, zinit: ZInit) -> Result { - println!("running status function"); let status = zinit.status(&name).await?; let result = Status { @@ -226,7 +225,6 @@ impl Api { after }, }; - println!("finished running status function"); Ok(encoder::to_value(result)?) } diff --git a/src/zinit/config.rs b/src/zinit/config.rs index e1e11c8..e12d344 100644 --- a/src/zinit/config.rs +++ b/src/zinit/config.rs @@ -60,8 +60,6 @@ pub struct Service { impl Service { pub fn validate(&self) -> Result<()> { - println!("Validating service from dir:{}", self.dir); - use nix::sys::signal::Signal; use std::str::FromStr; if self.exec.is_empty() { @@ -101,7 +99,6 @@ pub fn load>(t: T) -> Result<(String, Service)> { /// a file, the callback can decide to either ignore the file, or stop /// the directory walking pub fn load_dir>(p: T) -> Result { - println!("loading services from directory"); let mut services: Services = HashMap::new(); for entry in fs::read_dir(p)? { diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index 345c4d6..261d8bd 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -209,11 +209,12 @@ impl ZInit { bail!(ZInitError::ShuttingDown); } let name = name.into(); - let mut services = self.services.write().await; + let services = self.services.read().await; if services.contains_key(&name) { bail!(ZInitError::ServiceAlreadyMonitored { name }) } + drop(services); // Check that service does not have dependecies that are cron jobs for dep in &service.after { @@ -228,7 +229,11 @@ impl ZInit { } let service = Arc::new(RwLock::new(ZInitService::new(service, State::Unknown))); + + let mut services = self.services.write().await; services.insert(name.clone(), Arc::clone(&service)); + drop(services); + let m = self.clone(); debug!("service '{}' monitored", name); tokio::spawn(m.watch(name, service)); @@ -560,7 +565,6 @@ impl ZInit { } async fn handle_scheduling(&self, name: &str, config: &config::Service) -> Result<()> { - println!("entering handle_scheduling function"); 'checks: loop { let sig = self.notify.notified(); debug!("checking {} if it can schedule", name); @@ -576,12 +580,10 @@ impl ZInit { debug!("service {} is blocked, waiting release", name); sig.await; } - println!("EXITING handle_scheduling function"); Ok(()) } async fn wait_for_cron_schedule(&self, name: &str, config: &config::Service) -> Result<()> { - println!("entering wait_for_cron_schedule function"); if let Some(ref schedule) = config.cron { let now: DateTime = Utc::now(); if let Some(next_datetime) = schedule.upcoming(Utc).next() { @@ -606,7 +608,6 @@ impl ZInit { } } - println!("EXITING wait_for_cron_schedule function"); Ok(()) } @@ -616,21 +617,17 @@ impl ZInit { input: &Arc>, config: &config::Service, ) -> Result<()> { - println!("entering run_service function"); let log = match config.log { config::Log::None => Log::None, config::Log::Stdout => Log::Stdout, config::Log::Ring => Log::Ring(name.to_string()), }; - println!("1"); let mut service = input.write().await; - println!("2"); if service.target == Target::Down { return Err(anyhow!("Service '{}' target is down", name)); } - println!("3"); let child = self .pm @@ -639,19 +636,14 @@ impl ZInit { log.clone(), ) .await?; - println!("4"); service.state.set(State::Spawned); service.pid = child.pid; - println!("run_service funciton: dropping write lock"); drop(service); - println!("run_service funciton: dropped write lock"); let result = child.wait().await; - println!("run_service funciton: aqcuiring write lock"); let mut service = input.write().await; - println!("run_service funciton: got write lock"); service.pid = Pid::from_raw(0); match result { @@ -668,12 +660,10 @@ impl ZInit { } } - println!("EXITING run_service function"); Ok(()) } async fn watch(self, name: String, input: Arc>) { - println!("entering watch function"); let mut service = input.write().await; if service.target == Target::Down || service.scheduled { @@ -684,27 +674,21 @@ impl ZInit { drop(service); // Release the lock loop { - println!("entering watch function loop"); - if let Err(err) = self - .handle_scheduling(&name, &input.read().await.service) - .await - { + let service_clone = { + let service_read_guard = input.read().await; + service_read_guard.service.clone() + }; + + if let Err(err) = self.handle_scheduling(&name, &service_clone).await { error!("Scheduling error for '{}': {}", name, err); break; } - if let Err(err) = self - .wait_for_cron_schedule(&name, &input.read().await.service) - .await - { + if let Err(err) = self.wait_for_cron_schedule(&name, &service_clone).await { debug!("Service '{}' scheduling completed: {}", name, err); break; } - let service_clone = { - let service_read_guard = input.read().await; - service_read_guard.service.clone() - }; if let Err(err) = self.run_service(&name, &input, &service_clone).await { error!("Error running service '{}': {}", name, err); break; @@ -724,190 +708,7 @@ impl ZInit { let mut service = input.write().await; service.scheduled = false; - println!("EXITING watch function"); - } - - // async fn watch(self, name: String, input: Arc>) { - // println!("entering watch function for {}", name.clone()); - // let name = name.clone(); - // let mut service = input.write().await; - // - // if service.target == Target::Down { - // debug!("service '{}' target is down", name); - // return; - // } - // - // if service.scheduled { - // debug!("service '{}' already scheduled", name); - // return; - // } - // - // service.scheduled = true; - // drop(service); // Release the lock - // - // loop { - // let name = name.clone(); - // - // let service = input.read().await; - // // early check if service is down, so we don't have to do extra checks - // if service.target == Target::Down { - // // we check target in loop in case service have - // // been set down. - // break; - // } - // let config = service.service.clone(); - // // we need to spawn this service now, but is it ready? - // // are all dependent services are running ? - // - // // so we drop the table to give other services - // // chance to acquire the lock and schedule themselves - // drop(service); - // - // // If cron is specified for a oneshot service, schedule the execution - // if let Some(ref schedule) = config.cron { - // println!("Got a CRON service: {:#?}", schedule); - // // Get current time (for now only UTC time supported) - // // TODO: dynamically adjust to machine's timezone - // let now: DateTime = Utc::now(); - // - // // Get next scheduled time - // if let Some(next_datetime) = schedule.upcoming(Utc).next() { - // let duration = next_datetime - // .signed_duration_since(now) - // .to_std() - // .unwrap_or(Duration::from_secs(0)); - // - // debug!("service {} scheduled to run at {}", name, next_datetime); - // println!("service {} scheduled to run at {}", name, next_datetime); - // // Set state of service to `Scheduled` - // self.set(&name, Some(State::ScheduledCron), None).await; - // // Wait until the next scheduled time - // sleep_until(Instant::now() + duration).await; - // - // // Before executing, check if service is still up and not shutting down - // if *self.shutdown.read().await || self.is_target_down(&name).await { - // break; - // } - // } else { - // // No upcoming scheduled times; exit the loop - // debug!("service '{}' has not more scheduled runs", name); - // println!("service '{}' has not more scheduled runs", name); - // // break; - // } - // } else if config.one_shot { - // // For oneshot services without cron, proceed immediately - // debug!( - // "service '{}' is a oneshot service without cron; starting immediately", - // name - // ); - // } else { - // // For non-oneshot services, proceed as usual - // debug!("service '{}' is not a oneshot service: proceeding", name); - // } - // - // 'checks: loop { - // let sig = self.notify.notified(); - // debug!("checking {} if it can schedule", name); - // println!("checking {} if it can schedule", name); - // if self.can_schedule(&config).await { - // debug!("service {} can schedule", name); - // println!("service {} can schedule", name); - // break 'checks; - // } - // - // self.set(&name, Some(State::Blocked), None).await; - // // don't even care if i am lagging - // // as long i am notified that some services status - // // has changed - // debug!("service {} is blocked, waiting release", name); - // println!("service {} is blocked, waiting release", name); - // sig.await; - // } - // - // let log = match config.log { - // config::Log::None => Log::None, - // config::Log::Stdout => Log::Stdout, - // config::Log::Ring => Log::Ring(name.clone()), - // }; - // - // let mut service = input.write().await; - // // we check again in case target has changed. Since we had to release the lock - // // earlier to not block locking on this service (for example if a stop was called) - // // while the service was waiting for dependencies. - // // the lock is kept until the spawning and the update of the pid. - // if service.target == Target::Down { - // // we check target in loop in case service have - // // been set down. - // break; - // } - // - // let child = self - // .pm - // .run( - // Process::new(&config.exec, &config.dir, Some(config.env.clone())), - // log.clone(), - // ) - // .await; - // - // let child = match child { - // Ok(child) => { - // service.state.set(State::Spawned); - // service.pid = child.pid; - // child - // } - // Err(err) => { - // // so, spawning failed. and nothing we can do about it - // // this can be duo to a bad command or exe not found. - // // set service to failure. - // error!("service {} failed to start: {}", name, err); - // service.state.set(State::Failure); - // break; - // } - // }; - // - // service.state.set(State::Running); - // drop(service); - // - // // Wait for the child process to finish - // let result = child.wait().await; - // - // let mut service = input.write().await; - // service.pid = Pid::from_raw(0); - // - // match result { - // Err(err) => { - // error!("failed to read service '{}' status: {}", name, err); - // service.state.set(State::Unknown); - // } - // Ok(status) => { - // service.state.set(if status.success() { - // State::Success - // } else { - // State::Error(status) - // }); - // } - // } - // - // drop(service); - // - // // For oneshot services with cron, loop to schedule next execution - // if config.one_shot { - // if config.cron.is_some() { - // continue; // Schedule the next execution - // } else { - // self.notify.notify_waiters(); - // break; // No cron; exit the loop - // } - // } else { - // // For non-oneshot services, handle respawn logic - // // Wait before restarting - // time::sleep(Duration::from_secs(2)).await; - // } - // } - // - // let mut service = input.write().await; - // service.scheduled = false; - // } + } // Helper function to check if the service target is down async fn is_target_down(&self, name: &str) -> bool { From 9d9f921ced8fe664cc7913515381bd9c95fc5d2e Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Thu, 10 Oct 2024 13:24:40 +0200 Subject: [PATCH 7/7] Updated readme --- docs/readme.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/readme.md b/docs/readme.md index 3017ac7..e6f8b55 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -33,9 +33,10 @@ env: KEY: VALUE ``` -- `oneshot` service is not going to re-spawn when it exit. +- `oneshot` service is not going to re-spawn when it exits. - `cron` is used to create a cronjob. This only works when `oneshot` is set to `true`. - if a service depends on a `oneshot` services, it will not get started, unless the oneshot service exits with success. +- a service cannot dependent on a service the is `cron`. - if a service depends on another service (that is not `oneshot`), it will not get started, unless the service is marked as `running` - a service with no test command is marked running if it successfully executed, regardless if it exits immediately after or not, hence a test command is useful. - If a test command is provided, the service will not consider running, unless the test command pass