subspace_service/sync_from_dsn/
segment_header_downloader.rs1use futures::StreamExt;
2use std::collections::{BTreeSet, HashMap, HashSet};
3use std::error::Error;
4use std::pin::pin;
5use std::sync::Arc;
6use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
7use subspace_networking::Node;
8use subspace_networking::libp2p::PeerId;
9use subspace_networking::protocols::request_response::handlers::segment_header::{
10 SegmentHeaderRequest, SegmentHeaderResponse,
11};
12use tracing::{debug, error, trace, warn};
13
14const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
15const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;
17const SEGMENT_HEADER_PEERS_RETRIES: u32 = 20;
19
20pub struct SegmentHeaderDownloader {
22 dsn_node: Node,
23}
24
25impl SegmentHeaderDownloader {
26 pub fn new(dsn_node: &Node) -> Self {
27 Self {
30 dsn_node: dsn_node.clone(),
31 }
32 }
33
34 pub async fn get_segment_headers(
37 &self,
38 last_known_segment_header: &SegmentHeader,
39 ) -> Result<Vec<SegmentHeader>, Box<dyn Error>> {
40 let last_known_segment_index = last_known_segment_header.segment_index();
41 trace!(
42 %last_known_segment_index,
43 "Searching for latest segment header"
44 );
45
46 let Some(last_segment_header) = self.get_last_segment_header().await? else {
47 return Ok(Vec::new());
48 };
49
50 if last_segment_header.segment_index() <= last_known_segment_index {
51 debug!(
52 %last_known_segment_index,
53 last_found_segment_index = %last_segment_header.segment_index(),
54 "No new segment headers found, nothing to download"
55 );
56
57 return Ok(Vec::new());
58 }
59
60 debug!(
61 %last_known_segment_index,
62 last_segment_index = %last_segment_header.segment_index(),
63 "Downloading segment headers"
64 );
65
66 let new_segment_headers_count = last_segment_header
67 .segment_index()
68 .checked_sub(last_known_segment_index)
69 .expect("just checked last_segment_header is greater; qed");
70
71 let mut new_segment_headers =
72 Vec::with_capacity(u64::from(new_segment_headers_count) as usize);
73 new_segment_headers.push(last_segment_header);
74
75 let mut tried_peers = HashSet::<PeerId>::new();
76 let mut segment_to_download_to = last_segment_header;
77 'new_peer: for segment_headers_batch_retry in 0..SEGMENT_HEADER_PEERS_RETRIES {
78 let maybe_connected_peer = self
79 .dsn_node
80 .connected_servers()
81 .await?
82 .into_iter()
83 .find(|connected_peer| !tried_peers.contains(connected_peer));
84
85 let peer_id = if let Some(peer_id) = maybe_connected_peer {
86 peer_id
87 } else {
88 let random_peers = self
89 .dsn_node
90 .get_closest_peers(PeerId::random().into())
91 .await?
92 .filter(|connected_peer| {
93 let new_peer = !tried_peers.contains(connected_peer);
94
95 async move { new_peer }
96 });
97 let mut random_peers = pin!(random_peers);
98
99 if let Some(peer_id) = random_peers.next().await {
100 peer_id
101 } else {
102 return Err("Can't find peers to download headers from".into());
103 }
104 };
105 tried_peers.insert(peer_id);
106
107 while segment_to_download_to.segment_index() - last_known_segment_index
108 > SegmentIndex::ONE
109 {
110 let segment_indexes = (last_known_segment_index + SegmentIndex::ONE
111 ..segment_to_download_to.segment_index())
112 .rev()
113 .take(SEGMENT_HEADER_NUMBER_PER_REQUEST as usize)
114 .collect::<Vec<_>>();
115
116 trace!(
117 %peer_id,
118 %segment_headers_batch_retry,
119 segment_indexes_count = %segment_indexes.len(),
120 first_segment_index = ?segment_indexes.first(),
121 last_segment_index = ?segment_indexes.last(),
122 "Getting segment header batch...",
123 );
124
125 let segment_indexes = Arc::new(segment_indexes);
126
127 let segment_headers = match self
128 .dsn_node
129 .send_generic_request(
130 peer_id,
131 Vec::new(),
132 SegmentHeaderRequest::SegmentIndexes {
133 segment_indexes: Arc::clone(&segment_indexes),
134 },
135 )
136 .await
137 {
138 Ok(response) => response.segment_headers,
139 Err(error) => {
140 debug!(
141 %peer_id,
142 %segment_headers_batch_retry,
143 %error,
144 %last_known_segment_index,
145 segment_to_download_to = %segment_to_download_to.segment_index(),
146 "Error getting segment headers from peer",
147 );
148
149 continue 'new_peer;
150 }
151 };
152
153 if !self.is_segment_headers_response_valid(
154 peer_id,
155 &segment_indexes,
156 &segment_headers,
157 ) {
158 warn!(
159 %peer_id,
160 %segment_headers_batch_retry,
161 "Received segment headers were invalid"
162 );
163
164 let _ = self.dsn_node.ban_peer(peer_id).await;
165 }
166
167 for segment_header in segment_headers {
168 if segment_header.hash() != segment_to_download_to.prev_segment_header_hash() {
169 error!(
170 %peer_id,
171 %segment_headers_batch_retry,
172 segment_index = %segment_to_download_to.segment_index() - SegmentIndex::ONE,
173 actual_hash = ?segment_header.hash(),
174 expected_hash = ?segment_to_download_to.prev_segment_header_hash(),
175 "Segment header hash doesn't match expected hash of the previous \
176 segment"
177 );
178
179 return Err(
180 "Segment header hash doesn't match expected hash of the previous \
181 segment"
182 .into(),
183 );
184 }
185
186 segment_to_download_to = segment_header;
187 new_segment_headers.push(segment_header);
188 }
189 }
190 }
191
192 new_segment_headers.reverse();
193
194 if new_segment_headers
195 .first()
196 .expect("Not empty; qed")
197 .prev_segment_header_hash()
198 != last_known_segment_header.hash()
199 {
200 return Err(
201 "Downloaded segment headers do not match last known segment header, ignoring \
202 downloaded headers"
203 .into(),
204 );
205 }
206
207 Ok(new_segment_headers)
208 }
209
210 async fn get_last_segment_header(&self) -> Result<Option<SegmentHeader>, Box<dyn Error>> {
215 let mut peer_segment_headers = HashMap::<PeerId, Vec<SegmentHeader>>::default();
216 for (required_peers, retry_attempt) in (1..=SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
217 .rev()
218 .zip(1_usize..)
219 {
220 trace!( %retry_attempt, "Downloading last segment headers");
221
222 let new_last_known_segment_headers = self
225 .dsn_node
226 .get_closest_peers(PeerId::random().into())
227 .await
228 .inspect_err(|error| {
229 warn!(?error, "get_closest_peers returned an error");
230 })?
231 .filter(|peer_id| {
232 let known_peer = peer_segment_headers.contains_key(peer_id);
233
234 async move { !known_peer }
235 })
236 .map(|peer_id| async move {
237 let request_result = self
238 .dsn_node
239 .send_generic_request(
240 peer_id,
241 Vec::new(),
242 SegmentHeaderRequest::LastSegmentHeaders {
243 limit: 2,
246 },
247 )
248 .await;
249
250 match request_result {
251 Ok(SegmentHeaderResponse { segment_headers }) => {
252 trace!(
253 %peer_id,
254 segment_headers_number=%segment_headers.len(),
255 "Last segment headers request succeeded"
256 );
257
258 if !self
259 .is_last_segment_headers_response_valid(peer_id, &segment_headers)
260 {
261 warn!(
262 %peer_id,
263 "Received last segment headers response was invalid"
264 );
265
266 let _ = self.dsn_node.ban_peer(peer_id).await;
267 return None;
268 }
269
270 Some((peer_id, segment_headers))
271 }
272 Err(error) => {
273 debug!(
274 %peer_id,
275 ?error,
276 "Last segment headers request failed"
277 );
278 None
279 }
280 }
281 })
282 .take(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
283 .buffer_unordered(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
284 .filter_map(|maybe_result| async move { maybe_result })
285 .collect::<Vec<(PeerId, Vec<SegmentHeader>)>>()
286 .await;
287
288 let last_peers_count = peer_segment_headers.len();
289 peer_segment_headers.extend(new_last_known_segment_headers);
290
291 let peer_count = peer_segment_headers.len();
292
293 if peer_count < required_peers {
294 if last_peers_count == 0 {
296 debug!(
297 %peer_count,
298 %required_peers,
299 %retry_attempt,
300 "Segment headers consensus requires some peers, will retry"
301 );
302
303 continue;
304 }
305 if required_peers > 1 {
307 debug!(
308 %peer_count,
309 %required_peers,
310 %retry_attempt,
311 "Segment headers consensus requires more peers, will retry"
312 );
313
314 continue;
315 }
316
317 debug!(
318 %peer_count,
319 %required_peers,
320 %retry_attempt,
321 "Segment headers consensus requires more peers, but no attempts left, so continue as is"
322 );
323 }
324
325 let mut segment_header_peers: HashMap<SegmentHeader, Vec<PeerId>> = HashMap::new();
327
328 for (peer_id, segment_headers) in peer_segment_headers {
329 for segment_header in segment_headers {
330 segment_header_peers
331 .entry(segment_header)
332 .and_modify(|peers| {
333 peers.push(peer_id);
334 })
335 .or_insert(vec![peer_id]);
336 }
337 }
338
339 let mut segment_header_peers_iter = segment_header_peers.into_iter();
340 let (mut best_segment_header, mut most_peers) =
341 segment_header_peers_iter.next().expect(
342 "Not empty due to not empty list of peers with non empty list of segment \
343 headers each; qed",
344 );
345
346 for (segment_header, peers) in segment_header_peers_iter {
347 if peers.len() > most_peers.len()
348 || (peers.len() == most_peers.len()
349 && segment_header.segment_index() > best_segment_header.segment_index())
350 {
351 best_segment_header = segment_header;
352 most_peers = peers;
353 }
354 }
355
356 return Ok(Some(best_segment_header));
357 }
358
359 Ok(None)
360 }
361
362 fn is_segment_headers_response_valid(
365 &self,
366 peer_id: PeerId,
367 segment_indexes: &[SegmentIndex],
368 segment_headers: &[SegmentHeader],
369 ) -> bool {
370 if segment_headers.len() != segment_indexes.len() {
371 warn!( %peer_id, "Segment header and segment indexes collection differ");
372
373 return false;
374 }
375
376 let returned_segment_indexes =
377 BTreeSet::from_iter(segment_headers.iter().map(|rb| rb.segment_index()));
378 if returned_segment_indexes.len() != segment_headers.len() {
379 warn!( %peer_id, "Peer banned: it returned collection with duplicated segment headers");
380
381 return false;
382 }
383
384 let indexes_match = segment_indexes.iter().zip(segment_headers.iter()).all(
385 |(segment_index, segment_header)| *segment_index == segment_header.segment_index(),
386 );
387
388 if !indexes_match {
389 warn!( %peer_id, "Segment header collection doesn't match segment indexes");
390
391 return false;
392 }
393
394 true
395 }
396
397 fn is_last_segment_headers_response_valid(
398 &self,
399 peer_id: PeerId,
400 segment_headers: &[SegmentHeader],
401 ) -> bool {
402 let segment_indexes = match segment_headers.last() {
403 None => {
404 return false;
406 }
407 Some(last_segment_header) => {
408 let last_segment_index = last_segment_header.segment_index();
409
410 let mut segment_indices = (SegmentIndex::ZERO..=last_segment_index)
411 .rev()
412 .take(segment_headers.len())
413 .collect::<Vec<_>>();
414 segment_indices.reverse();
415 segment_indices
416 }
417 };
418
419 self.is_segment_headers_response_valid(peer_id, &segment_indexes, segment_headers)
420 }
421}