diff --git a/Cargo.lock b/Cargo.lock index 634dbfa..2788cb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,7 @@ dependencies = [ "prost 0.12.6", "tokio", "tokio-stream", + "tokio-util", "tonic", ] @@ -1987,9 +1988,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index eef02eb..6dfaccc 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -11,3 +11,4 @@ tokio-stream = "0.1.15" h2 = "0.4.5" messages = { path = "../messages" } kubernetes-client = { path = "../kubernetes-client" } +tokio-util = "0.7.12" diff --git a/agent/src/client.rs b/agent/src/client.rs index f34958b..8532f30 100644 --- a/agent/src/client.rs +++ b/agent/src/client.rs @@ -1,5 +1,6 @@ use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::sync::CancellationToken; use tonic::transport::Channel; use messages::{ @@ -14,7 +15,7 @@ use super::k8scommands::retrieve_k8s_version_and_build_message; const VERSION: &str = "1"; -pub async fn agent_stream_manager(client: &mut CommanderClient) { +pub async fn agent_stream_manager(client: &mut CommanderClient, cancellation_token: CancellationToken) { println!("|{}| agent started", timenow()); let (mut tx, rx) = mpsc::channel(1); @@ -27,22 +28,29 @@ pub async fn agent_stream_manager(client: &mut CommanderClient) { let mut resp_stream = response.into_inner(); loop { - match resp_stream.next().await { - Some(received) => { - let received = received.unwrap(); - println!("|{time}| received message {name}: {:#?}", std::str::from_utf8(&received.payload).ok().unwrap(), name=&received.name, time=&received.timestamp); + tokio::select! { + received_from_stream = resp_stream.next() => { + match received_from_stream { + Some(received) => { + let received = received.unwrap(); + println!("|{time}| received message {name}: {:#?}", std::str::from_utf8(&received.payload).ok().unwrap(), name=&received.name, time=&received.timestamp); - let resp = get_response_message(received).await; - match resp { - Some(message_to_send) => send2server(&mut tx, message_to_send).await, - _ => (), - } + let resp = get_response_message(received).await; + match resp { + Some(message_to_send) => send2server(&mut tx, message_to_send).await, + _ => (), + } - println!("|{}| processed message", timenow()); - }, - None => { - println!("|{}| Received None from stream :(", timenow()); - break; + println!("|{}| processed message", timenow()); + }, + None => { + println!("|{}| Received None from stream :(", timenow()); + break; + } + } + } + _ = cancellation_token.cancelled() => { + println!("|{}| received cancellation signal", timenow()); } } } diff --git a/agent/src/main.rs b/agent/src/main.rs index fbf1336..4c26da3 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -4,13 +4,32 @@ use std::env; use messages::pb::commander_client::CommanderClient; use client::agent_stream_manager; +use tokio::signal::{self, unix::{signal as unix_signal, SignalKind}}; +use tokio_util::sync::CancellationToken; mod k8scommands; #[tokio::main] async fn main() { let address = env::var("COMMANDER_URL").unwrap_or("http://[::1]:50051".to_string()); - println!("Connecting to commander at {}", address); - let mut client = CommanderClient::connect(address).await.unwrap(); - agent_stream_manager(&mut client).await; + + let token = CancellationToken::new(); + + let agent_client_cancellation_token = token.clone(); + tokio::spawn(async move { + println!("Connecting to commander at {}", address); + let mut client = CommanderClient::connect(address).await.unwrap(); + agent_stream_manager(&mut client, agent_client_cancellation_token).await; + }); + tokio::select! { + // TODO: Wait for the SIGTERM signal + // _ = Future::new(unix_signal(SignalKind::terminate())) => { + // println!("Received SIGTERM, exiting"); + // token.cancel(); + // } + _ = signal::ctrl_c() => { + println!("Received SIGTERM, exiting"); + token.cancel(); + } + } }