diff --git a/migrations/010_drop_tasks_table.sql b/migrations/010_drop_tasks_table.sql new file mode 100644 index 0000000..859252f --- /dev/null +++ b/migrations/010_drop_tasks_table.sql @@ -0,0 +1,2 @@ +-- Drop tasks table and its associated indexes and triggers +DROP TABLE IF EXISTS tasks CASCADE; diff --git a/src/config.rs b/src/config.rs index 244a4fa..7ff2ad5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,8 +7,6 @@ pub struct Config { pub server: ServerConfig, pub blockchain: BlockchainConfig, pub candidates: CandidatesConfig, - pub task_generation: TaskGenerationConfig, - pub reverser: ReverserConfig, pub data: DataConfig, pub logging: LoggingConfig, pub jwt: JwtConfig, @@ -42,18 +40,6 @@ pub struct CandidatesConfig { pub refresh_interval_minutes: u64, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TaskGenerationConfig { - pub generation_interval_minutes: u64, - pub taskees_per_round: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ReverserConfig { - pub early_reversal_minutes: u64, - pub check_interval_seconds: u64, -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DataConfig { pub database_url: String, @@ -140,26 +126,6 @@ impl Config { &self.server.base_api_url } - pub fn get_candidates_refresh_duration(&self) -> tokio::time::Duration { - tokio::time::Duration::from_secs(self.candidates.refresh_interval_minutes * 60) - } - - pub fn get_task_generation_duration(&self) -> tokio::time::Duration { - tokio::time::Duration::from_secs(self.task_generation.generation_interval_minutes * 60) - } - - pub fn get_reverser_check_duration(&self) -> tokio::time::Duration { - tokio::time::Duration::from_secs(self.reverser.check_interval_seconds) - } - - pub fn get_reversal_period_duration(&self) -> chrono::Duration { - chrono::Duration::hours(self.blockchain.reversal_period_hours as i64) - } - - pub fn get_early_reversal_duration(&self) -> chrono::Duration { - chrono::Duration::minutes(self.reverser.early_reversal_minutes as i64) - } - pub fn get_jwt_expiration(&self) -> chrono::Duration { chrono::Duration::hours(self.jwt.exp_in_hours) } @@ -200,14 +166,6 @@ impl Default for Config { graphql_url: "http://localhost:4000/graphql".to_string(), refresh_interval_minutes: 30, }, - task_generation: TaskGenerationConfig { - generation_interval_minutes: 60, - taskees_per_round: 5, - }, - reverser: ReverserConfig { - early_reversal_minutes: 2, - check_interval_seconds: 30, - }, data: DataConfig { database_url: "postgres://postgres:postgres@127.0.0.1:5432/task_master".to_string(), }, diff --git a/src/db_persistence.rs b/src/db_persistence.rs index 77dc591..cf822bb 100644 --- a/src/db_persistence.rs +++ b/src/db_persistence.rs @@ -10,9 +10,7 @@ use crate::repositories::tweet_author::TweetAuthorRepository; use crate::repositories::tweet_pull_usage::TweetPullUsageRepository; use crate::repositories::x_association::XAssociationRepository; use crate::repositories::DbResult; -use crate::repositories::{ - address::AddressRepository, opt_in::OptInRepository, referral::ReferralRepository, task::TaskRepository, -}; +use crate::repositories::{address::AddressRepository, opt_in::OptInRepository, referral::ReferralRepository}; #[derive(Debug, thiserror::Error)] pub enum DbError { @@ -20,12 +18,8 @@ pub enum DbError { Database(#[from] sqlx::Error), #[error("Migration error: {0}")] Migration(#[from] sqlx::migrate::MigrateError), - #[error("Task not found: {0}")] - TaskNotFound(String), #[error("Address not found: {0}")] AddressNotFound(String), - #[error("Invalid task status: {0}")] - InvalidStatus(String), #[error("Record not found: {0}")] RecordNotFound(String), #[error("Conflict error: {0}")] @@ -34,7 +28,6 @@ pub enum DbError { #[derive(Debug, Clone)] pub struct DbPersistence { - pub tasks: TaskRepository, pub addresses: AddressRepository, pub referrals: ReferralRepository, pub opt_ins: OptInRepository, @@ -48,6 +41,7 @@ pub struct DbPersistence { pub raid_leaderboards: RaidLeaderboardRepository, pub tweet_pull_usage: TweetPullUsageRepository, + #[allow(dead_code)] pub pool: PgPool, } @@ -57,7 +51,6 @@ impl DbPersistence { sqlx::migrate!("./migrations").run(&pool).await?; - let tasks = TaskRepository::new(&pool); let addresses = AddressRepository::new(&pool); let referrals = ReferralRepository::new(&pool); let opt_ins = OptInRepository::new(&pool); @@ -73,43 +66,6 @@ impl DbPersistence { Ok(Self { pool, - tasks, - addresses, - referrals, - opt_ins, - x_associations, - eth_associations, - admin, - relevant_tweets, - tweet_authors, - raid_quests, - raid_submissions, - raid_leaderboards, - tweet_pull_usage, - }) - } - - #[cfg(test)] - pub async fn new_unmigrated(database_url: &str) -> DbResult { - let pool = PgPoolOptions::new().max_connections(5).connect(database_url).await?; - - let tasks = TaskRepository::new(&pool); - let addresses = AddressRepository::new(&pool); - let referrals = ReferralRepository::new(&pool); - let opt_ins = OptInRepository::new(&pool); - let x_associations = XAssociationRepository::new(&pool); - let eth_associations = EthAssociationRepository::new(&pool); - let admin = AdminRepository::new(&pool); - let relevant_tweets = RelevantTweetRepository::new(&pool); - let tweet_authors = TweetAuthorRepository::new(&pool); - let raid_quests = RaidQuestRepository::new(&pool); - let raid_submissions = RaidSubmissionRepository::new(&pool); - let raid_leaderboards = RaidLeaderboardRepository::new(&pool); - let tweet_pull_usage = TweetPullUsageRepository::new(pool.clone()); - - Ok(Self { - pool, - tasks, addresses, referrals, opt_ins, diff --git a/src/errors.rs b/src/errors.rs index 3cd734f..4c7b5f5 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -9,15 +9,9 @@ use tracing::error; use crate::{ db_persistence::DbError, - handlers::{ - address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError, task::TaskHandlerError, - HandlerError, - }, + handlers::{address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError, HandlerError}, models::ModelError, - services::{ - graphql_client::GraphqlError, reverser::ReverserError, task_generator::TaskGeneratorError, - transaction_manager::TransactionError, - }, + services::graphql_client::GraphqlError, }; #[derive(Debug, thiserror::Error)] @@ -30,12 +24,6 @@ pub enum AppError { Model(#[from] ModelError), #[error("Database error: {0}")] Database(#[from] DbError), - #[error("Transaction manager error: {0}")] - Transaction(#[from] TransactionError), - #[error("Task generator error: {0}")] - TaskGenerator(#[from] TaskGeneratorError), - #[error("Reverser error: {0}")] - Reverser(#[from] ReverserError), #[error("Server error: {0}")] Server(String), #[error("Join error: {0}")] @@ -74,10 +62,7 @@ impl IntoResponse for AppError { AppError::Database(err) => map_db_error(err), // --- Everything else --- - e @ (AppError::Transaction(_) - | AppError::TaskGenerator(_) - | AppError::Reverser(_) - | AppError::Join(_) + e @ (AppError::Join(_) | AppError::Graphql(_) | AppError::Config(_) | AppError::Http(_) @@ -150,21 +135,13 @@ fn map_handler_error(err: HandlerError) -> (StatusCode, String) { ReferralHandlerError::InvalidReferral(err) => (StatusCode::BAD_REQUEST, err), ReferralHandlerError::DuplicateReferral(err) => (StatusCode::CONFLICT, err), }, - - HandlerError::Task(err) => match err { - TaskHandlerError::TaskNotFound(err) => (StatusCode::NOT_FOUND, err.message.clone()), - TaskHandlerError::InvalidTaskUrl(err) => (StatusCode::BAD_REQUEST, err.message.clone()), - TaskHandlerError::StatusConflict(err) => (StatusCode::CONFLICT, err.message.clone()), - }, } } fn map_db_error(err: DbError) -> (StatusCode, String) { match err { DbError::UniqueViolation(err) => (StatusCode::CONFLICT, err), - DbError::RecordNotFound(err) | DbError::AddressNotFound(err) | DbError::TaskNotFound(err) => { - (StatusCode::NOT_FOUND, err) - } + DbError::RecordNotFound(err) | DbError::AddressNotFound(err) => (StatusCode::NOT_FOUND, err), DbError::Database(err) => { error!("Database error: {}", err); @@ -183,7 +160,7 @@ fn map_db_error(err: DbError) -> (StatusCode, String) { } } - DbError::InvalidStatus(_) | DbError::Migration(_) => ( + DbError::Migration(_) => ( StatusCode::INTERNAL_SERVER_ERROR, "An internal server error occurred".to_string(), ), diff --git a/src/handlers/address.rs b/src/handlers/address.rs index b3e58d2..0fced40 100644 --- a/src/handlers/address.rs +++ b/src/handlers/address.rs @@ -15,7 +15,7 @@ use crate::{ address::{ Address, AddressFilter, AddressSortColumn, AddressStatsResponse, AddressWithOptInAndAssociations, AddressWithRank, AggregateStatsQueryParams, AssociatedAccountsResponse, OptedInPositionResponse, - RewardProgramStatusPayload, SyncTransfersResponse, + RewardProgramStatusPayload, }, admin::Admin, eth_association::{ @@ -355,37 +355,6 @@ pub async fn retrieve_associated_accounts( })) } -pub async fn sync_transfers(State(state): State) -> Result, AppError> { - tracing::info!("Received request to sync transfers from GraphQL endpoint"); - - match state.graphql_client.sync_transfers_and_addresses().await { - Ok((transfer_count, address_count)) => { - tracing::info!( - "Transfer sync completed successfully: {} transfers, {} addresses", - transfer_count, - address_count - ); - - let response = SyncTransfersResponse { - success: true, - message: format!( - "Successfully processed {} transfers and stored {} addresses", - transfer_count, address_count - ), - transfers_processed: Some(transfer_count), - addresses_stored: Some(address_count), - }; - - Ok(Json(response)) - } - Err(e) => { - tracing::error!("Failed to sync transfers: {}", e); - - Err(AppError::Graphql(e)) - } - } -} - pub async fn handle_get_opted_in_users( State(state): State, ) -> Result>>, AppError> { @@ -905,10 +874,9 @@ mod tests { assert_eq!(response.status(), StatusCode::NO_CONTENT); // Verification in DB - let saved_assoc = state - .db - .x_associations - .find_by_username("twitter_pro_101") + let saved_assoc = sqlx::query_as::<_, XAssociation>("SELECT * FROM x_associations WHERE username = $1") + .bind(new_association.username) + .fetch_optional(&state.db.pool) .await .unwrap(); diff --git a/src/handlers/auth.rs b/src/handlers/auth.rs index 8037c5f..b0ab229 100644 --- a/src/handlers/auth.rs +++ b/src/handlers/auth.rs @@ -344,6 +344,7 @@ mod tests { use crate::{ handlers::auth::handle_x_oauth_callback, http_server::AppState, + models::x_association::XAssociation, routes::auth::auth_routes, utils::{ test_app_state::create_test_app_state, @@ -527,10 +528,9 @@ mod tests { assert!(location.contains(&format!("payload={}", expected_username))); // Check DB Side Effects - let saved_assoc = state - .db - .x_associations - .find_by_username(expected_username) + let saved_assoc = sqlx::query_as::<_, XAssociation>("SELECT * FROM x_associations WHERE username = $1") + .bind(expected_username) + .fetch_optional(&state.db.pool) .await .unwrap(); diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index d065696..a9340d4 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -3,9 +3,7 @@ use serde::{Deserialize, Serialize}; use std::fmt::Display; use crate::{ - handlers::{ - address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError, task::TaskHandlerError, - }, + handlers::{address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError}, AppError, }; @@ -14,13 +12,10 @@ pub mod auth; pub mod raid_quest; pub mod referral; pub mod relevant_tweet; -pub mod task; pub mod tweet_author; #[derive(Debug, thiserror::Error)] pub enum HandlerError { - #[error("Task handler error")] - Task(#[from] TaskHandlerError), #[error("Referral handler error")] Referral(#[from] ReferralHandlerError), #[error("Address handler error")] diff --git a/src/handlers/task.rs b/src/handlers/task.rs deleted file mode 100644 index 8ea11bb..0000000 --- a/src/handlers/task.rs +++ /dev/null @@ -1,139 +0,0 @@ -use axum::{ - extract::{Path, State}, - Json, -}; - -use crate::{ - db_persistence::DbError, - handlers::HandlerError, - http_server::AppState, - models::task::{CompleteTaskRequest, CompleteTaskResponse, Task, TaskStatus}, - AppError, -}; - -use super::SuccessResponse; - -#[derive(Debug, thiserror::Error)] -pub enum TaskHandlerError { - #[error("Task not found")] - TaskNotFound(Json), - #[error("Invalid task URL format")] - InvalidTaskUrl(Json), - #[error("Invalid task status")] - StatusConflict(Json), -} - -pub async fn list_all_tasks(State(state): State) -> Result>>, AppError> { - let tasks = state.db.tasks.get_all_tasks().await?; - - Ok(SuccessResponse::new(tasks)) -} - -pub async fn get_task( - State(state): State, - Path(task_id): Path, -) -> Result>, AppError> { - let task = state.db.tasks.get_task(&task_id).await?; - - match task { - Some(task) => Ok(SuccessResponse::new(task)), - None => Err(AppError::Database(DbError::TaskNotFound("".to_string()))), - } -} - -pub async fn complete_task( - State(state): State, - Json(payload): Json, -) -> Result, AppError> { - tracing::info!("Received task completion request for URL: {}", payload.task_url); - - // Validate task URL format (12 digits) - if payload.task_url.len() != 12 || !payload.task_url.chars().all(|c| c.is_ascii_digit()) { - let response = CompleteTaskResponse { - success: false, - message: format!("Invalid task URL format: {}", payload.task_url), - task_id: None, - }; - return Err(AppError::Handler(HandlerError::Task(TaskHandlerError::InvalidTaskUrl( - Json(response), - )))); - } - - // Find task by URL - let task = match state.db.tasks.find_task_by_url(&payload.task_url).await { - Ok(Some(task)) => task, - Ok(None) => { - let response = CompleteTaskResponse { - success: false, - message: format!("Task not found with URL: {}", payload.task_url), - task_id: None, - }; - return Err(AppError::Handler(HandlerError::Task(TaskHandlerError::TaskNotFound( - Json(response), - )))); - } - Err(db_err) => { - return Err(AppError::Database(db_err)); - } - }; - - // Check if task is in a valid state for completion - match task.status { - TaskStatus::Pending => { - // Task can be completed - } - TaskStatus::Completed => { - let response = CompleteTaskResponse { - success: false, - message: "Task is already completed".to_string(), - task_id: Some(task.task_id.clone()), - }; - return Err(AppError::Handler(HandlerError::Task(TaskHandlerError::StatusConflict( - Json(response), - )))); - } - TaskStatus::Reversed => { - let response = CompleteTaskResponse { - success: false, - message: "Task has already been reversed".to_string(), - task_id: Some(task.task_id.clone()), - }; - return Err(AppError::Handler(HandlerError::Task(TaskHandlerError::StatusConflict( - Json(response), - )))); - } - TaskStatus::Failed => { - let response = CompleteTaskResponse { - success: false, - message: "Task has failed and cannot be completed".to_string(), - task_id: Some(task.task_id.clone()), - }; - return Err(AppError::Handler(HandlerError::Task(TaskHandlerError::StatusConflict( - Json(response), - )))); - } - } - - // Mark task as completed - match state - .db - .tasks - .update_task_status(&task.task_id, TaskStatus::Completed) - .await - { - Ok(()) => { - tracing::info!("Task {} marked as completed", task.task_id); - let response = CompleteTaskResponse { - success: true, - message: "Task completed successfully".to_string(), - task_id: Some(task.task_id.clone()), - }; - Ok(Json(response)) - } - Err(e) => { - tracing::error!("Failed to update task {}: {}", task.task_id, e); - - return Err(AppError::Database(e)); - } - } -} diff --git a/src/http_server.rs b/src/http_server.rs index af4bc6d..2a00812 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,4 +1,4 @@ -use axum::{extract::State, http::StatusCode, middleware, response::Json, routing::get, Router}; +use axum::{middleware, response::Json, routing::get, Router}; use rusx::{PkceCodeVerifier, TwitterGateway}; use serde::{Deserialize, Serialize}; use std::{ @@ -12,9 +12,7 @@ use tower_http::{cors::CorsLayer, trace::TraceLayer}; use crate::{ db_persistence::DbPersistence, metrics::{metrics_handler, track_metrics, Metrics}, - models::task::TaskStatus, routes::api_routes, - services::alert_service::AlertService, Config, GraphqlClient, }; use chrono::{DateTime, Utc}; @@ -30,7 +28,6 @@ pub struct AppState { pub oauth_sessions: Arc>>, pub twitter_oauth_tokens: Arc>>, pub twitter_gateway: Arc, - pub alert_client: Arc, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -40,16 +37,6 @@ pub struct Challenge { pub created_at: DateTime, } -#[derive(Debug, Serialize)] -pub struct StatusResponse { - pub status: String, - pub total_tasks: usize, - pub pending_tasks: usize, - pub completed_tasks: usize, - pub reversed_tasks: usize, - pub failed_tasks: usize, -} - #[derive(Debug, Serialize)] pub struct HealthResponse { pub healthy: bool, @@ -62,7 +49,6 @@ pub struct HealthResponse { pub fn create_router(state: AppState) -> Router { Router::new() .route("/health", get(health_check)) - .route("/status", get(get_status)) .route("/metrics", get(metrics_handler)) .nest("/api", api_routes(state.clone())) .layer(middleware::from_fn(track_metrics)) @@ -85,39 +71,11 @@ async fn health_check() -> Json { }) } -/// Get service status and task counts -async fn get_status(State(state): State) -> Result, StatusCode> { - let status_counts = state - .db - .tasks - .status_counts() - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let total_tasks = state - .db - .tasks - .task_count() - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let response = StatusResponse { - status: "running".to_string(), - total_tasks: total_tasks as usize, - pending_tasks: status_counts.get(&TaskStatus::Pending).copied().unwrap_or(0), - completed_tasks: status_counts.get(&TaskStatus::Completed).copied().unwrap_or(0), - reversed_tasks: status_counts.get(&TaskStatus::Reversed).copied().unwrap_or(0), - failed_tasks: status_counts.get(&TaskStatus::Failed).copied().unwrap_or(0), - }; - - Ok(Json(response)) -} - /// Start the HTTP server pub async fn start_server( db: Arc, graphql_client: Arc, twitter_gateway: Arc, - alert_client: Arc, bind_address: &str, config: Arc, ) -> Result<(), Box> { @@ -125,7 +83,6 @@ pub async fn start_server( db, metrics: Arc::new(Metrics::new()), graphql_client, - alert_client: alert_client, config, twitter_gateway, challenges: Arc::new(RwLock::new(HashMap::new())), @@ -141,16 +98,3 @@ pub async fn start_server( Ok(()) } - -#[cfg(test)] -mod tests { - use crate::utils::test_app_state::create_test_app_state; - - use super::*; - - async fn test_app() -> axum::Router { - let state = create_test_app_state().await; - - create_router(state) - } -} diff --git a/src/lib.rs b/src/lib.rs index 4ec8764..aea43e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ //! # TaskMaster Library //! -//! A task management server that creates reversible blockchain transactions -//! using the Quantus Network. This library provides the core functionality -//! for managing tasks, interacting with the blockchain, and handling -//! HTTP API requests. +//! A reward management server that monitors social media interactions and +//! integrates with the Quantus Network. This library provides the core +//! functionality for managing rewards, interacting with the blockchain, +//! and handling HTTP API requests. pub mod args; pub mod config; @@ -23,18 +23,12 @@ pub mod utils; pub use config::Config; pub use errors::{AppError, AppResult}; pub use http_server::AppState; -pub use services::graphql_client::{GraphqlClient, SyncStats, Transfer}; -pub use services::reverser::{ReversalStats, ReverserService}; -pub use services::task_generator::TaskGenerator; -pub use services::transaction_manager::TransactionManager; +pub use services::graphql_client::{GraphqlClient, Transfer}; // Re-export errors pub use db_persistence::DbError; -pub use services::ethereum_service::EthAddressAssociation; pub use services::graphql_client::GraphqlError; -pub use services::reverser::ReverserError; -pub use services::task_generator::TaskGeneratorError; -pub use services::transaction_manager::TransactionError; +pub use services::signature_service::SigServiceError; /// Library version pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/src/main.rs b/src/main.rs index 37a5fcb..77033d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,9 @@ use crate::{ args::Args, db_persistence::DbPersistence, errors::{AppError, AppResult}, - models::task::{Task, TaskInput}, services::{ alert_service::AlertService, graphql_client::GraphqlClient, raid_leaderboard_service::RaidLeaderboardService, - reverser::start_reverser_service, task_generator::TaskGenerator, telegram_service::TelegramService, - transaction_manager::TransactionManager, tweet_synchronizer_service::TweetSynchronizerService, + telegram_service::TelegramService, tweet_synchronizer_service::TweetSynchronizerService, }, }; @@ -14,8 +12,8 @@ use clap::Parser; use rusx::RusxGateway; use sp_core::crypto::{self, Ss58AddressFormat}; use std::sync::Arc; -use tokio::time::Duration; -use tracing::{error, info, warn}; + +use tracing::{error, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod args; @@ -66,9 +64,6 @@ async fn main() -> AppResult<()> { info!("Database URL: {}", db_url); let db = Arc::new(DbPersistence::new(db_url).await?); - let initial_task_count = db.tasks.task_count().await?; - info!("Loaded {} existing tasks from database", initial_task_count); - // Initialize graphql client let graphql_client = GraphqlClient::new((*db).clone(), config.candidates.graphql_url.clone()); @@ -82,186 +77,6 @@ async fn main() -> AppResult<()> { return Ok(()); } - if args.test_selection { - info!("Running in test-selection mode"); - let mut task_generator = TaskGenerator::new(db.clone()); - - // Load candidates from database - if let Err(e) = task_generator.refresh_candidates_from_db().await { - error!("Failed to refresh candidates from database: {}", e); - return Err(AppError::TaskGenerator(e)); - } - - info!("Loaded {} candidates from database", task_generator.candidates_count()); - - // Test generating tasks - let test_count = 5; // Generate 5 test tasks - match task_generator.generate_tasks(test_count).await { - Ok(tasks) => { - info!("Successfully generated {} test tasks:", tasks.len()); - for task in &tasks { - info!( - " Task {}: {} -> {} QUAN (URL: {})", - task.task_id, task.quan_address.0, task.quan_amount.0, task.task_url - ); - } - - // Optionally save the tasks to database - info!("Saving test tasks to database..."); - if let Err(e) = task_generator.save_tasks(tasks).await { - error!("Failed to save test tasks: {}", e); - return Err(AppError::TaskGenerator(e)); - } - info!("Test tasks saved successfully!"); - } - Err(e) => { - error!("Failed to generate test tasks: {}", e); - return Err(AppError::TaskGenerator(e)); - } - } - - return Ok(()); - } - - if args.test_transaction { - info!("Running in test-transaction mode"); - // Initialize transaction manager for testing - info!("Connecting to Quantus node..."); - let transaction_manager = Arc::new( - TransactionManager::new( - &config.blockchain.node_url, - &config.blockchain.wallet_name, - &config.blockchain.wallet_password, - db.clone(), - config.get_reversal_period_duration(), - ) - .await?, - ); - - // Perform health check - if let Err(e) = transaction_manager.health_check().await { - error!("Node health check failed: {}", e); - return Err(AppError::Transaction(e)); - } - - let node_info = transaction_manager.get_node_info().await?; - info!("โœ… Connected to: {}", node_info); - info!("Wallet address: {}", transaction_manager.get_wallet_address()); - - // Check wallet balance - match transaction_manager.get_wallet_balance().await { - Ok(balance) => info!("Wallet balance: {} units", balance), - Err(e) => warn!("Could not check wallet balance: {}", e), - } - - // Create or get test task - let (task_id, destination_address, amount) = if let (Some(dest), Some(amt)) = (&args.destination, args.amount) { - // Create a temporary task for testing with custom parameters - let task_input = TaskInput { - quan_address: dest.clone(), - quan_amount: amt, - task_url: format!("test-{}", rand::random::()), - }; - - let test_task = Task::new(task_input)?; - - info!( - "Creating temporary test task: {} -> {} (amount: {})", - test_task.task_id, dest, amt - ); - - // Add the task to database - db.tasks.create(&test_task).await?; - - (test_task.task_id, dest.clone(), amt) - } else { - // Use existing task from database - let tasks = db.tasks.get_all_tasks().await?; - if tasks.is_empty() { - error!("No tasks found in database. Run --test-selection first to create some tasks, or provide --destination and --amount arguments."); - return Err(AppError::Server("No tasks available for testing".to_string())); - } - - let test_task = &tasks[0]; - ( - test_task.task_id.clone(), - test_task.quan_address.0.clone(), - test_task.quan_amount.0 as u64, - ) - }; - - info!( - "Testing transaction with task: {} -> {} (amount: {})", - task_id, destination_address, amount - ); - - // Send a reversible transaction - match transaction_manager.send_reversible_transaction(&task_id).await { - Ok(tx_hash) => { - info!("โœ… Reversible transaction sent successfully!"); - info!("Transaction hash: {}", tx_hash); - info!("Task ID: {}", task_id); - info!("Recipient: {}", destination_address); - info!("Amount: {} QUAN", amount); - } - Err(e) => { - error!("โŒ Failed to send reversible transaction: {}", e); - return Err(AppError::Transaction(e)); - } - } - - return Ok(()); - } - - // Initialize transaction manager - info!("Connecting to Quantus node..."); - let transaction_manager = Arc::new( - TransactionManager::new( - &config.blockchain.node_url, - &config.blockchain.wallet_name, - &config.blockchain.wallet_password, - db.clone(), - config.get_reversal_period_duration(), - ) - .await?, - ); - - // Perform health check - if let Err(e) = transaction_manager.health_check().await { - error!("Node health check failed: {}", e); - return Err(AppError::Transaction(e)); - } - - let node_info = transaction_manager.get_node_info().await?; - info!("โœ… Connected to: {}", node_info); - info!("Wallet address: {}", transaction_manager.get_wallet_address()); - - // Check wallet balance - match transaction_manager.get_wallet_balance().await { - Ok(balance) => info!("Wallet balance: {} units", balance), - Err(e) => warn!("Could not check wallet balance: {}", e), - } - - // Initialize task generator - let mut task_generator = TaskGenerator::new(db.clone()); - - // Initial candidate refresh - info!("Fetching initial candidates..."); - if let Err(e) = task_generator.refresh_candidates(&config.candidates.graphql_url).await { - error!("Failed to fetch initial candidates: {}", e); - return Err(AppError::TaskGenerator(e)); - } - info!("Loaded {} candidates", task_generator.candidates_count()); - - if args.run_once { - info!("Running in single-run mode"); - return run_once(config, task_generator, transaction_manager).await; - } - - // Start the reverser service - info!("Starting reverser service..."); - // Tasks will be started directly in the tokio::select! macro - // Start HTTP server let server_address = config.get_server_address(); info!("Starting HTTP server on {}", server_address); @@ -277,13 +92,11 @@ async fn main() -> AppResult<()> { let server_addr_clone = server_address.clone(); let server_config = Arc::new(config.clone()); let server_twitter_gateway = twitter_gateway.clone(); - let server_alert_service = alert_service.clone(); let server_task = tokio::spawn(async move { http_server::start_server( server_db, graphql_client, server_twitter_gateway, - server_alert_service, &server_addr_clone, server_config, ) @@ -293,10 +106,7 @@ async fn main() -> AppResult<()> { info!("๐ŸŽฏ TaskMaster is now running!"); info!("HTTP API available at: http://{}", server_address); - info!( - "Task generation interval: {} minutes", - config.task_generation.generation_interval_minutes - ); + info!( "Candidates refresh interval: {} minutes", config.candidates.refresh_interval_minutes @@ -322,14 +132,6 @@ async fn main() -> AppResult<()> { error!("HTTP server exited: {:?}", result); result??; } - result = start_candidates_refresh_task( - task_generator.clone(), - config.candidates.graphql_url.clone(), - config.get_candidates_refresh_duration(), - ) => { - error!("Candidates refresh task exited: {:?}", result); - result.await??; - } result = tweet_synchronizer.spawn_tweet_synchronizer() => { error!("Tweet synchronizer exited: {:?}", result); result??; @@ -338,113 +140,11 @@ async fn main() -> AppResult<()> { error!("Raid leaderboard synchronizer exited: {:?}", result); result??; } - // result = start_task_generation_task( - // task_generator.clone(), - // transaction_manager.clone(), - // config.task_generation.taskees_per_round, - // config.get_task_generation_duration(), - // ) => { - // error!("Task generation task exited: {:?}", result); - // result.await??; - // } - result = start_reverser_service( - db.clone(), - transaction_manager.clone(), - config.get_reverser_check_duration(), - config.get_early_reversal_duration().num_minutes(), - ) => { - error!("Reverser service exited: {:?}", result); - result.await?.map_err(AppError::Reverser)?; - } } Ok(()) } -async fn run_once( - config: Config, - task_generator: TaskGenerator, - transaction_manager: Arc, -) -> AppResult<()> { - info!("Generating {} tasks...", config.task_generation.taskees_per_round); - - let tasks = task_generator - .generate_and_save_tasks(config.task_generation.taskees_per_round) - .await?; - - info!("Generated {} tasks", tasks.len()); - - info!("Processing transactions..."); - let processed = transaction_manager.process_task_batch(tasks).await?; - - info!("Successfully processed {} transactions", processed.len()); - info!("Single run completed successfully"); - - Ok(()) -} - -async fn start_candidates_refresh_task( - mut task_generator: TaskGenerator, - graphql_url: String, - refresh_interval: Duration, -) -> tokio::task::JoinHandle> { - tokio::spawn(async move { - let mut interval = tokio::time::interval(refresh_interval); - - loop { - interval.tick().await; - - info!("Refreshing candidates..."); - match task_generator.refresh_candidates(&graphql_url).await { - Ok(()) => { - info!("Candidates refreshed: {} available", task_generator.candidates_count()); - } - Err(e) => { - error!("Failed to refresh candidates: {}", e); - return Err(AppError::TaskGenerator(e)); - } - } - } - }) -} - -async fn start_task_generation_task( - task_generator: TaskGenerator, - transaction_manager: Arc, - taskees_per_round: usize, - generation_interval: Duration, -) -> tokio::task::JoinHandle> { - tokio::spawn(async move { - let mut interval = tokio::time::interval(generation_interval); - - loop { - interval.tick().await; - - info!("Generating new batch of {} tasks...", taskees_per_round); - - let tasks = match task_generator.generate_and_save_tasks(taskees_per_round).await { - Ok(tasks) => tasks, - Err(e) => { - error!("Failed to generate tasks: {}", e); - return Err(AppError::TaskGenerator(e)); - } - }; - - info!("Generated {} tasks, processing transactions...", tasks.len()); - - match transaction_manager.process_task_batch(tasks).await { - Ok(processed) => { - info!("Successfully processed {} transactions", processed.len()); - } - Err(e) => { - error!("Failed to process transaction batch: {}", e); - return Err(AppError::Transaction(e)); - } - } - } - }) -} - fn init_logging(level: &str) -> AppResult<()> { let log_level = match level.to_lowercase().as_str() { "error" => tracing::Level::ERROR, diff --git a/src/models/address.rs b/src/models/address.rs index b3d8004..f999c78 100644 --- a/src/models/address.rs +++ b/src/models/address.rs @@ -111,19 +111,6 @@ pub struct AddressInput { pub referral_code: String, } -#[derive(Debug, Clone, Deserialize)] -pub struct NewAddressPayload { - pub quan_address: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct SyncTransfersResponse { - pub success: bool, - pub message: String, - pub transfers_processed: Option, - pub addresses_stored: Option, -} - #[derive(Debug, Clone, Deserialize)] pub struct RewardProgramStatusPayload { pub new_status: bool, diff --git a/src/models/admin.rs b/src/models/admin.rs index 7f1fe96..8329535 100644 --- a/src/models/admin.rs +++ b/src/models/admin.rs @@ -29,12 +29,6 @@ impl<'r> FromRow<'r, PgRow> for Admin { } } -#[derive(Debug, Clone)] -pub struct CreateAdmin { - pub username: String, - pub password: String, -} - #[derive(Deserialize)] pub struct AdminLoginPayload { pub username: String, diff --git a/src/models/auth.rs b/src/models/auth.rs index bbc90fb..965a625 100644 --- a/src/models/auth.rs +++ b/src/models/auth.rs @@ -8,9 +8,7 @@ pub struct TokenClaims { } #[derive(Debug, Deserialize)] -pub struct RequestChallengeBody { - pub address: Option, -} +pub struct RequestChallengeBody {} #[derive(Debug, Serialize)] pub struct RequestChallengeResponse { diff --git a/src/models/mod.rs b/src/models/mod.rs index a5b3e32..bd4e57a 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -18,7 +18,6 @@ pub mod raid_quest; pub mod raid_submission; pub mod referrals; pub mod relevant_tweet; -pub mod task; pub mod tweet_author; pub mod tweet_pull_usage; pub mod x_association; diff --git a/src/models/task.rs b/src/models/task.rs deleted file mode 100644 index 513dcf9..0000000 --- a/src/models/task.rs +++ /dev/null @@ -1,194 +0,0 @@ -use chrono::{DateTime, Utc}; -use rand::Rng; -use serde::{Deserialize, Serialize}; -use sqlx::{postgres::PgRow, FromRow, Row}; -use uuid::Uuid; - -use crate::models::{address::QuanAddress, ModelError, ModelResult}; - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub enum TaskStatus { - #[serde(rename = "pending")] - Pending, - #[serde(rename = "completed")] - Completed, - #[serde(rename = "reversed")] - Reversed, - #[serde(rename = "failed")] - Failed, -} - -impl std::fmt::Display for TaskStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TaskStatus::Pending => write!(f, "pending"), - TaskStatus::Completed => write!(f, "completed"), - TaskStatus::Reversed => write!(f, "reversed"), - TaskStatus::Failed => write!(f, "failed"), - } - } -} - -impl std::str::FromStr for TaskStatus { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "pending" => Ok(TaskStatus::Pending), - "completed" => Ok(TaskStatus::Completed), - "reversed" => Ok(TaskStatus::Reversed), - "failed" => Ok(TaskStatus::Failed), - _ => Err(format!("Invalid task status: {}", s)), - } - } -} - -#[derive(Debug, Deserialize, Serialize, Clone, sqlx::Type)] -#[sqlx(transparent)] -pub struct TokenAmount(pub i64); -impl TokenAmount { - pub fn from(input: i64) -> Result { - if input <= 0 { - return Err(String::from("Token amount can't be less or equal 0.")); - } - - Ok(TokenAmount(input)) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Task { - pub id: Option, - pub task_id: String, - pub quan_address: QuanAddress, - pub quan_amount: TokenAmount, - pub usdc_amount: i64, - pub task_url: String, - pub status: TaskStatus, - pub reversible_tx_id: Option, - pub send_time: Option>, - pub end_time: Option>, - pub created_at: Option>, - pub updated_at: Option>, -} - -impl Task { - pub fn new(input: TaskInput) -> ModelResult { - let quan_address = match QuanAddress::from(&input.quan_address) { - Ok(name) => name, - Err(e) => { - tracing::error!(error = %e, "Invalid quan address input for task"); - return Err(ModelError::InvalidInput); - } - }; - - let quan_amount = match TokenAmount::from(input.quan_amount as i64) { - Ok(quan_amount) => quan_amount, - Err(e) => { - tracing::error!(error = %e, "Invalid token amount input for task"); - return Err(ModelError::InvalidInput); - } - }; - - let task_url = input.task_url; - - let mut rng = rand::rng(); - let usdc_amount = rng.random_range(1..=25); - let task_id = Uuid::new_v4().to_string(); - - Ok(Task { - id: None, - task_id, - quan_address, - quan_amount, - usdc_amount, - task_url, - status: TaskStatus::Pending, - reversible_tx_id: None, - send_time: None, - end_time: None, - created_at: None, - updated_at: None, - }) - } - - pub fn set_transaction_sent( - &mut self, - reversible_tx_id: String, - send_time: DateTime, - end_time: DateTime, - ) { - self.reversible_tx_id = Some(reversible_tx_id); - self.send_time = Some(send_time); - self.end_time = Some(end_time); - self.status = TaskStatus::Pending; - } - - pub fn mark_completed(&mut self) { - self.status = TaskStatus::Completed; - } - - pub fn mark_reversed(&mut self) { - self.status = TaskStatus::Reversed; - } - - pub fn mark_failed(&mut self) { - self.status = TaskStatus::Failed; - } - - pub fn is_ready_for_reversal(&self, early_minutes: i64) -> bool { - if self.status != TaskStatus::Pending { - return false; - } - - if let Some(end_time) = self.end_time { - let reversal_time = end_time - chrono::Duration::minutes(early_minutes); - Utc::now() >= reversal_time - } else { - false - } - } -} -impl<'r> FromRow<'r, PgRow> for Task { - fn from_row(row: &'r PgRow) -> Result { - let status_str: String = row.try_get("status")?; - let status = status_str - .parse::() - .map_err(|e| sqlx::Error::Decode(Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e))))?; - - Ok(Task { - id: row.try_get("id")?, - task_id: row.try_get("task_id")?, - quan_address: row.try_get("quan_address")?, - quan_amount: row.try_get("quan_amount")?, - usdc_amount: row.try_get("usdc_amount")?, - task_url: row.try_get("task_url")?, - status, - reversible_tx_id: row.try_get("reversible_tx_id")?, - send_time: row.try_get("send_time")?, - end_time: row.try_get("end_time")?, - created_at: row.try_get("created_at")?, - updated_at: row.try_get("updated_at")?, - }) - } -} - -// An unvalidated version that we can deserialize directly from JSON -#[derive(Debug, Deserialize)] -pub struct TaskInput { - pub quan_address: String, - pub quan_amount: u64, - pub task_url: String, -} - -#[derive(Debug, Deserialize)] -pub struct CompleteTaskRequest { - pub task_url: String, -} - -#[derive(Debug, Serialize)] -pub struct CompleteTaskResponse { - pub success: bool, - pub message: String, - pub task_id: Option, -} diff --git a/src/repositories/address.rs b/src/repositories/address.rs index 0566202..1164143 100644 --- a/src/repositories/address.rs +++ b/src/repositories/address.rs @@ -200,6 +200,7 @@ impl AddressRepository { Ok(total_items) } + #[allow(dead_code)] pub async fn find_all(&self) -> DbResult> { let addresses = sqlx::query_as::<_, Address>("SELECT * FROM addresses") .fetch_all(&self.pool) diff --git a/src/repositories/admin.rs b/src/repositories/admin.rs index e262d17..d620d8e 100644 --- a/src/repositories/admin.rs +++ b/src/repositories/admin.rs @@ -1,11 +1,7 @@ use sqlx::{PgPool, Postgres, QueryBuilder}; use uuid::Uuid; -use crate::{ - db_persistence::DbError, - models::admin::{Admin, CreateAdmin}, - repositories::DbResult, -}; +use crate::{models::admin::Admin, repositories::DbResult}; #[derive(Clone, Debug)] pub struct AdminRepository { @@ -20,26 +16,6 @@ impl AdminRepository { Self { pool: pool.clone() } } - pub async fn create(&self, new_admin: &CreateAdmin) -> DbResult { - let created_id = sqlx::query_scalar::<_, String>( - " - INSERT INTO admins (username, password) - VALUES ($1, $2) - RETURNING id - ", - ) - .bind(new_admin.username.clone()) - .bind(new_admin.password.clone()) - .fetch_optional(&self.pool) - .await?; - - if let Some(id) = created_id { - Ok(id) - } else { - Err(DbError::RecordNotFound("Record id is generated".to_string())) - } - } - pub async fn find_by_id(&self, id: &Uuid) -> DbResult> { let mut qb = AdminRepository::create_select_base_query(); qb.push(" WHERE id = "); diff --git a/src/repositories/eth_association.rs b/src/repositories/eth_association.rs index 48df5aa..0de90ae 100644 --- a/src/repositories/eth_association.rs +++ b/src/repositories/eth_association.rs @@ -1,10 +1,7 @@ use sqlx::PgPool; use crate::{ - models::{ - address::QuanAddress, - eth_association::{EthAddress, EthAssociation}, - }, + models::{address::QuanAddress, eth_association::EthAssociation}, repositories::DbResult, }; @@ -43,15 +40,6 @@ impl EthAssociationRepository { Ok(association) } - pub async fn find_by_eth_address(&self, eth_address: &EthAddress) -> DbResult> { - let association = sqlx::query_as::<_, EthAssociation>("SELECT * FROM eth_associations WHERE eth_address = $1") - .bind(ð_address.0) - .fetch_optional(&self.pool) - .await?; - - Ok(association) - } - pub async fn update_eth_address(&self, new_association: &EthAssociation) -> DbResult { let association = sqlx::query_as::<_, EthAssociation>( r#" @@ -133,30 +121,6 @@ mod tests { assert_eq!(found.eth_address.0, "0x00000000219ab540356cBB839Cbe05303d7705Fa"); } - #[tokio::test] - async fn test_find_by_eth_address() { - let (address_repo, eth_repo) = setup_test_repositories().await; - - let address = create_persisted_address(&address_repo, "user_02").await; - - let input = EthAssociationInput { - quan_address: address.quan_address.0.clone(), - eth_address: "0x00000000219ab540356cBB839Cbe05303d7705Fa".to_string(), - }; - let new_association = EthAssociation::new(input).unwrap(); - eth_repo.create(&new_association).await.unwrap(); - - // Find by ETH Address - let found = eth_repo - .find_by_eth_address(&new_association.eth_address) - .await - .unwrap(); - - assert!(found.is_some()); - let found = found.unwrap(); - assert_eq!(found.quan_address.0, address.quan_address.0); - } - #[tokio::test] async fn test_update_eth_address() { let (address_repo, eth_repo) = setup_test_repositories().await; diff --git a/src/repositories/mod.rs b/src/repositories/mod.rs index 2d893df..547105c 100644 --- a/src/repositories/mod.rs +++ b/src/repositories/mod.rs @@ -13,7 +13,6 @@ pub mod raid_quest; pub mod raid_submission; pub mod referral; pub mod relevant_tweet; -pub mod task; pub mod tweet_author; pub mod tweet_pull_usage; pub mod x_association; diff --git a/src/repositories/opt_in.rs b/src/repositories/opt_in.rs index ca5b3d2..a7c59cd 100644 --- a/src/repositories/opt_in.rs +++ b/src/repositories/opt_in.rs @@ -55,14 +55,6 @@ impl OptInRepository { Ok(opt_ins) } - - pub async fn count(&self) -> DbResult { - let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM opt_ins") - .fetch_one(&self.pool) - .await?; - - Ok(count) - } } #[cfg(test)] @@ -75,6 +67,7 @@ mod tests { use crate::repositories::address::AddressRepository; use crate::utils::test_db::reset_database; use sqlx::{postgres::PgPoolOptions, PgPool}; + use sqlx::{Pool, Postgres}; use std::time::Duration; use tokio::time::sleep; @@ -94,6 +87,14 @@ mod tests { (opt_in_repo, address_repo, pool) } + async fn count_records(pool: &Pool) -> DbResult { + let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM opt_ins") + .fetch_one(pool) + .await?; + + Ok(count) + } + fn create_test_address(id: &str) -> Address { let input = AddressInput { quan_address: format!("qz_test_{}", id), @@ -108,7 +109,7 @@ mod tests { let address = create_test_address("test_create_001"); address_repo.create(&address).await.unwrap(); - let count = opt_in_repo.count().await.unwrap(); + let count = count_records(&_pool).await.unwrap(); let opt_in = opt_in_repo.create(&address.quan_address.0).await.unwrap(); assert_eq!(opt_in.quan_address.0, address.quan_address.0); @@ -129,7 +130,6 @@ mod tests { let address = create_test_address("test_delete_001"); address_repo.create(&address).await.unwrap(); - let count_before = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&address.quan_address.0).await.unwrap(); assert!(opt_in_repo @@ -159,15 +159,12 @@ mod tests { address_repo.create(&addr2).await.unwrap(); address_repo.create(&addr3).await.unwrap(); - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr1.quan_address.0).await.unwrap(); sleep(Duration::from_millis(10)).await; - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr2.quan_address.0).await.unwrap(); sleep(Duration::from_millis(10)).await; - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr3.quan_address.0).await.unwrap(); let all = opt_in_repo.get_all_ordered(100).await.unwrap(); @@ -189,15 +186,12 @@ mod tests { address_repo.create(&addr2).await.unwrap(); address_repo.create(&addr3).await.unwrap(); - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr1.quan_address.0).await.unwrap(); sleep(Duration::from_millis(10)).await; - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr2.quan_address.0).await.unwrap(); sleep(Duration::from_millis(10)).await; - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr3.quan_address.0).await.unwrap(); let limited = opt_in_repo.get_all_ordered(2).await.unwrap(); @@ -210,7 +204,7 @@ mod tests { async fn test_count() { let (opt_in_repo, address_repo, _pool) = setup_test_repository().await; - assert_eq!(opt_in_repo.count().await.unwrap(), 0); + assert_eq!(count_records(&_pool).await.unwrap(), 0); let addr1 = create_test_address("test_count_001"); let addr2 = create_test_address("test_count_002"); @@ -220,20 +214,17 @@ mod tests { address_repo.create(&addr2).await.unwrap(); address_repo.create(&addr3).await.unwrap(); - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr1.quan_address.0).await.unwrap(); - assert_eq!(opt_in_repo.count().await.unwrap(), 1); + assert_eq!(count_records(&_pool).await.unwrap(), 1); - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr2.quan_address.0).await.unwrap(); - assert_eq!(opt_in_repo.count().await.unwrap(), 2); + assert_eq!(count_records(&_pool).await.unwrap(), 2); - let count = opt_in_repo.count().await.unwrap(); opt_in_repo.create(&addr3.quan_address.0).await.unwrap(); - assert_eq!(opt_in_repo.count().await.unwrap(), 3); + assert_eq!(count_records(&_pool).await.unwrap(), 3); opt_in_repo.delete(&addr2.quan_address.0).await.unwrap(); - assert_eq!(opt_in_repo.count().await.unwrap(), 2); + assert_eq!(count_records(&_pool).await.unwrap(), 2); } #[tokio::test] @@ -243,13 +234,11 @@ mod tests { address_repo.create(&address).await.unwrap(); - let count = opt_in_repo.count().await.unwrap(); let opt_in1 = opt_in_repo.create(&address.quan_address.0).await.unwrap(); let first_created_at = opt_in1.created_at; sleep(Duration::from_millis(10)).await; - let count = opt_in_repo.count().await.unwrap(); let opt_in2 = opt_in_repo.create(&address.quan_address.0).await.unwrap(); assert_eq!(opt_in2.quan_address.0, address.quan_address.0); @@ -285,22 +274,19 @@ mod tests { address_repo.create(&addr2).await.unwrap(); address_repo.create(&addr3).await.unwrap(); - assert_eq!(opt_in_repo.count().await.unwrap(), 0); + assert_eq!(count_records(&_pool).await.unwrap(), 0); - let count = opt_in_repo.count().await.unwrap(); let opt_in1 = opt_in_repo.create(&addr1.quan_address.0).await.unwrap(); assert_eq!(opt_in1.opt_in_number, 1); - assert_eq!(opt_in_repo.count().await.unwrap(), 1); + assert_eq!(count_records(&_pool).await.unwrap(), 1); - let count = opt_in_repo.count().await.unwrap(); let opt_in2 = opt_in_repo.create(&addr2.quan_address.0).await.unwrap(); assert_eq!(opt_in2.opt_in_number, 2); - assert_eq!(opt_in_repo.count().await.unwrap(), 2); + assert_eq!(count_records(&_pool).await.unwrap(), 2); - let count = opt_in_repo.count().await.unwrap(); let opt_in3 = opt_in_repo.create(&addr3.quan_address.0).await.unwrap(); assert_eq!(opt_in3.opt_in_number, 3); - assert_eq!(opt_in_repo.count().await.unwrap(), 3); + assert_eq!(count_records(&_pool).await.unwrap(), 3); let all = opt_in_repo.get_all_ordered(100).await.unwrap(); assert_eq!(all[0].opt_in_number, 1); @@ -314,7 +300,6 @@ mod tests { let address = create_test_address("test_timestamp_001"); address_repo.create(&address).await.unwrap(); - let count = opt_in_repo.count().await.unwrap(); let opt_in = opt_in_repo.create(&address.quan_address.0).await.unwrap(); assert!(!opt_in.created_at.to_rfc3339().is_empty()); diff --git a/src/repositories/task.rs b/src/repositories/task.rs deleted file mode 100644 index b299af0..0000000 --- a/src/repositories/task.rs +++ /dev/null @@ -1,342 +0,0 @@ -use std::{collections::HashMap, str::FromStr}; - -use chrono::{DateTime, Utc}; -use sqlx::{PgPool, Row}; - -use crate::{ - db_persistence::DbError, - models::task::{Task, TaskStatus}, - repositories::DbResult, -}; - -#[derive(Clone, Debug)] -pub struct TaskRepository { - pool: PgPool, -} -impl TaskRepository { - pub fn new(pool: &PgPool) -> Self { - Self { pool: pool.clone() } - } - - pub async fn create(&self, new_task: &Task) -> DbResult { - let created_task_id = sqlx::query_scalar::<_, String>( - " - INSERT INTO tasks ( - task_id, quan_address, quan_amount, usdc_amount, task_url, - status, reversible_tx_id, send_time, end_time - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - RETURNING task_id - ", - ) - .bind(&new_task.task_id) - .bind(&new_task.quan_address.0) - .bind(new_task.quan_amount.0) - .bind(new_task.usdc_amount) - .bind(&new_task.task_url) - .bind(new_task.status.to_string()) - .bind(&new_task.reversible_tx_id) - .bind(new_task.send_time) - .bind(new_task.end_time) - .fetch_one(&self.pool) - .await?; - - Ok(created_task_id) - } - - pub async fn get_task(&self, task_id: &str) -> DbResult> { - let task = sqlx::query_as::<_, Task>("SELECT * FROM tasks WHERE task_id = $1") - .bind(task_id) - .fetch_optional(&self.pool) - .await?; - Ok(task) - } - - pub async fn find_task_by_url(&self, task_url: &str) -> DbResult> { - let task = sqlx::query_as::<_, Task>("SELECT * FROM tasks WHERE task_url = $1") - .bind(task_url) - .fetch_optional(&self.pool) - .await?; - Ok(task) - } - - pub async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> DbResult<()> { - let result = sqlx::query("UPDATE tasks SET status = $1, updated_at = NOW() WHERE task_id = $2") - .bind(status.to_string()) - .bind(task_id) - .execute(&self.pool) - .await?; - - if result.rows_affected() == 0 { - return Err(DbError::TaskNotFound(task_id.to_string())); - } - - Ok(()) - } - - pub async fn update_task_transaction( - &self, - task_id: &str, - reversible_tx_id: &str, - send_time: DateTime, - end_time: DateTime, - ) -> DbResult<()> { - let result = sqlx::query( - " - UPDATE tasks - SET reversible_tx_id = $1, send_time = $2, end_time = $3, - status = $4, updated_at = NOW() - WHERE task_id = $5 - ", - ) - .bind(reversible_tx_id) - .bind(send_time) - .bind(end_time) - .bind(TaskStatus::Pending.to_string()) // Assuming you want to set it to pending - .bind(task_id) - .execute(&self.pool) - .await?; - - if result.rows_affected() == 0 { - return Err(DbError::TaskNotFound(task_id.to_string())); - } - - Ok(()) - } - - pub async fn get_tasks_by_status(&self, status: TaskStatus) -> DbResult> { - let tasks = sqlx::query_as::<_, Task>("SELECT * FROM tasks WHERE status = $1 ORDER BY created_at") - .bind(status.to_string()) - .fetch_all(&self.pool) - .await?; - Ok(tasks) - } - - pub async fn get_tasks_ready_for_reversal(&self, early_minutes: i64) -> DbResult> { - let cutoff_time = Utc::now() + chrono::Duration::minutes(early_minutes); - - let tasks = sqlx::query_as::<_, Task>( - "SELECT * FROM tasks WHERE status = $1 AND end_time IS NOT NULL AND end_time <= $2", - ) - .bind(TaskStatus::Pending.to_string()) - .bind(cutoff_time) - .fetch_all(&self.pool) - .await?; - Ok(tasks) - } - - pub async fn get_all_tasks(&self) -> DbResult> { - let tasks = sqlx::query_as::<_, Task>("SELECT * FROM tasks ORDER BY created_at DESC") - .fetch_all(&self.pool) - .await?; - Ok(tasks) - } - - pub async fn task_count(&self) -> DbResult { - let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM tasks") - .fetch_one(&self.pool) - .await?; - Ok(count) - } - - pub async fn status_counts(&self) -> DbResult> { - let rows = sqlx::query("SELECT status, COUNT(*) as count FROM tasks GROUP BY status") - .fetch_all(&self.pool) - .await?; - - let mut counts = HashMap::new(); - for row in rows { - let status_str: String = row.get("status"); - let count: i64 = row.get("count"); - - if let Ok(status) = TaskStatus::from_str(&status_str) { - counts.insert(status, count as usize); - } - } - - Ok(counts) - } - - pub async fn get_address_stats(&self) -> DbResult> { - let stats = sqlx::query_as::<_, (String, i64)>( - r#" - SELECT - a.quan_address, - COUNT(t.id) as task_count - FROM - addresses a - LEFT JOIN - tasks t ON a.quan_address = t.quan_address - GROUP BY - a.quan_address - ORDER BY - a.quan_address - "#, - ) - .fetch_all(&self.pool) - .await?; - - Ok(stats) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - config::Config, - models::task::TaskInput, - repositories::address::AddressRepository, - utils::test_db::{create_persisted_address, reset_database}, - }; - use uuid::Uuid; - - // Helper to set up repositories and clean all tables. - async fn setup_test_repositories() -> (AddressRepository, TaskRepository) { - let config = Config::load_test_env().expect("Failed to load configuration for tests"); - let pool = PgPool::connect(config.get_database_url()) - .await - .expect("Failed to create pool."); - - reset_database(&pool).await; - - (AddressRepository::new(&pool), TaskRepository::new(&pool)) - } - - // Helper to create a mock Task object. - fn create_mock_task_object(quan_address: &str) -> Task { - let input = TaskInput { - quan_address: quan_address.to_string(), - quan_amount: 1000, - task_url: format!("http://example.com/task/{}", Uuid::new_v4()), - }; - Task::new(input).unwrap() - } - - #[tokio::test] - async fn test_create_and_get_task() { - let (address_repo, task_repo) = setup_test_repositories().await; - let address = create_persisted_address(&address_repo, "001").await; - let new_task = create_mock_task_object(&address.quan_address.0); - - let created_id = task_repo.create(&new_task).await.unwrap(); - assert_eq!(created_id, new_task.task_id); - - let fetched_task = task_repo.get_task(&created_id).await.unwrap().unwrap(); - assert_eq!(fetched_task.task_id, new_task.task_id); - assert_eq!(fetched_task.status, TaskStatus::Pending); - } - - #[tokio::test] - async fn test_update_task_status() { - let (address_repo, task_repo) = setup_test_repositories().await; - let address = create_persisted_address(&address_repo, "002").await; - let new_task = create_mock_task_object(&address.quan_address.0); - task_repo.create(&new_task).await.unwrap(); - - task_repo - .update_task_status(&new_task.task_id, TaskStatus::Completed) - .await - .unwrap(); - - let fetched_task = task_repo.get_task(&new_task.task_id).await.unwrap().unwrap(); - assert_eq!(fetched_task.status, TaskStatus::Completed); - } - - #[tokio::test] - async fn test_update_task_transaction() { - let (address_repo, task_repo) = setup_test_repositories().await; - let address = create_persisted_address(&address_repo, "003").await; - let new_task = create_mock_task_object(&address.quan_address.0); - task_repo.create(&new_task).await.unwrap(); - - let tx_id = "0x123abc"; - let send_time = Utc::now(); - let end_time = send_time + chrono::Duration::hours(1); - - task_repo - .update_task_transaction(&new_task.task_id, tx_id, send_time, end_time) - .await - .unwrap(); - - let updated_task = task_repo.get_task(&new_task.task_id).await.unwrap().unwrap(); - assert_eq!(updated_task.reversible_tx_id, Some(tx_id.to_string())); - assert!(updated_task.send_time.is_some()); - assert!(updated_task.end_time.is_some()); - } - - #[tokio::test] - async fn test_get_tasks_by_status() { - let (address_repo, task_repo) = setup_test_repositories().await; - let address = create_persisted_address(&address_repo, "004").await; - - let mut task1 = create_mock_task_object(&address.quan_address.0); - task1.status = TaskStatus::Pending; - task_repo.create(&task1).await.unwrap(); - - let mut task2 = create_mock_task_object(&address.quan_address.0); - task2.status = TaskStatus::Completed; - task_repo.create(&task2).await.unwrap(); - - let pending_tasks = task_repo.get_tasks_by_status(TaskStatus::Pending).await.unwrap(); - assert_eq!(pending_tasks.len(), 1); - assert_eq!(pending_tasks[0].task_id, task1.task_id); - } - - #[tokio::test] - async fn test_get_tasks_ready_for_reversal() { - let (address_repo, task_repo) = setup_test_repositories().await; - let address = create_persisted_address(&address_repo, "005").await; - - // This task's end time is soon, so it should be picked up - let task1 = create_mock_task_object(&address.quan_address.0); - task_repo.create(&task1).await.unwrap(); - let end_time1 = Utc::now() + chrono::Duration::minutes(5); - task_repo - .update_task_transaction(&task1.task_id, "tx1", Utc::now(), end_time1) - .await - .unwrap(); - - // This task's end time is far in the future - let task2 = create_mock_task_object(&address.quan_address.0); - task_repo.create(&task2).await.unwrap(); - let end_time2 = Utc::now() + chrono::Duration::minutes(30); - task_repo - .update_task_transaction(&task2.task_id, "tx2", Utc::now(), end_time2) - .await - .unwrap(); - - // Looking for tasks ending within the next 10 minutes - let reversible_tasks = task_repo.get_tasks_ready_for_reversal(10).await.unwrap(); - assert_eq!(reversible_tasks.len(), 1); - assert_eq!(reversible_tasks[0].task_id, task1.task_id); - } - - #[tokio::test] - async fn test_counts() { - let (address_repo, task_repo) = setup_test_repositories().await; - let address = create_persisted_address(&address_repo, "006").await; - - let mut task1 = create_mock_task_object(&address.quan_address.0); - task1.status = TaskStatus::Pending; - task_repo.create(&task1).await.unwrap(); - - let mut task2 = create_mock_task_object(&address.quan_address.0); - task2.status = TaskStatus::Pending; - task_repo.create(&task2).await.unwrap(); - - let mut task3 = create_mock_task_object(&address.quan_address.0); - task3.status = TaskStatus::Completed; - task_repo.create(&task3).await.unwrap(); - - // Test total count - let total = task_repo.task_count().await.unwrap(); - assert_eq!(total, 3); - - // Test status counts - let counts = task_repo.status_counts().await.unwrap(); - assert_eq!(counts.get(&TaskStatus::Pending), Some(&2)); - assert_eq!(counts.get(&TaskStatus::Completed), Some(&1)); - assert_eq!(counts.get(&TaskStatus::Failed), None); - } -} diff --git a/src/repositories/tweet_pull_usage.rs b/src/repositories/tweet_pull_usage.rs index 1813819..d1adcd8 100644 --- a/src/repositories/tweet_pull_usage.rs +++ b/src/repositories/tweet_pull_usage.rs @@ -53,32 +53,11 @@ impl TweetPullUsageRepository { } } - pub async fn get_current_usage(&self, reset_day: u32) -> Result { - let period = Self::get_current_period(reset_day); - self.get_usage_for_period(&period).await - } - pub async fn increment_usage(&self, amount: i32, reset_day: u32) -> Result { let period = Self::get_current_period(reset_day); self.increment_usage_for_period(amount, &period).await } - /// Internal helper to get usage for a specific period string. - async fn get_usage_for_period(&self, period: &str) -> Result { - let usage = sqlx::query_as::<_, TweetPullUsage>( - "INSERT INTO tweet_pull_usage (period, tweet_count) - VALUES ($1, 0) - ON CONFLICT (period) DO UPDATE SET period = EXCLUDED.period - RETURNING *", - ) - .bind(period) - .fetch_one(&self.pool) - .await - .map_err(DbError::Database)?; - - Ok(usage) - } - /// Internal helper to increment usage for a specific period string. async fn increment_usage_for_period(&self, amount: i32, period: &str) -> Result { let usage = sqlx::query_as::<_, TweetPullUsage>( @@ -119,21 +98,42 @@ mod tests { use crate::utils::test_app_state::create_test_app_state; use crate::utils::test_db::reset_database; use chrono::{TimeZone, Utc}; + use sqlx::{Pool, Postgres}; + + async fn get_current_usage(pool: &Pool, reset_day: u32) -> Result { + let period = TweetPullUsageRepository::get_current_period(reset_day); + get_usage_for_period(pool, &period).await + } + + /// Internal helper to get usage for a specific period string. + async fn get_usage_for_period(pool: &Pool, period: &str) -> Result { + let usage = sqlx::query_as::<_, TweetPullUsage>( + "INSERT INTO tweet_pull_usage (period, tweet_count) + VALUES ($1, 0) + ON CONFLICT (period) DO UPDATE SET period = EXCLUDED.period + RETURNING *", + ) + .bind(period) + .fetch_one(pool) + .await + .map_err(DbError::Database)?; + + Ok(usage) + } #[tokio::test] async fn test_get_current_usage_integration() { let state = create_test_app_state().await; reset_database(&state.db.pool).await; - let repo = &state.db.tweet_pull_usage; let reset_day = 1; // 1. Initial call should create a record with 0 - let usage = repo.get_current_usage(reset_day).await.unwrap(); + let usage = get_current_usage(&state.db.pool, reset_day).await.unwrap(); assert_eq!(usage.tweet_count, 0); // 2. Subsequent call should return the same record - let usage2 = repo.get_current_usage(reset_day).await.unwrap(); + let usage2 = get_current_usage(&state.db.pool, reset_day).await.unwrap(); assert_eq!(usage2.tweet_count, 0); assert_eq!(usage.period, usage2.period); } @@ -171,8 +171,8 @@ mod tests { repo.increment_usage_for_period(50, period_b).await.unwrap(); // 3. Verify they are separate - let usage_a = repo.get_usage_for_period(period_a).await.unwrap(); - let usage_b = repo.get_usage_for_period(period_b).await.unwrap(); + let usage_a = get_usage_for_period(&state.db.pool, period_a).await.unwrap(); + let usage_b = get_usage_for_period(&state.db.pool, period_b).await.unwrap(); assert_eq!(usage_a.tweet_count, 100); assert_eq!(usage_b.tweet_count, 50); diff --git a/src/repositories/x_association.rs b/src/repositories/x_association.rs index d95c63b..d99a88b 100644 --- a/src/repositories/x_association.rs +++ b/src/repositories/x_association.rs @@ -42,32 +42,6 @@ impl XAssociationRepository { Ok(association) } - pub async fn find_by_username(&self, username: &str) -> DbResult> { - let association = sqlx::query_as::<_, XAssociation>("SELECT * FROM x_associations WHERE username = $1") - .bind(username) - .fetch_optional(&self.pool) - .await?; - - Ok(association) - } - - pub async fn update_username(&self, quan_address: &QuanAddress, new_username: &str) -> DbResult { - let association = sqlx::query_as::<_, XAssociation>( - r#" - UPDATE x_associations - SET username = $2 - WHERE quan_address = $1 - RETURNING * - "#, - ) - .bind(&quan_address.0) - .bind(new_username) - .fetch_one(&self.pool) - .await?; - - Ok(association) - } - pub async fn delete(&self, quan_address: &QuanAddress) -> DbResult<()> { sqlx::query("DELETE FROM x_associations WHERE quan_address = $1") .bind(&quan_address.0) @@ -128,54 +102,6 @@ mod tests { assert_eq!(found.username, "x_user_01"); } - #[tokio::test] - async fn test_find_by_username() { - let (address_repo, x_repo) = setup_test_repositories().await; - - let address = create_persisted_address(&address_repo, "user_02").await; - - let input = XAssociationInput { - quan_address: address.quan_address.0.clone(), - username: "unique_handler_123".to_string(), - }; - let new_association = XAssociation::new(input).unwrap(); - x_repo.create(&new_association).await.unwrap(); - - // Find by Username - let found = x_repo.find_by_username("unique_handler_123").await.unwrap(); - - assert!(found.is_some()); - let found = found.unwrap(); - assert_eq!(found.quan_address.0, address.quan_address.0); - } - - #[tokio::test] - async fn test_update_username() { - let (address_repo, x_repo) = setup_test_repositories().await; - - let address = create_persisted_address(&address_repo, "user_03").await; - - // Initial Create - let input = XAssociationInput { - quan_address: address.quan_address.0.clone(), - username: "old_username".to_string(), - }; - let new_association = XAssociation::new(input).unwrap(); - x_repo.create(&new_association).await.unwrap(); - - // Update - let updated = x_repo - .update_username(&address.quan_address, "new_cool_username") - .await - .unwrap(); - - assert_eq!(updated.username, "new_cool_username"); - - // Verify in DB - let found = x_repo.find_by_address(&address.quan_address).await.unwrap().unwrap(); - assert_eq!(found.username, "new_cool_username"); - } - #[tokio::test] async fn test_delete_association() { let (address_repo, x_repo) = setup_test_repositories().await; @@ -199,12 +125,4 @@ mod tests { let found = x_repo.find_by_address(&address.quan_address).await.unwrap(); assert!(found.is_none()); } - - #[tokio::test] - async fn test_find_non_existent() { - let (_address_repo, x_repo) = setup_test_repositories().await; - - let result = x_repo.find_by_username("ghost_user").await.unwrap(); - assert!(result.is_none()); - } } diff --git a/src/routes/address.rs b/src/routes/address.rs index a69b8f4..41f45f1 100644 --- a/src/routes/address.rs +++ b/src/routes/address.rs @@ -10,7 +10,7 @@ use crate::{ associate_eth_address, associate_x_handle, dissociate_eth_address, dissociate_x_account, handle_aggregate_address_stats, handle_get_address_reward_status_by_id, handle_get_address_stats, handle_get_addresses, handle_get_leaderboard, handle_get_opted_in_position, handle_get_opted_in_users, - handle_update_reward_program_status, retrieve_associated_accounts, sync_transfers, update_eth_address, + handle_update_reward_program_status, retrieve_associated_accounts, update_eth_address, }, http_server::AppState, middlewares::jwt_auth, @@ -62,5 +62,4 @@ pub fn address_routes(state: AppState) -> Router { post(associate_x_handle.layer(middleware::from_fn_with_state(state.clone(), jwt_auth::jwt_auth))) .delete(dissociate_x_account.layer(middleware::from_fn_with_state(state, jwt_auth::jwt_auth))), ) - .route("/addresses/sync-transfers", post(sync_transfers)) } diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 00e12e0..2b8c773 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -6,7 +6,7 @@ use crate::{ http_server::AppState, routes::{ address::address_routes, raid_quest::raid_quest_routes, relevant_tweet::relevant_tweet_routes, - task::task_routes, tweet_author::tweet_author_routes, + tweet_author::tweet_author_routes, }, }; @@ -15,7 +15,6 @@ pub mod auth; pub mod raid_quest; pub mod referral; pub mod relevant_tweet; -pub mod task; pub mod tweet_author; pub fn api_routes(state: AppState) -> Router { @@ -23,7 +22,6 @@ pub fn api_routes(state: AppState) -> Router { .merge(referral_routes(state.clone())) .merge(address_routes(state.clone())) .merge(auth_routes(state.clone())) - .merge(task_routes(state.clone())) .merge(relevant_tweet_routes(state.clone())) .merge(tweet_author_routes(state.clone())) .merge(raid_quest_routes(state)) diff --git a/src/routes/task.rs b/src/routes/task.rs deleted file mode 100644 index 3c73607..0000000 --- a/src/routes/task.rs +++ /dev/null @@ -1,16 +0,0 @@ -use axum::{ - routing::{get, put}, - Router, -}; - -use crate::{ - handlers::task::{complete_task, get_task, list_all_tasks}, - http_server::AppState, -}; - -pub fn task_routes(_: AppState) -> Router { - Router::new() - .route("/tasks", get(list_all_tasks)) - .route("/tasks/complete", put(complete_task)) - .route("/tasks/:task_id", get(get_task)) -} diff --git a/src/services/ethereum_service.rs b/src/services/ethereum_service.rs deleted file mode 100644 index 5382087..0000000 --- a/src/services/ethereum_service.rs +++ /dev/null @@ -1,7 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EthAddressAssociation { - pub quan_address: String, - pub eth_address: String, -} diff --git a/src/services/graphql_client.rs b/src/services/graphql_client.rs index b3c319f..16b8481 100644 --- a/src/services/graphql_client.rs +++ b/src/services/graphql_client.rs @@ -270,21 +270,6 @@ impl GraphqlClient { Ok((transfer_count, address_count as usize)) } - /// Get statistics about stored transfers and addresses - pub async fn get_sync_stats(&self) -> GraphqlResult { - // Note: This would require additional database queries to get counts - // For now, we'll return basic stats from the current sync - let transfers = self.fetch_transfers().await?; - let unique_addresses: std::collections::HashSet<&String> = - transfers.iter().flat_map(|t| [&t.from.id, &t.to.id]).collect(); - - Ok(SyncStats { - total_transfers: transfers.len(), - unique_addresses: unique_addresses.len(), - last_sync_time: chrono::Utc::now(), - }) - } - pub async fn get_address_stats(&self, id: String) -> GraphqlResult { const GET_STATS_QUERY: &str = r#" query GetStatsById($id: String!) { @@ -460,13 +445,6 @@ query GetEventCountByIds($ids: [String!]!) { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct SyncStats { - pub total_transfers: usize, - pub unique_addresses: usize, - pub last_sync_time: chrono::DateTime, -} - #[derive(Debug, Serialize, Deserialize)] pub struct AddressStats { pub total_transactions: u64, @@ -906,17 +884,6 @@ query GetEventCountByIds($ids: [String!]!) { assert_eq!(err.to_string(), "GraphQL response error: Query failed"); } - #[test] - fn test_graphql_error_from_db_error() { - let db_err = DbError::TaskNotFound("task-123".to_string()); - let graphql_err: GraphqlError = db_err.into(); - - match graphql_err { - GraphqlError::DatabaseError(_) => (), - _ => panic!("Expected DatabaseError conversion"), - } - } - #[test] fn test_graphql_error_from_json_error() { let json_err = serde_json::from_str::("invalid json").unwrap_err(); @@ -1099,37 +1066,6 @@ query GetEventCountByIds($ids: [String!]!) { assert!(debug_str.contains("1000")); } - // ============================================================================ - // SyncStats Tests - // ============================================================================ - - #[test] - fn test_sync_stats_serialization() { - let stats = SyncStats { - total_transfers: 10, - unique_addresses: 15, - last_sync_time: chrono::Utc::now(), - }; - - let json = serde_json::to_string(&stats).unwrap(); - assert!(json.contains("total_transfers")); - assert!(json.contains("unique_addresses")); - assert!(json.contains("last_sync_time")); - } - - #[test] - fn test_sync_stats_deserialization() { - let json = r#"{ - "total_transfers": 5, - "unique_addresses": 8, - "last_sync_time": "2024-01-01T00:00:00Z" - }"#; - - let stats: SyncStats = serde_json::from_str(json).unwrap(); - assert_eq!(stats.total_transfers, 5); - assert_eq!(stats.unique_addresses, 8); - } - // ============================================================================ // Edge Cases // ============================================================================ diff --git a/src/services/mod.rs b/src/services/mod.rs index 3c674ec..dc7bd65 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,10 +1,6 @@ pub mod alert_service; -pub mod ethereum_service; pub mod graphql_client; pub mod raid_leaderboard_service; -pub mod reverser; pub mod signature_service; -pub mod task_generator; pub mod telegram_service; -pub mod transaction_manager; pub mod tweet_synchronizer_service; diff --git a/src/services/reverser.rs b/src/services/reverser.rs deleted file mode 100644 index 4fd3b6d..0000000 --- a/src/services/reverser.rs +++ /dev/null @@ -1,383 +0,0 @@ -use crate::{ - db_persistence::{DbError, DbPersistence}, - models::task::TaskStatus, - services::transaction_manager::TransactionManager, -}; -use std::sync::Arc; -use subxt::error::TransactionError; -use tokio::time::{interval, Duration}; - -#[derive(Debug, thiserror::Error)] -pub enum ReverserError { - #[error("Database error: {0}")] - Database(#[from] DbError), - #[error("Transaction error: {0}")] - Transaction(#[from] TransactionError), - #[error("Reverser service error: {0}")] - Service(String), -} - -pub type ReverserResult = Result; - -pub struct ReverserService { - db: Arc, - transaction_manager: Arc, - check_interval: Duration, - early_reversal_minutes: i64, -} - -impl ReverserService { - pub fn new( - db: Arc, - transaction_manager: Arc, - check_interval: Duration, - early_reversal_minutes: i64, - ) -> Self { - Self { - db, - transaction_manager, - check_interval, - early_reversal_minutes, - } - } - - /// Start the reverser service monitoring loop - pub async fn start(&self) -> ReverserResult<()> { - tracing::info!( - "Starting reverser service with {} minute early reversal and {} second check interval", - self.early_reversal_minutes, - self.check_interval.as_secs() - ); - - let mut interval_timer = interval(self.check_interval); - - loop { - interval_timer.tick().await; - - if let Err(e) = self.check_and_reverse_tasks().await { - tracing::error!("Error in reverser service: {}", e); - // For now, log and die as requested - return Err(e); - } - } - } - - /// Check for tasks that need to be reversed and reverse them - async fn check_and_reverse_tasks(&self) -> ReverserResult<()> { - let tasks_to_reverse = self - .db - .tasks - .get_tasks_ready_for_reversal(self.early_reversal_minutes) - .await?; - - if tasks_to_reverse.is_empty() { - tracing::debug!("No tasks ready for reversal"); - return Ok(()); - } - - tracing::info!("Found {} tasks ready for reversal", tasks_to_reverse.len()); - - let mut reversal_count = 0; - let mut error_count = 0; - - for task in tasks_to_reverse { - tracing::info!( - "Reversing task {} (quan_address: {}, quan_amount: {}, usdc_amount: {}, tx: {})", - task.task_id, - task.quan_address.0, - task.quan_amount.0, - task.usdc_amount, - task.reversible_tx_id.as_deref().unwrap_or("none") - ); - - match self.transaction_manager.reverse_transaction(&task.task_id).await { - Ok(()) => { - reversal_count += 1; - tracing::info!("Successfully reversed task {}", task.task_id); - } - Err(e) => { - error_count += 1; - tracing::error!("Failed to reverse task {}: {}", task.task_id, e); - - // Mark task as failed if reversal failed - if let Err(db_err) = self - .db - .tasks - .update_task_status(&task.task_id, TaskStatus::Failed) - .await - { - tracing::error!( - "Failed to mark task {} as failed after reversal error: {}", - task.task_id, - db_err - ); - } - } - } - } - - tracing::info!( - "Reversal batch completed: {} successful, {} errors", - reversal_count, - error_count - ); - - // If there were any errors, return an error to trigger the "log and die" behavior - if error_count > 0 { - return Err(ReverserError::Service(format!( - "Failed to reverse {} out of {} tasks", - error_count, - reversal_count + error_count - ))); - } - - Ok(()) - } - - /// Get statistics about tasks that need attention - pub async fn get_reversal_stats(&self) -> ReverserResult { - let pending_tasks = self.db.tasks.get_tasks_by_status(TaskStatus::Pending).await?; - let tasks_ready_for_reversal = self - .db - .tasks - .get_tasks_ready_for_reversal(self.early_reversal_minutes) - .await?; - - let mut tasks_expiring_soon = 0; - let mut tasks_already_expired = 0; - let now = chrono::Utc::now(); - - for task in &pending_tasks { - if let Some(end_time) = task.end_time { - if end_time <= now { - tasks_already_expired += 1; - } else if end_time <= now + chrono::Duration::minutes(self.early_reversal_minutes) { - tasks_expiring_soon += 1; - } - } - } - - Ok(ReversalStats { - total_pending: pending_tasks.len(), - ready_for_reversal: tasks_ready_for_reversal.len(), - expiring_soon: tasks_expiring_soon, - already_expired: tasks_already_expired, - }) - } - - /// Manual trigger for reversal check (useful for testing or admin endpoints) - pub async fn trigger_reversal_check(&self) -> ReverserResult { - let tasks_to_reverse = self - .db - .tasks - .get_tasks_ready_for_reversal(self.early_reversal_minutes) - .await?; - - let count = tasks_to_reverse.len(); - - if count > 0 { - self.check_and_reverse_tasks().await?; - } - - Ok(count) - } -} - -#[derive(Debug, Clone, serde::Serialize)] -pub struct ReversalStats { - pub total_pending: usize, - pub ready_for_reversal: usize, - pub expiring_soon: usize, - pub already_expired: usize, -} - -/// Start the reverser service in a background task -pub async fn start_reverser_service( - db: Arc, - transaction_manager: Arc, - check_interval: Duration, - early_reversal_minutes: i64, -) -> tokio::task::JoinHandle> { - let reverser = ReverserService::new(db, transaction_manager, check_interval, early_reversal_minutes); - - tokio::spawn(async move { reverser.start().await }) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - config::Config, - db_persistence::DbPersistence, - models::address::{Address, AddressInput}, - models::task::{Task, TaskInput, TaskStatus}, - services::transaction_manager::TransactionManager, - utils::generate_referral_code::generate_referral_code, - utils::test_db::reset_database, - }; - use chrono::{Duration as ChronoDuration, Utc}; - use quantus_cli::wallet::WalletManager; - use uuid::Uuid; - - // Helper to set up a full test environment with a DB, TransactionManager, and ReverserService. - // NOTE: Requires a local Quantus node running. - async fn setup_test_reverser() -> (ReverserService, Arc, Arc) { - let config = Config::load_test_env().expect("Failed to load test configuration"); - std::env::set_var("TASKMASTER_USE_DEV_ALICE", "1"); - let db = Arc::new(DbPersistence::new(config.get_database_url()).await.unwrap()); - - reset_database(&db.pool).await; - - let wallet_name = "//Alice"; - let transaction_manager = Arc::new( - TransactionManager::new( - &config.blockchain.node_url, - &wallet_name, - "password", - db.clone(), - ChronoDuration::seconds(60), - ) - .await - .unwrap(), - ); - - let reverser = ReverserService::new( - db.clone(), - transaction_manager.clone(), - Duration::from_secs(10), - 5, // 5 minute early reversal window for tests - ); - - (reverser, transaction_manager, db) - } - - // Helper to create a task that is ready for reversal - async fn create_reversable_task( - db: &DbPersistence, - tm: &TransactionManager, - id: &str, // Used to keep task_url unique - ) -> Task { - let wallet_manager = WalletManager::new().unwrap(); - let recipient_wallet_name = format!("test_recipient_{}", Uuid::new_v4()); - let recipient_info = wallet_manager - .create_wallet(&recipient_wallet_name, Some("password")) - .await - .unwrap(); - // This is a real, valid SS58 address that the node will accept. - let quan_address = recipient_info.address; - - // Create and save the Address and Task objects using the valid address. - let referral_code = generate_referral_code(quan_address.clone()).await.unwrap(); - let address = Address::new(AddressInput { - quan_address, - referral_code, - }) - .unwrap(); - db.addresses.create(&address).await.unwrap(); - - let task = Task::new(TaskInput { - quan_address: address.quan_address.0, - quan_amount: 1000, - task_url: format!("http://example.com/{}", id), - }) - .unwrap(); - let task_id = db.tasks.create(&task).await.unwrap(); - - tm.send_reversible_transaction(&task_id).await.unwrap(); - - // Manually update the task's end_time to be within the reversal window. - let new_end_time = Utc::now() + ChronoDuration::minutes(2); - sqlx::query("UPDATE tasks SET end_time = $1 WHERE task_id = $2") - .bind(new_end_time) - .bind(&task.task_id) - .execute(&db.pool) - .await - .unwrap(); - - // Return the fully prepared task. - db.tasks.get_task(&task_id).await.unwrap().unwrap() - } - - #[tokio::test] - async fn chain_test_check_and_reverse_tasks_success() { - let (reverser, tm, db) = setup_test_reverser().await; - - // Arrange: Create a task that is ready to be reversed. - let task = create_reversable_task(&db, &tm, "001").await; - assert_eq!(task.status, TaskStatus::Pending); - - // Act: Run the reversal check. - reverser.check_and_reverse_tasks().await.unwrap(); - - // Assert: The task status in the DB should now be 'Reversed'. - let reversed_task = db.tasks.get_task(&task.task_id).await.unwrap().unwrap(); - assert_eq!(reversed_task.status, TaskStatus::Reversed); - } - - #[tokio::test] - async fn chain_test_check_and_reverse_does_nothing_if_no_tasks_ready() { - let (reverser, tm, db) = setup_test_reverser().await; - - // Arrange: Create a task, send its transaction, but its end_time is far in the future. - let task = create_reversable_task(&db, &tm, "002").await; - let future_end_time = Utc::now() + ChronoDuration::hours(1); - sqlx::query("UPDATE tasks SET end_time = $1 WHERE task_id = $2") - .bind(future_end_time) - .bind(&task.task_id) - .execute(&db.pool) - .await - .unwrap(); - - // Act: Run the reversal check. - reverser.check_and_reverse_tasks().await.unwrap(); - - // Assert: The task should not have been reversed. - let not_reversed_task = db.tasks.get_task(&task.task_id).await.unwrap().unwrap(); - assert_eq!(not_reversed_task.status, TaskStatus::Pending); - } - - #[tokio::test] - async fn chain_test_get_reversal_stats() { - let (reverser, _tm, db) = setup_test_reverser().await; - - // We will manually create tasks with specific timings for this test. - let now = Utc::now(); - let early_reversal_window = ChronoDuration::minutes(reverser.early_reversal_minutes); - - // Task 1: Already expired (should be ready for reversal) - let task1 = create_reversable_task(&db, &reverser.transaction_manager, "stats_01").await; - sqlx::query("UPDATE tasks SET end_time = $1 WHERE task_id = $2") - .bind(now - ChronoDuration::minutes(10)) - .bind(&task1.task_id) - .execute(&db.pool) - .await - .unwrap(); - - // Task 2: Expiring soon (inside the window, also ready for reversal) - let task2 = create_reversable_task(&db, &reverser.transaction_manager, "stats_02").await; - sqlx::query("UPDATE tasks SET end_time = $1 WHERE task_id = $2") - .bind(now + early_reversal_window - ChronoDuration::minutes(1)) - .bind(&task2.task_id) - .execute(&db.pool) - .await - .unwrap(); - - // Task 3: Pending, but not expiring soon (outside the window) - let task3 = create_reversable_task(&db, &reverser.transaction_manager, "stats_03").await; - sqlx::query("UPDATE tasks SET end_time = $1 WHERE task_id = $2") - .bind(now + early_reversal_window + ChronoDuration::minutes(10)) - .bind(&task3.task_id) - .execute(&db.pool) - .await - .unwrap(); - - // Act: Get the stats - let stats = reverser.get_reversal_stats().await.unwrap(); - - // Assert - assert_eq!(stats.total_pending, 3); - assert_eq!(stats.ready_for_reversal, 2); // Expired + Expiring Soon - assert_eq!(stats.expiring_soon, 1); // Only task2 - assert_eq!(stats.already_expired, 1); // Only task1 - } -} diff --git a/src/services/task_generator.rs b/src/services/task_generator.rs deleted file mode 100644 index 47d6282..0000000 --- a/src/services/task_generator.rs +++ /dev/null @@ -1,365 +0,0 @@ -use crate::{ - db_persistence::{DbError, DbPersistence}, - models::{ - address::{Address, AddressInput, QuanAddress}, - task::{Task, TaskInput}, - }, - utils::generate_referral_code::generate_referral_code, -}; -use rand::prelude::*; - -#[derive(Debug, thiserror::Error)] -pub enum TaskGeneratorError { - #[error("Task input data contain one of more invalid value")] - ValidationError, - #[error("No candidates available")] - NoCandidates, - #[error("Not enough candidates for selection")] - InsufficientCandidates, - #[error("CSV error: {0}")] - Database(#[from] DbError), - #[error("HTTP error: {0}")] - Http(#[from] reqwest::Error), - #[error("JSON parsing error: {0}")] - Json(#[from] serde_json::Error), -} - -pub type TaskGeneratorResult = Result; - -#[derive(Debug, Clone)] -pub struct TaskGenerator { - candidates: Vec, - db: std::sync::Arc, - http_client: reqwest::Client, -} - -impl TaskGenerator { - pub fn new(db: std::sync::Arc) -> Self { - Self { - candidates: Vec::new(), - db, - http_client: reqwest::Client::new(), - } - } - - /// Fetch candidates from GraphQL endpoint - pub async fn refresh_candidates(&mut self, graphql_url: &str) -> TaskGeneratorResult<()> { - tracing::info!("Refreshing candidates from: {}", graphql_url); - - // Simple GraphQL query - adjust this based on your actual schema - let query = serde_json::json!({ - "query": "{ - accounts { - id - } - }"}); - - let response = self.http_client.post(graphql_url).json(&query).send().await?; - - if !response.status().is_success() { - tracing::error!("GraphQL request failed with status: {}", response.status()); - return Err(TaskGeneratorError::Http(reqwest::Error::from( - response.error_for_status().unwrap_err(), - ))); - } - - let response_json: serde_json::Value = response.json().await?; - - // Extract candidates array from GraphQL response - let candidates = response_json - .get("data") - .and_then(|data| data.get("accounts")) - .and_then(|accounts| accounts.as_array()) - .ok_or_else(|| { - TaskGeneratorError::Json(serde_json::Error::io(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid GraphQL response format", - ))) - })?; - - let mut new_candidates = Vec::new(); - for candidate in candidates { - if let Some(address) = candidate.get("id").and_then(|id| id.as_str()) { - // Validate that it's a proper quantus address (starts with qz) - if let Ok(valid_address) = QuanAddress::from(address) { - new_candidates.push(valid_address.0.to_string()); - } else { - tracing::warn!("Invalid candidate address format: {}", address); - } - } - } - - self.candidates = new_candidates; - tracing::info!("Refreshed {} candidates", self.candidates.len()); - Ok(()) - } - - /// Refresh candidates from local database addresses - pub async fn refresh_candidates_from_db(&mut self) -> TaskGeneratorResult<()> { - tracing::info!("Refreshing candidates from local database"); - - let addresses = self.db.addresses.find_all().await?; - - let mut new_candidates = Vec::new(); - for address in addresses { - // Validate that it's a proper quantus address (starts with qz) - - new_candidates.push(address.quan_address.0); - } - - self.candidates = new_candidates; - tracing::info!("Refreshed {} candidates from database", self.candidates.len()); - Ok(()) - } - - /// Generate tasks by randomly selecting taskees - pub async fn generate_tasks(&self, count: usize) -> TaskGeneratorResult> { - if self.candidates.is_empty() { - return Err(TaskGeneratorError::NoCandidates); - } - - if self.candidates.len() < count { - tracing::warn!( - "Requested {} taskees but only have {} candidates", - count, - self.candidates.len() - ); - } - - let mut rng = rand::rng(); - let selection_count = count.min(self.candidates.len()); - - // Randomly select unique candidates - let selected_candidates: Vec = self - .candidates - .choose_multiple(&mut rng, selection_count) - .cloned() - .collect(); - - let mut tasks = Vec::new(); - - for quan_address in selected_candidates { - let quan_amount = self.generate_random_quan_amount(); - let task_url = self.generate_task_url(); - let task_input = TaskInput { - quan_address, - quan_amount, - task_url, - }; - - if let Ok(task) = Task::new(task_input) { - tasks.push(task); - } else { - return Err(TaskGeneratorError::ValidationError); - }; - } - - tracing::info!("Generated {} new tasks", tasks.len()); - Ok(tasks) - } - - /// Save generated tasks to database - pub async fn save_tasks(&self, tasks: Vec) -> TaskGeneratorResult<()> { - for task in tasks { - tracing::debug!( - "Saving task: {} -> {} (quan_amount: {}, usdc_amount: {}, url: {})", - task.task_id, - task.quan_address.0, - task.quan_amount.0, - task.usdc_amount, - task.task_url - ); - - if let Ok(referral_code) = generate_referral_code(task.quan_address.0.clone()).await { - if let Ok(address) = Address::new(AddressInput { - quan_address: task.quan_address.0.clone(), - referral_code, - }) { - // Ensure address exists in database - self.db.addresses.create(&address).await?; - self.db.tasks.create(&task).await?; - } else { - return Err(TaskGeneratorError::ValidationError); - } - } - } - Ok(()) - } - - /// Generate and save tasks in one operation - pub async fn generate_and_save_tasks(&self, count: usize) -> TaskGeneratorResult> { - let mut tasks = self.generate_tasks(count).await?; - self.ensure_unique_task_urls(&mut tasks).await?; - self.save_tasks(tasks.clone()).await?; - Ok(tasks) - } - - /// Get current candidates count - pub fn candidates_count(&self) -> usize { - self.candidates.len() - } - - /// Get current candidates list (for debugging/status) - pub fn get_candidates(&self) -> &[String] { - &self.candidates - } - - /// Check for duplicate task URLs to avoid collisions - pub async fn ensure_unique_task_urls(&self, tasks: &mut [Task]) -> TaskGeneratorResult<()> { - for task in tasks { - // Keep checking if URL exists and regenerate if needed - while let Some(_existing_task) = self.db.tasks.find_task_by_url(&task.task_url).await? { - tracing::warn!("Task URL collision detected, regenerating: {}", task.task_url); - task.task_url = self.generate_task_url(); - } - } - - Ok(()) - } - - fn generate_random_quan_amount(&self) -> u64 { - let mut rng = rand::rng(); - rng.random_range(1000..=9999) - } - - fn generate_task_url(&self) -> String { - let mut rng = rand::rng(); - // Generate 12 digit random number - let task_url: u64 = rng.random_range(100_000_000_000..=999_999_999_999); - task_url.to_string() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::Config; - use crate::utils::test_db::reset_database; - use std::sync::Arc; - use wiremock::{matchers::method, Mock, MockServer, ResponseTemplate}; - - // Helper to set up a test generator with a real PostgreSQL test database. - async fn setup_test_generator() -> TaskGenerator { - let config = Config::load_test_env().expect("Failed to load test configuration"); - let db = Arc::new(DbPersistence::new(config.get_database_url()).await.unwrap()); - - reset_database(&db.pool).await; - - TaskGenerator::new(db) - } - - #[tokio::test] - async fn test_generate_random_quan_amount() { - let generator = setup_test_generator().await; - for _ in 0..100 { - let amount = generator.generate_random_quan_amount(); - assert!((1000..=9999).contains(&amount)); - } - } - - #[tokio::test] - async fn test_generate_task_url() { - let generator = setup_test_generator().await; - for _ in 0..100 { - let url = generator.generate_task_url(); - assert_eq!(url.len(), 12); - assert!(url.chars().all(|c| c.is_ascii_digit())); - } - } - - #[tokio::test] - async fn test_refresh_candidates_from_db() { - let mut generator = setup_test_generator().await; - - // Create and save some addresses to the DB. - // The dummy addresses must be > 10 characters to pass validation. - let addr1 = Address::new(AddressInput { - quan_address: "qz_a_valid_test_address_1".to_string(), - referral_code: "REF1".to_string(), - }) - .unwrap(); - let addr2 = Address::new(AddressInput { - quan_address: "qz_a_valid_test_address_2".to_string(), - referral_code: "REF2".to_string(), - }) - .unwrap(); - generator.db.addresses.create(&addr1).await.unwrap(); - generator.db.addresses.create(&addr2).await.unwrap(); - - // Refresh candidates from the database. - generator.refresh_candidates_from_db().await.unwrap(); - - assert_eq!(generator.candidates_count(), 2); - assert!(generator.get_candidates().contains(&addr1.quan_address.0)); - assert!(generator.get_candidates().contains(&addr2.quan_address.0)); - } - - #[tokio::test] - async fn test_refresh_candidates_with_mock_server() { - // Start a mock server. - let server = MockServer::start().await; - let mut generator = setup_test_generator().await; - - // Create a mock GraphQL response. - let mock_response = serde_json::json!({ - "data": { - "accounts": [ - { "id": "qz_a_valid_test_address_1" }, - { "id": "invalid_addr" }, // Should be filtered out - { "id": "qz_a_valid_test_address_2" } - ] - } - }); - Mock::given(method("POST")) - .respond_with(ResponseTemplate::new(200).set_body_json(mock_response)) - .mount(&server) - .await; - - // Call the function with the mock server's URI. - generator.refresh_candidates(&server.uri()).await.unwrap(); - - // Assert that only valid candidates were added. - assert_eq!(generator.candidates_count(), 2); - assert!(generator - .get_candidates() - .contains(&"qz_a_valid_test_address_1".to_string())); - assert!(generator - .get_candidates() - .contains(&"qz_a_valid_test_address_2".to_string())); - } - - #[tokio::test] - async fn test_generate_tasks_and_save() { - let mut generator = setup_test_generator().await; - - // Populate candidates manually for the test. - generator.candidates = vec![ - "qz_a_valid_test_address_1".to_string(), - "qz_a_valid_test_address_2".to_string(), - "qz_a_valid_test_address_3".to_string(), - ]; - - // Generate and save 2 tasks. - let tasks = generator.generate_and_save_tasks(2).await.unwrap(); - assert_eq!(tasks.len(), 2); - - // Verify the state after the first call. - let db_tasks = generator.db.tasks.get_all_tasks().await.unwrap(); - assert_eq!(db_tasks.len(), 2); - - // Generate and save 3 more tasks (capped by the 3 candidates). - generator.generate_and_save_tasks(5).await.unwrap(); - let db_tasks_total = generator.db.tasks.get_all_tasks().await.unwrap(); - - // The database now contains the original 2 tasks PLUS the 3 new ones. - // The total should be 5. - assert_eq!(db_tasks_total.len(), 5); - } - - #[tokio::test] - async fn test_no_candidates_error() { - let generator = setup_test_generator().await; // Candidates list is empty. - let result = generator.generate_tasks(1).await; - assert!(matches!(result, Err(TaskGeneratorError::NoCandidates))); - } -} diff --git a/src/services/transaction_manager.rs b/src/services/transaction_manager.rs deleted file mode 100644 index 3ff3eb3..0000000 --- a/src/services/transaction_manager.rs +++ /dev/null @@ -1,298 +0,0 @@ -use crate::db_persistence::DbPersistence; -use crate::models::task::{Task, TaskStatus}; -use chrono::Utc; -use quantus_cli::chain::client::QuantusClient; -use quantus_cli::cli::reversible::{cancel_transaction, schedule_transfer}; -use quantus_cli::qp_dilithium_crypto::crystal_alice; -use quantus_cli::wallet::{QuantumKeyPair, WalletManager}; -use std::sync::Arc; -use tokio::sync::RwLock; - -#[derive(Debug, thiserror::Error)] -pub enum TransactionError { - #[error("Quantus client error: {0}")] - QuantusClient(#[from] quantus_cli::error::QuantusError), - #[error("Wallet error: {0}")] - Wallet(#[from] quantus_cli::error::WalletError), - #[error("CSV error: {0}")] - Database(#[from] crate::db_persistence::DbError), - #[error("Transaction not found: {0}")] - TransactionNotFound(String), - #[error("Invalid transaction state: {0}")] - InvalidState(String), -} - -pub type TransactionResult = Result; - -pub struct TransactionManager { - client: Arc>, - keypair: QuantumKeyPair, - db: Arc, - reversal_period: chrono::Duration, -} - -impl TransactionManager { - pub async fn new( - node_url: &str, - wallet_name: &str, - wallet_password: &str, - db: Arc, - reversal_period: chrono::Duration, - ) -> TransactionResult { - // Connect to Quantus node - let client = QuantusClient::new(node_url).await?; - let client = Arc::new(RwLock::new(client)); - - // Initialize wallet manager - let wallet_manager = WalletManager::new()?; - - // Support dev keypair - let keypair = if wallet_name.starts_with("//Alice") { - tracing::info!("Using dev URI keypair: {}", wallet_name); - QuantumKeyPair::from_resonance_pair(&crystal_alice()) - } else { - // Load or create wallet - match wallet_manager.load_wallet(wallet_name, wallet_password) { - Ok(wallet_data) => { - tracing::info!("Loaded existing wallet: {}", wallet_name); - wallet_data.keypair - } - Err(_) => { - tracing::info!("Creating new wallet: {}", wallet_name); - let wallet_info = wallet_manager.create_wallet(wallet_name, Some(wallet_password)).await?; - tracing::info!("Created wallet with address: {}", wallet_info.address); - - // Load the newly created wallet - wallet_manager.load_wallet(wallet_name, wallet_password)?.keypair - } - } - }; - - tracing::info!( - "Transaction manager initialized with wallet address: {}", - keypair.to_account_id_ss58check() - ); - - Ok(Self { - client, - keypair, - db, - reversal_period, - }) - } - - /// Send a reversible transaction for a task - pub async fn send_reversible_transaction(&self, task_id: &str) -> TransactionResult { - let task = self - .db - .tasks - .get_task(task_id) - .await? - .ok_or_else(|| TransactionError::TransactionNotFound(task_id.to_string()))?; - - tracing::info!( - "Sending reversible transaction for task {} to {} (quan_amount: {})", - task_id, - task.quan_address.0, - task.quan_amount.0 - ); - - // Send the transaction - let client = self.client.read().await; - let tx_hash = schedule_transfer( - &*client, - &self.keypair, - &task.quan_address.0, - task.quan_amount.0 as u128, // Convert to u128 for quantus-cli - false, - ) - .await?; - - drop(client); - - // Calculate end time (current time + reversal period) - let send_time = Utc::now(); - let end_time = send_time + self.reversal_period; - - // Update task with transaction details - self.db - .tasks - .update_task_transaction(task_id, &format!("0x{:x}", tx_hash), send_time, end_time) - .await?; - - let tx_hash_string = format!("0x{:x}", tx_hash); - - tracing::info!( - "Transaction sent successfully. Hash: {}, End time: {}", - tx_hash_string, - end_time.format("%Y-%m-%d %H:%M:%S UTC") - ); - - Ok(tx_hash_string) - } - - /// Cancel/reverse a transaction - pub async fn reverse_transaction(&self, task_id: &str) -> TransactionResult<()> { - let task = self - .db - .tasks - .get_task(task_id) - .await? - .ok_or_else(|| TransactionError::TransactionNotFound(task_id.to_string()))?; - - let reversible_tx_id = task.reversible_tx_id.as_ref().ok_or_else(|| { - TransactionError::InvalidState("Task has no reversible transaction ID to reverse".to_string()) - })?; - - // Remove "0x" prefix if present for the cancel call - let tx_hash_str = reversible_tx_id.strip_prefix("0x").unwrap_or(reversible_tx_id); - - tracing::info!("Reversing transaction for task {} (tx: {})", task_id, reversible_tx_id); - - let client = self.client.read().await; - let cancel_tx_hash = cancel_transaction(&*client, &self.keypair, tx_hash_str, false).await?; - - drop(client); - - // Update task status - self.db.tasks.update_task_status(task_id, TaskStatus::Reversed).await?; - - tracing::info!("Transaction reversed successfully. Cancel tx: 0x{:x}", cancel_tx_hash); - - Ok(()) - } - - /// Process a batch of tasks for transaction sending - pub async fn process_task_batch(&self, tasks: Vec) -> TransactionResult> { - let mut processed_tasks = Vec::new(); - let task_count = tasks.len(); - - for task in tasks { - match self.send_reversible_transaction(&task.task_id).await { - Ok(tx_hash) => { - processed_tasks.push(task.task_id.clone()); - tracing::info!("Successfully processed task: {} with tx: {}", task.task_id, tx_hash); - } - Err(e) => { - tracing::error!("Failed to process task {}: {}", task.task_id, e); - - // Mark task as failed - if let Err(db_err) = self - .db - .tasks - .update_task_status(&task.task_id, TaskStatus::Failed) - .await - { - tracing::error!("Failed to mark task as failed: {}", db_err); - } - } - } - } - - tracing::info!( - "Batch processing completed. {}/{} tasks processed successfully", - processed_tasks.len(), - task_count - ); - - Ok(processed_tasks) - } - - /// Get wallet balance - pub async fn get_wallet_balance(&self) -> TransactionResult { - let client = self.client.read().await; - let account_id = self.keypair.to_account_id_32(); - - // Convert to subxt AccountId32 - let account_bytes: [u8; 32] = *account_id.as_ref(); - let subxt_account_id = subxt::utils::AccountId32::from(account_bytes); - - use quantus_cli::chain::quantus_subxt::api; - let storage_addr = api::storage().system().account(subxt_account_id); - let account_info = client - .client() - .storage() - .at_latest() - .await - .map_err(|e| { - TransactionError::QuantusClient(quantus_cli::error::QuantusError::NetworkError(e.to_string())) - })? - .fetch_or_default(&storage_addr) - .await - .map_err(|e| { - TransactionError::QuantusClient(quantus_cli::error::QuantusError::NetworkError(e.to_string())) - })?; - - Ok(account_info.data.free) - } - - /// Get wallet address - pub fn get_wallet_address(&self) -> String { - self.keypair.to_account_id_ss58check() - } - - /// Health check - verify connection to node - pub async fn health_check(&self) -> TransactionResult { - let client = self.client.read().await; - match client.get_runtime_version().await { - Ok(_) => { - tracing::debug!("Health check passed - connected to Quantus node"); - Ok(true) - } - Err(e) => { - tracing::error!("Health check failed: {}", e); - Err(TransactionError::QuantusClient(e)) - } - } - } - - /// Get node info for debugging - pub async fn get_node_info(&self) -> TransactionResult { - let client = self.client.read().await; - let runtime_version = client.get_runtime_version().await?; - Ok(format!( - "Quantus Node - Runtime Version: {}.{}", - runtime_version.0, runtime_version.1 - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::Config; - - #[tokio::test] - async fn chain_test_new_manager_creates_and_loads_wallet() { - // This test requires filesystem access to create a wallet. - let config = Config::load_test_env().expect("Failed to load test configuration"); - let db = Arc::new(DbPersistence::new(config.get_database_url()).await.unwrap()); - let wallet_name = "//Alice"; // use dev key for local node - - // First, create the manager, which should create a new wallet. - let manager1 = TransactionManager::new( - &config.blockchain.node_url, - &wallet_name, - "password", - db.clone(), - chrono::Duration::hours(12), - ) - .await - .unwrap(); - let addr1 = manager1.get_wallet_address(); - - // Now, create another manager with the same name to ensure it loads the existing wallet. - let manager2 = TransactionManager::new( - &config.blockchain.node_url, - &wallet_name, - "password", - db.clone(), - chrono::Duration::hours(12), - ) - .await - .unwrap(); - let addr2 = manager2.get_wallet_address(); - - assert_eq!(addr1, addr2); - } -} diff --git a/src/services/tweet_synchronizer_service.rs b/src/services/tweet_synchronizer_service.rs index 416041c..16063e7 100644 --- a/src/services/tweet_synchronizer_service.rs +++ b/src/services/tweet_synchronizer_service.rs @@ -164,7 +164,7 @@ impl TweetSynchronizerService { tracing::info!("๐Ÿ”„ Background Worker: Starting Twitter Sync..."); match service.sync_relevant_tweets().await { - Ok(_) => tracing::info!("โœ… Sync Complete."), + Ok(_) => tracing::info!("โœ… Sync Complete. Relevant tweets synced."), Err(e) => tracing::error!("โŒ Sync Failed: {:?}", e), } } diff --git a/src/utils/test_app_state.rs b/src/utils/test_app_state.rs index 8b94b84..8ffe053 100644 --- a/src/utils/test_app_state.rs +++ b/src/utils/test_app_state.rs @@ -1,6 +1,6 @@ use crate::{ - db_persistence::DbPersistence, http_server::AppState, metrics::Metrics, models::auth::TokenClaims, - services::alert_service::AlertService, Config, GraphqlClient, + db_persistence::DbPersistence, http_server::AppState, metrics::Metrics, models::auth::TokenClaims, Config, + GraphqlClient, }; use jsonwebtoken::{encode, EncodingKey, Header}; use rusx::RusxGateway; @@ -13,13 +13,11 @@ pub async fn create_test_app_state() -> AppState { let graphql_client = GraphqlClient::new(db.clone(), config.candidates.graphql_url.clone()); let db = Arc::new(db); - let alert_client = Arc::new(AlertService::new(config.clone(), db.tweet_pull_usage.clone())); return AppState { db, metrics: Arc::new(Metrics::new()), graphql_client: Arc::new(graphql_client), - alert_client, config: Arc::new(config), twitter_gateway: Arc::new(twitter_gateway), oauth_sessions: Arc::new(Mutex::new(std::collections::HashMap::new())), diff --git a/src/utils/test_db.rs b/src/utils/test_db.rs index 8ea2673..5ad3044 100644 --- a/src/utils/test_db.rs +++ b/src/utils/test_db.rs @@ -15,7 +15,7 @@ use crate::{ }; pub async fn reset_database(pool: &PgPool) { - sqlx::query("TRUNCATE tasks, referrals, opt_ins, addresses, admins, eth_associations, x_associations, relevant_tweets, tweet_authors, raid_quests, raid_submissions, tweet_pull_usage RESTART IDENTITY CASCADE") + sqlx::query("TRUNCATE referrals, opt_ins, addresses, admins, eth_associations, x_associations, relevant_tweets, tweet_authors, raid_quests, raid_submissions, tweet_pull_usage RESTART IDENTITY CASCADE") .execute(pool) .await .expect("Failed to truncate tables for tests"); diff --git a/test.db b/test.db deleted file mode 100644 index 78d3b25..0000000 Binary files a/test.db and /dev/null differ diff --git a/test_endpoints.sh b/test_endpoints.sh deleted file mode 100755 index afd0ec2..0000000 --- a/test_endpoints.sh +++ /dev/null @@ -1,123 +0,0 @@ -#!/bin/bash - -# TaskMaster API Test Script -# Tests the HTTP endpoints of the TaskMaster server - -set -e - -BASE_URL="http://localhost:3000" - -echo "๐Ÿงช Testing TaskMaster API Endpoints" -echo "==================================" - -# Colors for output -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -NC='\033[0m' # No Color - -# Function to make HTTP requests and check response -test_endpoint() { - local method=$1 - local endpoint=$2 - local data=$3 - local expected_status=$4 - - echo -e "${YELLOW}Testing: $method $endpoint${NC}" - - if [ -z "$data" ]; then - response=$(curl -s -w "HTTPSTATUS:%{http_code}" -X "$method" "$BASE_URL$endpoint") - else - response=$(curl -s -w "HTTPSTATUS:%{http_code}" -X "$method" "$BASE_URL$endpoint" \ - -H "Content-Type: application/json" \ - -d "$data") - fi - - body=$(echo "$response" | sed -E 's/HTTPSTATUS\:[0-9]{3}$//') - status=$(echo "$response" | tr -d '\n' | sed -E 's/.*HTTPSTATUS:([0-9]{3})$/\1/') - - if [ "$status" -eq "$expected_status" ]; then - echo -e "${GREEN}โœ… Status: $status (expected $expected_status)${NC}" - if [ -n "$body" ] && [ "$body" != "null" ]; then - echo "๐Ÿ“„ Response: $(echo "$body" | jq -C . 2>/dev/null || echo "$body")" - fi - else - echo -e "${RED}โŒ Status: $status (expected $expected_status)${NC}" - echo "๐Ÿ“„ Response: $body" - fi - echo "" -} - -# Wait for server to be ready -echo "โณ Checking if TaskMaster server is running..." -max_attempts=10 -attempt=1 - -while [ $attempt -le $max_attempts ]; do - if curl -s "$BASE_URL/health" > /dev/null 2>&1; then - echo -e "${GREEN}โœ… Server is ready!${NC}" - break - else - echo " Attempt $attempt/$max_attempts - waiting for server..." - sleep 2 - ((attempt++)) - fi -done - -if [ $attempt -gt $max_attempts ]; then - echo -e "${RED}โŒ Server is not responding. Please start TaskMaster first.${NC}" - echo " Run: cargo run" - exit 1 -fi - -echo "" - -# Test 1: Health Check -test_endpoint "GET" "/health" "" 200 - -# Test 2: Status Check -test_endpoint "GET" "/status" "" 200 - -# Test 3: List All Tasks -test_endpoint "GET" "/tasks" "" 200 - -# Test 4: Complete Task (should fail - task doesn't exist) -task_completion_data='{"task_url": "999999999999"}' -test_endpoint "POST" "/complete" "$task_completion_data" 404 - -# Test 5: Complete Task with invalid format -invalid_task_data='{"task_url": "invalid"}' -test_endpoint "POST" "/complete" "$invalid_task_data" 400 - -# Test 6: Get Non-existent Task -test_endpoint "GET" "/tasks/nonexistent-task-id" "" 404 - -echo "๐ŸŽ‰ API endpoint testing completed!" -echo "" - -# Additional functionality tests -echo "๐Ÿ“Š Additional Server Information" -echo "===============================" - -# Get current status -echo "Current Status:" -curl -s "$BASE_URL/status" | jq -C . 2>/dev/null || curl -s "$BASE_URL/status" -echo "" - -echo "Health Check:" -curl -s "$BASE_URL/health" | jq -C . 2>/dev/null || curl -s "$BASE_URL/health" -echo "" - -# Show example of how to complete a task when one exists -echo "๐Ÿ’ก To complete a task when one exists:" -echo " curl -X POST $BASE_URL/complete \\" -echo ' -H "Content-Type: application/json" \' -echo ' -d '"'"'{"task_url": "123456789012"}'"'" -echo "" - -echo "๐Ÿ“ To monitor the CSV file:" -echo " tail -f tasks.csv" -echo "" - -echo "๐Ÿ” To check logs with debug level:" -echo " TASKMASTER_LOGGING__LEVEL=debug cargo run"