Skip to content

Commit 9de9cfd

Browse files
committed
feat: simulation create tag an and read tag
1 parent 4d9b794 commit 9de9cfd

File tree

5 files changed

+210
-25
lines changed

5 files changed

+210
-25
lines changed

cli/fledger/src/simulation.rs

Lines changed: 177 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1-
use crate::{metrics::Metrics, Fledger};
1+
use std::any::type_name;
2+
3+
use crate::Fledger;
24
use clap::{arg, Args, Subcommand};
35
use flarch::{nodeids::U256, tasks::wait_ms};
4-
use flmodules::gossip_events::core::Event;
6+
use flcrypto::tofrombytes::ToFromBytes;
7+
use flmodules::{
8+
dht_storage::realm_view::RealmView,
9+
flo::blob::{BlobAccess, BlobTag},
10+
gossip_events::core::Event,
11+
};
512
use metrics::{absolute_counter, increment_counter};
613

714
#[derive(Args, Debug, Clone)]
@@ -28,6 +35,16 @@ pub enum SimulationSubcommand {
2835
},
2936

3037
DhtJoinRealm {},
38+
39+
CreateTag {
40+
#[arg(long)]
41+
tag: String,
42+
},
43+
44+
FetchTag {
45+
#[arg(long)]
46+
tag: String,
47+
},
3148
}
3249

3350
pub struct SimulationHandler {}
@@ -39,6 +56,8 @@ impl SimulationHandler {
3956
Self::run_chat(f, command, send_msg, recv_msg).await
4057
}
4158
SimulationSubcommand::DhtJoinRealm {} => Self::run_dht_join_realm(f).await,
59+
SimulationSubcommand::CreateTag { tag } => Self::run_dht_create_tag(f, tag).await,
60+
SimulationSubcommand::FetchTag { tag } => Self::run_dht_fetch_tag(f, tag).await,
4261
}
4362
}
4463

