Skip to content

Commit 0b5f043

Browse files
authored
refactor: census peer discovery algorithm (#1553)
1 parent 96a2152 commit 0b5f043

File tree

9 files changed

+387
-155
lines changed

9 files changed

+387
-155
lines changed

Cargo.lock

+2-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ ethereum_ssz_derive = "0.7.1"
9494
ethportal-api = { path = "ethportal-api" }
9595
futures = "0.3.23"
9696
hex = "0.4.3"
97+
itertools = "0.13.0"
9798
jsonrpsee = "0.24.4"
9899
keccak-hash = "0.10.0"
99100
lazy_static = "1.4.0"

ethportal-api/src/types/node_id.rs

+33
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,36 @@ pub fn generate_random_node_id(target_bucket_idx: u8, local_node_id: NodeId) ->
1616

1717
raw_bytes.into()
1818
}
19+
20+
/// Generates `2 ^ unique_bits` random `NodeId`s.
21+
///
22+
/// The most significant bits of each `NodeId` will be unique.
23+
///
24+
/// The `unique_bits` has to be in `(0..=8)` range, panics otherwise. Can be easily upgraded to
25+
/// support wider range.
26+
pub fn generate_random_node_ids(unique_bits: u32) -> Vec<NodeId> {
27+
assert!(
28+
(0..=8).contains(&unique_bits),
29+
"Invalid bits value: {unique_bits}"
30+
);
31+
32+
let insignificant_bits = u8::BITS - unique_bits;
33+
let insignificant_bits_mask = u8::MAX.checked_shr(unique_bits).unwrap_or_default();
34+
35+
(0usize..1 << unique_bits)
36+
.map(|index| {
37+
// shift bits to most significant positions
38+
let unique_bits = (index as u8)
39+
.checked_shl(insignificant_bits)
40+
.unwrap_or_default();
41+
42+
let mut node_id = rand::random::<[u8; 32]>();
43+
44+
// set most significant bits of the first byte
45+
node_id[0] &= insignificant_bits_mask;
46+
node_id[0] |= unique_bits;
47+
48+
NodeId::from(node_id)
49+
})
50+
.collect()
51+
}

portal-bridge/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ eth_trie.workspace = true
2424
ethereum_ssz.workspace = true
2525
ethportal-api.workspace = true
2626
futures.workspace = true
27+
itertools.workspace = true
2728
jsonrpsee = { workspace = true, features = [
2829
"async-client",
2930
"client",

portal-bridge/src/census/mod.rs

+51-24
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::task::JoinHandle;
66
use tracing::{error, info, Instrument};
77

88
use crate::cli::BridgeConfig;
9-
use network::Network;
9+
use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManager};
1010

1111
mod network;
1212
mod peers;
@@ -40,6 +40,9 @@ pub struct Census {
4040
}
4141

4242
impl Census {
43+
const SUPPORTED_SUBNETWORKS: [Subnetwork; 3] =
44+
[Subnetwork::Beacon, Subnetwork::History, Subnetwork::State];
45+
4346
pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self {
4447
Self {
4548
history: Network::new(client.clone(), Subnetwork::History, bridge_config),
@@ -71,50 +74,74 @@ impl Census {
7174
&mut self,
7275
subnetworks: impl IntoIterator<Item = Subnetwork>,
7376
) -> Result<JoinHandle<()>, CensusError> {
77+
info!("Initializing census");
78+
7479
if self.initialized {
7580
return Err(CensusError::AlreadyInitialized);
7681
}
7782
self.initialized = true;
7883

79-
let subnetworks = HashSet::from_iter(subnetworks);
84+
let subnetworks = HashSet::<Subnetwork>::from_iter(subnetworks);
8085
if subnetworks.is_empty() {
8186
return Err(CensusError::FailedInitialization("No subnetwork"));
8287
}
8388
for subnetwork in &subnetworks {
84-
info!("Initializing {subnetwork} subnetwork");
85-
match subnetwork {
86-
Subnetwork::History => self.history.init().await?,
87-
Subnetwork::State => self.state.init().await?,
88-
Subnetwork::Beacon => self.beacon.init().await?,
89-
_ => return Err(CensusError::UnsupportedSubnetwork(*subnetwork)),
89+
if !Self::SUPPORTED_SUBNETWORKS.contains(subnetwork) {
90+
return Err(CensusError::UnsupportedSubnetwork(*subnetwork));
9091
}
9192
}
9293

93-
Ok(self.start_background_service(subnetworks))
94-
}
94+
let initialization_config = NetworkInitializationConfig::default();
95+
96+
let mut beacon_manager = if subnetworks.contains(&Subnetwork::Beacon) {
97+
self.beacon.init(&initialization_config).await?;
98+
Some(self.beacon.create_manager())
99+
} else {
100+
None
101+
};
102+
let mut history_manager = if subnetworks.contains(&Subnetwork::History) {
103+
self.history.init(&initialization_config).await?;
104+
Some(self.history.create_manager())
105+
} else {
106+
None
107+
};
108+
let mut state_manager = if subnetworks.contains(&Subnetwork::State) {
109+
self.state.init(&initialization_config).await?;
110+
Some(self.state.create_manager())
111+
} else {
112+
None
113+
};
95114

96-
/// Starts background service that is responsible for keeping view of the network up to date.
97-
///
98-
/// Selects available tasks and runs them. Tasks are provided by enabled subnetworks.
99-
fn start_background_service(&self, subnetworks: HashSet<Subnetwork>) -> JoinHandle<()> {
100-
let mut history_network = self.history.clone();
101-
let mut state_network = self.state.clone();
102-
let mut beacon_network = self.beacon.clone();
103115
let service = async move {
104116
loop {
105117
tokio::select! {
106-
peer = history_network.peer_to_process(), if subnetworks.contains(&Subnetwork::History) => {
107-
history_network.process_peer(peer).await;
118+
Some(action) = next_action(&mut beacon_manager) => {
119+
if let Some(manager) = &mut beacon_manager {
120+
manager.execute_action(action).await;
121+
}
108122
}
109-
peer = state_network.peer_to_process(), if subnetworks.contains(&Subnetwork::State) => {
110-
state_network.process_peer(peer).await;
123+
Some(action) = next_action(&mut history_manager) => {
124+
if let Some(manager) = &mut history_manager {
125+
manager.execute_action(action).await;
126+
}
111127
}
112-
peer = beacon_network.peer_to_process(), if subnetworks.contains(&Subnetwork::Beacon) => {
113-
beacon_network.process_peer(peer).await;
128+
Some(action) = next_action(&mut state_manager) => {
129+
if let Some(manager) = &mut state_manager {
130+
manager.execute_action(action).await;
131+
}
114132
}
115133
}
116134
}
117135
};
118-
tokio::spawn(service.instrument(tracing::trace_span!("census").or_current()))
136+
Ok(tokio::spawn(
137+
service.instrument(tracing::trace_span!("census").or_current()),
138+
))
139+
}
140+
}
141+
142+
async fn next_action(manager: &mut Option<NetworkManager>) -> Option<NetworkAction> {
143+
match manager {
144+
Some(manager) => Some(manager.next_action().await),
145+
None => None,
119146
}
120147
}

0 commit comments

Comments
 (0)