diff --git a/src/game/faction.rs b/src/game/faction.rs index dbfb36c..d727dcf 100644 --- a/src/game/faction.rs +++ b/src/game/faction.rs @@ -1,7 +1,7 @@ use actix_web::{get, web, HttpResponse}; use serde::{Serialize, Deserialize}; use crate::{ - AppState, + game::global::AppState, game::{ game::game::GameID, }, @@ -119,7 +119,7 @@ impl GameFaction { } #[get("/")] -pub async fn get_factions(state: web::Data) -> Result { +pub async fn get_factions(state: &AppState) -> Result { Ok(HttpResponse::Ok().json(Faction::find_all(&state.db_pool).await?)) } diff --git a/src/game/fleet/combat/battle.rs b/src/game/fleet/combat/battle.rs index 161dc65..2954386 100644 --- a/src/game/fleet/combat/battle.rs +++ b/src/game/fleet/combat/battle.rs @@ -1,5 +1,6 @@ use std::collections::{HashSet, HashMap}; use crate::{ + game::global::state, task, lib::{ error::{ServerError, InternalError}, @@ -9,7 +10,7 @@ use crate::{ }, game::{ faction::FactionID, - game::server::GameServer, + game::{game::GameID, server::GameServer}, fleet::{ combat::{ conquest::Conquest, @@ -23,12 +24,12 @@ use crate::{ }, ws::protocol, }; +use actix::prelude::*; use serde::{Deserialize, Serialize}; use sqlx::{PgPool, PgConnection, pool::PoolConnection, postgres::{PgRow, PgQueryAs}, FromRow, Executor, Transaction, Postgres, Error, types::Json}; use sqlx_core::row::Row; use rand::prelude::*; use uuid::Uuid; -use futures::executor::block_on; #[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Copy)] pub struct BattleID(pub Uuid); @@ -180,24 +181,31 @@ impl Battle { Err(InternalError::NotFound.into()) } - pub async fn engage(arriver: &Fleet, orbiting_fleets: &HashMap, system: &System, defender_faction: Option, server: &GameServer) -> Result<()> { - Conquest::stop(&system, &server).await?; + pub async fn engage(arriver: &Fleet, orbiting_fleets: &HashMap, system: &System, defender_faction: Option, gid: GameID) -> Result<()> { + let state = state(); + Conquest::stop(&system, gid).await?; let mut fleets = orbiting_fleets.clone(); fleets.insert(arriver.id.clone(), arriver.clone()); - let battle = init_battle(arriver, system, fleets, defender_faction, &server.state.db_pool).await?; + let battle = init_battle(arriver, system, fleets, defender_faction, &state.db_pool).await?; - server.ws_broadcast(&protocol::Message::new(protocol::Action::BattleStarted, &battle, None)).await?; + GameServer::ws_broadcast(gid, protocol::Message::new(protocol::Action::BattleStarted, &battle, None)).await?; let mut round = Round::new(battle.id, 1); - server.state.games().get(&server.id).unwrap().do_send(task!(round -> move |gs| block_on(round.execute(gs)))); + state.games().get(&gid).unwrap().do_send(task!(round -> move |gs, ctx| { + let gid = gs.id; + ctx.wait(actix::fut::wrap_future(async move { + round.execute(gid).await; + })); + Ok(()) + })); log(gelf::Level::Informational, "Battle started", "A new battle has started", vec![ ("battle_id", battle.id.0.to_string()), ("system_id", system.id.0.to_string()), ("fleet_id", arriver.id.0.to_string()), - ], &server.state.logger); + ], &state.logger); Ok(()) } @@ -207,12 +215,13 @@ impl Battle { 2 > self.fleets.keys().len() } - pub async fn end(&mut self, server: &GameServer) -> Result<()> { + pub async fn end(&mut self, gid: GameID) -> Result<()> { + let state = state(); self.victor = Some(self.process_victor()?); self.ended_at = Some(Time::now()); - self.update(&mut &server.state.db_pool).await?; + self.update(&mut &state.db_pool).await?; - server.ws_broadcast(&protocol::Message::new( + GameServer::ws_broadcast(gid, protocol::Message::new( protocol::Action::BattleEnded, self.clone(), None @@ -222,16 +231,16 @@ impl Battle { return Ok(()); } - let fleet = Fleet::find(&self.attacker, &server.state.db_pool).await?; - let system = System::find(self.system, &server.state.db_pool).await?; + let fleet = Fleet::find(&self.attacker, &state.db_pool).await?; + let system = System::find(self.system, &state.db_pool).await?; log(gelf::Level::Informational, "Battle ended", "A battle has finished", vec![ ("battle_id", self.id.0.to_string()), ("victor_id", self.victor.unwrap().0.to_string()), ("system_id", self.system.0.to_string()) - ], &server.state.logger); + ], &state.logger); - Conquest::resume(&fleet, &system, self.victor, &server).await + Conquest::resume(&fleet, &system, self.victor, gid).await } } @@ -292,8 +301,9 @@ async fn init_battle(attacker: &Fleet, system: &System, fleets: HashMap Result>> { - let mut tx = server.state.db_pool.begin().await?; +pub async fn update_fleets(battle: &Battle) -> Result>> { + let state = state(); + let mut tx = state.db_pool.begin().await?; let mut remaining_fleets = HashMap::new(); for (faction_id, fleets) in battle.fleets.iter() { @@ -304,7 +314,7 @@ pub async fn update_fleets(battle: &Battle, server: &GameServer) -> Result Result<()> { - let mut fleets = system.retrieve_orbiting_fleets(&server.state.db_pool).await?; - let game = Game::find(system.game, &server.state.db_pool).await?; + pub async fn remove_fleet(&mut self, system: &System, fleet: &Fleet, gid: GameID) -> Result<()> { + let state = state(); + let mut fleets = system.retrieve_orbiting_fleets(&state.db_pool).await?; + let game = Game::find(system.game, &state.db_pool).await?; fleets.retain(|&fid, _| fid != fleet.id); // If the current fleet is the only one, the conquest is cancelled if fleets.len() < 1 { - return self.cancel(&server).await; + return self.cancel(gid).await; } - server.state.games().get(&server.id).unwrap().do_send(cancel_task!(self)); - self.update_time(fleets.values().collect(), game.game_speed, &server.state.db_pool).await?; + state.games().get(&gid).unwrap().do_send(cancel_task!(self)); + self.update_time(fleets.values().collect(), game.game_speed, &state.db_pool).await?; let mut conquest = self.clone(); - server.state.games().get(&server.id).unwrap().do_send(task!(conquest -> move |server| block_on(conquest.end(&server)))); + state.games().get(&gid).unwrap().do_send(task!(conquest -> move |server, ctx| { + let gid = server.id; + ctx.wait(actix::fut::wrap_future(async move { + conquest.end(gid).await; + })); + Ok(()) + })); Ok(()) } @@ -169,37 +175,39 @@ impl Conquest { Ok(()) } - pub async fn cancel(&mut self, server: &GameServer) -> Result<()> { + pub async fn cancel(&mut self, gid: GameID) -> Result<()> { + let state = state(); self.ended_at = Time::now(); self.is_over = true; - self.update(&mut &server.state.db_pool).await?; + self.update(&mut &state.db_pool).await?; let conquest = self.clone(); - server.ws_broadcast(&protocol::Message::new( + GameServer::ws_broadcast(gid, protocol::Message::new( protocol::Action::ConquestCancelled, conquest.clone(), None )).await?; - server.state.games().get(&server.id).unwrap().do_send(cancel_task!(conquest)); + state.games().get(&gid).unwrap().do_send(cancel_task!(conquest)); log(gelf::Level::Informational, "Conquest cancelled", "The last fleet executing the conquest has travelled elsewhere", vec![ ("conquest_id", conquest.id.0.to_string()), ("system_id", conquest.system.0.to_string()), - ], &server.state.logger); + ], &state.logger); Ok(()) } - pub async fn stop(system: &System, server: &GameServer) -> Result<()> { - let c = Self::find_current_by_system(&system.id, &server.state.db_pool).await?; + pub async fn stop(system: &System, gid: GameID) -> Result<()> { + let state = state(); + let c = Self::find_current_by_system(&system.id, &state.db_pool).await?; if let Some(mut conquest) = c { - conquest.halt(&server.state, &server.id).await?; + conquest.halt(&state, gid).await?; } Ok(()) } - pub async fn halt(&mut self, state: &web::Data, game_id: &GameID) -> Result<()> { + pub async fn halt(&mut self, state: &AppState, game_id: GameID) -> Result<()> { self.is_stopped = true; self.percent = self.calculate_progress(); self.update(&mut &state.db_pool).await?; @@ -219,21 +227,22 @@ impl Conquest { self.percent + (consumed_ms / total_ms) } - pub async fn resume(fleet: &Fleet, system: &System, victor_faction: Option, server: &GameServer) -> Result<()> { - let c = Self::find_current_by_system(&system.id, &server.state.db_pool).await?; - let game = Game::find(system.game, &server.state.db_pool).await?; - let fleets_data = system.retrieve_orbiting_fleets(&server.state.db_pool).await?; + pub async fn resume(fleet: &Fleet, system: &System, victor_faction: Option, gid: GameID) -> Result<()> { + let state = state(); + let c = Self::find_current_by_system(&system.id, &state.db_pool).await?; + let game = Game::find(system.game, &state.db_pool).await?; + let fleets_data = system.retrieve_orbiting_fleets(&state.db_pool).await?; let fleets = fleets_data.values().collect(); if let Some(mut conquest) = c { - let conquest_player = Player::find(conquest.player, &server.state.db_pool).await?; - let games = server.state.games(); - let game_server = games.get(&server.id).unwrap(); + let conquest_player = Player::find(conquest.player, &state.db_pool).await?; + let games = state.games(); + let game_server = games.get(&gid).unwrap(); if victor_faction.is_some() && victor_faction != conquest_player.faction { - conquest.cancel(&server).await?; + conquest.cancel(gid).await?; - return Self::new(fleet, fleets, system, game.game_speed, &server).await; + return Self::new(fleet, fleets, system, game.game_speed, gid).await; } // This case means the fleet is reinforcing a current conquest @@ -245,21 +254,28 @@ impl Conquest { None, )); } - conquest.update_time(fleets, game.game_speed, &server.state.db_pool).await?; + conquest.update_time(fleets, game.game_speed, &state.db_pool).await?; game_server.do_send(protocol::Message::new( protocol::Action::ConquestUpdated, conquest.clone(), None )); - game_server.do_send(task!(conquest -> move |server| block_on(conquest.end(&server)))); + game_server.do_send(task!(conquest -> move |server, ctx| { + let gid = server.id; + ctx.wait(actix::fut::wrap_future(async move { + conquest.end(gid).await; + })); + Ok(()) + })); return Ok(()); } - Self::new(fleet, fleets, system, game.game_speed, &server).await + Self::new(fleet, fleets, system, game.game_speed, gid).await } - pub async fn new(fleet: &Fleet, fleets: Vec<&Fleet>, system: &System, game_speed: GameOptionSpeed, server: &GameServer) -> Result<()> { + pub async fn new(fleet: &Fleet, fleets: Vec<&Fleet>, system: &System, game_speed: GameOptionSpeed, gid: GameID) -> Result<()> { + let state = state(); let conquest_id = ConquestID(Uuid::new_v4()); let mut conquest = Conquest{ id: conquest_id, @@ -274,37 +290,44 @@ impl Conquest { is_successful: false, is_over: false, }; - conquest.insert(&mut &server.state.db_pool).await?; + conquest.insert(&mut &state.db_pool).await?; log(gelf::Level::Informational, "New conquest", "A new conquest has started", vec![ ("conquest_id", conquest_id.0.to_string()), ("player_id", fleets[0].player.0.to_string()), ("fleet_id", fleets[0].id.0.to_string()), ("system_id", system.id.0.to_string()) - ], &server.state.logger); + ], &state.logger); - server.ws_broadcast(&protocol::Message::new( + GameServer::ws_broadcast(gid, protocol::Message::new( protocol::Action::ConquestStarted, conquest.clone(), None )).await?; - server.state.games().get(&server.id).unwrap().do_send(task!(conquest -> move |server| block_on(conquest.end(&server)))); + state.games().get(&gid).unwrap().do_send(task!(conquest -> move |server, ctx| { + let gid = server.id; + ctx.wait(actix::fut::wrap_future(async move { + conquest.end(gid).await; + })); + Ok(()) + })); Ok(()) } - pub async fn end(&mut self, server: &GameServer) -> Result<()> { - let mut system = System::find(self.system.clone(), &server.state.db_pool).await?; - let fleets = system.retrieve_orbiting_fleets(&server.state.db_pool).await?.values().cloned().collect(); + pub async fn end(&mut self, gid: GameID) -> Result<()> { + let state = state(); + let mut system = System::find(self.system.clone(), &state.db_pool).await?; + let fleets = system.retrieve_orbiting_fleets(&state.db_pool).await?.values().cloned().collect(); self.is_over = true; - self.update(&mut &server.state.db_pool).await?; + self.update(&mut &state.db_pool).await?; system.player = Some(self.player.clone()); - system.update(&mut &server.state.db_pool).await?; + system.update(&mut &state.db_pool).await?; - server.ws_broadcast(&protocol::Message::new( + GameServer::ws_broadcast(gid, protocol::Message::new( protocol::Action::SystemConquerred, ConquestData{ system, fleets }, None @@ -314,7 +337,7 @@ impl Conquest { ("conquest_id", self.id.0.to_string()), ("player_id", self.player.0.to_string()), ("system_id", self.system.0.to_string()) - ], &server.state.logger); + ], &state.logger); Ok(()) } @@ -423,4 +446,4 @@ mod tests category, } } -} \ No newline at end of file +} diff --git a/src/game/fleet/combat/round.rs b/src/game/fleet/combat/round.rs index 8a16c70..323bea8 100644 --- a/src/game/fleet/combat/round.rs +++ b/src/game/fleet/combat/round.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use actix::AsyncContext; use crate::{ + game::global::state, task, lib::{ time::Time, @@ -15,10 +17,9 @@ use crate::{ fleet::{FleetID, Fleet}, squadron::{FleetSquadronID, FleetSquadron}, }, - game::server::{ GameServer, GameServerTask } + game::{game::GameID, server::{ GameServer, GameServerTask }}, } }; -use futures::executor::block_on; use chrono::{Duration, Utc}; use serde::{Deserialize, Serialize}; use rand::prelude::*; @@ -87,29 +88,36 @@ impl Round { } } - pub async fn execute(&mut self, server: &GameServer) -> Result<()> { - let mut battle = Battle::find(self.battle, &server.state.db_pool).await?; + pub async fn execute(&mut self, gid: GameID) -> Result<()> { + let state = state(); + let mut battle = Battle::find(self.battle, &state.db_pool).await?; log(gelf::Level::Informational, "Battle round started", "A new round has been added to the battle", vec![ ("battle_id", battle.id.0.to_string()), ("round_number", self.number.to_string()), - ], &server.state.logger); + ], &state.logger); - let new_fleets = battle.get_joined_fleets(&server.state.db_pool).await?.iter().map(|f| (f.id.clone(), f.clone())).collect::>(); - for (fid, fleets) in get_factions_fleets(new_fleets.clone(), &server.state.db_pool).await? { + let new_fleets = battle.get_joined_fleets(&state.db_pool).await?.iter().map(|f| (f.id.clone(), f.clone())).collect::>(); + for (fid, fleets) in get_factions_fleets(new_fleets.clone(), &state.db_pool).await? { battle.fleets.get_mut(&fid).unwrap().extend(fleets); } self.fight(&mut battle, &new_fleets); battle.rounds.push(self.clone()); - battle.fleets = update_fleets(&battle, &server).await?; - battle.update(&mut &server.state.db_pool).await?; + battle.fleets = update_fleets(&battle).await?; + battle.update(&mut &state.db_pool).await?; if battle.is_over() { - battle.end(server).await?; + battle.end(gid).await?; } else { let mut next_round = Round::new(battle.id, self.number + 1); - server.state.games().get(&server.id).unwrap().do_send(task!(next_round -> move |gs| block_on(next_round.execute(gs)))); + state.games().get(&gid).unwrap().do_send(task!(next_round -> move |gs, ctx| { + let gid = gs.id; + ctx.wait(actix::fut::wrap_future(async move { + next_round.execute(gid).await; + })); + Ok(()) + })); } Ok(()) } diff --git a/src/game/fleet/fleet.rs b/src/game/fleet/fleet.rs index ec98ca8..da26261 100644 --- a/src/game/fleet/fleet.rs +++ b/src/game/fleet/fleet.rs @@ -17,7 +17,7 @@ use crate::{ fleet::squadron::{FleetSquadron}, }, ws::protocol, - AppState + game::global::AppState, }; use sqlx::{PgPool, postgres::{PgRow, PgQueryAs}, FromRow, Executor, Error, Postgres}; use sqlx_core::row::Row; @@ -136,7 +136,7 @@ impl Fleet { } #[post("/")] -pub async fn create_fleet(state: web::Data, info: web::Path<(GameID,SystemID)>, claims: Claims) -> Result { +pub async fn create_fleet(state: &AppState, info: web::Path<(GameID,SystemID)>, claims: Claims) -> Result { let system = System::find(info.1, &state.db_pool).await?; if system.player != Some(claims.pid) { @@ -167,7 +167,7 @@ pub async fn create_fleet(state: web::Data, info: web::Path<(GameID,Sy #[patch("/donate/")] pub async fn donate( - state: web::Data, + state: &AppState, info: web::Path<(GameID,SystemID,FleetID,)>, claims: Claims ) -> Result { diff --git a/src/game/fleet/squadron.rs b/src/game/fleet/squadron.rs index 9d2a6a7..9b85b6b 100644 --- a/src/game/fleet/squadron.rs +++ b/src/game/fleet/squadron.rs @@ -1,4 +1,5 @@ use actix_web::{post , web, HttpResponse}; +use actix::AsyncContext; use serde::{Serialize, Deserialize}; use uuid::Uuid; use crate::{ @@ -25,9 +26,8 @@ use crate::{ }, player::Player, }, - AppState + game::global::AppState }; -use futures::executor::block_on; use sqlx::{PgPool, postgres::{PgRow, PgQueryAs}, FromRow, Executor, Error, Postgres}; use sqlx_core::row::Row; use futures::join; @@ -175,7 +175,7 @@ impl FleetSquadron { #[post("/")] pub async fn assign_ships( - state: web::Data, + state: &AppState, info: web::Path<(GameID, SystemID, FleetID)>, json_data: web::Json, claims: Claims @@ -259,7 +259,13 @@ pub async fn assign_ships( if let Some(sq) = ship_queue { let data = sq.clone(); - state.games().get(&info.0).unwrap().do_send(task!(sq -> move |gs: &GameServer| block_on(sq.produce(gs)))); + state.games().get(&info.0).unwrap().do_send(task!(sq -> move |gs, ctx| { + let gid = gs.id; + ctx.wait(actix::fut::wrap_future(async move { + sq.produce(gid).await; + })); + Ok(()) + })); return Ok(HttpResponse::Created().json(data)); } Ok(HttpResponse::NoContent().finish()) diff --git a/src/game/fleet/travel.rs b/src/game/fleet/travel.rs index 55734d7..376956a 100644 --- a/src/game/fleet/travel.rs +++ b/src/game/fleet/travel.rs @@ -25,7 +25,7 @@ use crate::{ fleet::squadron::{FleetSquadron}, }, ws::protocol, - AppState + game::global::{AppState, state}, }; use std::collections::HashMap; use chrono::{Duration, Utc}; @@ -88,7 +88,7 @@ impl From for Option { #[post("/travel/")] pub async fn travel( - state: web::Data, + state: &AppState, info: web::Path<(GameID,SystemID,FleetID,)>, json_data: web::Json, claims: Claims @@ -138,7 +138,7 @@ pub async fn travel( if let Some(mut conquest) = Conquest::find_current_by_system(&system.id, &state.db_pool).await? { let count = Fleet::count_stationed_by_system(&system.id, &state.db_pool).await?; if 1 >= count { - conquest.halt(&state, &game_id).await?; + conquest.halt(&state, game_id).await?; } } game.do_send(GameFleetTravelMessage{ system, fleet: fleet.clone() }); @@ -152,34 +152,36 @@ pub async fn travel( Ok(HttpResponse::Ok().json(fleet)) } -pub async fn process_fleet_arrival(server: &GameServer, fleet_id: FleetID) -> Result<()> { - let mut fleet = Fleet::find(&fleet_id, &server.state.db_pool).await?; - fleet.squadrons = FleetSquadron::find_by_fleet(fleet.id.clone(), &server.state.db_pool).await?; - let destination_system = System::find(fleet.destination_system.unwrap(), &server.state.db_pool).await?; - let player = Player::find(fleet.player, &server.state.db_pool).await?; +pub async fn process_fleet_arrival(gid: GameID, fleet_id: FleetID) -> Result<()> { + let state = state(); + let mut fleet = Fleet::find(&fleet_id, &state.db_pool).await?; + fleet.squadrons = FleetSquadron::find_by_fleet(fleet.id.clone(), &state.db_pool).await?; + let destination_system = System::find(fleet.destination_system.unwrap(), &state.db_pool).await?; + let player = Player::find(fleet.player, &state.db_pool).await?; let system_owner = { match destination_system.player { - Some(owner_id) => Some(Player::find(owner_id, &server.state.db_pool).await?), + Some(owner_id) => Some(Player::find(owner_id, &state.db_pool).await?), None => None, } }; fleet.change_system(&destination_system); - fleet.update(&mut &server.state.db_pool).await?; + fleet.update(&mut &state.db_pool).await?; - let outcome = resolve_arrival_outcome(&destination_system, &server, fleet, &player, system_owner).await?; + let outcome = resolve_arrival_outcome(&destination_system, gid, fleet, &player, system_owner).await?; if let Some(message) = Option::::from(outcome.clone()) { - server.ws_broadcast(&message).await?; + GameServer::ws_broadcast(gid, message).await?; } - process_arrival_outcome(&outcome, &server).await + process_arrival_outcome(&outcome, gid).await } -async fn resolve_arrival_outcome(system: &System, server: &GameServer, fleet: Fleet, player: &Player, system_owner: Option) -> Result { +async fn resolve_arrival_outcome(system: &System, gid: GameID, fleet: Fleet, player: &Player, system_owner: Option) -> Result { + let state = state(); // First we check if a battle rages in the destination system. No matter the opponents, the fleet joins in - if Battle::count_current_by_system(&system.id, &server.state.db_pool).await? > 0 { + if Battle::count_current_by_system(&system.id, &state.db_pool).await? > 0 { return Ok(FleetArrivalOutcome::JoinedBattle{ fleet }); } match system_owner { @@ -189,12 +191,12 @@ async fn resolve_arrival_outcome(system: &System, server: &GameServer, fleet: Fl log(gelf::Level::Informational, "Fleet arrived", "A fleet has finished its journey to another system", vec![ ("fleet_id", fleet.id.0.to_string()), ("system_id", system.id.0.to_string()), - ], &server.state.logger); + ], &state.logger); return Ok(FleetArrivalOutcome::Arrived{ fleet }); } // The fleet landed in an enemy system. We check if it is defended by some fleets and initiate a battle - let fleets = system.retrieve_orbiting_fleets(&server.state.db_pool).await?; + let fleets = system.retrieve_orbiting_fleets(&state.db_pool).await?; // If there are none, a conquest begins if fleets.is_empty() { return Ok(FleetArrivalOutcome::Conquer{ system: system.clone(), fleet }); @@ -203,12 +205,12 @@ async fn resolve_arrival_outcome(system: &System, server: &GameServer, fleet: Fl }, None => { // The fleet landed in a neutral system. We check if it is currently being colonized by some fleets and initiate a battle - match Conquest::find_current_by_system(&system.id, &server.state.db_pool).await? { + match Conquest::find_current_by_system(&system.id, &state.db_pool).await? { Some(current_colonization) => { - let colonizer = Player::find(current_colonization.player, &server.state.db_pool).await?; + let colonizer = Player::find(current_colonization.player, &state.db_pool).await?; if colonizer.faction != player.faction { - let fleets = system.retrieve_orbiting_fleets(&server.state.db_pool).await?; + let fleets = system.retrieve_orbiting_fleets(&state.db_pool).await?; return Ok(FleetArrivalOutcome::Battle{ system: system.clone(), fleet, fleets, defender_faction: None }) } // The fleet reinforces the current colonization @@ -220,11 +222,11 @@ async fn resolve_arrival_outcome(system: &System, server: &GameServer, fleet: Fl } } -async fn process_arrival_outcome(outcome: &FleetArrivalOutcome, server: &GameServer) -> Result<()> { +async fn process_arrival_outcome(outcome: &FleetArrivalOutcome, gid: GameID) -> Result<()> { match outcome { - FleetArrivalOutcome::Battle { fleet, fleets, system, defender_faction } => Battle::engage(&fleet, &fleets, &system, *defender_faction, &server).await, - FleetArrivalOutcome::Colonize { fleet, system } => Conquest::resume(fleet, &system, None, &server).await, - FleetArrivalOutcome::Conquer { fleet, system } => Conquest::resume(fleet, &system, None, &server).await, + FleetArrivalOutcome::Battle { fleet, fleets, system, defender_faction } => Battle::engage(&fleet, &fleets, &system, *defender_faction, gid).await, + FleetArrivalOutcome::Colonize { fleet, system } => Conquest::resume(fleet, &system, None, gid).await, + FleetArrivalOutcome::Conquer { fleet, system } => Conquest::resume(fleet, &system, None, gid).await, _ => Ok(()) } } diff --git a/src/game/game/game.rs b/src/game/game/game.rs index d8c4c94..44a6faa 100644 --- a/src/game/game/game.rs +++ b/src/game/game/game.rs @@ -20,7 +20,7 @@ use crate::{ player::{PlayerID, Player}, }, ws::client::ClientSession, - AppState, + game::global::AppState, }; use sqlx::{PgPool, postgres::{PgRow, PgQueryAs}, FromRow, Error, Executor, Postgres}; use sqlx_core::row::Row; @@ -87,13 +87,12 @@ impl Game { } } -pub async fn create_game(lobby: &Lobby, state: web::Data, clients: HashMap>) -> Result<(GameID, Addr)> { +pub async fn create_game(lobby: &Lobby, state: &AppState, clients: HashMap>) -> Result<(GameID, Addr)> { let id = GameID(Uuid::new_v4()); let game_server = GameServer{ id: id.clone(), - state: state.clone(), - clients: RwLock::new(clients), + clients: clients, tasks: HashMap::new(), }; let game = Game{ @@ -113,12 +112,12 @@ pub async fn create_game(lobby: &Lobby, state: web::Data, clients: Has } #[get("/{id}/players/")] -pub async fn get_players(state: web::Data, info: web::Path<(GameID,)>) -> Result { +pub async fn get_players(state: &AppState, info: web::Path<(GameID,)>) -> Result { Ok(HttpResponse::Ok().json(Player::find_by_game(info.0, &state.db_pool).await?)) } #[delete("/{id}/players/")] -pub async fn leave_game(state:web::Data, claims: Claims, info: web::Path<(GameID,)>) +pub async fn leave_game(state: &AppState, claims: Claims, info: web::Path<(GameID,)>) -> Result { let game = Game::find(info.0, &state.db_pool).await?; @@ -131,7 +130,7 @@ pub async fn leave_game(state:web::Data, claims: Claims, info: web::Pa let games = state.games(); let game_server = games.get(&game.id).expect("Game exists in DB but not in HashMap"); - let (client, is_empty) = Arc::try_unwrap(game_server.send(GameRemovePlayerMessage(player.id.clone())).await?).ok().unwrap(); + let (client, is_empty) = game_server.send(GameRemovePlayerMessage(player.id)).await?.unwrap(); if let Some(c) = client { state.add_client(&player.id, c); } diff --git a/src/game/game/server.rs b/src/game/game/server.rs index b9ee7cd..108622f 100644 --- a/src/game/game/server.rs +++ b/src/game/game/server.rs @@ -1,13 +1,8 @@ -use actix_web::web; -use actix::prelude::*; +use actix::{fut::wrap_future, prelude::*}; use serde::{Serialize}; -use std::sync::{Arc, RwLock}; use std::collections::{HashMap}; use std::time::Duration; use chrono::{DateTime, Utc}; -use futures::{ - executor::block_on, -}; use crate::{ lib::{ Result, @@ -29,13 +24,12 @@ use crate::{ }, }, ws::{ client::ClientSession, protocol}, - AppState, + game::global::state, }; pub struct GameServer { pub id: GameID, - pub state: web::Data, - pub clients: RwLock>>, + pub clients: HashMap>, pub tasks: HashMap, } @@ -59,8 +53,8 @@ pub trait GameServerTask{ impl Handler for GameServer { type Result = (); - fn handle(&mut self, msg: protocol::Message, _ctx: &mut Self::Context) -> Self::Result { - block_on(self.ws_broadcast(&msg)); + fn handle(&mut self, msg: protocol::Message, ctx: &mut Self::Context) -> Self::Result { + ctx.wait(wrap_future(Self::ws_broadcast(self.id, msg)).map(|_,_,_| ())); } } @@ -68,90 +62,100 @@ impl Actor for GameServer { type Context = Context; fn started(&mut self, ctx: &mut Context) { - block_on(self.ws_broadcast(&protocol::Message::new( - protocol::Action::LobbyLaunched, - self.id.clone(), - None, - ))); + ctx.wait( + wrap_future(Self::ws_broadcast(self.id, protocol::Message::new( + protocol::Action::LobbyLaunched, + self.id.clone(), + None, + ))).map(|_,_,_| { () }) + ); - self.add_task(ctx, "init".to_string(), Duration::new(1, 0), |this, _| block_on(this.init())); - self.add_task(ctx, "begin".to_string(), Duration::new(4, 0), |this, _| block_on(this.begin())); - run_interval(ctx, Duration::new(5, 0), move |this, _| { - block_on(this.produce_income()) + self.add_task(ctx, "init".to_string(), Duration::new(1, 0), |this, ctx| { Ok(ctx.wait(this.init())) }); + self.add_task(ctx, "begin".to_string(), Duration::new(4, 0), |this, ctx| { Ok(ctx.wait(this.begin())) }); + run_interval(ctx, Duration::new(5, 0), move |this, ctx| { + Ok(ctx.wait(wrap_future(Self::produce_income(this.id)).map(|_,_,_| ()))) }); - run_interval(ctx, Duration::new(60, 0), move |this, _| { - block_on(this.distribute_victory_points()) + run_interval(ctx, Duration::new(60, 0), move |this, ctx| { + Ok(ctx.wait(wrap_future(Self::distribute_victory_points(this.id)).map(|_,_,_| ()))) }); } } impl GameServer { - async fn init(&mut self) -> Result<()> { - generate_game_factions(self.id.clone(), &self.state.db_pool).await?; - - let mut game = Game::find(self.id.clone(), &self.state.db_pool).await?; - - let (mut systems, nb_victory_systems) = generate_systems(self.id.clone(), game.map_size).await?; - - game.victory_points = nb_victory_systems as i32 * 100; - - Game::update(game.clone(), &self.state.db_pool).await?; - - let mut players = Player::find_by_game(self.id, &self.state.db_pool).await?; - assign_systems(&players, &mut systems).await?; - init_player_wallets(&mut players, &self.state.db_pool).await?; - System::insert_all(systems.iter(), &self.state.db_pool).await?; - init_player_systems(&systems, game.game_speed, &self.state.db_pool).await?; - - self.ws_broadcast(&protocol::Message::new( - protocol::Action::SystemsCreated, - (), - None - )).await + fn init(&mut self) -> impl ActorFuture { + let gid = self.id; + wrap_future(async move { + let state = state(); + generate_game_factions(gid, &state.db_pool).await?; + + let mut game = Game::find(gid, &state.db_pool).await?; + + let (mut systems, nb_victory_systems) = generate_systems(gid, game.map_size).await?; + + game.victory_points = nb_victory_systems as i32 * 100; + + Game::update(game.clone(), &state.db_pool).await?; + + let mut players = Player::find_by_game(gid, &state.db_pool).await?; + assign_systems(&players, &mut systems).await?; + init_player_wallets(&mut players, &state.db_pool).await?; + System::insert_all(systems.iter(), &state.db_pool).await?; + init_player_systems(&systems, game.game_speed, &state.db_pool).await?; + + Self::ws_broadcast(gid, protocol::Message::new( + protocol::Action::SystemsCreated, + (), + None + )).await + }).map(|_,_,_| ()) } - async fn begin(&self) -> Result<()> { - let game = Game::find(self.id.clone(), &self.state.db_pool).await?; - #[derive(Serialize)] - struct GameData{ - victory_points: i32 - } - self.ws_broadcast(&protocol::Message::new( - protocol::Action::GameStarted, - GameData{ - victory_points: game.victory_points - }, - None - )).await + fn begin(&self) -> impl ActorFuture { + let gid = self.id; + wrap_future(async move { + let state = state(); + let game = Game::find(gid, &state.db_pool).await.expect("Game not found"); + #[derive(Serialize)] + struct GameData{ + victory_points: i32 + } + Self::ws_broadcast(gid, protocol::Message::new( + protocol::Action::GameStarted, + GameData{ + victory_points: game.victory_points + }, + None + )).await; + }) } - fn clients(&self) -> std::sync::RwLockReadGuard>> { - self.clients.read().expect("Poisoned lock on game clients") - } +// fn clients(&self) -> std::sync::RwLockReadGuard>> { +// self.clients.read().expect("Poisoned lock on game clients") +// } - pub async fn ws_broadcast(&self, message: &protocol::Message) -> Result<()> { - let clients = self.clients(); - for pid in Player::find_ids_by_game(self.id, &self.state.db_pool).await? { - self.ws_send(&clients, &pid, message); + pub async fn ws_broadcast(gid: GameID, message: protocol::Message) -> Result<()> { + let state = state(); + for pid in Player::find_ids_by_game(gid, &state.db_pool).await? { + state.ws_send(&pid, &message); } Ok(()) } - pub async fn faction_broadcast(&self, fid: FactionID, message: protocol::Message) -> Result<()> { - let clients = self.clients(); - for pid in Player::find_ids_by_game_and_faction(self.id, fid, &self.state.db_pool).await? { - self.ws_send(&clients, &pid, &message); + pub async fn faction_broadcast(gid: GameID, fid: FactionID, message: protocol::Message) -> Result<()> { + let state = state(); + for pid in Player::find_ids_by_game_and_faction(gid, fid, &state.db_pool).await? { + state.ws_send(&pid, &message); } Ok(()) } - pub fn player_broadcast(&self, pid: &PlayerID, message: &protocol::Message) { - let clients = self.clients(); - self.ws_send(&clients, pid, message); + pub fn player_broadcast(pid: &PlayerID, message: protocol::Message) { + state().ws_send(pid, &message); } pub fn ws_send(&self, clients: &std::sync::RwLockReadGuard>>, pid: &PlayerID, message: &protocol::Message) { - let mut missing_messages = self.state.missing_messages_mut(); + let state = state(); + let mut missing_messages = state.missing_messages_mut(); if let Some(client) = clients.get(pid) { client.do_send(message.clone()); @@ -162,13 +166,14 @@ impl GameServer { } } - async fn produce_income(&mut self) -> Result<()> { - let mut players: HashMap = Player::find_by_game(self.id.clone(), &self.state.db_pool).await? + async fn produce_income(gid: GameID) -> Result<()> { + let state = state(); + let mut players: HashMap = Player::find_by_game(gid, &state.db_pool).await? .into_iter() .map(|p| (p.id.clone(), p)) .collect(); let mut players_income = HashMap::new(); - let mines: Vec = Building::find_by_kind(BuildingKind::Mine, &self.state.db_pool).await? + let mines: Vec = Building::find_by_kind(BuildingKind::Mine, &state.db_pool).await? .into_iter() .filter(|b| b.status == BuildingStatus::Operational) .map(|b| b.system) @@ -176,7 +181,7 @@ impl GameServer { // Add money to each player based on the number of // currently, the income is `some_player.income = some_player.number_of_systems_owned * 15` - System::find_possessed(self.id.clone(), &self.state.db_pool).await? + System::find_possessed(gid, &state.db_pool).await? .into_iter() .for_each(|system| { let mut income = 10; @@ -191,20 +196,17 @@ impl GameServer { struct PlayerIncome { income: usize } - let clients = self.clients.read().expect("Poisoned lock on game clients"); for (pid, income) in players_income { if let Some(p) = players.get_mut(&pid.unwrap()) { p.wallet += income; - if let Some(c) = clients.get(&pid.unwrap()){ - c.do_send(protocol::Message::new( + state.ws_send(&pid.unwrap(), &protocol::Message::new( protocol::Action::PlayerIncome, PlayerIncome{ income }, None, )); - } } } - let mut tx = self.state.db_pool.begin().await?; + let mut tx = state.db_pool.begin().await?; for p in players.values() { p.update(&mut tx).await?; } @@ -212,14 +214,15 @@ impl GameServer { Ok(()) } - async fn distribute_victory_points(&mut self) -> Result<()> { - let victory_systems = System::find_possessed_victory_systems(self.id.clone(), &self.state.db_pool).await?; - let game = Game::find(self.id.clone(), &self.state.db_pool).await?; - let mut factions = GameFaction::find_all(self.id.clone(), &self.state.db_pool).await? + async fn distribute_victory_points(gid: GameID) -> Result<()> { + let state = state(); + let victory_systems = System::find_possessed_victory_systems(gid, &state.db_pool).await?; + let game = Game::find(gid, &state.db_pool).await?; + let mut factions = GameFaction::find_all(gid, &state.db_pool).await? .into_iter() .map(|gf| (gf.faction.clone(), gf)) .collect::>(); - let mut players = Player::find_by_ids(victory_systems.clone().into_iter().map(|s| s.player.clone().unwrap()).collect(), &self.state.db_pool).await? + let mut players = Player::find_by_ids(victory_systems.clone().into_iter().map(|s| s.player.clone().unwrap()).collect(), &state.db_pool).await? .into_iter() .map(|p| (p.id.clone(), p)) .collect::>(); @@ -234,7 +237,7 @@ impl GameServer { } let mut victorious_faction: Option<&GameFaction> = None; - let mut tx = self.state.db_pool.begin().await?; + let mut tx = state.db_pool.begin().await?; for f in factions.values() { GameFaction::update(f, &mut tx).await?; if f.victory_points >= game.victory_points { @@ -243,26 +246,27 @@ impl GameServer { } tx.commit().await?; - self.ws_broadcast(&protocol::Message::new( + Self::ws_broadcast(gid, protocol::Message::new( protocol::Action::FactionPointsUpdated, factions.clone(), None )).await?; if let Some(f) = victorious_faction { - self.process_victory(f, factions.values().cloned().collect::>()).await?; + Self::process_victory(gid, f, factions.values().cloned().collect::>()).await?; } Ok(()) } - async fn process_victory(&mut self, victorious_faction: &GameFaction, factions: Vec) -> Result<()> { + async fn process_victory(gid: GameID, victorious_faction: &GameFaction, factions: Vec) -> Result<()> { + let state = state(); #[derive(Serialize, Clone)] struct VictoryData { victorious_faction: FactionID, scores: Vec } - self.ws_broadcast(&protocol::Message::new( + Self::ws_broadcast(gid, protocol::Message::new( protocol::Action::Victory, VictoryData{ victorious_faction: victorious_faction.faction, @@ -271,21 +275,26 @@ impl GameServer { None, )).await?; - let game = Game::find(self.id, &self.state.db_pool).await?; - self.state.clear_game(&game).await?; + let game = Game::find(gid, &state.db_pool).await?; + state.clear_game(&game).await?; Ok(()) } - pub async fn remove_player(&self, pid: PlayerID) -> Result>> { - let mut player = Player::find(pid, &self.state.db_pool).await?; - player.is_connected = false; - self.ws_broadcast(&protocol::Message::new( - protocol::Action::PlayerLeft, - pid.clone(), - Some(pid), - )).await?; - let mut clients = self.clients.write().expect("Poisoned lock on game players"); - Ok(clients.remove(&pid)) + pub fn remove_player(gid: GameID, pid: PlayerID) -> impl ActorFuture>>> { + wrap_future(async move { + let state = state(); + let mut player = Player::find(pid, &state.db_pool).await.unwrap(); + player.is_connected = false; + Self::ws_broadcast(gid, protocol::Message::new( + protocol::Action::PlayerLeft, + pid.clone(), + Some(pid), + )).await; + player.id + }) + .map(|pid, this:&mut Self, _| { + Ok(this.clients.remove(&pid)) + }) } pub fn add_task( @@ -295,7 +304,7 @@ impl GameServer { duration: Duration, closure: F ) - where F: 'static + FnOnce(&mut Self, & ::Context) -> Result<()>, + where F: 'static + FnOnce(&mut Self, &mut ::Context) -> Result<()>, { self.tasks.insert(task_name.clone(), ctx.run_later( duration, @@ -322,14 +331,12 @@ impl GameServer { } pub fn is_empty(&self) -> bool { - let clients = self.clients.read().expect("Poisoned lock on game players"); - - clients.len() == 0 + self.clients.len() == 0 } } #[derive(actix::Message, Serialize, Clone)] -#[rtype(result="Arc<(Option>, bool)>")] +#[rtype(result="std::result::Result<(Option>, bool), ()>")] pub struct GameRemovePlayerMessage(pub PlayerID); #[derive(actix::Message, Clone)] @@ -355,7 +362,9 @@ pub struct GameFleetTravelMessage{ /// This macro helps keeping the code readable: /// ```ignore /// let ship_queue = todo!(); -/// server.do_send(task!(ship_queue -> move |gs: &GameServer| block_on(ship_queue.so_something))); +/// server.do_send(task!(ship_queue -> move |gs, ctx| { +/// do_something() +/// })); /// ``` #[macro_export] macro_rules! task { @@ -385,7 +394,7 @@ pub struct GameScheduleTaskMessage { task_id: String, task_duration: Option, - callback: Box Result<()> + Send + 'static>, + callback: Box) -> Result<()> + Send + 'static>, } #[derive(actix::Message)] @@ -397,7 +406,7 @@ pub struct GameCancelTaskMessage impl GameScheduleTaskMessage { - pub fn new Result<()> + Send + 'static>(task_id: String, task_duration:Option, callback: F) -> Self { + pub fn new) -> Result<()> + Send + 'static>(task_id: String, task_duration:Option, callback: F) -> Self { Self { task_id, task_duration, @@ -423,17 +432,20 @@ impl Handler for GameServer { type Result = (); fn handle(&mut self, GameAddClientMessage(pid, client): GameAddClientMessage, _ctx: &mut Self::Context) -> Self::Result { - let mut clients = self.clients.write().expect("Poisoned lock on game players"); - clients.insert(pid, client); + self.clients.insert(pid, client); } } impl Handler for GameServer { - type Result = Arc<(Option>, bool)>; + type Result = ResponseActFuture>, bool), ()>>; fn handle(&mut self, GameRemovePlayerMessage(pid): GameRemovePlayerMessage, _ctx: &mut Self::Context) -> Self::Result { - let client = block_on(self.remove_player(pid)).unwrap(); - Arc::new((client, self.is_empty())) + Box::new( + Self::remove_player(self.id, pid) + .map(|client, this, _| { + Ok((client.map_err(|_| ())?, this.is_empty())) + }) + ) } } @@ -441,8 +453,7 @@ impl Handler for GameServer { type Result = (); fn handle(&mut self, msg: GameNotifyPlayerMessage, _ctx: &mut Self::Context) -> Self::Result { - let clients = self.clients.read().expect("Poisoned lock on game clients"); - let client = clients.get(&msg.0).unwrap().clone(); + let client = self.clients.get(&msg.0).unwrap(); client.do_send(msg.1); } } @@ -450,11 +461,14 @@ impl Handler for GameServer { impl Handler for GameServer { type Result = (); - fn handle(&mut self, msg: GameNotifyFactionMessage, _ctx: &mut Self::Context) -> Self::Result { - let res = block_on(self.faction_broadcast(msg.0, msg.1)); - if res.is_err() { - println!("Faction broadcast failed : {:?}", res.err()); - } + fn handle(&mut self, msg: GameNotifyFactionMessage, ctx: &mut Self::Context) -> Self::Result { + let gid = self.id; + ctx.wait(wrap_future(async move { + let res = Self::faction_broadcast(gid, msg.0, msg.1).await; + if res.is_err() { + println!("Faction broadcast failed : {:?}", res.err()); + } + })); } } @@ -462,22 +476,37 @@ impl Handler for GameServer { type Result = (); fn handle(&mut self, msg: GameFleetTravelMessage, ctx: &mut Self::Context) -> Self::Result { - block_on(self.ws_broadcast(&protocol::Message::new( - protocol::Action::FleetSailed, - msg.fleet.clone(), - Some(msg.fleet.player), - ))); + let gid = self.id; + let state = state(); + let fleet = msg.fleet.clone(); + let player = fleet.player; + let fid = fleet.id; + ctx.wait(wrap_future(async move { + Self::ws_broadcast(gid, protocol::Message::new( + protocol::Action::FleetSailed, + fleet, + Some(player), + )).await; + })); // In this case, there is no battle, but a in-progress conquest // We update the conquest or cancel it depending on the remaining fleets - if let Some(mut conquest) = block_on(Conquest::find_current_by_system(&msg.system.id, &self.state.db_pool)).map_err(ServerError::from).ok().unwrap() { - block_on(conquest.remove_fleet(&msg.system, &msg.fleet, &self)).map_err(ServerError::from).ok().unwrap(); - } - let datetime: DateTime = msg.fleet.destination_arrival_date.unwrap().into(); - ctx.run_later(datetime.signed_duration_since(Utc::now()).to_std().unwrap(), move |this, _| { - let res = block_on(process_fleet_arrival(&this, msg.fleet.id)); - if res.is_err() { - println!("Fleet arrival fail : {:?}", res.err()); + let fleet = msg.fleet.clone(); + let system = msg.system.clone(); + ctx.wait(wrap_future(async move { + if let Some(mut conquest) = Conquest::find_current_by_system(&system.id, &state.db_pool).await.unwrap() { + conquest.remove_fleet(&system, &fleet, gid).await; } + })); + + let datetime: DateTime = msg.fleet.destination_arrival_date.unwrap().into(); + ctx.run_later(datetime.signed_duration_since(Utc::now()).to_std().unwrap(), move |this, ctx| { + let gid = this.id; + ctx.wait(wrap_future(async move { + let res = process_fleet_arrival(gid, fid).await; + if res.is_err() { + println!("Fleet arrival fail : {:?}", res.err()); + } + })); }); } } @@ -491,7 +520,7 @@ impl Handler for GameServer &mut ctx, msg.task_id.clone(), msg.task_duration.unwrap_or(Duration::new(0, 0)), - move |this, _| (msg.callback)(&this) + move |this, ctx| (msg.callback)(this, ctx) ) } } @@ -509,9 +538,9 @@ impl Handler for GameServer { type Result = (); fn handle(&mut self, _msg: GameEndMessage, ctx: &mut Self::Context) -> Self::Result { - let clients = self.clients.read().expect("Poisoned lock on game clients"); - for (pid, c) in clients.iter() { - self.state.add_client(&pid, c.clone()); + let state = state(); + for (pid, c) in self.clients.iter() { + state.add_client(&pid, c.clone()); } ctx.stop(); ctx.terminate(); @@ -523,7 +552,7 @@ fn run_interval( duration: Duration, mut closure: F ) - where F: FnMut(&mut GameServer, & ::Context) -> Result<()> + 'static, + where F: FnMut(&mut GameServer, &mut ::Context) -> Result<()> + 'static, { ctx.run_interval(duration, move |this, ctx| { let result = closure(this, ctx).map_err(ServerError::from); @@ -531,4 +560,4 @@ fn run_interval( println!("{:?}", result.err()); } }); -} \ No newline at end of file +} diff --git a/src/game/global.rs b/src/game/global.rs new file mode 100644 index 0000000..0f29759 --- /dev/null +++ b/src/game/global.rs @@ -0,0 +1,130 @@ +use crate::{ + lib::{self, Result, sync::SyncOnceCell}, + ws::{self, protocol}, + game::{ + player, + lobby, + game::{game as g, server::{GameEndMessage, GameServer}}, + }, +}; +use std::sync::RwLock; +use std::collections::HashMap; +use std::future::{Ready, ready}; +use sqlx::PgPool; +use gelf::Logger as GelfLogger; +use actix_web::{HttpRequest, FromRequest, dev::Payload}; + +/// Global state of the game, containing everything we need to access from everywhere. +/// Each attribute is between a [`RwLock`](https://doc.rust-lang.org/std/sync/struct.RwLock.html) +pub struct AppState { + pub db_pool: PgPool, + pub logger: Option, + clients: RwLock>>, + lobbies: RwLock>>, + games: RwLock>>, + missing_messages: RwLock>>, +} + +macro_rules! res_access { + { $name:ident , $name_mut:ident : $t:ty } => { + pub fn $name(&self) -> std::sync::RwLockReadGuard<$t> { + self.$name.read().expect(stringify!("AppState::", $name, "() RwLock poisoned")) + } + pub fn $name_mut(&self) -> std::sync::RwLockWriteGuard<$t> { + self.$name.write().expect(stringify!("AppState::", $name_mut, "() RwLock poisoned")) + } + }; +} + +impl AppState { + pub fn new(db_pool: PgPool, logger: Option) -> Self { + Self { + db_pool, + logger, + games: RwLock::new(HashMap::new()), + lobbies: RwLock::new(HashMap::new()), + clients: RwLock::new(HashMap::new()), + missing_messages: RwLock::new(HashMap::new()), + } + } + + pub fn ws_broadcast(&self, message: &ws::protocol::Message) { + self.clients().iter().for_each(|(_, c)| c.do_send(message.clone())); + } + + pub async fn clear_lobby(&self, lobby: lobby::Lobby, pid: player::PlayerID) -> lib::Result<()> { + let mut tx = self.db_pool.begin().await?; + lobby.remove(&mut tx).await?; + tx.commit().await?; + self.ws_broadcast(&ws::protocol::Message::new( + ws::protocol::Action::LobbyRemoved, + lobby, + Some(pid), + )); + Ok(()) + } + + pub async fn clear_game(&self, game: &g::Game) -> lib::Result<()> { + let game_server = { + let mut games = self.games_mut(); + games.remove(&game.id).unwrap() + }; + game_server.do_send(GameEndMessage{}); + let mut tx = self.db_pool.begin().await?; + game.remove(&mut tx).await?; + tx.commit().await?; + Ok(()) + } + + pub fn add_client(&self, pid: &player::PlayerID, client: actix::Addr) { + self.clients_mut().insert(pid.clone(), client); + } + + #[allow(clippy::or_fun_call)] + pub fn retrieve_client(&self, pid: &player::PlayerID) -> Result> { + let mut clients = self.clients_mut(); + clients.remove_entry(&pid) + .ok_or(lib::error::InternalError::PlayerUnknown.into()) + .map(|t| t.1) + } + + pub fn remove_client(&self, pid: &player::PlayerID) { + self.clients_mut().remove(pid); + } + + pub fn ws_send(&self, pid: &player::PlayerID, message: &protocol::Message) { + let msg = message.clone(); + if let Some(client) = self.clients().get(pid) { + client.do_send(msg); + } else { + self.missing_messages_mut().entry(*pid) + .or_default() + .push(msg) + } + } + + res_access!{ games, games_mut : HashMap> } + res_access!{ lobbies, lobbies_mut : HashMap> } + res_access!{ clients, clients_mut : HashMap> } + res_access!{ missing_messages, missing_messages_mut : HashMap> } +} + +static STATE : SyncOnceCell = SyncOnceCell::new(); + +pub fn init(state: AppState) { + STATE.set(state) +} + +pub fn state() -> & 'static AppState { + STATE.get().expect("Global state was not initialized") +} + +impl FromRequest for & 'static AppState { + type Error = (); + type Future = Ready>; + type Config = (); + + fn from_request(_req: &HttpRequest, _payload: &mut Payload) -> Self::Future { + ready(Ok(state())) + } +} diff --git a/src/game/lobby.rs b/src/game/lobby.rs index 18f0644..a53f451 100644 --- a/src/game/lobby.rs +++ b/src/game/lobby.rs @@ -1,5 +1,8 @@ use actix_web::{delete, get, patch, post, web, HttpResponse}; -use actix::prelude::*; +use actix::{ + fut, + prelude::*, +}; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::{ @@ -14,9 +17,9 @@ use crate::{ }, game::player::{PlayerID, Player}, ws::{ client::ClientSession, protocol}, - AppState, + game::global::AppState, }; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::collections::{HashMap}; use sqlx::{PgPool, postgres::{PgRow, PgQueryAs}, FromRow, Executor, Error, Postgres}; use sqlx_core::row::Row; @@ -31,7 +34,7 @@ impl From for Uuid { pub struct LobbyServer { pub id: LobbyID, - pub clients: RwLock>>, + pub clients: HashMap>, } #[derive(Serialize, Deserialize, Clone)] @@ -64,30 +67,24 @@ impl<'a> FromRow<'a, PgRow<'a>> for Lobby { impl LobbyServer { pub fn ws_broadcast(&self, message: &protocol::Message) { - let clients = self.clients.read().expect("Poisoned lock on lobby clients"); - for c in clients.values() { + for c in self.clients.values() { c.do_send(message.clone()); } } pub fn is_empty(&self) -> bool { - let clients = self.clients.read().expect("Poisoned lock on lobby clients"); + let clients = &self.clients; clients.len() == 0 } pub fn add_player(&mut self, pid: PlayerID, client: actix::Addr) { - let mut clients = self.clients.write().expect("Poisoned lock on lobby clients"); - - clients.insert(pid, client); + self.clients.insert(pid, client); } // Remove the player from the lobby's list and notify all remaining players pub fn remove_player(&mut self, pid: PlayerID) -> actix::Addr { - let client = { - let mut clients = self.clients.write().expect("Poisoned lock on lobby clients"); - clients.remove(&pid).unwrap() - }; + let client = self.clients.remove(&pid).unwrap(); self.ws_broadcast(&protocol::Message::new( protocol::Action::PlayerLeft, pid.clone(), @@ -155,11 +152,11 @@ impl Actor for LobbyServer { pub struct LobbyAddClientMessage(pub PlayerID, pub actix::Addr); #[derive(actix::Message, Serialize, Clone)] -#[rtype(result="Arc<(actix::Addr, bool)>")] +#[rtype(result="Option<(actix::Addr, bool)>")] pub struct LobbyRemoveClientMessage(pub PlayerID); #[derive(actix::Message, Clone)] -#[rtype(result="Arc>>")] +#[rtype(result="Option>>")] pub struct LobbyGetClientsMessage(); impl Handler for LobbyServer { @@ -171,26 +168,24 @@ impl Handler for LobbyServer { } impl Handler for LobbyServer { - type Result = Arc<(actix::Addr, bool)>; + type Result = Option<(actix::Addr, bool)>; fn handle(&mut self, LobbyRemoveClientMessage(pid): LobbyRemoveClientMessage, ctx: &mut Self::Context) -> Self::Result { let client = self.remove_player(pid); if self.is_empty() { ctx.stop(); ctx.terminate(); - return Arc::new((client, true)); + return Some((client, true)); } - Arc::new((client, false)) + Some((client, false)) } } impl Handler for LobbyServer { - type Result = Arc>>; + type Result = Option>>; fn handle(&mut self, _msg: LobbyGetClientsMessage, _ctx: &mut Self::Context) -> Self::Result { - let clients = self.clients.read().expect("Poisoned lock on lobby players"); - - Arc::new(clients.clone()) + Some(self.clients.clone()) } } @@ -203,7 +198,7 @@ impl Handler for LobbyServer { } #[get("/")] -pub async fn get_lobbies(state: web::Data) -> Result { +pub async fn get_lobbies(state: &AppState) -> Result { #[derive(Serialize)] struct LobbyData{ id: LobbyID, @@ -237,7 +232,7 @@ pub async fn get_lobbies(state: web::Data) -> Result { #[allow(clippy::eval_order_dependence)] #[get("/{id}")] -pub async fn get_lobby(state: web::Data, info: web::Path<(LobbyID,)>) -> Result { +pub async fn get_lobby(state: &AppState, info: web::Path<(LobbyID,)>) -> Result { let lobby = Lobby::find(info.0, &state.db_pool).await?; #[derive(Serialize)] @@ -259,7 +254,7 @@ pub async fn get_lobby(state: web::Data, info: web::Path<(LobbyID,)>) } #[post("/")] -pub async fn create_lobby(state: web::Data, claims: Claims) -> Result { +pub async fn create_lobby(state: &AppState, claims: Claims) -> Result { // Get the requesting player identity let mut player = Player::find(claims.pid, &state.db_pool).await?; let mut lobby_servers = state.lobbies_mut(); @@ -278,7 +273,7 @@ pub async fn create_lobby(state: web::Data, claims: Claims) -> Result< }; let lobby_server = LobbyServer{ id: new_lobby.id.clone(), - clients: RwLock::new(HashMap::new()), + clients: HashMap::new(), }.start(); let client = state.retrieve_client(&claims.pid)?; lobby_server.do_send(LobbyAddClientMessage(player.id.clone(), client)); @@ -302,7 +297,7 @@ pub async fn create_lobby(state: web::Data, claims: Claims) -> Result< #[patch("/{id}/")] pub async fn update_lobby_options( - state: web::Data, + state: &AppState, info: web::Path<(LobbyID,)>, data: web::Json, claims: Claims @@ -331,7 +326,7 @@ pub async fn update_lobby_options( } #[post("/{id}/launch/")] -pub async fn launch_game(state: web::Data, claims:Claims, info: web::Path<(LobbyID,)>) +pub async fn launch_game(state: &AppState, claims:Claims, info: web::Path<(LobbyID,)>) -> Result { let mut games = state.games_mut(); @@ -341,11 +336,11 @@ pub async fn launch_game(state: web::Data, claims:Claims, info: web::P if lobby.owner != claims.pid.clone() { return Err(InternalError::AccessDenied.into()); } - let clients = Arc::try_unwrap({ + let clients = { let lobbies = state.lobbies(); let lobby_server = lobbies.get(&lobby.id).ok_or(InternalError::LobbyUnknown)?; lobby_server.send(LobbyGetClientsMessage{}) - }.await?).ok().unwrap(); + }.await?.unwrap(); let (game_id, game) = create_game(&lobby, state.clone(), clients).await?; games.insert(game_id, game); @@ -363,7 +358,7 @@ pub async fn launch_game(state: web::Data, claims:Claims, info: web::P } #[delete("/{id}/players/")] -pub async fn leave_lobby(state:web::Data, claims:Claims, info:web::Path<(LobbyID,)>) +pub async fn leave_lobby(state: &AppState, claims:Claims, info:web::Path<(LobbyID,)>) -> Result { let mut lobby = Lobby::find(info.0, &state.db_pool).await?; @@ -376,7 +371,7 @@ pub async fn leave_lobby(state:web::Data, claims:Claims, info:web::Pat let lobbies = state.lobbies(); let lobby_server = lobbies.get(&lobby.id).expect("Lobby exists in DB but not in HashMap"); - let (client, is_empty) = Arc::try_unwrap(lobby_server.send(LobbyRemoveClientMessage(player.id.clone())).await?).ok().unwrap(); + let (client, is_empty) = lobby_server.send(LobbyRemoveClientMessage(player.id.clone())).await?.unwrap(); state.add_client(&player.id, client.clone()); if is_empty { state.clear_lobby(lobby, player.id).await?; @@ -392,7 +387,7 @@ pub async fn leave_lobby(state:web::Data, claims:Claims, info:web::Pat } #[post("/{id}/players/")] -pub async fn join_lobby(info: web::Path<(LobbyID,)>, state: web::Data, claims: Claims) +pub async fn join_lobby(info: web::Path<(LobbyID,)>, state: &AppState, claims: Claims) -> Result { let lobby = Lobby::find(info.0, &state.db_pool).await?; diff --git a/src/game/mod.rs b/src/game/mod.rs index 3371c09..7123e01 100644 --- a/src/game/mod.rs +++ b/src/game/mod.rs @@ -5,3 +5,4 @@ pub mod game; pub mod fleet; pub mod ship; pub mod system; +pub mod global; diff --git a/src/game/player.rs b/src/game/player.rs index b4408a0..0905db4 100644 --- a/src/game/player.rs +++ b/src/game/player.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use sqlx::{PgPool, postgres::{PgRow, PgQueryAs}, Executor, FromRow, Error, Postgres}; use sqlx_core::row::Row; use crate::{ - AppState, + game::global::AppState, game::game::{ game::{GameID, GAME_START_WALLET}, server::GameNotifyPlayerMessage, @@ -196,7 +196,7 @@ pub async fn init_player_wallets(players: &mut Vec, db_pool: &PgPool) -> } #[post("/login")] -pub async fn login(state:web::Data) +pub async fn login(state: &AppState) -> Result { let player = Player { @@ -217,7 +217,7 @@ pub async fn login(state:web::Data) } #[get("/count/")] -pub async fn get_nb_players(state:web::Data) +pub async fn get_nb_players(state: &AppState) -> Option { #[derive(Serialize)] @@ -230,14 +230,14 @@ pub async fn get_nb_players(state:web::Data) } #[get("/me/")] -pub async fn get_current_player(state:web::Data, claims: auth::Claims) +pub async fn get_current_player(state: &AppState, claims: auth::Claims) -> Result { Ok(HttpResponse::Ok().json(Player::find(claims.pid, &state.db_pool).await?)) } #[patch("/me/")] -pub async fn update_current_player(state: web::Data, json_data: web::Json, claims: auth::Claims) +pub async fn update_current_player(state: &AppState, json_data: web::Json, claims: auth::Claims) -> Result { let mut player = Player::find(claims.pid, &state.db_pool).await?; @@ -280,14 +280,14 @@ pub async fn update_current_player(state: web::Data, json_data: web::J } #[get("/players/")] -pub async fn get_faction_members(state: web::Data, info: web::Path<(GameID, FactionID)>) +pub async fn get_faction_members(state: &AppState, info: web::Path<(GameID, FactionID)>) -> Result { Ok(HttpResponse::Ok().json(Player::find_by_game_and_faction(info.0, info.1, &state.db_pool).await?)) } #[patch("/players/{player_id}/money/")] -pub async fn transfer_money(state: web::Data, info: web::Path<(GameID, FactionID, PlayerID)>, data: web::Json, claims: auth::Claims) +pub async fn transfer_money(state: &AppState, info: web::Path<(GameID, FactionID, PlayerID)>, data: web::Json, claims: auth::Claims) -> Result { let mut current_player = Player::find(claims.pid, &state.db_pool).await?; diff --git a/src/game/ship/queue.rs b/src/game/ship/queue.rs index c5d5015..58b45f8 100644 --- a/src/game/ship/queue.rs +++ b/src/game/ship/queue.rs @@ -1,4 +1,5 @@ use actix_web::{get, post, web, HttpResponse}; +use actix::AsyncContext; use sqlx::{PgPool, Executor, postgres::{PgRow, PgQueryAs}, FromRow, Error, Postgres}; use sqlx_core::row::Row; use serde::{Serialize, Deserialize}; @@ -33,10 +34,9 @@ use crate::{ }, }, ws::protocol, - AppState, + game::global::{AppState, state}, }; use futures::join; -use futures::executor::block_on; #[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Copy)] pub struct ShipQueueID(pub Uuid); @@ -136,9 +136,10 @@ impl ShipQueue { .execute(&mut *exec).await.map_err(ServerError::from) } - pub async fn produce(&self, server: &GameServer) -> Result<()> { - let player = Player::find_system_owner(self.system.clone(), &server.state.db_pool).await?; - let mut tx = server.state.db_pool.begin().await?; + pub async fn produce(&self, gid: GameID) -> Result<()> { + let state = state(); + let player = Player::find_system_owner(self.system.clone(), &state.db_pool).await?; + let mut tx = state.db_pool.begin().await?; if let Some(assigned_fleet) = self.assigned_fleet.clone() { let fleet_data: Vec<&str> = assigned_fleet.split(':').collect(); @@ -149,21 +150,21 @@ impl ShipQueue { formation, self.category, self.quantity, - &server.state.db_pool + &state.db_pool ).await?; } else { Squadron::assign_existing( self.system, self.category, self.quantity as i32, - &server.state.db_pool + &state.db_pool ).await?; } self.remove(&mut tx).await?; tx.commit().await?; - server.player_broadcast(&player.id, &protocol::Message::new( + GameServer::player_broadcast(&player.id, protocol::Message::new( protocol::Action::ShipQueueFinished, self.clone(), None, @@ -222,7 +223,7 @@ impl ShipQueue { #[post("/")] pub async fn add_ship_queue( - state: web::Data, + state: &AppState, info: web::Path<(GameID, SystemID)>, json_data: web::Json, claims: Claims @@ -251,13 +252,19 @@ pub async fn add_ship_queue( ).await?.unwrap(); let sq = ship_queue.clone(); - state.games().get(&info.0).unwrap().do_send(task!(sq -> move |gs: &GameServer| block_on(sq.produce(gs)))); + state.games().get(&info.0).unwrap().do_send(task!(sq -> move |gs, ctx| { + let gid = gs.id; + ctx.wait(actix::fut::wrap_future(async move { + sq.produce(gid).await; + })); + Ok(()) + })); Ok(HttpResponse::Created().json(ship_queue)) } #[get("/")] -pub async fn get_ship_queues(state: web::Data, info: web::Path<(GameID, SystemID)>, claims: Claims) +pub async fn get_ship_queues(state: &AppState, info: web::Path<(GameID, SystemID)>, claims: Claims) -> Result { let (s, p) = futures::join!( diff --git a/src/game/ship/squadron.rs b/src/game/ship/squadron.rs index 9cec98f..1213521 100644 --- a/src/game/ship/squadron.rs +++ b/src/game/ship/squadron.rs @@ -15,7 +15,7 @@ use crate::{ system::system::{SystemID, System}, ship::model::ShipModelCategory, }, - AppState, + game::global::AppState, }; #[derive(Serialize, Clone)] @@ -128,7 +128,7 @@ impl Squadron { } #[get("/")] -pub async fn get_system_squadrons(state: web::Data, info: web::Path<(GameID, SystemID)>, claims: Claims) +pub async fn get_system_squadrons(state: &AppState, info: web::Path<(GameID, SystemID)>, claims: Claims) -> Result { let (s, p) = futures::join!( diff --git a/src/game/system/building.rs b/src/game/system/building.rs index 1567b35..7201258 100644 --- a/src/game/system/building.rs +++ b/src/game/system/building.rs @@ -1,13 +1,13 @@ use actix_web::{get, post, web, HttpResponse}; +use actix::AsyncContext; use serde::{Serialize, Deserialize}; use uuid::Uuid; use chrono::{DateTime, Duration, Utc}; use sqlx::{PgPool, postgres::{PgRow, PgQueryAs}, FromRow, Executor, Error, Postgres}; use sqlx_core::row::Row; -use futures::executor::block_on; use crate::{ task, - AppState, + game::global::{AppState, state}, lib::{ Result, auth::Claims, @@ -194,16 +194,17 @@ impl Building { .execute(&mut *exec).await.map_err(ServerError::from) } - async fn construct(&mut self, server: &GameServer) -> Result<()> { - let player = Player::find_system_owner(self.system.clone(), &server.state.db_pool).await?; + async fn construct(&mut self, gid: GameID) -> Result<()> { + let state = state(); + let player = Player::find_system_owner(self.system.clone(), &state.db_pool).await?; self.status = BuildingStatus::Operational; - let mut tx = server.state.db_pool.begin().await?; + let mut tx = state.db_pool.begin().await?; self.update(&mut tx).await?; tx.commit().await?; - server.faction_broadcast(player.faction.unwrap(), protocol::Message::new( + GameServer::faction_broadcast(gid, player.faction.unwrap(), protocol::Message::new( protocol::Action::BuildingConstructed, self.clone(), None, @@ -214,7 +215,7 @@ impl Building { } #[get("/")] -pub async fn get_system_buildings(state: web::Data, info: web::Path<(GameID, SystemID)>) +pub async fn get_system_buildings(state: &AppState, info: web::Path<(GameID, SystemID)>) -> Result { Ok(HttpResponse::Ok().json(Building::find_by_system(info.1, &state.db_pool).await?)) @@ -222,7 +223,7 @@ pub async fn get_system_buildings(state: web::Data, info: web::Path<(G #[post("/")] pub async fn create_building( - state: web::Data, + state: &AppState, info: web::Path<(GameID,SystemID)>, data: web::Json, claims: Claims @@ -253,7 +254,13 @@ pub async fn create_building( tx.commit().await?; let mut b = building.clone(); - state.games().get(&info.0).unwrap().do_send(task!(building -> move |gs: &GameServer| block_on(b.construct(gs)))); + state.games().get(&info.0).unwrap().do_send(task!(building -> move |gs, ctx| { + let gid = gs.id; + ctx.wait(actix::fut::wrap_future(async move { + b.construct(gid).await; + })); + Ok(()) + })); Ok(HttpResponse::Created().json(building)) } diff --git a/src/game/system/system.rs b/src/game/system/system.rs index e5d789e..16a11d6 100644 --- a/src/game/system/system.rs +++ b/src/game/system/system.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use serde::{Serialize, Deserialize}; use std::collections::HashMap; use crate::{ - AppState, + game::global::AppState, lib::{ Result, pagination::{Paginator, new_paginated_response}, @@ -403,7 +403,7 @@ async fn find_place<'a>( #[allow(clippy::eval_order_dependence)] // false positive ? #[get("/")] -pub async fn get_systems(state: web::Data, info: web::Path<(GameID,)>, pagination: web::Query) +pub async fn get_systems(state: &AppState, info: web::Path<(GameID,)>, pagination: web::Query) -> Result { Ok(new_paginated_response( diff --git a/src/lib/mod.rs b/src/lib/mod.rs index aabfb63..d6101be 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -3,6 +3,7 @@ pub mod error; pub mod log; pub mod pagination; pub mod time; +pub mod sync; /// Helper type used as a return type for HTTP handler. /// This type helps agregating multiple error types from this crate as well as different external diff --git a/src/lib/sync.rs b/src/lib/sync.rs new file mode 100644 index 0000000..a2bba87 --- /dev/null +++ b/src/lib/sync.rs @@ -0,0 +1,42 @@ +use std::{ + cell::UnsafeCell, + mem::MaybeUninit, + sync::atomic::{AtomicUsize, Ordering}, + marker::{Send, Sync}, +}; + +const UNINIT : usize = 0; +const INITING : usize = 1; +const INIT : usize = 3; + +pub struct SyncOnceCell { + state: AtomicUsize, + inner: UnsafeCell>, +} + +impl SyncOnceCell { + pub const fn new() -> Self { + Self { state: AtomicUsize::new(UNINIT), inner: UnsafeCell::new(MaybeUninit::uninit()) } + } + + pub fn set(&self, value: T) { + match self.state.fetch_or(INITING, Ordering::SeqCst) { + UNINIT => { + unsafe { self.inner.get().write_volatile(MaybeUninit::new(value)) }; + self.state.store(INIT, Ordering::Release); + }, + _ => { }, + } + } + + pub fn get(&self) -> Option<&T> { + if self.state.load(Ordering::Acquire) == INIT { + Some(unsafe { &*(self.inner.get() as *const T)}) + } else { + None + } + } +} + +unsafe impl Send for SyncOnceCell {} +unsafe impl Sync for SyncOnceCell {} diff --git a/src/main.rs b/src/main.rs index 67d6465..dad6cfe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,9 +23,11 @@ use actix_web::{web, App, HttpServer}; use actix_web::middleware::Logger; -use std::collections::HashMap; -use std::sync::RwLock; -use std::env; +use std::{ + sync::RwLock, + collections::HashMap, + env, +}; #[cfg(feature="ssl-secure")] use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use sqlx::PgPool; @@ -53,95 +55,12 @@ use game::{ system::system, ship::model, ship::queue, - ship::squadron + ship::squadron, + global::AppState, }; use lib::Result; use ws::protocol; -/// Global state of the game, containing everything we need to access from everywhere. -/// Each attribute is between a [`RwLock`](https://doc.rust-lang.org/std/sync/struct.RwLock.html) -pub struct AppState { - db_pool: PgPool, - logger: Option, - clients: RwLock>>, - lobbies: RwLock>>, - games: RwLock>>, - missing_messages: RwLock>>, -} - -macro_rules! res_access { - { $name:ident , $name_mut:ident : $t:ty } => { - pub fn $name(&self) -> std::sync::RwLockReadGuard<$t> { - self.$name.read().expect(stringify!("AppState::", $name, "() RwLock poisoned")) - } - pub fn $name_mut(&self) -> std::sync::RwLockWriteGuard<$t> { - self.$name.write().expect(stringify!("AppState::", $name_mut, "() RwLock poisoned")) - } - }; -} - -impl AppState { - pub fn ws_broadcast(&self, message: &ws::protocol::Message) { - self.clients().iter().for_each(|(_, c)| c.do_send(message.clone())); - } - - pub async fn clear_lobby(&self, lobby: lobby::Lobby, pid: player::PlayerID) -> lib::Result<()> { - let mut tx = self.db_pool.begin().await?; - lobby.remove(&mut tx).await?; - tx.commit().await?; - self.ws_broadcast(&ws::protocol::Message::new( - ws::protocol::Action::LobbyRemoved, - lobby, - Some(pid), - )); - Ok(()) - } - - pub async fn clear_game(&self, game: &g::Game) -> lib::Result<()> { - let game_server = { - let mut games = self.games_mut(); - games.remove(&game.id).unwrap() - }; - game_server.do_send(GameEndMessage{}); - let mut tx = self.db_pool.begin().await?; - game.remove(&mut tx).await?; - tx.commit().await?; - Ok(()) - } - - pub fn add_client(&self, pid: &player::PlayerID, client: actix::Addr) { - self.clients_mut().insert(pid.clone(), client); - } - - #[allow(clippy::or_fun_call)] - pub fn retrieve_client(&self, pid: &player::PlayerID) -> Result> { - let mut clients = self.clients_mut(); - clients.remove_entry(&pid) - .ok_or(lib::error::InternalError::PlayerUnknown.into()) - .map(|t| t.1) - } - - pub fn remove_client(&self, pid: &player::PlayerID) { - self.clients_mut().remove(pid); - } - - res_access!{ games, games_mut : HashMap> } - res_access!{ lobbies, lobbies_mut : HashMap> } - res_access!{ clients, clients_mut : HashMap> } - res_access!{ missing_messages, missing_messages_mut : HashMap> } -} - -async fn generate_state() -> AppState { - AppState { - db_pool: create_pool().await.unwrap(), - logger: create_logger(), - games: RwLock::new(HashMap::new()), - lobbies: RwLock::new(HashMap::new()), - clients: RwLock::new(HashMap::new()), - missing_messages: RwLock::new(HashMap::new()), - } -} - // this function could be located in different module fn config(cfg: &mut web::ServiceConfig) { cfg.service( @@ -260,16 +179,22 @@ fn create_logger() -> Option { None } +async fn generate_state() -> AppState { + AppState::new(create_pool().await.unwrap(), create_logger()) +} + #[actix_rt::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); - let state = web::Data::new(generate_state().await); + + let state = generate_state().await; + game::global::init(state); let mut server = HttpServer::new(move || App::new() .wrap(Logger::default()) - .app_data(state.clone()).configure(config)); + .configure(config)); #[cfg(feature="ssl-secure")] { @@ -287,4 +212,4 @@ async fn main() -> std::io::Result<()> { server = server.bind(get_env("LISTENING_URL", "127.0.0.1:80"))?; } server.run().await -} \ No newline at end of file +} diff --git a/src/ws/client.rs b/src/ws/client.rs index ba32bb0..c703558 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -1,8 +1,7 @@ use std::time::{Duration, Instant}; -use actix::*; +use actix::{*, fut::wrap_future}; use actix_web::{web, HttpRequest, HttpResponse}; use actix_web_actors::ws; -use futures::executor::block_on; use crate::{ lib::{ Result, @@ -18,7 +17,7 @@ use crate::{ player::{Player, PlayerID}, }, ws::protocol, - AppState, + game::global::{AppState, state}, }; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); @@ -29,14 +28,13 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); pub async fn entrypoint( req: HttpRequest, stream: web::Payload, - state: web::Data, + state: &AppState, claims: Claims, ) -> Result { let player = Player::find(claims.pid, &state.db_pool).await?; // Creates the websocket client for the current player let (client, resp) = ws::start_with_addr(ClientSession{ hb: Instant::now(), - state: state.clone(), pid: player.id.clone() }, &req, stream)?; @@ -86,58 +84,61 @@ pub async fn entrypoint( /// WebSocket actor used to communicate with a player. pub struct ClientSession { hb: Instant, - state: web::Data, pid: PlayerID } impl ClientSession { - async fn logout(&self) -> Result<()> { - let player = Player::find(self.pid, &self.state.db_pool).await.unwrap(); - { - let mut clients = self.state.clients_mut(); - clients.remove(&self.pid); - }; - - log( - gelf::Level::Warning, - "Player disconnected", - &format!("{} has lost its websocket connection", player.username), - vec![], - &self.state.logger - ); - - if let Some(lobby_id) = player.lobby { - let mut lobby = Lobby::find(lobby_id, &self.state.db_pool).await.unwrap(); - let lobbies = self.state.lobbies(); - let lobby_server = lobbies.get(&lobby.id).expect("Lobby server not found"); - let (_, is_empty) = std::sync::Arc::try_unwrap(lobby_server.send(LobbyRemoveClientMessage(player.id.clone())).await?).ok().unwrap(); - if is_empty { - self.state.clear_lobby(lobby, player.id).await?; - } else if player.id == lobby.owner { - lobby.update_owner(&self.state.db_pool).await?; - lobby_server.do_send(protocol::Message::new( - protocol::Action::LobbyOwnerUpdated, - lobby.owner.clone(), - None, - )); - } - } else if let Some(game_id) = player.game { - let mut games = self.state.games_mut(); - let game = games.get_mut(&game_id).expect("Game not found"); - - let (_, is_empty) = std::sync::Arc::try_unwrap(game.send(GameRemovePlayerMessage(player.id.clone())).await?).ok().unwrap(); - if is_empty { - drop(games); - let game = Game::find(game_id, &self.state.db_pool).await?; - self.state.clear_game(&game).await?; + fn logout(&self) -> impl ActorFuture> { + let pid = self.pid; + wrap_future(async move { + let state = state(); + let player = Player::find(pid, &state.db_pool).await.unwrap(); + { + let mut clients = state.clients_mut(); + clients.remove(&pid); + }; + + log( + gelf::Level::Warning, + "Player disconnected", + &format!("{} has lost its websocket connection", player.username), + vec![], + &state.logger + ); + + if let Some(lobby_id) = player.lobby { + let mut lobby = Lobby::find(lobby_id, &state.db_pool).await.unwrap(); + let lobbies = state.lobbies(); + let lobby_server = lobbies.get(&lobby.id).expect("Lobby server not found"); + let (_, is_empty) = lobby_server.send(LobbyRemoveClientMessage(player.id.clone())).await?.unwrap(); + if is_empty { + state.clear_lobby(lobby, player.id).await?; + } else if player.id == lobby.owner { + lobby.update_owner(&state.db_pool).await?; + lobby_server.do_send(protocol::Message::new( + protocol::Action::LobbyOwnerUpdated, + lobby.owner.clone(), + None, + )); + } + } else if let Some(game_id) = player.game { + let mut games = state.games_mut(); + let game = games.get_mut(&game_id).expect("Game not found"); + + let (_, is_empty) = game.send(GameRemovePlayerMessage(player.id)).await?.unwrap(); + if is_empty { + drop(games); + let game = Game::find(game_id, &state.db_pool).await?; + state.clear_game(&game).await?; + } } - } - self.state.ws_broadcast(&protocol::Message::new( - protocol::Action::PlayerDisconnected, - player.clone(), - Some(self.pid), - )); - Ok(()) + state.ws_broadcast(&protocol::Message::new( + protocol::Action::PlayerDisconnected, + player.clone(), + Some(pid), + )); + Ok(()) + }) } } @@ -151,11 +152,12 @@ impl Actor for ClientSession { self.hb(ctx); } - fn stopping(&mut self, _: &mut Self::Context) -> Running { - let res = block_on(self.logout()); - if res.is_err() { - println!("Logout error : {:?}", res); - } + fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + ctx.wait(self.logout().map(|res,_,_| { + if res.is_err() { + println!("Logout error : {:?}", res); + } + })); Running::Stop } }