diff --git a/examples/redis/Cargo.toml b/examples/redis/Cargo.toml new file mode 100644 index 0000000..bd82145 --- /dev/null +++ b/examples/redis/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "redis" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.65" +mega_etl = {path = "../../mega_etl"} +tokio_wasi = {version = '1.21', features = ["rt", "macros"]} +env_logger = "0.9" +log = "0.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +http_req_wasi = "0.10" +lazy_static = "1.4.0" diff --git a/examples/redis/src/main.rs b/examples/redis/src/main.rs new file mode 100644 index 0000000..728288e --- /dev/null +++ b/examples/redis/src/main.rs @@ -0,0 +1,59 @@ +use mega_etl::{async_trait, Pipe, Transformer, TransformerError, TransformerResult}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +struct Order { + order_id: i32, + product_id: i32, + quantity: i32, + amount: f32, + shipping: f32, + tax: f32, + shipping_address: String, +} + +#[async_trait] +impl Transformer for Order{ + async fn transform(inbound_data: &Vec) -> TransformerResult>{ + let s = std::str::from_utf8(&inbound_data) + .map_err(|e| TransformerError::Custom(e.to_string()))?; + let order: Order = serde_json::from_str(String::from(s).as_str()) + .map_err(|e| TransformerError::Custom(e.to_string()))?; + log::info!("{:?}", &order); + let mut ret = vec![]; + let sql_string = format!( + r"INSERT INTO orders VALUES ({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?}, CURRENT_TIMESTAMP);", + order.order_id, + order.product_id, + order.quantity, + order.amount, + order.shipping, + order.tax, + order.shipping_address, + ); + dbg!(sql_string.clone()); + ret.push(sql_string); + Ok(ret) + } + async fn init() -> TransformerResult { + Ok(String::from( + r"CREATE TABLE IF NOT EXISTS orders (order_id INT, product_id INT, quantity INT, amount FLOAT, shipping FLOAT, tax FLOAT, shipping_address VARCHAR(50), date_registered TIMESTAMP DEFAULT CURRENT_TIMESTAMP);", + )) + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()>{ + //env_logger::init(); + + //let database_uri = std::env::var("DATABASE_URL")?; + let database_uri = "mysql://hashkat:something@localhost:3306/mysql"; + //let redis_uri = std::env::var("REDIS_URL")?; + let redis_uri = "redis://localhost:6379".to_string(); + let mut pipe = Pipe::new(database_uri, redis_uri).await; + + // Async + pipe.start::().await?; + + Ok(()) +} diff --git a/mega_etl/Cargo.toml b/mega_etl/Cargo.toml index 729d86e..0f7d450 100644 --- a/mega_etl/Cargo.toml +++ b/mega_etl/Cargo.toml @@ -25,3 +25,4 @@ tokio_wasi = {version = "1", features = ["net"]} log = "0.4.17" rskafka_wasi = "0.3" futures-util = "0.3" +redis = "0.23.0" diff --git a/mega_etl/src/lib.rs b/mega_etl/src/lib.rs index 7977b2c..0334957 100644 --- a/mega_etl/src/lib.rs +++ b/mega_etl/src/lib.rs @@ -49,7 +49,7 @@ impl_from_transformer_error!(hyper::Error); enum DataSource { Hyper(String, u16), - Redis, + Redis(String, String), Kafka(String, u16, String), Unknown, } @@ -71,7 +71,22 @@ impl DataSource { }; Ok(DataSource::Hyper(host.to_string(), port)) } - "redis" => Ok(DataSource::Redis), + //"redis" => Ok(DataSource::Redis(Host,Port)), + "redis" => { + let host= if let Some(host) = url.host_str(){ + host + } + else{ + return Err(url::ParseError::EmptyHost); + }; + let password = if let Some(password) = url.password(){ + password + } + else{ + "Incorrect Password Provided!" + }; + Ok(DataSource::Redis(host.to_string(), password.to_string())) + } "kafka" => { let host = if let Some(host) = url.host_str() { host @@ -312,7 +327,13 @@ impl Pipe { } } } - DataSource::Redis => Err(TransformerError::Unimplemented), + DataSource::Redis(host,password) => { + let redis_hostname = host; + let redis_password = password; + let redis_conn_url = format!("redis://:{}@{}", redis_password, redis_hostname); + redis::Client::open(redis_conn_url).expect("Invalid Connection URL").get_connection().expect("Failed to connect to Redis"); + Ok(()) + } DataSource::Unknown => Err(TransformerError::Custom("Unknown data source".to_string())), } }