subspace_service/sync_from_dsn/
segment_header_downloader.rs1use crate::sync_from_dsn::LOG_TARGET;
2use futures::StreamExt;
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::error::Error;
5use std::pin::pin;
6use std::sync::Arc;
7use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
8use subspace_networking::libp2p::PeerId;
9use subspace_networking::protocols::request_response::handlers::segment_header::{
10 SegmentHeaderRequest, SegmentHeaderResponse,
11};
12use subspace_networking::Node;
13use tracing::{debug, error, trace, warn};
14
15const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
16const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;
18const SEGMENT_HEADER_PEERS_RETRIES: u32 = 10;
20
21pub struct SegmentHeaderDownloader {
23 dsn_node: Node,
24}
25
26impl SegmentHeaderDownloader {
27 pub fn new(dsn_node: &Node) -> Self {
28 Self {
31 dsn_node: dsn_node.clone(),
32 }
33 }
34
35 pub async fn get_segment_headers(
38 &self,
39 last_known_segment_header: &SegmentHeader,
40 ) -> Result<Vec<SegmentHeader>, Box<dyn Error>> {
41 let last_known_segment_index = last_known_segment_header.segment_index();
42 trace!(
43 target: LOG_TARGET,
44 %last_known_segment_index,
45 "Searching for latest segment header"
46 );
47
48 let Some(last_segment_header) = self.get_last_segment_header().await? else {
49 return Ok(Vec::new());
50 };
51
52 if last_segment_header.segment_index() <= last_known_segment_index {
53 debug!(
54 target: LOG_TARGET,
55 %last_known_segment_index,
56 last_found_segment_index = %last_segment_header.segment_index(),
57 "No new segment headers found, nothing to download"
58 );
59
60 return Ok(Vec::new());
61 }
62
63 debug!(
64 target: LOG_TARGET,
65 %last_known_segment_index,
66 last_segment_index = %last_segment_header.segment_index(),
67 "Downloading segment headers"
68 );
69
70 let new_segment_headers_count = last_segment_header
71 .segment_index()
72 .checked_sub(last_known_segment_index)
73 .expect("just checked last_segment_header is greater; qed");
74
75 let mut new_segment_headers =
76 Vec::with_capacity(u64::from(new_segment_headers_count) as usize);
77 new_segment_headers.push(last_segment_header);
78
79 let mut tried_peers = HashSet::<PeerId>::new();
80 let mut segment_to_download_to = last_segment_header;
81 'new_peer: for segment_headers_batch_retry in 0..SEGMENT_HEADER_PEERS_RETRIES {
82 let maybe_connected_peer = self
83 .dsn_node
84 .connected_servers()
85 .await?
86 .into_iter()
87 .find(|connected_peer| !tried_peers.contains(connected_peer));
88
89 let peer_id = if let Some(peer_id) = maybe_connected_peer {
90 peer_id
91 } else {
92 let random_peers = self
93 .dsn_node
94 .get_closest_peers(PeerId::random().into())
95 .await?
96 .filter(|connected_peer| {
97 let new_peer = !tried_peers.contains(connected_peer);
98
99 async move { new_peer }
100 });
101 let mut random_peers = pin!(random_peers);
102
103 if let Some(peer_id) = random_peers.next().await {
104 peer_id
105 } else {
106 return Err("Can't find peers to download headers from".into());
107 }
108 };
109 tried_peers.insert(peer_id);
110
111 while segment_to_download_to.segment_index() - last_known_segment_index
112 > SegmentIndex::ONE
113 {
114 let segment_indexes = (last_known_segment_index + SegmentIndex::ONE
115 ..segment_to_download_to.segment_index())
116 .rev()
117 .take(SEGMENT_HEADER_NUMBER_PER_REQUEST as usize)
118 .collect::<Vec<_>>();
119
120 trace!(
121 target: LOG_TARGET,
122 %peer_id,
123 %segment_headers_batch_retry,
124 segment_indexes_count = %segment_indexes.len(),
125 first_segment_index = ?segment_indexes.first(),
126 last_segment_index = ?segment_indexes.last(),
127 "Getting segment header batch...",
128 );
129
130 let segment_indexes = Arc::new(segment_indexes);
131
132 let segment_headers = match self
133 .dsn_node
134 .send_generic_request(
135 peer_id,
136 Vec::new(),
137 SegmentHeaderRequest::SegmentIndexes {
138 segment_indexes: Arc::clone(&segment_indexes),
139 },
140 )
141 .await
142 {
143 Ok(response) => response.segment_headers,
144 Err(error) => {
145 debug!(
146 target: LOG_TARGET,
147 %peer_id,
148 %segment_headers_batch_retry,
149 %error,
150 %last_known_segment_index,
151 segment_to_download_to = %segment_to_download_to.segment_index(),
152 "Error getting segment headers from peer",
153 );
154
155 continue 'new_peer;
156 }
157 };
158
159 if !self.is_segment_headers_response_valid(
160 peer_id,
161 &segment_indexes,
162 &segment_headers,
163 ) {
164 warn!(
165 target: LOG_TARGET,
166 %peer_id,
167 %segment_headers_batch_retry,
168 "Received segment headers were invalid"
169 );
170
171 let _ = self.dsn_node.ban_peer(peer_id).await;
172 }
173
174 for segment_header in segment_headers {
175 if segment_header.hash() != segment_to_download_to.prev_segment_header_hash() {
176 error!(
177 target: LOG_TARGET,
178 %peer_id,
179 %segment_headers_batch_retry,
180 segment_index = %segment_to_download_to.segment_index() - SegmentIndex::ONE,
181 actual_hash = ?segment_header.hash(),
182 expected_hash = ?segment_to_download_to.prev_segment_header_hash(),
183 "Segment header hash doesn't match expected hash of the previous \
184 segment"
185 );
186
187 return Err(
188 "Segment header hash doesn't match expected hash of the previous \
189 segment"
190 .into(),
191 );
192 }
193
194 segment_to_download_to = segment_header;
195 new_segment_headers.push(segment_header);
196 }
197 }
198 }
199
200 new_segment_headers.reverse();
201
202 if new_segment_headers
203 .first()
204 .expect("Not empty; qed")
205 .prev_segment_header_hash()
206 != last_known_segment_header.hash()
207 {
208 return Err(
209 "Downloaded segment headers do not match last known segment header, ignoring \
210 downloaded headers"
211 .into(),
212 );
213 }
214
215 Ok(new_segment_headers)
216 }
217
218 async fn get_last_segment_header(&self) -> Result<Option<SegmentHeader>, Box<dyn Error>> {
223 let mut peer_segment_headers = HashMap::<PeerId, Vec<SegmentHeader>>::default();
224 for (required_peers, retry_attempt) in (1..=SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
225 .rev()
226 .zip(1_usize..)
227 {
228 trace!(target: LOG_TARGET, %retry_attempt, "Downloading last segment headers");
229
230 let new_last_known_segment_headers = self
233 .dsn_node
234 .get_closest_peers(PeerId::random().into())
235 .await
236 .inspect_err(|error| {
237 warn!(target: LOG_TARGET, ?error, "get_closest_peers returned an error");
238 })?
239 .filter(|peer_id| {
240 let known_peer = peer_segment_headers.contains_key(peer_id);
241
242 async move { !known_peer }
243 })
244 .map(|peer_id| async move {
245 let request_result = self
246 .dsn_node
247 .send_generic_request(
248 peer_id,
249 Vec::new(),
250 SegmentHeaderRequest::LastSegmentHeaders {
251 limit: 2,
254 },
255 )
256 .await;
257
258 match request_result {
259 Ok(SegmentHeaderResponse { segment_headers }) => {
260 trace!(
261 target: LOG_TARGET,
262 %peer_id,
263 segment_headers_number=%segment_headers.len(),
264 "Last segment headers request succeeded"
265 );
266
267 if !self
268 .is_last_segment_headers_response_valid(peer_id, &segment_headers)
269 {
270 warn!(
271 target: LOG_TARGET,
272 %peer_id,
273 "Received last segment headers response was invalid"
274 );
275
276 let _ = self.dsn_node.ban_peer(peer_id).await;
277 return None;
278 }
279
280 Some((peer_id, segment_headers))
281 }
282 Err(error) => {
283 debug!(
284 target: LOG_TARGET,
285 %peer_id,
286 ?error,
287 "Last segment headers request failed"
288 );
289 None
290 }
291 }
292 })
293 .take(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
294 .buffer_unordered(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
295 .filter_map(|maybe_result| async move { maybe_result })
296 .collect::<Vec<(PeerId, Vec<SegmentHeader>)>>()
297 .await;
298
299 let last_peers_count = peer_segment_headers.len();
300 peer_segment_headers.extend(new_last_known_segment_headers);
301
302 let peer_count = peer_segment_headers.len();
303
304 if peer_count < required_peers {
305 if last_peers_count == 0 {
307 debug!(
308 target: LOG_TARGET,
309 %peer_count,
310 %required_peers,
311 %retry_attempt,
312 "Segment headers consensus requires some peers, will retry"
313 );
314
315 continue;
316 }
317 if required_peers > 1 {
319 debug!(
320 target: LOG_TARGET,
321 %peer_count,
322 %required_peers,
323 %retry_attempt,
324 "Segment headers consensus requires more peers, will retry"
325 );
326
327 continue;
328 }
329
330 debug!(
331 target: LOG_TARGET,
332 %peer_count,
333 %required_peers,
334 %retry_attempt,
335 "Segment headers consensus requires more peers, but no attempts left, so continue as is"
336 );
337 }
338
339 let mut segment_header_peers: HashMap<SegmentHeader, Vec<PeerId>> = HashMap::new();
341
342 for (peer_id, segment_headers) in peer_segment_headers {
343 for segment_header in segment_headers {
344 segment_header_peers
345 .entry(segment_header)
346 .and_modify(|peers| {
347 peers.push(peer_id);
348 })
349 .or_insert(vec![peer_id]);
350 }
351 }
352
353 let mut segment_header_peers_iter = segment_header_peers.into_iter();
354 let (mut best_segment_header, mut most_peers) =
355 segment_header_peers_iter.next().expect(
356 "Not empty due to not empty list of peers with non empty list of segment \
357 headers each; qed",
358 );
359
360 for (segment_header, peers) in segment_header_peers_iter {
361 if peers.len() > most_peers.len()
362 || (peers.len() == most_peers.len()
363 && segment_header.segment_index() > best_segment_header.segment_index())
364 {
365 best_segment_header = segment_header;
366 most_peers = peers;
367 }
368 }
369
370 return Ok(Some(best_segment_header));
371 }
372
373 Ok(None)
374 }
375
376 fn is_segment_headers_response_valid(
379 &self,
380 peer_id: PeerId,
381 segment_indexes: &[SegmentIndex],
382 segment_headers: &[SegmentHeader],
383 ) -> bool {
384 if segment_headers.len() != segment_indexes.len() {
385 warn!(target: LOG_TARGET, %peer_id, "Segment header and segment indexes collection differ");
386
387 return false;
388 }
389
390 let returned_segment_indexes =
391 BTreeSet::from_iter(segment_headers.iter().map(|rb| rb.segment_index()));
392 if returned_segment_indexes.len() != segment_headers.len() {
393 warn!(target: LOG_TARGET, %peer_id, "Peer banned: it returned collection with duplicated segment headers");
394
395 return false;
396 }
397
398 let indexes_match = segment_indexes.iter().zip(segment_headers.iter()).all(
399 |(segment_index, segment_header)| *segment_index == segment_header.segment_index(),
400 );
401
402 if !indexes_match {
403 warn!(target: LOG_TARGET, %peer_id, "Segment header collection doesn't match segment indexes");
404
405 return false;
406 }
407
408 true
409 }
410
411 fn is_last_segment_headers_response_valid(
412 &self,
413 peer_id: PeerId,
414 segment_headers: &[SegmentHeader],
415 ) -> bool {
416 let segment_indexes = match segment_headers.last() {
417 None => {
418 return false;
420 }
421 Some(last_segment_header) => {
422 let last_segment_index = last_segment_header.segment_index();
423
424 let mut segment_indices = (SegmentIndex::ZERO..=last_segment_index)
425 .rev()
426 .take(segment_headers.len())
427 .collect::<Vec<_>>();
428 segment_indices.reverse();
429 segment_indices
430 }
431 };
432
433 self.is_segment_headers_response_valid(peer_id, &segment_indexes, segment_headers)
434 }
435}