Skip to content

Commit

Permalink
Use mix_id for display
Browse files Browse the repository at this point in the history
  • Loading branch information
durch committed May 17, 2024
1 parent d7af5f8 commit 5c4c7c3
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 129 deletions.
26 changes: 24 additions & 2 deletions common/topology/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use crate::filter::VersionFilterable;
pub use error::NymTopologyError;
use log::warn;
use mix::Node;
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, MixId};
use nym_sphinx_addressing::nodes::NodeIdentity;
Expand Down Expand Up @@ -237,7 +238,7 @@ impl NymTopology {
&self,
rng: &mut R,
num_mix_hops: u8,
) -> Result<Vec<SphinxNode>, NymTopologyError>
) -> Result<Vec<Node>, NymTopologyError>
where
R: Rng + CryptoRng + ?Sized,
{
Expand All @@ -262,12 +263,32 @@ impl NymTopology {
let random_mix = layer_mixes
.choose(rng)
.ok_or(NymTopologyError::EmptyMixLayer { layer })?;
route.push(random_mix.into());
route.push(random_mix.clone());
}

Ok(route)
}

pub fn random_path_to_gateway<R>(
&self,
rng: &mut R,
num_mix_hops: u8,
gateway_identity: &NodeIdentity,
) -> Result<(Vec<mix::Node>, gateway::Node), NymTopologyError>
where
R: Rng + CryptoRng + ?Sized,
{
let gateway = self.get_gateway(gateway_identity).ok_or(
NymTopologyError::NonExistentGatewayError {
identity_key: gateway_identity.to_base58_string(),
},
)?;

let path = self.random_mix_route(rng, num_mix_hops)?;

Ok((path, gateway.clone()))
}

