Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ tokio-stream = "0.1.15"
h2 = "0.4.5"
messages = { path = "../messages" }
kubernetes-client = { path = "../kubernetes-client" }
tokio-util = "0.7.12"
38 changes: 23 additions & 15 deletions agent/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;

use messages::{
Expand All @@ -14,7 +15,7 @@ use super::k8scommands::retrieve_k8s_version_and_build_message;

const VERSION: &str = "1";

pub async fn agent_stream_manager(client: &mut CommanderClient<Channel>) {
pub async fn agent_stream_manager(client: &mut CommanderClient<Channel>, cancellation_token: CancellationToken) {
println!("|{}| agent started", timenow());

let (mut tx, rx) = mpsc::channel(1);
Expand All @@ -27,22 +28,29 @@ pub async fn agent_stream_manager(client: &mut CommanderClient<Channel>) {

let mut resp_stream = response.into_inner();
loop {
match resp_stream.next().await {
Some(received) => {
let received = received.unwrap();
println!("|{time}| received message {name}: {:#?}", std::str::from_utf8(&received.payload).ok().unwrap(), name=&received.name, time=&received.timestamp);
tokio::select! {
received_from_stream = resp_stream.next() => {
match received_from_stream {
Some(received) => {
let received = received.unwrap();
println!("|{time}| received message {name}: {:#?}", std::str::from_utf8(&received.payload).ok().unwrap(), name=&received.name, time=&received.timestamp);

let resp = get_response_message(received).await;
match resp {
Some(message_to_send) => send2server(&mut tx, message_to_send).await,
_ => (),
}
let resp = get_response_message(received).await;
match resp {
Some(message_to_send) => send2server(&mut tx, message_to_send).await,
_ => (),
}

println!("|{}| processed message", timenow());
},
None => {
println!("|{}| Received None from stream :(", timenow());
break;
println!("|{}| processed message", timenow());
},
None => {
println!("|{}| Received None from stream :(", timenow());
break;
}
}
}
_ = cancellation_token.cancelled() => {
println!("|{}| received cancellation signal", timenow());
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,32 @@ use std::env;

use messages::pb::commander_client::CommanderClient;
use client::agent_stream_manager;
use tokio::signal::{self, unix::{signal as unix_signal, SignalKind}};
use tokio_util::sync::CancellationToken;

mod k8scommands;

#[tokio::main]
async fn main() {
let address = env::var("COMMANDER_URL").unwrap_or("http://[::1]:50051".to_string());
println!("Connecting to commander at {}", address);
let mut client = CommanderClient::connect(address).await.unwrap();
agent_stream_manager(&mut client).await;

let token = CancellationToken::new();

let agent_client_cancellation_token = token.clone();
tokio::spawn(async move {
println!("Connecting to commander at {}", address);
let mut client = CommanderClient::connect(address).await.unwrap();
agent_stream_manager(&mut client, agent_client_cancellation_token).await;
});
tokio::select! {
// TODO: Wait for the SIGTERM signal
// _ = Future::new(unix_signal(SignalKind::terminate())) => {
// println!("Received SIGTERM, exiting");
// token.cancel();
// }
_ = signal::ctrl_c() => {
println!("Received SIGTERM, exiting");
token.cancel();
}
}
}