subspace_service/sync_from_dsn/
segment_header_downloader.rs

1use crate::sync_from_dsn::LOG_TARGET;
2use futures::StreamExt;
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::error::Error;
5use std::pin::pin;
6use std::sync::Arc;
7use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
8use subspace_networking::libp2p::PeerId;
9use subspace_networking::protocols::request_response::handlers::segment_header::{
10    SegmentHeaderRequest, SegmentHeaderResponse,
11};
12use subspace_networking::Node;
13use tracing::{debug, error, trace, warn};
14
15const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
16/// Initial number of peers to query for last segment header
17const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;
18/// How many distinct peers to try when downloading segment headers
19const SEGMENT_HEADER_PEERS_RETRIES: u32 = 10;
20
21/// Helps downloader segment headers from DSN
22pub struct SegmentHeaderDownloader {
23    dsn_node: Node,
24}
25
26impl SegmentHeaderDownloader {
27    pub fn new(dsn_node: &Node) -> Self {
28        // TODO: Should not be necessary to store owned copy, but it results in confusing compiler
29        //  errors otherwise
30        Self {
31            dsn_node: dsn_node.clone(),
32        }
33    }
34
35    /// Returns new segment headers known to DSN, ordered from 0 to the last known, but newer than
36    /// `last_known_segment_index`
37    pub async fn get_segment_headers(
38        &self,
39        last_known_segment_header: &SegmentHeader,
40    ) -> Result<Vec<SegmentHeader>, Box<dyn Error>> {
41        let last_known_segment_index = last_known_segment_header.segment_index();
42        trace!(
43            target: LOG_TARGET,
44            %last_known_segment_index,
45            "Searching for latest segment header"
46        );
47
48        let Some(last_segment_header) = self.get_last_segment_header().await? else {
49            return Ok(Vec::new());
50        };
51
52        if last_segment_header.segment_index() <= last_known_segment_index {
53            debug!(
54                target: LOG_TARGET,
55                %last_known_segment_index,
56                last_found_segment_index = %last_segment_header.segment_index(),
57                "No new segment headers found, nothing to download"
58            );
59
60            return Ok(Vec::new());
61        }
62
63        debug!(
64            target: LOG_TARGET,
65            %last_known_segment_index,
66            last_segment_index = %last_segment_header.segment_index(),
67            "Downloading segment headers"
68        );
69
70        let new_segment_headers_count = last_segment_header
71            .segment_index()
72            .checked_sub(last_known_segment_index)
73            .expect("just checked last_segment_header is greater; qed");
74
75        let mut new_segment_headers =
76            Vec::with_capacity(u64::from(new_segment_headers_count) as usize);
77        new_segment_headers.push(last_segment_header);
78
79        let mut tried_peers = HashSet::<PeerId>::new();
80        let mut segment_to_download_to = last_segment_header;
81        'new_peer: for segment_headers_batch_retry in 0..SEGMENT_HEADER_PEERS_RETRIES {
82            let maybe_connected_peer = self
83                .dsn_node
84                .connected_servers()
85                .await?
86                .into_iter()
87                .find(|connected_peer| !tried_peers.contains(connected_peer));
88
89            let peer_id = if let Some(peer_id) = maybe_connected_peer {
90                peer_id
91            } else {
92                let random_peers = self
93                    .dsn_node
94                    .get_closest_peers(PeerId::random().into())
95                    .await?
96                    .filter(|connected_peer| {
97                        let new_peer = !tried_peers.contains(connected_peer);
98
99                        async move { new_peer }
100                    });
101                let mut random_peers = pin!(random_peers);
102
103                if let Some(peer_id) = random_peers.next().await {
104                    peer_id
105                } else {
106                    return Err("Can't find peers to download headers from".into());
107                }
108            };
109            tried_peers.insert(peer_id);
110
111            while segment_to_download_to.segment_index() - last_known_segment_index
112                > SegmentIndex::ONE
113            {
114                let segment_indexes = (last_known_segment_index + SegmentIndex::ONE
115                    ..segment_to_download_to.segment_index())
116                    .rev()
117                    .take(SEGMENT_HEADER_NUMBER_PER_REQUEST as usize)
118                    .collect::<Vec<_>>();
119
120                trace!(
121                    target: LOG_TARGET,
122                    %peer_id,
123                    %segment_headers_batch_retry,
124                    segment_indexes_count = %segment_indexes.len(),
125                    first_segment_index = ?segment_indexes.first(),
126                    last_segment_index = ?segment_indexes.last(),
127                    "Getting segment header batch...",
128                );
129
130                let segment_indexes = Arc::new(segment_indexes);
131
132                let segment_headers = match self
133                    .dsn_node
134                    .send_generic_request(
135                        peer_id,
136                        Vec::new(),
137                        SegmentHeaderRequest::SegmentIndexes {
138                            segment_indexes: Arc::clone(&segment_indexes),
139                        },
140                    )
141                    .await
142                {
143                    Ok(response) => response.segment_headers,
144                    Err(error) => {
145                        debug!(
146                            target: LOG_TARGET,
147                            %peer_id,
148                            %segment_headers_batch_retry,
149                            %error,
150                            %last_known_segment_index,
151                            segment_to_download_to = %segment_to_download_to.segment_index(),
152                            "Error getting segment headers from peer",
153                        );
154
155                        continue 'new_peer;
156                    }
157                };
158
159                if !self.is_segment_headers_response_valid(
160                    peer_id,
161                    &segment_indexes,
162                    &segment_headers,
163                ) {
164                    warn!(
165                        target: LOG_TARGET,
166                        %peer_id,
167                        %segment_headers_batch_retry,
168                        "Received segment headers were invalid"
169                    );
170
171                    let _ = self.dsn_node.ban_peer(peer_id).await;
172                }
173
174                for segment_header in segment_headers {
175                    if segment_header.hash() != segment_to_download_to.prev_segment_header_hash() {
176                        error!(
177                            target: LOG_TARGET,
178                            %peer_id,
179                            %segment_headers_batch_retry,
180                            segment_index = %segment_to_download_to.segment_index() - SegmentIndex::ONE,
181                            actual_hash = ?segment_header.hash(),
182                            expected_hash = ?segment_to_download_to.prev_segment_header_hash(),
183                            "Segment header hash doesn't match expected hash of the previous \
184                            segment"
185                        );
186
187                        return Err(
188                            "Segment header hash doesn't match expected hash of the previous \
189                            segment"
190                                .into(),
191                        );
192                    }
193
194                    segment_to_download_to = segment_header;
195                    new_segment_headers.push(segment_header);
196                }
197            }
198        }
199
200        new_segment_headers.reverse();
201
202        if new_segment_headers
203            .first()
204            .expect("Not empty; qed")
205            .prev_segment_header_hash()
206            != last_known_segment_header.hash()
207        {
208            return Err(
209                "Downloaded segment headers do not match last known segment header, ignoring \
210                downloaded headers"
211                    .into(),
212            );
213        }
214
215        Ok(new_segment_headers)
216    }
217
218    /// Return last segment header known to DSN and agreed on by majority of the peer set with
219    /// minimum initial size of [`SEGMENT_HEADER_CONSENSUS_INITIAL_NODES`] peers.
220    ///
221    /// `Ok(None)` is returned when no peers were found.
222    async fn get_last_segment_header(&self) -> Result<Option<SegmentHeader>, Box<dyn Error>> {
223        let mut peer_segment_headers = HashMap::<PeerId, Vec<SegmentHeader>>::default();
224        for (required_peers, retry_attempt) in (1..=SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
225            .rev()
226            .zip(1_usize..)
227        {
228            trace!(target: LOG_TARGET, %retry_attempt, "Downloading last segment headers");
229
230            // Get random peers and acquire segments from them. Some of them could be bootstrap
231            // nodes with no support for request-response protocol for segment commitment.
232            let new_last_known_segment_headers = self
233                .dsn_node
234                .get_closest_peers(PeerId::random().into())
235                .await
236                .inspect_err(|error| {
237                    warn!(target: LOG_TARGET, ?error, "get_closest_peers returned an error");
238                })?
239                .filter(|peer_id| {
240                    let known_peer = peer_segment_headers.contains_key(peer_id);
241
242                    async move { !known_peer }
243                })
244                .map(|peer_id| async move {
245                    let request_result = self
246                        .dsn_node
247                        .send_generic_request(
248                            peer_id,
249                            Vec::new(),
250                            SegmentHeaderRequest::LastSegmentHeaders {
251                                // Request 2 top segment headers, accounting for situations when new
252                                // segment header was just produced and not all nodes have it
253                                limit: 2,
254                            },
255                        )
256                        .await;
257
258                    match request_result {
259                        Ok(SegmentHeaderResponse { segment_headers }) => {
260                            trace!(
261                                target: LOG_TARGET,
262                                %peer_id,
263                                segment_headers_number=%segment_headers.len(),
264                                "Last segment headers request succeeded"
265                            );
266
267                            if !self
268                                .is_last_segment_headers_response_valid(peer_id, &segment_headers)
269                            {
270                                warn!(
271                                    target: LOG_TARGET,
272                                    %peer_id,
273                                    "Received last segment headers response was invalid"
274                                );
275
276                                let _ = self.dsn_node.ban_peer(peer_id).await;
277                                return None;
278                            }
279
280                            Some((peer_id, segment_headers))
281                        }
282                        Err(error) => {
283                            debug!(
284                                target: LOG_TARGET,
285                                %peer_id,
286                                ?error,
287                                "Last segment headers request failed"
288                            );
289                            None
290                        }
291                    }
292                })
293                .take(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
294                .buffer_unordered(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
295                .filter_map(|maybe_result| async move { maybe_result })
296                .collect::<Vec<(PeerId, Vec<SegmentHeader>)>>()
297                .await;
298
299            let last_peers_count = peer_segment_headers.len();
300            peer_segment_headers.extend(new_last_known_segment_headers);
301
302            let peer_count = peer_segment_headers.len();
303
304            if peer_count < required_peers {
305                // If we've got nothing, we have to retry
306                if last_peers_count == 0 {
307                    debug!(
308                        target: LOG_TARGET,
309                        %peer_count,
310                        %required_peers,
311                        %retry_attempt,
312                        "Segment headers consensus requires some peers, will retry"
313                    );
314
315                    continue;
316                }
317                // If there are still attempts left, do more attempts
318                if required_peers > 1 {
319                    debug!(
320                        target: LOG_TARGET,
321                        %peer_count,
322                        %required_peers,
323                        %retry_attempt,
324                        "Segment headers consensus requires more peers, will retry"
325                    );
326
327                    continue;
328                }
329
330                debug!(
331                    target: LOG_TARGET,
332                    %peer_count,
333                    %required_peers,
334                    %retry_attempt,
335                    "Segment headers consensus requires more peers, but no attempts left, so continue as is"
336                );
337            }
338
339            // Calculate votes
340            let mut segment_header_peers: HashMap<SegmentHeader, Vec<PeerId>> = HashMap::new();
341
342            for (peer_id, segment_headers) in peer_segment_headers {
343                for segment_header in segment_headers {
344                    segment_header_peers
345                        .entry(segment_header)
346                        .and_modify(|peers| {
347                            peers.push(peer_id);
348                        })
349                        .or_insert(vec![peer_id]);
350                }
351            }
352
353            let mut segment_header_peers_iter = segment_header_peers.into_iter();
354            let (mut best_segment_header, mut most_peers) =
355                segment_header_peers_iter.next().expect(
356                    "Not empty due to not empty list of peers with non empty list of segment \
357                    headers each; qed",
358                );
359
360            for (segment_header, peers) in segment_header_peers_iter {
361                if peers.len() > most_peers.len()
362                    || (peers.len() == most_peers.len()
363                        && segment_header.segment_index() > best_segment_header.segment_index())
364                {
365                    best_segment_header = segment_header;
366                    most_peers = peers;
367                }
368            }
369
370            return Ok(Some(best_segment_header));
371        }
372
373        Ok(None)
374    }
375
376    /// Validates segment headers and related segment indexes.
377    /// We assume `segment_indexes` to be a sorted collection (we create it manually).
378    fn is_segment_headers_response_valid(
379        &self,
380        peer_id: PeerId,
381        segment_indexes: &[SegmentIndex],
382        segment_headers: &[SegmentHeader],
383    ) -> bool {
384        if segment_headers.len() != segment_indexes.len() {
385            warn!(target: LOG_TARGET, %peer_id, "Segment header and segment indexes collection differ");
386
387            return false;
388        }
389
390        let returned_segment_indexes =
391            BTreeSet::from_iter(segment_headers.iter().map(|rb| rb.segment_index()));
392        if returned_segment_indexes.len() != segment_headers.len() {
393            warn!(target: LOG_TARGET, %peer_id, "Peer banned: it returned collection with duplicated segment headers");
394
395            return false;
396        }
397
398        let indexes_match = segment_indexes.iter().zip(segment_headers.iter()).all(
399            |(segment_index, segment_header)| *segment_index == segment_header.segment_index(),
400        );
401
402        if !indexes_match {
403            warn!(target: LOG_TARGET, %peer_id, "Segment header collection doesn't match segment indexes");
404
405            return false;
406        }
407
408        true
409    }
410
411    fn is_last_segment_headers_response_valid(
412        &self,
413        peer_id: PeerId,
414        segment_headers: &[SegmentHeader],
415    ) -> bool {
416        let segment_indexes = match segment_headers.last() {
417            None => {
418                // Empty collection is invalid, everyone has at least one segment header
419                return false;
420            }
421            Some(last_segment_header) => {
422                let last_segment_index = last_segment_header.segment_index();
423
424                let mut segment_indices = (SegmentIndex::ZERO..=last_segment_index)
425                    .rev()
426                    .take(segment_headers.len())
427                    .collect::<Vec<_>>();
428                segment_indices.reverse();
429                segment_indices
430            }
431        };
432
433        self.is_segment_headers_response_valid(peer_id, &segment_indexes, segment_headers)
434    }
435}