diff --git a/src/main.rs b/src/main.rs index 35377ad..93d7a27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::fs::read_to_string; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use anyhow::Result; use databend_driver::new_connection; @@ -69,35 +69,33 @@ async fn execute(dsn: &str, iterations: u32) -> Result { num_of_success += 1; } - if (batch_id + 1) % 7 == 0 { - // introduce more conflicts if possible in on replace into stmt - let ids = vec![batch_id, batch_id / 2, batch_id / 3]; - exec_replace_conflict(&dsn, &ids).await?; - } + // introduce more conflicts if possible in on replace into stmt + let ids = vec![batch_id]; + exec_replace_conflict(&dsn, &ids).await?; } Ok::<_, anyhow::Error>(num_of_success) } }); - let shutdown = Arc::new(AtomicBool::new(false)); + // let shutdown = Arc::new(AtomicBool::new(false)); // background tasks to maintain the table - let maintain_handle = tokio::spawn({ - let dsn = dsn.to_string(); - let shutdown = shutdown.clone(); - async move { - let mut batch_id = 0; - loop { - if shutdown.load(Ordering::Relaxed) { - break; - } - // we do not care if this fails - let _ = exec_table_maintenance(&dsn, batch_id).await; - batch_id += 1; - } - Ok::<_, anyhow::Error>(()) - } - }); + // let maintain_handle = tokio::spawn({ + // let dsn = dsn.to_string(); + // let shutdown = shutdown.clone(); + // async move { + // let mut batch_id = 0; + // loop { + // if shutdown.load(Ordering::Relaxed) { + // break; + // } + // // we do not care if this fails + // let _ = exec_table_maintenance(&dsn, batch_id).await; + // batch_id += 1; + // } + // Ok::<_, anyhow::Error>(()) + // } + // }); // join all the join handles @@ -105,9 +103,9 @@ async fn execute(dsn: &str, iterations: u32) -> Result { let success_replace_stmts = replace_handle.await??; // then we shutdown the table maintenance tasks - shutdown.store(true, Ordering::Relaxed); + // shutdown.store(true, Ordering::Relaxed); - maintain_handle.await??; + // maintain_handle.await??; Ok(success_replace_stmts) } @@ -119,7 +117,7 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result { .map(|id| id.to_string()) .collect::>() .join(","); - info!("executing replace (with conflict) : [{}]", ids); + info!("executing merge-into (with conflict) : [{}]", ids); // generate sub query which combine all the data that generated by history batch ids @@ -133,17 +131,23 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result { // replace these history data into the table (itself). while table being compacted and re-clustered // this may lead to partial and total block update. - let sql = format!("replace into test_order on(id, insert_time) ({sub_query})"); - + let sql = format!( + "merge into test_order as t + using ({sub_query}) as s + on t.id = s.id and t.insert_time = s.insert_time + when matched then delete + when not matched then insert * + " + ); match conn.exec(&sql).await { Ok(_) => { - info!("Ok. replace batch (with conflict) : [{}]", ids); + info!("Ok. merge-into batch (with conflict) : [{}]", ids); Ok(true) } Err(e) => { // replace may be failed due to concurrent mutations (compact, purge, recluster) - info!("Err. replace batch (with conflict) : [{}]. {e}", ids); + info!("Err. merge-into batch (with conflict) : [{}]. {e}", ids); Ok(false) } } @@ -152,61 +156,88 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result { async fn exec_replace(dsn: &str, batch_id: u32) -> Result { let conn = new_connection(dsn)?; - info!("executing replace batch : {}", batch_id); + info!("executing merge-into batch : {}", batch_id); let batch_correlated_value = batch_id * 7; + let truncate_sql = "truncate table random_source_store"; + match conn.exec(&truncate_sql).await { + Ok(_) => {} + Err(e) => { + panic!("{:?}", e); + } + }; + + let insert_sql = format!( + "insert into random_source_store (select + id, + {batch_id} as id1, + {batch_correlated_value} as id2, + id3, id4, id5, id6, id7, + s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13, + d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, + insert_time, + insert_time1, + insert_time2, + insert_time3, + i + from random_source limit 1000)" + ); + + match conn.exec(&insert_sql).await { + Ok(_) => {} + Err(e) => { + panic!("{:?}", e); + } + }; + //on(id, insert_time) let sql = format!( " - replace into test_order on(id, insert_time) - select - id, - {batch_id} as id1, - {batch_correlated_value} as id2, - id3, id4, id5, id6, id7, - s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13, - d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, - insert_time, - insert_time1, - insert_time2, - insert_time3, - i - from random_source limit 1000 + merge into test_order as t + using ( + select * from random_source_store + ) as s + + on t.id = s.id and t.insert_time = s.insert_time + + when matched then update * + when not matched then insert * " ); + match conn.exec(&sql).await { Ok(_) => { - info!("Ok. replace batch : {}", batch_id); + info!("Ok. merge-into batch : {}", batch_id); Ok(true) } Err(e) => { // replace may be failed due to concurrent mutations (compact, purge, recluster) - info!("Err. replace batch : {}. {e}", batch_id); + info!("Err. merge-into batch : {}. {e}", batch_id); Ok(false) } } } -async fn exec_table_maintenance(dsn: &str, batch_id: i32) -> Result<()> { - info!("executing table maintenance batch : {}", batch_id); - let conn = new_connection(dsn)?; - let sqls = vec![ - //"select * from test_order ignore_result", - "optimize table test_order compact segment", - "optimize table test_order compact", - "optimize table test_order purge", - "alter table test_order recluster", - ]; - for sql in sqls { - match conn.exec(sql).await { - Ok(_) => { - info!("Ok. maintenance batch : {}", batch_id); - } - Err(e) => { - info!("Err. maintenance batch : {}. {e}", batch_id); - } - } - } - Ok(()) -} +// async fn exec_table_maintenance(dsn: &str, batch_id: i32) -> Result<()> { +// info!("executing table maintenance batch : {}", batch_id); +// let conn = new_connection(dsn)?; +// let sqls = vec![ +// //"select * from test_order ignore_result", +// "optimize table test_order compact segment", +// "optimize table test_order compact", +// "optimize table test_order purge", +// "alter table test_order recluster", +// ]; +// for sql in sqls { +// match conn.exec(sql).await { +// Ok(_) => { +// info!("Ok. maintenance batch : {}", batch_id); +// } +// Err(e) => { +// info!("Err. maintenance batch : {}. {e}", batch_id); +// } +// } +// } +// Ok(()) +// } async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { info!("=========================="); @@ -217,7 +248,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { info!(" "); info!(" "); info!( - "number of successfully executed replace-into statements : {}", + "number of successfully executed merge-into statements : {}", success_replace_stmts ); info!(" "); @@ -225,7 +256,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { // - check the table data match the number of successfully executed replace into statements { - info!("CHECK: value of successfully executed replace into statements"); + info!("CHECK: value of successfully executed merge-into statements"); // For most of the cases, there should be 1000 * success_replace_stmts rows // @@ -236,9 +267,9 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { let mut rows = conn.query_iter("select count() from test_order").await?; let r = rows.next().await.unwrap().unwrap(); - let count: (u32, ) = r.try_into()?; + let count: (u32,) = r.try_into()?; info!( - "CHECK: value of successfully executed replace into statements: client {}, server {}", + "CHECK: value of successfully executed merge-into statements: client {}, server {}", success_replace_stmts * 1000, count.0 ); @@ -249,43 +280,43 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { let mut rows = conn .query_iter( " - select count() from - (select count() a, id1 from test_order group by id1) - where a != 1000", + select count() from test_order + ", ) .await?; let r = rows.next().await.unwrap().unwrap(); - let count: (u32, ) = r.try_into()?; + let count: (u32,) = r.try_into()?; + // conflict test deleted all data assert_eq!(0, count.0); // show the number of distinct value of id2 // not required to be equal, since there might be communication failures - let mut rows = conn - .query_iter("select count(distinct(id2)) from test_order") - .await?; - let r = rows.next().await.unwrap().unwrap(); - let count: (u32, ) = r.try_into()?; - - assert_eq!(success_replace_stmts, count.0); - info!( - "CHECK: distinct ids: client {}, server {}", - success_replace_stmts, count.0 - ); + // let mut rows = conn + // .query_iter("select count(distinct(id2)) from test_order") + // .await?; + // let r = rows.next().await.unwrap().unwrap(); + // let count: (u32,) = r.try_into()?; + + // assert_eq!(success_replace_stmts, count.0); + // info!( + // "CHECK: distinct ids: client {}, server {}", + // success_replace_stmts, count.0 + // ); } // - check the value of correlated column // for all the rows, id2 should be equal to id1 * 7 - { - let mut rows = conn - .query_iter("select count() from test_order where id2 != id1 * 7") - .await?; - let r = rows.next().await.unwrap().unwrap(); - let count: (i64, ) = r.try_into()?; + // { + // let mut rows = conn + // .query_iter("select count() from test_order where id2 != id1 * 7") + // .await?; + // let r = rows.next().await.unwrap().unwrap(); + // let count: (i64,) = r.try_into()?; - info!("CHECK: value of correlated column"); + // info!("CHECK: value of correlated column"); - assert_eq!(0, count.0); - } + // assert_eq!(0, count.0); + // } // - full table scan, ensure that the table data is not damaged info!("CHECK: full table scanning"); @@ -306,7 +337,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> { info!(" "); info!("========METRICS============"); - let mut rows = conn.query_iter("select metric, value from system.metrics where metric like '%replace%' or metric like '%conflict%' order by metric") + let mut rows = conn.query_iter("select metric, value from system.metrics where metric like '%merge%' or metric like '%conflict%' order by metric") .await?; while let Some(r) = rows.next().await { let (metric, value): (String, String) = r.unwrap().try_into()?; diff --git a/tests/sql/setup.sql b/tests/sql/setup.sql index a901a24..a5b0e14 100644 --- a/tests/sql/setup.sql +++ b/tests/sql/setup.sql @@ -1,5 +1,7 @@ drop table if exists test_order; drop table if exists random_source; +drop table if exists random_source_store; + create table test_order ( id bigint, id1 bigint, @@ -91,6 +93,49 @@ create table random_source( +create table random_source_store( + id bigint not null, + id1 bigint, + id2 bigint, + id3 bigint, + id4 bigint, + id5 bigint, + id6 bigint, + id7 bigint, + + s1 varchar, + s2 varchar, + s3 varchar, + s4 varchar, + s5 varchar, + s6 varchar, + s7 varchar, + s8 varchar, + s9 varchar, + s10 varchar, + s11 varchar, + s12 varchar, + s13 varchar, + + d1 DECIMAL(20, 8), + d2 DECIMAL(20, 8), + d3 DECIMAL(20, 8), + d4 DECIMAL(20, 8), + d5 DECIMAL(20, 8), + d6 DECIMAL(30, 8), + d7 DECIMAL(30, 8), + d8 DECIMAL(30, 8), + d9 DECIMAL(30, 8), + d10 DECIMAL(30, 8), + + insert_time datetime not null, + insert_time1 datetime, + insert_time2 datetime, + insert_time3 datetime, + + i int + +); truncate table system.metrics;