/// Tries to create a route to the specified gateway, such that it goes through mixnode on layer 1,
/// mixnode on layer2, .... mixnode on layer n and finally the target gateway
pub fn random_route_to_gateway<R>(
Expand All @@ -288,6 +309,7 @@ impl NymTopology {
Ok(self
.random_mix_route(rng, num_mix_hops)?
.into_iter()
.map(|node| SphinxNode::from(&node))
.chain(std::iter::once(gateway.into()))
.collect())
}
Expand Down
163 changes: 70 additions & 93 deletions nnm/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use axum::extract::{Path, State};
use axum::http::Response;
use axum::http::StatusCode;
use axum::routing::get;
use axum::{Json, Router};
use dashmap::DashMap;
use futures::StreamExt;
use log::debug;
use log::{debug, error, warn};
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx::chunking::{ReceivedFragment, SentFragment, FRAGMENTS_RECEIVED, FRAGMENTS_SENT};
use nym_sphinx::Node;
use nym_topology::NymTopology;
use nym_topology::{gateway, mix, NymTopology};
use petgraph::dot::Dot;
use petgraph::Graph;
use rand::distributions::Alphanumeric;
Expand All @@ -21,11 +20,10 @@ use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

use crate::ClientWrapper;
use crate::ClientsWrapper;

pub struct HttpServer {
listener: SocketAddr,
Expand All @@ -34,18 +32,15 @@ pub struct HttpServer {

#[derive(Clone)]
struct AppState {
clients: Arc<RwLock<Vec<Arc<RwLock<ClientWrapper>>>>>,
clients: ClientsWrapper,
}

impl HttpServer {
pub fn new(listener: SocketAddr, cancel: CancellationToken) -> Self {
HttpServer { listener, cancel }
}

pub async fn run(
self,
clients: Arc<RwLock<Vec<Arc<RwLock<ClientWrapper>>>>>,
) -> anyhow::Result<()> {
pub async fn run(self, clients: ClientsWrapper) -> anyhow::Result<()> {
let n_clients = clients.read().await.len();
let state = AppState { clients };
let app = Router::new()
Expand Down Expand Up @@ -110,11 +105,11 @@ struct NetworkAccount {
complete_fragment_sets: Vec<i32>,
incomplete_fragment_sets: Vec<i32>,
missing_fragments: HashMap<i32, Vec<u8>>,
complete_routes: Vec<Vec<String>>,
incomplete_routes: Vec<Vec<String>>,
complete_routes: Vec<Vec<u32>>,
incomplete_routes: Vec<Vec<u32>>,
#[serde(skip)]
topology: NymTopology,
tested_nodes: HashSet<String>,
tested_nodes: HashSet<u32>,
}

impl NetworkAccount {
Expand Down Expand Up @@ -162,10 +157,10 @@ impl NetworkAccount {
account
}

pub fn hydrate_route(&self, fragment: SentFragment) -> Vec<Node> {
pub fn hydrate_route(&self, fragment: SentFragment) -> (Vec<mix::Node>, gateway::Node) {
let mut rng = ChaCha8Rng::seed_from_u64(fragment.seed() as u64);
self.topology
.random_route_to_gateway(
.random_path_to_gateway(
&mut rng,
fragment.mixnet_params().hops(),
fragment.mixnet_params().destination(),
Expand All @@ -176,13 +171,12 @@ impl NetworkAccount {
fn hydrate_all_fragments(&mut self) {
for fragment_id in &self.complete_fragment_sets {
let fragment_set = FRAGMENTS_SENT.get(fragment_id).unwrap();
let route = self
.hydrate_route(fragment_set.value().first().unwrap().clone())
.iter()
.map(|n| n.address.as_base58_string())
.collect::<Vec<String>>();
let path = self.hydrate_route(fragment_set.value().first().unwrap().clone());

let route = path.0.iter().map(|n| n.mix_id).collect::<Vec<u32>>();

for node in &route {
self.tested_nodes.insert(node.clone());
self.tested_nodes.insert(*node);
}
self.complete_routes.push(route);
}
Expand All @@ -191,9 +185,10 @@ impl NetworkAccount {
let fragment_set = FRAGMENTS_SENT.get(fragment_id).unwrap();
let route = self
.hydrate_route(fragment_set.value().first().unwrap().clone())
.0
.iter()
.map(|n| n.address.as_base58_string())
.collect::<Vec<String>>();
.map(|n| n.mix_id)
.collect::<Vec<u32>>();
self.incomplete_routes.push(route);
}
}
Expand Down Expand Up @@ -238,41 +233,41 @@ async fn accounting_handler() -> Json<NetworkAccount> {
Json(account)
}

async fn graph_handler(Path(node_address): Path<String>) -> String {
async fn graph_handler(Path(mix_id): Path<u32>) -> String {
let account = NetworkAccount::finalize();
let mut nodes = HashSet::new();
let mut edges: Vec<(String, String)> = vec![];
let mut broken_edges: Vec<(String, String)> = vec![];
let mut edges: Vec<(u32, u32)> = vec![];
let mut broken_edges: Vec<(u32, u32)> = vec![];

for route in &account.complete_routes {
if !route.contains(&node_address) {
if !route.contains(&mix_id) {
continue;
}

for chunk in route.windows(2) {
nodes.insert(chunk[0].clone());
nodes.insert(chunk[1].clone());
edges.push((chunk[0].clone(), chunk[1].clone()));
nodes.insert(chunk[0]);
nodes.insert(chunk[1]);
edges.push((chunk[0], chunk[1]));
}
}

for route in &account.incomplete_routes {
if !route.contains(&node_address) {
if !route.contains(&mix_id) {
continue;
}

for chunk in route.windows(2) {
nodes.insert(chunk[0].clone());
nodes.insert(chunk[1].clone());
broken_edges.push((chunk[0].clone(), chunk[1].clone()));
nodes.insert(chunk[0]);
nodes.insert(chunk[1]);
broken_edges.push((chunk[0], chunk[1]));
}
}

let mut graph = Graph::new();

let node_indices: HashMap<String, _> = nodes
let node_indices: HashMap<u32, _> = nodes
.iter()
.map(|node| (node.clone(), graph.add_node(node.clone())))
.map(|node| (*node, graph.add_node(*node)))
.collect();

for (from, to) in edges {
Expand All @@ -292,17 +287,31 @@ async fn mermaid_handler() -> String {
let mut mermaid = String::new();
mermaid.push_str("flowchart LR;\n");
for route in account.complete_routes {
mermaid.push_str(route.join("-->").as_str());
mermaid.push_str(
route
.iter()
.map(|n| n.to_string())
.collect::<Vec<String>>()
.join("-->")
.as_str(),
);
mermaid.push('\n')
}
for route in account.incomplete_routes {
mermaid.push_str(route.join("-- ❌ -->").as_str());
mermaid.push_str(
route
.iter()
.map(|n| n.to_string())
.collect::<Vec<String>>()
.join("-- ❌ -->")
.as_str(),
);
mermaid.push('\n')
}
mermaid
}

async fn handler(State(state): State<AppState>) -> Response<String> {
async fn handler(State(state): State<AppState>) -> Result<String, StatusCode> {
send_receive_mixnet(state).await
}

Expand All @@ -314,17 +323,7 @@ async fn recv_handler() -> Json<DashMap<i32, Vec<ReceivedFragment>>> {
Json((*FRAGMENTS_RECEIVED).clone())
}

async fn send_receive_mixnet(state: AppState) -> Response<String> {
// let mut client = match make_client().await {
// Ok(client) => client,
// Err(e) => {
// return response
// .status(500)
// .body(format!("Failed to create mixnet client: {e}"))
// .unwrap();
// }
// };

async fn send_receive_mixnet(state: AppState) -> Result<String, StatusCode> {
let msg: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
Expand All @@ -333,35 +332,39 @@ async fn send_receive_mixnet(state: AppState) -> Response<String> {
let sent_msg = msg.clone();

let client = {
let clients = state.clients.read().await;
Arc::clone(clients.choose(&mut rand::thread_rng()).unwrap())
let mut clients = state.clients.write().await;
Arc::clone(
clients
.make_contiguous()
.choose(&mut rand::thread_rng())
.expect("Empty client vec"),
)
};
// Be able to get our client address
// println!("Our client nym address is: {our_address}");

let recv = Arc::clone(&client);
let sender = Arc::clone(&client);

let recv_handle = tokio::spawn(async move {
match timeout(Duration::from_secs(10), recv.write().await.client.next()).await {
match timeout(Duration::from_secs(10), recv.write().await.next()).await {
Ok(Some(received)) => {
println!("Received: {}", String::from_utf8_lossy(&received.message));
debug!("Received: {}", String::from_utf8_lossy(&received.message));
}
Ok(None) => println!("No message received"),
Err(e) => println!("Failed to receive message: {e}"),
Ok(None) => debug!("No message received"),
Err(e) => warn!("Failed to receive message: {e}"),
}
});

let send_handle = tokio::spawn(async move {
let mixnet_sender = sender.read().await.client.split_sender();
let our_address = *sender.read().await.client.nym_address();
let mixnet_sender = sender.read().await.split_sender();
let our_address = *sender.read().await.nym_address();
match timeout(
Duration::from_secs(5),
mixnet_sender.send_plain_message(our_address, &msg),
)
.await
{
Ok(_) => println!("Sent message: {msg}"),
Err(e) => println!("Failed to send message: {e}"),
Ok(_) => debug!("Sent message: {msg}"),
Err(e) => warn!("Failed to send message: {e}"),
};
});

Expand All @@ -370,37 +373,11 @@ async fn send_receive_mixnet(state: AppState) -> Response<String> {
match result {
Ok(_) => {}
Err(e) => {
let response = Response::builder();
return response
.status(500)
.body(format!("Failed to send or receive message: {e}"))
.unwrap();
error!("Failed to send/receive message: {e}");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
}
// wait for both tasks to be done
// println!("waiting for shutdown");

// match sending_task_handle.await {
// Ok(_) => {}
// Err(e) => {
// let response = Response::builder();
// return response
// .status(500)
// .body(format!("Failed to send message: {e}"))
// .unwrap();
// }
// };
// match recv_handle.await {
// Ok(_) => {}
// Err(e) => {
// let response = Response::builder();
// return response
// .status(500)
// .body(format!("Failed to receive message: {e}"))
// .unwrap();
// }
// };
let response = Response::builder();
response.status(200).body(sent_msg).unwrap()

Ok(sent_msg)
}

0 comments on commit 5c4c7c3

Please sign in to comment.