@@ -50,8 +69,6 @@ impl SimulationHandler {
5069
) -> anyhow::Result<()> {
5170
f.loop_node(crate::FledgerState::Connected(1)).await?;
5271

53-
let node_name = f.args.name.clone().unwrap_or("unknown".into());
54-
5572
if let Some(ref msg) = recv_msg {
5673
log::info!("Waiting for chat message {}.", msg);
5774
}
@@ -63,9 +80,6 @@ impl SimulationHandler {
6380

6481
let mut acked_msg_ids: Vec<U256> = Vec::new();
6582

66-
// necessary to grab the variable for lifetime purposes.
67-
let _influx = Metrics::setup(node_name);
68-
6983
loop {
7084
wait_ms(1000).await;
7185

@@ -90,7 +104,7 @@ impl SimulationHandler {
90104
.iter()
91105
.any(|ev| ev.msg.eq(msg))
92106
{
93-
log::info!("RECV_CHAT_MSG TRIGGERED");
107+
log::info!("SIMULATION END");
94108
f.loop_node(crate::FledgerState::Forever).await?;
95109
return Ok(());
96110
}
@@ -102,10 +116,165 @@ impl SimulationHandler {
102116
f.loop_node(crate::FledgerState::DHTAvailable).await?;
103117
log::info!("SIMULATION END");
104118

119+
absolute_counter!("fledger_realms_total", 1);
120+
121+
f.loop_node(crate::FledgerState::Forever).await?;
122+
return Ok(());
123+
}
124+
125+
async fn run_dht_create_tag(mut f: Fledger, tag: String) -> anyhow::Result<()> {
126+
f.loop_node(crate::FledgerState::DHTAvailable).await?;
127+
absolute_counter!("fledger_dht_connected", 1);
128+
129+
log::info!("DHT CONNECTED");
130+
131+
//let router = f.node.dht_router.unwrap();
132+
let ds = f.node.dht_storage.as_mut().unwrap();
133+
let mut rv = RealmView::new_first(ds.clone()).await?;
134+
135+
// Send a Flo tag blob
136+
log::info!("Storing tag in DHT {}.", tag);
137+
let flo_tag = rv
138+
.create_tag(&tag, None, flcrypto::access::Condition::Pass, &[])
139+
.unwrap();
140+
log::info!(
141+
"tag {}/{}/{} | {}",
142+
flo_tag.flo_id(),
143+
flo_tag.realm_id(),
144+
flo_tag.version(),
145+
flo_tag.values().iter().next().unwrap().1,
146+
);
147+
148+
let _ = ds.store_flo(flo_tag.flo().clone());
149+
let _ = ds.propagate();
150+
ds.broker.settle(Vec::new()).await?;
151+
152+
log::info!("SIMULATION END");
153+
absolute_counter!("fledger_simulation_end", 1);
154+
155+
ds.get_flos(&rv.realm.realm_id())
156+
.await
157+
.unwrap()
158+
.iter()
159+
.for_each(|flo| {
160+
let flo_type = type_name::<BlobTag>();
161+
if flo.flo_type() == flo_type {
162+
let tag = BlobTag::from_rmp_bytes(flo_type, &flo.data()).unwrap();
163+
log::info!(
164+
"tag found {}/{}/{} | {}",
165+
flo.flo_id(),
166+
flo.realm_id(),
167+
flo.version(),
168+
tag.0.values().iter().next().unwrap().1,
169+
)
170+
}
171+
});
172+
173+
// if let Some(ref tags) = rv.tags {
174+
// log::info!("storage amt: {}", tags.storage.iter().count());
175+
//
176+
// let tagname = tags
177+
// .storage
178+
// .iter()
179+
// .next()
180+
// .unwrap()
181+
// .1 // first tag stored
182+
// .values()
183+
// .iter()
184+
// .next()
185+
// .unwrap()
186+
// .1; // name of tag
187+
// log::info!("tag found: {}", tagname);
188+
// } else {
189+
// log::info!("NOTICE: tag not found.")
190+
// }
191+
105192
f.loop_node(crate::FledgerState::Forever).await?;
106193
return Ok(());
107194
}
108195

196+
async fn run_dht_fetch_tag(mut f: Fledger, tag: String) -> anyhow::Result<()> {
197+
f.loop_node(crate::FledgerState::DHTAvailable).await?;
198+
absolute_counter!("fledger_dht_connected", 1);
199+
200+
log::info!("DHT CONNECTED");
201+
202+
let ds = f.node.dht_storage.as_mut().unwrap();
203+
let mut rv = RealmView::new_first(ds.clone()).await?;
204+
205+
loop {
206+
wait_ms(1000).await;
207+
208+
// let fledger_connected_total = f.node.nodes_connected()?.len(); // TODO: does not
209+
// compile.
210+
//absolute_counter!("fledger_connected_total", fledger_connected_total as u64);
211+
increment_counter!("fledger_iterations_total");
212+
213+
rv.update_tags().await?;
214+
215+
let flos = ds.get_flos(&rv.realm.realm_id()).await.unwrap().clone();
216+
// flos.iter().for_each(|flo| {
217+
// log::info!(
218+
// "flo found {}/{}/{} [{}]",
219+
// flo.flo_id(),
220+
// flo.realm_id(),
221+
// flo.version(),
222+
// flo.flo_type(),
223+
// )
224+
// });
225+
226+
let mut tags = flos
227+
.iter()
228+
.filter(|flo| flo.flo_type() == type_name::<BlobTag>())
229+
.map(|flo| BlobTag::from_rmp_bytes(&flo.flo_type(), &flo.data()).unwrap());
230+
231+
tags.clone().for_each(|tag| {
232+
log::info!("tag found {}", tag.0.values().iter().next().unwrap().1)
233+
});
234+
235+
if tags.any(|flotag| {
236+
flotag
237+
.0
238+
.values()
239+
.iter()
240+
.next()
241+
.is_some_and(|tagname| *tagname.1 == tag)
242+
}) {
243+
log::info!("SIMULATION END");
244+
absolute_counter!("fledger_simulation_end", 1);
245+
f.loop_node(crate::FledgerState::Forever).await?;
246+
247+
return Ok(());
248+
} else {
249+
log::info!("Tag not found...");
250+
}
251+
// tags.storage
252+
// .iter()
253+
// .any(|flotag| flotag.1.values().get(&tag).is_some());
254+
// //.any(|flotag| flotag.1.values().iter().next().unwrap().1.eq(&tag));
255+
// //.any(|tag| tag.1.values() tag.1.cache().0.get_blob_mut().values.get(tag).is_some());
256+
// {
257+
// log::info!("SIMULATION END");
258+
// absolute_counter!("fledger_simulation_end", 1);
259+
// f.loop_node(crate::FledgerState::Forever).await?;
260+
//
261+
// return Ok(());
262+
// }
263+
}
264+
}
265+
266+
// async fn run_dht_request_random_flow(mut f: Fledger) -> anyhow::Result<()> {
267+
// let ds = f.node.dht_storage.unwrap();
268+
// let rv = RealmView::new_first(ds.clone()).await?;
269+
//
270+
// // To send requests for random floID
271+
// // let realm_id = ds.get_realm_ids().await?.first().unwrap();
272+
// let realm_id = rv.realm.realm_id();
273+
// let res = ds.get_flo(&GlobalID::new(realm_id, FloID::rnd())).await;
274+
//
275+
// return Ok(());
276+
// }
277+
109278
fn log_new_messages(f: &Fledger, acked_msg_ids: &mut Vec<U256>) {
110279
let chat_events = f.node.gossip.as_ref().unwrap().chat_events();
111280
let chats: Vec<&Event> = chat_events

flmodules/src/dht_storage/broker.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub(super) const MODULE_NAME: &str = "DHTStorage";
3636

3737
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
3838
pub enum DHTStorageIn {
39+
ReadFlos(RealmID),
3940
StoreFlo(Flo),
4041
ReadFlo(GlobalID),
4142
ReadCuckooIDs(GlobalID),
@@ -46,6 +47,7 @@ pub enum DHTStorageIn {
4647

4748
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
4849
pub enum DHTStorageOut {
50+
Flos(Vec<Flo>),
4951
FloValue(FloCuckoo),
5052
RealmIDs(Vec<RealmID>),
5153
CuckooIDs(GlobalID, Vec<FloID>),
@@ -186,6 +188,15 @@ impl DHTStorage {
186188
.await?)
187189
}
188190

191+
pub async fn get_flos(&mut self, realm_id: &RealmID) -> anyhow::Result<Vec<Flo>> {
192+
Ok(self
193+
.send_wait(DHTStorageIn::ReadFlos(realm_id.clone()), &|msg| match msg {
194+
DHTStorageOut::Flos(flos) => Some(flos.clone()),
195+
_ => None,
196+
})
197+
.await?)
198+
}
199+
189200
pub fn sync(&mut self) -> anyhow::Result<()> {
190201
Ok(self.broker.emit_msg_in(DHTStorageIn::SyncFromNeighbors)?)
191202
}
@@ -434,14 +445,8 @@ mod tests {
434445
router.simul.send_node_info().await?;
435446
ds_2.sync()?;
436447
router.settle_all().await;
437-
assert!(ds_2
438-
.get_flo::<Testing>(&t0.flo().global_id())
439-
.await
440-
.is_ok());
441-
assert!(ds_2
442-
.get_flo::<Testing>(&t1.flo().global_id())
443-
.await
444-
.is_ok());
448+
assert!(ds_2.get_flo::<Testing>(&t0.flo().global_id()).await.is_ok());
449+
assert!(ds_2.get_flo::<Testing>(&t1.flo().global_id()).await.is_ok());
445450

446451
Ok(())
447452
}

flmodules/src/dht_storage/core.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ impl RealmStorage {
168168
.collect()
169169
}
170170

171+
pub fn get_flos(&self) -> Vec<Flo> {
172+
return self.flos.iter().map(|flo| flo.1.flo.clone()).collect_vec();
173+
}
174+
171175
pub fn store_cuckoo_ids(&mut self, parent: &FloID, cuckoos: Vec<FloID>) {
172176
for cuckoo in cuckoos {
173177
self.store_cuckoo_id(parent, cuckoo);

flmodules/src/dht_storage/messages.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ impl Messages {
183183
DHTStorageIn::GetRealms => {
184184
vec![DHTStorageOut::RealmIDs(self.realms.keys().cloned().collect()).into()]
185185
}
186+
DHTStorageIn::ReadFlos(realm_id) => {
187+
vec![DHTStorageOut::Flos(self.realms.get(&realm_id).unwrap().get_flos()).into()]
188+
}
186189
}
187190
}
188191

@@ -363,14 +366,14 @@ impl Messages {
363366
// Either its realm is already known, or it is a new realm.
364367
// When 'true' is returned, then the flo has been stored.
365368
fn upsert_flo(&mut self, flo: Flo) -> bool {
366-
// log::trace!(
367-
// "{} store_flo {}({}/{}) {}",
368-
// self.our_id,
369-
// flo.flo_type(),
370-
// flo.flo_id(),
371-
// flo.realm_id(),
372-
// flo.version()
373-
// );
369+
log::info!(
370+
"{} store_flo {}({}/{}) {}",
371+
self.our_id,
372+
flo.flo_type(),
373+
flo.flo_id(),
374+
flo.realm_id(),
375+
flo.version()
376+
);
374377
// log::info!(
375378
// "{} has realm: {}",
376379
// self.our_id,
@@ -445,7 +448,7 @@ impl SubsystemHandler<InternIn, InternOut> for Messages {
445448
.map_or(vec![], |msg| vec![msg])
446449
}
447450
})
448-
// .inspect(|msg| log::debug!("{_id}: Out: {msg:?}"))
451+
//.inspect(|msg| log::debug!("{_id}: Out: {msg:?}"))
449452
.collect()
450453
}
451454
}

flmodules/src/flo/blob.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ impl FloBlobTag {
174174
pub fn blob_id(&self) -> BlobID {
175175
(*self.flo_id()).into()
176176
}
177+
178+
pub fn values(&self) -> &HashMap<String, String> {
179+
(&(*self.get_blob().values())).into()
180+
}
177181
}
178182

179183
impl BlobAccess for FloBlobTag {

0 commit comments

Comments
 (0)