From 1d09b0b34518d6daa7e30066cf29602c4b2619ae Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 20 Feb 2024 15:12:07 -0300 Subject: [PATCH] feat: implement dlt monitor --- Cargo.lock | 188 +++++++++++++++++++++++++++------ Cargo.toml | 1 + src/domain/events.rs | 1 - src/domain/mod.rs | 1 - src/driven/fabric_state/mod.rs | 6 +- src/drivers/dlt_monitor/mod.rs | 72 +++++++++++++ src/drivers/mod.rs | 1 + 7 files changed, 233 insertions(+), 37 deletions(-) create mode 100644 src/drivers/dlt_monitor/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b27622d..781f5e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,6 +144,28 @@ dependencies = [ "password-hash", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "async-trait" version = "0.1.73" @@ -152,7 +174,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -392,7 +414,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -432,6 +454,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -594,6 +626,7 @@ dependencies = [ "tonic-reflection", "tracing", "tracing-subscriber", + "utxorpc", "uuid", ] @@ -789,7 +822,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -1423,9 +1456,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -1565,9 +1598,15 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-sys" version = "0.9.91" @@ -1742,7 +1781,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -1783,7 +1822,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -1843,9 +1882,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -1912,9 +1951,9 @@ checksum = "ae1d0fdc23ea945d58449259496ba12d3b520da1a45ce5011f631c990add9029" [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -2076,11 +2115,24 @@ version = "0.21.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ + "log", "ring", "rustls-webpki", "sct", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2112,6 +2164,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "schemars" version = "0.8.12" @@ -2162,6 +2223,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.183" @@ -2189,7 +2273,7 @@ checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -2305,9 +2389,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys 0.48.0", @@ -2584,9 +2668,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.32" +version = "2.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" +checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb" dependencies = [ "proc-macro2", "quote", @@ -2614,22 +2698,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -2659,9 +2743,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -2670,7 +2754,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -2687,13 +2771,13 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -2708,6 +2792,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -2749,6 +2843,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.21.2", @@ -2763,7 +2858,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-native-certs", + "rustls-pemfile", "tokio", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -2858,7 +2956,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] @@ -2991,6 +3089,32 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "utxorpc" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a98857edfd2d93f425c531a047030b118a293e17596a22d6bdf31f11300bf8e" +dependencies = [ + "thiserror", + "tokio", + "tonic", + "utxorpc-spec", +] + +[[package]] +name = "utxorpc-spec" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88f80e24bfe310d0972406d15c0892ff09b6c81ded2cdefc0183aac35cf0514f" +dependencies = [ + "bytes", + "pbjson", + "pbjson-types", + "prost", + "serde", + "tonic", +] + [[package]] name = "uuid" version = "1.6.1" @@ -3054,7 +3178,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", "wasm-bindgen-shared", ] @@ -3076,7 +3200,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3300,7 +3424,7 @@ checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.32", + "syn 2.0.50", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 25ac6b4..c533a17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ uuid = { version = "1.6.1", features = ["v4"] } # dmtri = { version = "0.1.0", git = "https://github.com/demeter-run/specs.git" } dmtri = { version = "0.1.0", path = "../specs/gen/rust" } +utxorpc = "0.1.0" diff --git a/src/domain/events.rs b/src/domain/events.rs index ca848ce..1f045cb 100644 --- a/src/domain/events.rs +++ b/src/domain/events.rs @@ -68,7 +68,6 @@ pub struct UsagePaymentV1 { pub entry: Blob, pub epoch: Epoch, pub namespace: NamespaceName, - pub cluster: ClusterUuid, pub units: DCU, } diff --git a/src/domain/mod.rs b/src/domain/mod.rs index 0359128..18cd16f 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -444,7 +444,6 @@ mod tests { entry: b"1".into(), epoch: 123, namespace: "ns1".into(), - cluster: b"cluster1".into(), units: 400, }) .unwrap(); diff --git a/src/driven/fabric_state/mod.rs b/src/driven/fabric_state/mod.rs index 8fa2d06..80c95cd 100644 --- a/src/driven/fabric_state/mod.rs +++ b/src/driven/fabric_state/mod.rs @@ -140,7 +140,7 @@ VALUES ($1, $2, $3, $4, $5) &self, epoch: i64, entry: &[u8], - cluster: &[u8], + cluster: Option<&[u8]>, namespace: &str, resource: Option<&[u8]>, deltas: Vec, @@ -266,7 +266,7 @@ mod tests { db.insert_accounting( 1, b"entry1", - b"cluster1", + Some(b"cluster1"), "ns1", Some(b"resource1"), vec![ @@ -288,7 +288,7 @@ mod tests { db.insert_accounting( 1, b"entry1", - b"cluster1", + None, "ns1", None, vec![ diff --git a/src/drivers/dlt_monitor/mod.rs b/src/drivers/dlt_monitor/mod.rs new file mode 100644 index 0000000..a2f73aa --- /dev/null +++ b/src/drivers/dlt_monitor/mod.rs @@ -0,0 +1,72 @@ +use anyhow::Result; +use std::sync::Arc; +use tokio::sync::Mutex; +use utxorpc::spec::cardano::{Tx, TxOutput}; + +use crate::{ + domain::{Domain, UsagePaymentV1}, + driven::event_dispatch::EventWrapper, +}; + +struct Config { + u5c_endpoint: String, + dcu_policy: String, +} + +fn is_dcu_payment(tx: &Tx, config: &Config) -> bool { + tx.outputs + .iter() + .flat_map(|txo| txo.assets.iter()) + .any(|x| x.policy_id == &config.dcu_policy) +} + +pub async fn run(domain: Arc>, config: Config) -> Result<()> { + let mut subscription = { domain.lock().await.event_dispatch.subscribe() }; + + while let Ok(EventWrapper(evt, _)) = subscription.recv().await { + let mut domain = domain.lock().await; + domain.handle(evt).await?; + } + + let mut client = utxorpc::ClientBuilder::new() + .uri(&config.u5c_endpoint)? + .build::() + .await; + + let mut tip = client.follow_tip(vec![]).await?; + + while let Ok(event) = tip.event().await { + let txs: Vec<_> = match event { + utxorpc::TipEvent::Apply(x) => x + .body + .unwrap() + .tx + .into_iter() + .filter(|x| is_dcu_payment(x, &config)) + .collect(), + utxorpc::TipEvent::Undo(_) => todo!(), + utxorpc::TipEvent::Reset(_) => todo!(), + }; + + for tx in txs { + domain.lock().await.handle( + UsagePaymentV1 { + entry: tx.hash.into(), + epoch: 1, + namespace: tx + .auxiliary + .unwrap() + .metadata + .iter() + .find(|x| x.label == 1791) + .unwrap() + .value, + units: tx.outputs.first().unwrap().coin, + } + .into(), + ); + } + } + + Ok(()) +} diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index cebb5c0..09af09f 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -1,2 +1,3 @@ +pub mod dlt_monitor; pub mod fabric_monitor; pub mod rpc;