From 94f914c1e79d76a84981ecdea73d39c0059b0f02 Mon Sep 17 00:00:00 2001 From: AbdelrahmanElawady Date: Wed, 8 May 2024 17:55:27 +0300 Subject: [PATCH] Add support for group operation --- Cargo.lock | 47 ++++++++-- Cargo.toml | 1 + src/app/api.rs | 134 +++++++++++++++++++++------ src/app/mod.rs | 25 ++++- src/zinit/config.rs | 41 ++++++++- src/zinit/mod.rs | 216 ++++++++++++++++++++++++++++++-------------- src/zinit/ord.rs | 5 +- 7 files changed, 354 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d08bd2..71db726 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "atty" version = "0.2.14" @@ -129,7 +140,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -316,18 +327,18 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.29" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d" +checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] name = "quote" -version = "1.0.9" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -370,7 +381,7 @@ checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -434,6 +445,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "syn" +version = "2.0.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -460,7 +482,7 @@ checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -491,7 +513,7 @@ checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.76", ] [[package]] @@ -520,6 +542,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + [[package]] name = "unicode-width" version = "0.1.9" @@ -574,6 +602,7 @@ name = "zinit" version = "0.2.0" dependencies = [ "anyhow", + "async-recursion", "clap", "command-group", "fern", diff --git a/Cargo.toml b/Cargo.toml index 822ddb5..6f57de6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,4 @@ thiserror = "1.0" clap = "2.33" git-version = "0.3.5" command-group = "1.0.8" +async-recursion = "1.1.1" diff --git a/src/app/api.rs b/src/app/api.rs index 38ec5c6..e36c25e 100644 --- a/src/app/api.rs +++ b/src/app/api.rs @@ -1,12 +1,15 @@ -use crate::zinit::{config, ZInit}; +use crate::zinit::{self, config, ZInit, ZInitStatus}; use anyhow::{Context, Result}; use nix::sys::signal; use serde::{Deserialize, Serialize}; use serde_json::{self as encoder, Value}; use std::collections::HashMap; +use std::env::current_dir; +use std::io::{self, ErrorKind}; use std::marker::Unpin; use std::path::{Path, PathBuf}; use std::str::FromStr; +use tokio::fs; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::net::{UnixListener, UnixStream}; @@ -24,9 +27,16 @@ enum State { Error, } +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "lowercase", untagged)] +pub enum Status { + Service(ServiceStatus), + Group(GroupStatus), +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] -pub struct Status { +pub struct ServiceStatus { pub name: String, pub pid: u32, pub state: String, @@ -34,6 +44,13 @@ pub struct Status { pub after: HashMap, } +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub struct GroupStatus { + pub name: String, + pub services: Vec, +} + pub struct Api { zinit: ZInit, socket: PathBuf, @@ -149,17 +166,55 @@ impl Api { let services = zinit.list().await?; let mut map: HashMap = HashMap::new(); for service in services { - let state = zinit.status(&service).await?; - map.insert(service, format!("{:?}", state.state)); + if let ZInitStatus::Service(state) = zinit.status(&service).await? { + map.insert(service, format!("{:?}", state.state)); + } } Ok(encoder::to_value(map)?) } async fn monitor>(name: S, zinit: ZInit) -> Result { - let (name, service) = config::load(format!("{}.yaml", name.as_ref())) - .context("failed to load service config")?; - zinit.monitor(name, service).await?; + match config::load(format!("{}.yaml", name.as_ref())) { + Ok((name, service)) => zinit.monitor(name, config::Entry::Service(service)).await?, + Err(e) => { + if let Some(err) = e.downcast_ref::() { + if err.kind() != ErrorKind::NotFound { + return Err(e.context("failed to load service config")); + } + } else { + return Err(e.context("failed to load service config")); + } + } + } + let canonical_path = fs::canonicalize(name.as_ref()).await?; + let path = if !canonical_path.starts_with(current_dir()?) { + bail!("directory outside of zinit configuration directory") + } else { + canonical_path.strip_prefix(current_dir()?)? + }; + let prefix = path.to_str().ok_or(anyhow!("invalid path name"))?; + match config::load_dir_with_prefix(path, prefix.to_string()) { + Ok(services) => { + for (k, v) in services { + if let Err(err) = zinit.monitor(&k, v).await { + error!("failed to monitor service {}: {}", k, err); + }; + } + } + Err(e) => { + if let Some(err) = e.downcast_ref::() { + if err.kind() == ErrorKind::NotFound { + bail!( + "neither {}.yaml nor {} directory was found", + name.as_ref(), + name.as_ref() + ) + } + } + return Err(e.context("failed to load service config")); + } + } Ok(Value::Null) } @@ -206,30 +261,50 @@ impl Api { } async fn status>(name: S, zinit: ZInit) -> Result { - let status = zinit.status(&name).await?; - - let result = Status { - name: name.as_ref().into(), - pid: status.pid.as_raw() as u32, - state: format!("{:?}", status.state), - target: format!("{:?}", status.target), - after: { - let mut after = HashMap::new(); - for service in status.service.after { - let status = match zinit.status(&service).await { - Ok(dep) => dep.state, - Err(_) => crate::zinit::State::Unknown, - }; - after.insert(service, format!("{:?}", status)); - } - after - }, + let result = match zinit.status(&name).await? { + ZInitStatus::Service(status) => { + Status::Service(zinit_status_to_service_status(name, zinit, status).await) + } + ZInitStatus::Group(group) => Status::Group(GroupStatus { + name: name.as_ref().into(), + services: { + let mut services = vec![]; + for (name, status) in group.services { + services + .push(zinit_status_to_service_status(name, zinit.clone(), status).await) + } + services + }, + }), }; - Ok(encoder::to_value(result)?) } } +async fn zinit_status_to_service_status>( + name: S, + zinit: ZInit, + status: zinit::ServiceStatus, +) -> ServiceStatus { + ServiceStatus { + name: name.as_ref().into(), + pid: status.pid.as_raw() as u32, + state: format!("{:?}", status.state), + target: format!("{:?}", status.target), + after: { + let mut after = HashMap::new(); + for service in status.service.after { + if let Ok(ZInitStatus::Service(status)) = zinit.status(&service).await { + after.insert(service, format!("{:?}", status.state)); + } else { + after.insert(service, format!("{:?}", crate::zinit::State::Unknown)); + } + } + after + }, + } +} + pub struct Client { socket: PathBuf, } @@ -291,7 +366,8 @@ impl Client { match filter { None => tokio::io::copy(&mut con, &mut out).await?, Some(filter) => { - let filter = format!("{}:", filter.as_ref()); + let service_filter = format!("{}:", filter.as_ref()); + let group_filter = format!("{}/", filter.as_ref()); let mut stream = BufStream::new(con); loop { let mut line = String::new(); @@ -303,7 +379,9 @@ impl Client { } } - if line[4..].starts_with(&filter) { + if line[4..].starts_with(&service_filter) + || line[4..].starts_with(&group_filter) + { let _ = out.write_all(line.as_bytes()).await; } } diff --git a/src/app/mod.rs b/src/app/mod.rs index 84067ae..56ae03a 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf}; use tokio::fs; use tokio::time; +use self::api::Status; + fn logger(level: log::LevelFilter) -> Result<()> { let logger = fern::Dispatch::new() .format(|out, message, record| { @@ -135,10 +137,25 @@ pub async fn restart(socket: &str, name: &str) -> Result<()> { client.stop(name).await?; //pull status for _ in 0..20 { - let result = client.status(name).await?; - if result.pid == 0 && result.target == "Down" { - client.start(name).await?; - return Ok(()); + match client.status(name).await? { + Status::Service(result) => { + if result.pid == 0 && result.target == "Down" { + client.start(name).await?; + return Ok(()); + } + } + Status::Group(result) => { + let mut start = true; + for service in result.services { + if service.pid != 0 || service.target != "Down" { + start = false; + } + } + if start { + client.start(name).await?; + return Ok(()); + } + } } time::sleep(std::time::Duration::from_secs(1)).await; } diff --git a/src/zinit/config.rs b/src/zinit/config.rs index f706c47..af3c6bc 100644 --- a/src/zinit/config.rs +++ b/src/zinit/config.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::ffi::OsStr; use std::fs::{self, File}; use std::path::Path; -pub type Services = HashMap; +pub type Services = HashMap; pub const DEFAULT_SHUTDOWN_TIMEOUT: u64 = 10; // in seconds @@ -32,6 +32,11 @@ pub enum Log { Stdout, } +pub enum Entry { + Service(Service), + Directory(Services), +} + fn default_shutdown_timeout_fn() -> u64 { DEFAULT_SHUTDOWN_TIMEOUT } @@ -61,6 +66,12 @@ impl Service { if self.exec.is_empty() { bail!("missing exec directive"); } + for service in self.after.iter() { + if service.contains('/') { + // to make sure dependencies doesn't extend to another group + bail!("services can't have /: {}", service) + } + } Signal::from_str(&self.signal.stop.to_uppercase())?; @@ -90,10 +101,23 @@ pub fn load>(t: T) -> Result<(String, Service)> { /// a file, the callback can decide to either ignore the file, or stop /// the directory walking pub fn load_dir>(p: T) -> Result { + load_dir_with_prefix(p, "".to_string()) +} + +pub fn load_dir_with_prefix>(p: T, prefix: String) -> Result { let mut services: Services = HashMap::new(); for entry in fs::read_dir(p)? { let entry = entry?; + if entry.file_type()?.is_dir() { + let mut name = entry.file_name().into_string().expect("not a valid name"); + if !prefix.is_empty() { + name = format!("{}/{}", prefix, name); + } + let entries = load_dir_with_prefix(entry.path(), name.clone())?; + services.insert(name, Entry::Directory(entries)); + continue; + } if !entry.file_type()?.is_file() { continue; } @@ -104,7 +128,7 @@ pub fn load_dir>(p: T) -> Result { continue; } - let (name, service) = match load(&fp) { + let (name, mut service) = match load(&fp) { Ok(content) => content, Err(err) => { error!("failed to load config file {:?}: {}", fp, err); @@ -112,8 +136,19 @@ pub fn load_dir>(p: T) -> Result { } }; - services.insert(name, service); + if prefix.is_empty() { + services.insert(name, Entry::Service(service)); + } else { + update_dependencies(prefix.clone(), &mut service); + services.insert(format!("{}/{}", prefix, name), Entry::Service(service)); + } } Ok(services) } + +fn update_dependencies(prefix: String, service: &mut Service) { + for i in service.after.iter_mut() { + *i = format!("{}/{}", prefix, i); + } +} diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index 421525a..a71a6cf 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -4,6 +4,7 @@ use crate::manager::{Log, Logs, Process, ProcessManager}; use crate::zinit::ord::ProcessDAG; use crate::zinit::ord::{service_dependency_order, DUMMY_ROOT}; use anyhow::Result; +use async_recursion::async_recursion; use config::DEFAULT_SHUTDOWN_TIMEOUT; use nix::sys::reboot::RebootMode; use nix::sys::signal; @@ -87,7 +88,12 @@ where } } -pub struct ZInitStatus { +pub enum ZInitStatus { + Service(ServiceStatus), + Group(GroupStatus), +} + +pub struct ServiceStatus { pub pid: Pid, // config is the service configuration pub service: config::Service, @@ -97,6 +103,12 @@ pub struct ZInitStatus { pub state: State, } +pub struct GroupStatus { + pub name: String, + pub services: Vec<(String, ServiceStatus)>, + // maybe add target and state based on services of the group? +} + impl ZInitService { fn new(service: config::Service, state: State) -> ZInitService { ZInitService { @@ -108,8 +120,8 @@ impl ZInitService { } } - pub fn status(&self) -> ZInitStatus { - ZInitStatus { + pub fn status(&self) -> ServiceStatus { + ServiceStatus { pid: self.pid, state: self.state.get().clone(), service: self.service.clone(), @@ -198,17 +210,37 @@ impl ZInit { self.pm.stream(follow).await } - pub async fn monitor>(&self, name: S, service: config::Service) -> Result<()> { + pub async fn monitor>(&self, name: S, mut entry: config::Entry) -> Result<()> { if *self.shutdown.read().await { bail!(ZInitError::ShuttingDown); } let name = name.into(); - let mut services = self.services.write().await; + let services = self.services.read().await; if services.contains_key(&name) { bail!(ZInitError::ServiceAlreadyMonitored { name }) } + if let config::Entry::Directory(ref mut svcs) = entry { + svcs.retain(|k, _| !services.contains_key(k)); + // if nothing new to monitor in a group return an error + if svcs.is_empty() { + bail!(ZInitError::ServiceAlreadyMonitored { name }) + } + } + drop(services); + match entry { + config::Entry::Service(service) => self.monitor_service(name, service).await, + config::Entry::Directory(services) => self.monitor_dir(services).await, + } + } + async fn monitor_service>( + &self, + name: S, + service: config::Service, + ) -> Result<()> { + let name = name.into(); + let mut services = self.services.write().await; let service = Arc::new(RwLock::new(ZInitService::new(service, State::Unknown))); services.insert(name.clone(), Arc::clone(&service)); let m = self.clone(); @@ -217,19 +249,45 @@ impl ZInit { Ok(()) } + #[async_recursion] + async fn monitor_dir(&self, services: config::Services) -> Result<()> { + for (name, service) in services { + match service { + config::Entry::Service(service) => { + self.monitor_service(name, service).await?; + } + config::Entry::Directory(services) => { + self.monitor_dir(services).await?; + } + } + } + Ok(()) + } + pub async fn status>(&self, name: S) -> Result { let table = self.services.read().await; let service = table.get(name.as_ref()); + if let Some(service) = service { + return Ok(ZInitStatus::Service(service.read().await.status())); + } + drop(table); - let service = match service { - Some(service) => service, - None => bail!(ZInitError::UnknownService { + let services = services_with_prefix(name.as_ref(), self.services.clone()).await; + if services.is_empty() { + bail!(ZInitError::UnknownService { name: name.as_ref().into() - }), + }) + } + let mut status = GroupStatus { + name: name.as_ref().to_string(), + services: vec![], }; - let service = service.read().await.status(); - Ok(service) + for (name, service) in services { + status.services.push((name, service.read().await.status())); + } + + Ok(ZInitStatus::Group(status)) } async fn kill_wait( @@ -349,90 +407,95 @@ impl ZInit { } pub async fn stop>(&self, name: S) -> Result<()> { - let table = self.services.read().await; - let service = table.get(name.as_ref()); - - let service = match service { - Some(service) => service, - None => bail!(ZInitError::UnknownService { + let services = services_with_prefix(name.as_ref(), self.services.clone()).await; + if services.is_empty() { + bail!(ZInitError::UnknownService { name: name.as_ref().into() - }), - }; - let mut service = service.write().await; - service.target = Target::Down; - let signal = match signal::Signal::from_str(&service.service.signal.stop.to_uppercase()) { - Ok(signal) => signal, - Err(err) => bail!( - "unknown stop signal configured '{}': {}", - service.service.signal.stop, - err - ), - }; - - if service.pid.as_raw() == 0 { - return Ok(()); + }) + } + for (_, service) in services { + let mut service = service.write().await; + service.target = Target::Down; + let signal = match signal::Signal::from_str(&service.service.signal.stop.to_uppercase()) + { + Ok(signal) => signal, + Err(err) => bail!( + "unknown stop signal configured '{}': {}", + service.service.signal.stop, + err + ), + }; + if service.pid.as_raw() == 0 { + continue; + } + self.pm.signal(service.pid, signal)?; } - self.pm.signal(service.pid, signal) + Ok(()) } pub async fn start>(&self, name: S) -> Result<()> { if *self.shutdown.read().await { bail!(ZInitError::ShuttingDown); } - self.set(name.as_ref(), None, Some(Target::Up)).await; - let table = self.services.read().await; - - let service = match table.get(name.as_ref()) { - Some(service) => service, - None => bail!(ZInitError::UnknownService { + let services = services_with_prefix(name.as_ref(), self.services.clone()).await; + if services.is_empty() { + bail!(ZInitError::UnknownService { name: name.as_ref().into() - }), - }; + }) + } + for (name, service) in services { + self.set(name.as_ref(), None, Some(Target::Up)).await; - let m = self.clone(); - tokio::spawn(m.watch(name.as_ref().into(), Arc::clone(service))); + let m = self.clone(); + tokio::spawn(m.watch(name, service)); + } Ok(()) } pub async fn forget>(&self, name: S) -> Result<()> { - let mut table = self.services.write().await; - let service = match table.get(name.as_ref()) { - Some(service) => service, - None => bail!(ZInitError::UnknownService { - name: name.as_ref().into() - }), - }; - - let service = service.read().await; - if service.target == Target::Up || service.pid != Pid::from_raw(0) { - bail!(ZInitError::ServiceISUp { + let services = services_with_prefix(name.as_ref(), self.services.clone()).await; + if services.is_empty() { + bail!(ZInitError::UnknownService { name: name.as_ref().into() }) } + let mut table = self.services.write().await; + for (name, service) in services { + let service = service.read().await; + if service.target == Target::Up || service.pid != Pid::from_raw(0) { + bail!(ZInitError::ServiceISUp { name }) + } + + drop(service); + table.remove(&name); + } - drop(service); - table.remove(name.as_ref()); Ok(()) } pub async fn kill>(&self, name: S, signal: signal::Signal) -> Result<()> { - let table = self.services.read().await; - let service = match table.get(name.as_ref()) { - Some(service) => service, - None => bail!(ZInitError::UnknownService { + let services = services_with_prefix(name.as_ref(), self.services.clone()).await; + if services.is_empty() { + bail!(ZInitError::UnknownService { name: name.as_ref().into() - }), - }; - - let service = service.read().await; - if service.pid == Pid::from_raw(0) { + }) + } + let mut all_down = true; + for (_, service) in services { + let service = service.read().await; + if service.pid == Pid::from_raw(0) { + continue; + } + all_down = false; + self.pm.signal(service.pid, signal)?; + } + if all_down { bail!(ZInitError::ServiceISDown { name: name.as_ref().into(), }) } - - self.pm.signal(service.pid, signal) + Ok(()) } pub async fn list(&self) -> Result> { @@ -679,3 +742,22 @@ impl ZInit { service.scheduled = false; } } + +async fn services_with_prefix>( + prefix: S, + services: Arc>, +) -> HashMap>> { + let mut v = HashMap::new(); + let table = services.read().await; + if let Some(service) = table.get(prefix.as_ref()) { + v.insert(prefix.as_ref().to_string(), service.clone()); + return v; + } + let prefix = format!("{}/", prefix.as_ref()); + for (name, service) in table.iter() { + if name.starts_with(&prefix) { + v.insert(name.clone(), service.clone()); + } + } + v +} diff --git a/src/zinit/ord.rs b/src/zinit/ord.rs index 71174c3..f1c6595 100644 --- a/src/zinit/ord.rs +++ b/src/zinit/ord.rs @@ -16,10 +16,7 @@ pub async fn service_dependency_order(services: Arc>) -> ProcessDA for (name, service) in table.iter() { let service = service.read().await; for child in service.service.after.iter() { - children - .entry(name.into()) - .or_insert_with(Vec::new) - .push(child.into()); + children.entry(name.into()).or_default().push(child.into()); *indegree.entry(child.into()).or_insert(0) += 1; } }