subspace_service/sync_from_dsn/
segment_header_downloader.rs

1use futures::StreamExt;
2use std::collections::{BTreeSet, HashMap, HashSet};
3use std::error::Error;
4use std::pin::pin;
5use std::sync::Arc;
6use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
7use subspace_networking::Node;
8use subspace_networking::libp2p::PeerId;
9use subspace_networking::protocols::request_response::handlers::segment_header::{
10    SegmentHeaderRequest, SegmentHeaderResponse,
11};
12use tracing::{debug, error, trace, warn};
13
14const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
15/// Initial number of peers to query for last segment header.
16const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;
17/// How many distinct peers to try before giving up on downloading segment headers batches.
18const SEGMENT_HEADER_PEERS_RETRIES: u32 = 20;
19
20/// Helps downloader segment headers from DSN
21pub struct SegmentHeaderDownloader {
22    dsn_node: Node,
23}
24
25impl SegmentHeaderDownloader {
26    pub fn new(dsn_node: &Node) -> Self {
27        // TODO: Should not be necessary to store owned copy, but it results in confusing compiler
28        //  errors otherwise
29        Self {
30            dsn_node: dsn_node.clone(),
31        }
32    }
33
34    /// Returns new segment headers known to DSN, ordered from 0 to the last known, but newer than
35    /// `last_known_segment_index`
36    pub async fn get_segment_headers(
37        &self,
38        last_known_segment_header: &SegmentHeader,
39    ) -> Result<Vec<SegmentHeader>, Box<dyn Error>> {
40        let last_known_segment_index = last_known_segment_header.segment_index();
41        trace!(
42            %last_known_segment_index,
43            "Searching for latest segment header"
44        );
45
46        let Some(last_segment_header) = self.get_last_segment_header().await? else {
47            return Ok(Vec::new());
48        };
49
50        if last_segment_header.segment_index() <= last_known_segment_index {
51            debug!(
52                %last_known_segment_index,
53                last_found_segment_index = %last_segment_header.segment_index(),
54                "No new segment headers found, nothing to download"
55            );
56
57            return Ok(Vec::new());
58        }
59
60        debug!(
61            %last_known_segment_index,
62            last_segment_index = %last_segment_header.segment_index(),
63            "Downloading segment headers"
64        );
65
66        let new_segment_headers_count = last_segment_header
67            .segment_index()
68            .checked_sub(last_known_segment_index)
69            .expect("just checked last_segment_header is greater; qed");
70
71        let mut new_segment_headers =
72            Vec::with_capacity(u64::from(new_segment_headers_count) as usize);
73        new_segment_headers.push(last_segment_header);
74
75        let mut tried_peers = HashSet::<PeerId>::new();
76        let mut segment_to_download_to = last_segment_header;
77        'new_peer: for segment_headers_batch_retry in 0..SEGMENT_HEADER_PEERS_RETRIES {
78            let maybe_connected_peer = self
79                .dsn_node
80                .connected_servers()
81                .await?
82                .into_iter()
83                .find(|connected_peer| !tried_peers.contains(connected_peer));
84
85            let peer_id = if let Some(peer_id) = maybe_connected_peer {
86                peer_id
87            } else {
88                let random_peers = self
89                    .dsn_node
90                    .get_closest_peers(PeerId::random().into())
91                    .await?
92                    .filter(|connected_peer| {
93                        let new_peer = !tried_peers.contains(connected_peer);
94
95                        async move { new_peer }
96                    });
97                let mut random_peers = pin!(random_peers);
98
99                if let Some(peer_id) = random_peers.next().await {
100                    peer_id
101                } else {
102                    return Err("Can't find peers to download headers from".into());
103                }
104            };
105            tried_peers.insert(peer_id);
106
107            while segment_to_download_to.segment_index() - last_known_segment_index
108                > SegmentIndex::ONE
109            {
110                let segment_indexes = (last_known_segment_index + SegmentIndex::ONE
111                    ..segment_to_download_to.segment_index())
112                    .rev()
113                    .take(SEGMENT_HEADER_NUMBER_PER_REQUEST as usize)
114                    .collect::<Vec<_>>();
115
116                trace!(
117                    %peer_id,
118                    %segment_headers_batch_retry,
119                    segment_indexes_count = %segment_indexes.len(),
120                    first_segment_index = ?segment_indexes.first(),
121                    last_segment_index = ?segment_indexes.last(),
122                    "Getting segment header batch...",
123                );
124
125                let segment_indexes = Arc::new(segment_indexes);
126
127                let segment_headers = match self
128                    .dsn_node
129                    .send_generic_request(
130                        peer_id,
131                        Vec::new(),
132                        SegmentHeaderRequest::SegmentIndexes {
133                            segment_indexes: Arc::clone(&segment_indexes),
134                        },
135                    )
136                    .await
137                {
138                    Ok(response) => response.segment_headers,
139                    Err(error) => {
140                        debug!(
141                            %peer_id,
142                            %segment_headers_batch_retry,
143                            %error,
144                            %last_known_segment_index,
145                            segment_to_download_to = %segment_to_download_to.segment_index(),
146                            "Error getting segment headers from peer",
147                        );
148
149                        continue 'new_peer;
150                    }
151                };
152
153                if !self.is_segment_headers_response_valid(
154                    peer_id,
155                    &segment_indexes,
156                    &segment_headers,
157                ) {
158                    warn!(
159                        %peer_id,
160                        %segment_headers_batch_retry,
161                        "Received segment headers were invalid"
162                    );
163
164                    let _ = self.dsn_node.ban_peer(peer_id).await;
165                }
166
167                for segment_header in segment_headers {
168                    if segment_header.hash() != segment_to_download_to.prev_segment_header_hash() {
169                        error!(
170                            %peer_id,
171                            %segment_headers_batch_retry,
172                            segment_index = %segment_to_download_to.segment_index() - SegmentIndex::ONE,
173                            actual_hash = ?segment_header.hash(),
174                            expected_hash = ?segment_to_download_to.prev_segment_header_hash(),
175                            "Segment header hash doesn't match expected hash of the previous \
176                            segment"
177                        );
178
179                        return Err(
180                            "Segment header hash doesn't match expected hash of the previous \
181                            segment"
182                                .into(),
183                        );
184                    }
185
186                    segment_to_download_to = segment_header;
187                    new_segment_headers.push(segment_header);
188                }
189            }
190        }
191
192        new_segment_headers.reverse();
193
194        if new_segment_headers
195            .first()
196            .expect("Not empty; qed")
197            .prev_segment_header_hash()
198            != last_known_segment_header.hash()
199        {
200            return Err(
201                "Downloaded segment headers do not match last known segment header, ignoring \
202                downloaded headers"
203                    .into(),
204            );
205        }
206
207        Ok(new_segment_headers)
208    }
209
210    /// Return last segment header known to DSN and agreed on by majority of the peer set with
211    /// minimum initial size of [`SEGMENT_HEADER_CONSENSUS_INITIAL_NODES`] peers.
212    ///
213    /// `Ok(None)` is returned when no peers were found.
214    async fn get_last_segment_header(&self) -> Result<Option<SegmentHeader>, Box<dyn Error>> {
215        let mut peer_segment_headers = HashMap::<PeerId, Vec<SegmentHeader>>::default();
216        for (required_peers, retry_attempt) in (1..=SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
217            .rev()
218            .zip(1_usize..)
219        {
220            trace!( %retry_attempt, "Downloading last segment headers");
221
222            // Get random peers and acquire segments from them. Some of them could be bootstrap
223            // nodes with no support for request-response protocol for segment commitment.
224            let new_last_known_segment_headers = self
225                .dsn_node
226                .get_closest_peers(PeerId::random().into())
227                .await
228                .inspect_err(|error| {
229                    warn!(?error, "get_closest_peers returned an error");
230                })?
231                .filter(|peer_id| {
232                    let known_peer = peer_segment_headers.contains_key(peer_id);
233
234                    async move { !known_peer }
235                })
236                .map(|peer_id| async move {
237                    let request_result = self
238                        .dsn_node
239                        .send_generic_request(
240                            peer_id,
241                            Vec::new(),
242                            SegmentHeaderRequest::LastSegmentHeaders {
243                                // Request 2 top segment headers, accounting for situations when new
244                                // segment header was just produced and not all nodes have it
245                                limit: 2,
246                            },
247                        )
248                        .await;
249
250                    match request_result {
251                        Ok(SegmentHeaderResponse { segment_headers }) => {
252                            trace!(
253                                %peer_id,
254                                segment_headers_number=%segment_headers.len(),
255                                "Last segment headers request succeeded"
256                            );
257
258                            if !self
259                                .is_last_segment_headers_response_valid(peer_id, &segment_headers)
260                            {
261                                warn!(
262                                    %peer_id,
263                                    "Received last segment headers response was invalid"
264                                );
265
266                                let _ = self.dsn_node.ban_peer(peer_id).await;
267                                return None;
268                            }
269
270                            Some((peer_id, segment_headers))
271                        }
272                        Err(error) => {
273                            debug!(
274                                %peer_id,
275                                ?error,
276                                "Last segment headers request failed"
277                            );
278                            None
279                        }
280                    }
281                })
282                .take(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
283                .buffer_unordered(SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
284                .filter_map(|maybe_result| async move { maybe_result })
285                .collect::<Vec<(PeerId, Vec<SegmentHeader>)>>()
286                .await;
287
288            let last_peers_count = peer_segment_headers.len();
289            peer_segment_headers.extend(new_last_known_segment_headers);
290
291            let peer_count = peer_segment_headers.len();
292
293            if peer_count < required_peers {
294                // If we've got nothing, we have to retry
295                if last_peers_count == 0 {
296                    debug!(
297                        %peer_count,
298                        %required_peers,
299                        %retry_attempt,
300                        "Segment headers consensus requires some peers, will retry"
301                    );
302
303                    continue;
304                }
305                // If there are still attempts left, do more attempts
306                if required_peers > 1 {
307                    debug!(
308                        %peer_count,
309                        %required_peers,
310                        %retry_attempt,
311                        "Segment headers consensus requires more peers, will retry"
312                    );
313
314                    continue;
315                }
316
317                debug!(
318                    %peer_count,
319                    %required_peers,
320                    %retry_attempt,
321                    "Segment headers consensus requires more peers, but no attempts left, so continue as is"
322                );
323            }
324
325            // Calculate votes
326            let mut segment_header_peers: HashMap<SegmentHeader, Vec<PeerId>> = HashMap::new();
327
328            for (peer_id, segment_headers) in peer_segment_headers {
329                for segment_header in segment_headers {
330                    segment_header_peers
331                        .entry(segment_header)
332                        .and_modify(|peers| {
333                            peers.push(peer_id);
334                        })
335                        .or_insert(vec![peer_id]);
336                }
337            }
338
339            let mut segment_header_peers_iter = segment_header_peers.into_iter();
340            let (mut best_segment_header, mut most_peers) =
341                segment_header_peers_iter.next().expect(
342                    "Not empty due to not empty list of peers with non empty list of segment \
343                    headers each; qed",
344                );
345
346            for (segment_header, peers) in segment_header_peers_iter {
347                if peers.len() > most_peers.len()
348                    || (peers.len() == most_peers.len()
349                        && segment_header.segment_index() > best_segment_header.segment_index())
350                {
351                    best_segment_header = segment_header;
352                    most_peers = peers;
353                }
354            }
355
356            return Ok(Some(best_segment_header));
357        }
358
359        Ok(None)
360    }
361
362    /// Validates segment headers and related segment indexes.
363    /// We assume `segment_indexes` to be a sorted collection (we create it manually).
364    fn is_segment_headers_response_valid(
365        &self,
366        peer_id: PeerId,
367        segment_indexes: &[SegmentIndex],
368        segment_headers: &[SegmentHeader],
369    ) -> bool {
370        if segment_headers.len() != segment_indexes.len() {
371            warn!( %peer_id, "Segment header and segment indexes collection differ");
372
373            return false;
374        }
375
376        let returned_segment_indexes =
377            BTreeSet::from_iter(segment_headers.iter().map(|rb| rb.segment_index()));
378        if returned_segment_indexes.len() != segment_headers.len() {
379            warn!( %peer_id, "Peer banned: it returned collection with duplicated segment headers");
380
381            return false;
382        }
383
384        let indexes_match = segment_indexes.iter().zip(segment_headers.iter()).all(
385            |(segment_index, segment_header)| *segment_index == segment_header.segment_index(),
386        );
387
388        if !indexes_match {
389            warn!( %peer_id, "Segment header collection doesn't match segment indexes");
390
391            return false;
392        }
393
394        true
395    }
396
397    fn is_last_segment_headers_response_valid(
398        &self,
399        peer_id: PeerId,
400        segment_headers: &[SegmentHeader],
401    ) -> bool {
402        let segment_indexes = match segment_headers.last() {
403            None => {
404                // Empty collection is invalid, everyone has at least one segment header
405                return false;
406            }
407            Some(last_segment_header) => {
408                let last_segment_index = last_segment_header.segment_index();
409
410                let mut segment_indices = (SegmentIndex::ZERO..=last_segment_index)
411                    .rev()
412                    .take(segment_headers.len())
413                    .collect::<Vec<_>>();
414                segment_indices.reverse();
415                segment_indices
416            }
417        };
418
419        self.is_segment_headers_response_valid(peer_id, &segment_indexes, segment_headers)
420    }
421}