Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions migrations/010_drop_tasks_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Drop tasks table and its associated indexes and triggers
DROP TABLE IF EXISTS tasks CASCADE;
42 changes: 0 additions & 42 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ pub struct Config {
pub server: ServerConfig,
pub blockchain: BlockchainConfig,
pub candidates: CandidatesConfig,
pub task_generation: TaskGenerationConfig,
pub reverser: ReverserConfig,
pub data: DataConfig,
pub logging: LoggingConfig,
pub jwt: JwtConfig,
Expand Down Expand Up @@ -42,18 +40,6 @@ pub struct CandidatesConfig {
pub refresh_interval_minutes: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskGenerationConfig {
pub generation_interval_minutes: u64,
pub taskees_per_round: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReverserConfig {
pub early_reversal_minutes: u64,
pub check_interval_seconds: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataConfig {
pub database_url: String,
Expand Down Expand Up @@ -140,26 +126,6 @@ impl Config {
&self.server.base_api_url
}

pub fn get_candidates_refresh_duration(&self) -> tokio::time::Duration {
tokio::time::Duration::from_secs(self.candidates.refresh_interval_minutes * 60)
}

pub fn get_task_generation_duration(&self) -> tokio::time::Duration {
tokio::time::Duration::from_secs(self.task_generation.generation_interval_minutes * 60)
}

pub fn get_reverser_check_duration(&self) -> tokio::time::Duration {
tokio::time::Duration::from_secs(self.reverser.check_interval_seconds)
}

pub fn get_reversal_period_duration(&self) -> chrono::Duration {
chrono::Duration::hours(self.blockchain.reversal_period_hours as i64)
}

pub fn get_early_reversal_duration(&self) -> chrono::Duration {
chrono::Duration::minutes(self.reverser.early_reversal_minutes as i64)
}

pub fn get_jwt_expiration(&self) -> chrono::Duration {
chrono::Duration::hours(self.jwt.exp_in_hours)
}
Expand Down Expand Up @@ -200,14 +166,6 @@ impl Default for Config {
graphql_url: "http://localhost:4000/graphql".to_string(),
refresh_interval_minutes: 30,
},
task_generation: TaskGenerationConfig {
generation_interval_minutes: 60,
taskees_per_round: 5,
},
reverser: ReverserConfig {
early_reversal_minutes: 2,
check_interval_seconds: 30,
},
data: DataConfig {
database_url: "postgres://postgres:postgres@127.0.0.1:5432/task_master".to_string(),
},
Expand Down
48 changes: 2 additions & 46 deletions src/db_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,16 @@ use crate::repositories::tweet_author::TweetAuthorRepository;
use crate::repositories::tweet_pull_usage::TweetPullUsageRepository;
use crate::repositories::x_association::XAssociationRepository;
use crate::repositories::DbResult;
use crate::repositories::{
address::AddressRepository, opt_in::OptInRepository, referral::ReferralRepository, task::TaskRepository,
};
use crate::repositories::{address::AddressRepository, opt_in::OptInRepository, referral::ReferralRepository};

#[derive(Debug, thiserror::Error)]
pub enum DbError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Migration error: {0}")]
Migration(#[from] sqlx::migrate::MigrateError),
#[error("Task not found: {0}")]
TaskNotFound(String),
#[error("Address not found: {0}")]
AddressNotFound(String),
#[error("Invalid task status: {0}")]
InvalidStatus(String),
#[error("Record not found: {0}")]
RecordNotFound(String),
#[error("Conflict error: {0}")]
Expand All @@ -34,7 +28,6 @@ pub enum DbError {

#[derive(Debug, Clone)]
pub struct DbPersistence {
pub tasks: TaskRepository,
pub addresses: AddressRepository,
pub referrals: ReferralRepository,
pub opt_ins: OptInRepository,
Expand All @@ -48,6 +41,7 @@ pub struct DbPersistence {
pub raid_leaderboards: RaidLeaderboardRepository,
pub tweet_pull_usage: TweetPullUsageRepository,

#[allow(dead_code)]
pub pool: PgPool,
}

Expand All @@ -57,7 +51,6 @@ impl DbPersistence {

sqlx::migrate!("./migrations").run(&pool).await?;

let tasks = TaskRepository::new(&pool);
let addresses = AddressRepository::new(&pool);
let referrals = ReferralRepository::new(&pool);
let opt_ins = OptInRepository::new(&pool);
Expand All @@ -73,43 +66,6 @@ impl DbPersistence {

Ok(Self {
pool,
tasks,
addresses,
referrals,
opt_ins,
x_associations,
eth_associations,
admin,
relevant_tweets,
tweet_authors,
raid_quests,
raid_submissions,
raid_leaderboards,
tweet_pull_usage,
})
}

#[cfg(test)]
pub async fn new_unmigrated(database_url: &str) -> DbResult<Self> {
let pool = PgPoolOptions::new().max_connections(5).connect(database_url).await?;

let tasks = TaskRepository::new(&pool);
let addresses = AddressRepository::new(&pool);
let referrals = ReferralRepository::new(&pool);
let opt_ins = OptInRepository::new(&pool);
let x_associations = XAssociationRepository::new(&pool);
let eth_associations = EthAssociationRepository::new(&pool);
let admin = AdminRepository::new(&pool);
let relevant_tweets = RelevantTweetRepository::new(&pool);
let tweet_authors = TweetAuthorRepository::new(&pool);
let raid_quests = RaidQuestRepository::new(&pool);
let raid_submissions = RaidSubmissionRepository::new(&pool);
let raid_leaderboards = RaidLeaderboardRepository::new(&pool);
let tweet_pull_usage = TweetPullUsageRepository::new(pool.clone());

Ok(Self {
pool,
tasks,
addresses,
referrals,
opt_ins,
Expand Down
33 changes: 5 additions & 28 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,9 @@ use tracing::error;

use crate::{
db_persistence::DbError,
handlers::{
address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError, task::TaskHandlerError,
HandlerError,
},
handlers::{address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError, HandlerError},
models::ModelError,
services::{
graphql_client::GraphqlError, reverser::ReverserError, task_generator::TaskGeneratorError,
transaction_manager::TransactionError,
},
services::graphql_client::GraphqlError,
};

#[derive(Debug, thiserror::Error)]
Expand All @@ -30,12 +24,6 @@ pub enum AppError {
Model(#[from] ModelError),
#[error("Database error: {0}")]
Database(#[from] DbError),
#[error("Transaction manager error: {0}")]
Transaction(#[from] TransactionError),
#[error("Task generator error: {0}")]
TaskGenerator(#[from] TaskGeneratorError),
#[error("Reverser error: {0}")]
Reverser(#[from] ReverserError),
#[error("Server error: {0}")]
Server(String),
#[error("Join error: {0}")]
Expand Down Expand Up @@ -74,10 +62,7 @@ impl IntoResponse for AppError {
AppError::Database(err) => map_db_error(err),

// --- Everything else ---
e @ (AppError::Transaction(_)
| AppError::TaskGenerator(_)
| AppError::Reverser(_)
| AppError::Join(_)
e @ (AppError::Join(_)
| AppError::Graphql(_)
| AppError::Config(_)
| AppError::Http(_)
Expand Down Expand Up @@ -150,21 +135,13 @@ fn map_handler_error(err: HandlerError) -> (StatusCode, String) {
ReferralHandlerError::InvalidReferral(err) => (StatusCode::BAD_REQUEST, err),
ReferralHandlerError::DuplicateReferral(err) => (StatusCode::CONFLICT, err),
},

HandlerError::Task(err) => match err {
TaskHandlerError::TaskNotFound(err) => (StatusCode::NOT_FOUND, err.message.clone()),
TaskHandlerError::InvalidTaskUrl(err) => (StatusCode::BAD_REQUEST, err.message.clone()),
TaskHandlerError::StatusConflict(err) => (StatusCode::CONFLICT, err.message.clone()),
},
}
}

fn map_db_error(err: DbError) -> (StatusCode, String) {
match err {
DbError::UniqueViolation(err) => (StatusCode::CONFLICT, err),
DbError::RecordNotFound(err) | DbError::AddressNotFound(err) | DbError::TaskNotFound(err) => {
(StatusCode::NOT_FOUND, err)
}
DbError::RecordNotFound(err) | DbError::AddressNotFound(err) => (StatusCode::NOT_FOUND, err),

DbError::Database(err) => {
error!("Database error: {}", err);
Expand All @@ -183,7 +160,7 @@ fn map_db_error(err: DbError) -> (StatusCode, String) {
}
}

DbError::InvalidStatus(_) | DbError::Migration(_) => (
DbError::Migration(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
"An internal server error occurred".to_string(),
),
Expand Down
40 changes: 4 additions & 36 deletions src/handlers/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
address::{
Address, AddressFilter, AddressSortColumn, AddressStatsResponse, AddressWithOptInAndAssociations,
AddressWithRank, AggregateStatsQueryParams, AssociatedAccountsResponse, OptedInPositionResponse,
RewardProgramStatusPayload, SyncTransfersResponse,
RewardProgramStatusPayload,
},
admin::Admin,
eth_association::{
Expand Down Expand Up @@ -355,37 +355,6 @@ pub async fn retrieve_associated_accounts(
}))
}

pub async fn sync_transfers(State(state): State<AppState>) -> Result<Json<SyncTransfersResponse>, AppError> {
tracing::info!("Received request to sync transfers from GraphQL endpoint");

match state.graphql_client.sync_transfers_and_addresses().await {
Ok((transfer_count, address_count)) => {
tracing::info!(
"Transfer sync completed successfully: {} transfers, {} addresses",
transfer_count,
address_count
);

let response = SyncTransfersResponse {
success: true,
message: format!(
"Successfully processed {} transfers and stored {} addresses",
transfer_count, address_count
),
transfers_processed: Some(transfer_count),
addresses_stored: Some(address_count),
};

Ok(Json(response))
}
Err(e) => {
tracing::error!("Failed to sync transfers: {}", e);

Err(AppError::Graphql(e))
}
}
}

pub async fn handle_get_opted_in_users(
State(state): State<AppState>,
) -> Result<Json<SuccessResponse<Vec<crate::models::opt_in::OptIn>>>, AppError> {
Expand Down Expand Up @@ -905,10 +874,9 @@ mod tests {
assert_eq!(response.status(), StatusCode::NO_CONTENT);

// Verification in DB
let saved_assoc = state
.db
.x_associations
.find_by_username("twitter_pro_101")
let saved_assoc = sqlx::query_as::<_, XAssociation>("SELECT * FROM x_associations WHERE username = $1")
.bind(new_association.username)
.fetch_optional(&state.db.pool)
.await
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions src/handlers/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ mod tests {
use crate::{
handlers::auth::handle_x_oauth_callback,
http_server::AppState,
models::x_association::XAssociation,
routes::auth::auth_routes,
utils::{
test_app_state::create_test_app_state,
Expand Down Expand Up @@ -527,10 +528,9 @@ mod tests {
assert!(location.contains(&format!("payload={}", expected_username)));

// Check DB Side Effects
let saved_assoc = state
.db
.x_associations
.find_by_username(expected_username)
let saved_assoc = sqlx::query_as::<_, XAssociation>("SELECT * FROM x_associations WHERE username = $1")
.bind(expected_username)
.fetch_optional(&state.db.pool)
.await
.unwrap();

Expand Down
7 changes: 1 addition & 6 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::Display;

use crate::{
handlers::{
address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError, task::TaskHandlerError,
},
handlers::{address::AddressHandlerError, auth::AuthHandlerError, referral::ReferralHandlerError},
AppError,
};

Expand All @@ -14,13 +12,10 @@ pub mod auth;
pub mod raid_quest;
pub mod referral;
pub mod relevant_tweet;
pub mod task;
pub mod tweet_author;

#[derive(Debug, thiserror::Error)]
pub enum HandlerError {
#[error("Task handler error")]
Task(#[from] TaskHandlerError),
#[error("Referral handler error")]
Referral(#[from] ReferralHandlerError),
#[error("Address handler error")]
Expand Down
Loading