Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/redis/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "redis"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.65"
mega_etl = {path = "../../mega_etl"}
tokio_wasi = {version = '1.21', features = ["rt", "macros"]}
env_logger = "0.9"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
http_req_wasi = "0.10"
lazy_static = "1.4.0"
59 changes: 59 additions & 0 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use mega_etl::{async_trait, Pipe, Transformer, TransformerError, TransformerResult};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Order {
order_id: i32,
product_id: i32,
quantity: i32,
amount: f32,
shipping: f32,
tax: f32,
shipping_address: String,
}

#[async_trait]
impl Transformer for Order{
async fn transform(inbound_data: &Vec<u8>) -> TransformerResult<Vec<String>>{
let s = std::str::from_utf8(&inbound_data)
.map_err(|e| TransformerError::Custom(e.to_string()))?;
let order: Order = serde_json::from_str(String::from(s).as_str())
.map_err(|e| TransformerError::Custom(e.to_string()))?;
log::info!("{:?}", &order);
let mut ret = vec![];
let sql_string = format!(
r"INSERT INTO orders VALUES ({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?}, CURRENT_TIMESTAMP);",
order.order_id,
order.product_id,
order.quantity,
order.amount,
order.shipping,
order.tax,
order.shipping_address,
);
dbg!(sql_string.clone());
ret.push(sql_string);
Ok(ret)
}
async fn init() -> TransformerResult<String> {
Ok(String::from(
r"CREATE TABLE IF NOT EXISTS orders (order_id INT, product_id INT, quantity INT, amount FLOAT, shipping FLOAT, tax FLOAT, shipping_address VARCHAR(50), date_registered TIMESTAMP DEFAULT CURRENT_TIMESTAMP);",
))
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()>{
//env_logger::init();

//let database_uri = std::env::var("DATABASE_URL")?;
let database_uri = "mysql://hashkat:something@localhost:3306/mysql";
//let redis_uri = std::env::var("REDIS_URL")?;
let redis_uri = "redis://localhost:6379".to_string();
let mut pipe = Pipe::new(database_uri, redis_uri).await;

// Async
pipe.start::<Order>().await?;

Ok(())
}
1 change: 1 addition & 0 deletions mega_etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ tokio_wasi = {version = "1", features = ["net"]}
log = "0.4.17"
rskafka_wasi = "0.3"
futures-util = "0.3"
redis = "0.23.0"
27 changes: 24 additions & 3 deletions mega_etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl_from_transformer_error!(hyper::Error);

enum DataSource {
Hyper(String, u16),
Redis,
Redis(String, String),
Kafka(String, u16, String),
Unknown,
}
Expand All @@ -71,7 +71,22 @@ impl DataSource {
};
Ok(DataSource::Hyper(host.to_string(), port))
}
"redis" => Ok(DataSource::Redis),
//"redis" => Ok(DataSource::Redis(Host,Port)),
"redis" => {
let host= if let Some(host) = url.host_str(){
host
}
else{
return Err(url::ParseError::EmptyHost);
};
let password = if let Some(password) = url.password(){
password
}
else{
"Incorrect Password Provided!"
};
Ok(DataSource::Redis(host.to_string(), password.to_string()))
}
"kafka" => {
let host = if let Some(host) = url.host_str() {
host
Expand Down Expand Up @@ -312,7 +327,13 @@ impl Pipe {
}
}
}
DataSource::Redis => Err(TransformerError::Unimplemented),
DataSource::Redis(host,password) => {
let redis_hostname = host;
let redis_password = password;
let redis_conn_url = format!("redis://:{}@{}", redis_password, redis_hostname);
redis::Client::open(redis_conn_url).expect("Invalid Connection URL").get_connection().expect("Failed to connect to Redis");
Ok(())
}
DataSource::Unknown => Err(TransformerError::Custom("Unknown data source".to_string())),
}
}
Expand Down