subspace_service/sync_from_dsn/
segment_header_downloader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::collections::{BTreeSet, HashMap};
use std::error::Error;
use std::pin::pin;
use std::sync::Arc;
use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
use subspace_networking::libp2p::PeerId;
use subspace_networking::protocols::request_response::handlers::segment_header::{
    SegmentHeaderRequest, SegmentHeaderResponse,
};
use subspace_networking::Node;
use tracing::{debug, error, trace, warn};

const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
/// Initial number of peers to query for segment header
const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;

/// Helps downloader segment headers from DSN
pub struct SegmentHeaderDownloader<'a> {
    dsn_node: &'a Node,
}

impl<'a> SegmentHeaderDownloader<'a> {
    pub fn new(dsn_node: &'a Node) -> Self {
        Self { dsn_node }
    }

    /// Returns new segment headers known to DSN, ordered from 0 to the last known, but newer than
    /// `last_known_segment_index`
    pub async fn get_segment_headers(
        &self,
        last_known_segment_header: &SegmentHeader,
    ) -> Result<Vec<SegmentHeader>, Box<dyn Error>> {
        let last_known_segment_index = last_known_segment_header.segment_index();
        trace!(
            %last_known_segment_index,
            "Searching for latest segment header"
        );

        let Some((last_segment_header, peers)) = self.get_last_segment_header().await? else {
            return Ok(Vec::new());
        };

        if last_segment_header.segment_index() <= last_known_segment_index {
            debug!(
                %last_known_segment_index,
                last_found_segment_index = %last_segment_header.segment_index(),
                "No new segment headers found, nothing to download"
            );

            return Ok(Vec::new());
        }

        debug!(
            %last_known_segment_index,
            last_segment_index = %last_segment_header.segment_index(),
            "Downloading segment headers"
        );

        let Some(new_segment_headers_count) = last_segment_header
            .segment_index()
            .checked_sub(last_known_segment_index)
        else {
            return Ok(Vec::new());
        };
        let mut new_segment_headers =
            Vec::with_capacity(u64::from(new_segment_headers_count) as usize);
        new_segment_headers.push(last_segment_header);

        let mut segment_to_download_to = last_segment_header;
        while segment_to_download_to.segment_index() - last_known_segment_index > SegmentIndex::ONE
        {
            let segment_indexes = (last_known_segment_index + SegmentIndex::ONE
                ..segment_to_download_to.segment_index())
                .rev()
                .take(SEGMENT_HEADER_NUMBER_PER_REQUEST as usize)
                .collect();

            let (peer_id, segment_headers) = self
                .get_segment_headers_batch(&peers, segment_indexes)
                .await?;

            for segment_header in segment_headers {
                if segment_header.hash() != segment_to_download_to.prev_segment_header_hash() {
                    error!(
                        %peer_id,
                        segment_index=%segment_to_download_to.segment_index() - SegmentIndex::ONE,
                        actual_hash=?segment_header.hash(),
                        expected_hash=?segment_to_download_to.prev_segment_header_hash(),
                        "Segment header hash doesn't match expected hash from the last block"
                    );

                    return Err(
                        "Segment header hash doesn't match expected hash from the last block"
                            .into(),
                    );
                }

                segment_to_download_to = segment_header;
                new_segment_headers.push(segment_header);
            }
        }

        new_segment_headers.reverse();

        if new_segment_headers
            .first()
            .expect("Not empty; qed")
            .prev_segment_header_hash()
            != last_known_segment_header.hash()
        {
            return Err(
                "Downloaded segment headers do not match last known segment header, ignoring \
                downloaded headers"
                    .into(),
            );
        }

        Ok(new_segment_headers)
    }

    /// Return last segment header known to DSN and agreed on by majority of the peer set with
    /// minimum initial size of [`SEGMENT_HEADER_CONSENSUS_INITIAL_NODES`] peers.
    ///
    /// `Ok(None)` is returned when no peers were found.
    async fn get_last_segment_header(
        &self,
    ) -> Result<Option<(SegmentHeader, Vec<PeerId>)>, Box<dyn Error>> {
        let mut peer_segment_headers = HashMap::<PeerId, Vec<SegmentHeader>>::default();
        for (required_peers, retry_attempt) in (1..=SEGMENT_HEADER_CONSENSUS_INITIAL_NODES)
            .rev()
            .zip(1_usize..)
        {
            trace!(%retry_attempt, "Downloading last segment headers");

            // Get random peers. Some of them could be bootstrap nodes with no support for
            // request-response protocol for segment commitment.
            let get_peers_result = self
                .dsn_node
                .get_closest_peers(PeerId::random().into())
                .await;

            // Acquire segment headers from peers.
            let peers = match get_peers_result {
                Ok(get_peers_stream) => {
                    get_peers_stream
                        .filter(|peer_id| {
                            let known_peer = peer_segment_headers.contains_key(peer_id);

                            async move { !known_peer }
                        })
                        .collect::<Vec<_>>()
                        .await
                }
                Err(err) => {
                    warn!(?err, "get_closest_peers returned an error");

                    return Err(err.into());
                }
            };

            trace!(peers_count = %peers.len(), "Found closest peers");

            let new_last_known_segment_headers = peers
                .into_iter()
                .map(|peer_id| async move {
                    let request_result = self
                        .dsn_node
                        .send_generic_request(
                            peer_id,
                            Vec::new(),
                            SegmentHeaderRequest::LastSegmentHeaders {
                                // Request 2 top segment headers, accounting for situations when new
                                // segment header was just produced and not all nodes have it
                                limit: 2,
                            },
                        )
                        .await;

                    match request_result {
                        Ok(SegmentHeaderResponse { segment_headers }) => {
                            trace!(
                                %peer_id,
                                segment_headers_number=%segment_headers.len(),
                                "Last segment headers request succeeded"
                            );

                            if !self
                                .is_last_segment_headers_response_valid(peer_id, &segment_headers)
                            {
                                warn!(
                                    %peer_id,
                                    "Received last segment headers response was invalid"
                                );

                                let _ = self.dsn_node.ban_peer(peer_id).await;
                                return None;
                            }

                            Some((peer_id, segment_headers))
                        }
                        Err(error) => {
                            debug!(%peer_id, ?error, "Last segment headers request failed");
                            None
                        }
                    }
                })
                .collect::<FuturesUnordered<_>>()
                .filter_map(|maybe_result| async move { maybe_result });
            let mut new_last_known_segment_headers = pin!(new_last_known_segment_headers);

            let last_peers_count = peer_segment_headers.len();

            while let Some((peer_id, segment_headers)) = new_last_known_segment_headers.next().await
            {
                peer_segment_headers.insert(peer_id, segment_headers);
            }

            let peer_count = peer_segment_headers.len();

            if peer_count < required_peers {
                // If we've got nothing, we have to retry
                if last_peers_count == 0 {
                    debug!(
                        %peer_count,
                        %required_peers,
                        %retry_attempt,
                        "Segment headers consensus requires some peers, will retry"
                    );

                    continue;
                }
                // If there are still attempts left, do more attempts
                if required_peers > 1 {
                    debug!(
                        %peer_count,
                        %required_peers,
                        %retry_attempt,
                        "Segment headers consensus requires more peers, will retry"
                    );

                    continue;
                }

                debug!(
                    %peer_count,
                    %required_peers,
                    %retry_attempt,
                    "Segment headers consensus requires more peers, but no attempts left, so continue as is"
                );
            }

            // Calculate votes
            let mut segment_header_peers: HashMap<SegmentHeader, Vec<PeerId>> = HashMap::new();

            for (peer_id, segment_headers) in peer_segment_headers {
                for segment_header in segment_headers {
                    segment_header_peers
                        .entry(segment_header)
                        .and_modify(|peers| {
                            peers.push(peer_id);
                        })
                        .or_insert(vec![peer_id]);
                }
            }

            let mut segment_header_peers_iter = segment_header_peers.into_iter();
            let (mut best_segment_header, mut most_peers) =
                segment_header_peers_iter.next().expect(
                    "Not empty due to not empty list of peers with non empty list of segment \
                    headers each; qed",
                );

            for (segment_header, peers) in segment_header_peers_iter {
                if peers.len() > most_peers.len()
                    || (peers.len() == most_peers.len()
                        && segment_header.segment_index() > best_segment_header.segment_index())
                {
                    best_segment_header = segment_header;
                    most_peers = peers;
                }
            }

            return Ok(Some((best_segment_header, most_peers)));
        }

        Ok(None)
    }

    /// Validates segment headers and related segment indexes.
    /// We assume `segment_indexes` to be a sorted collection (we create it manually).
    fn is_segment_headers_response_valid(
        &self,
        peer_id: PeerId,
        segment_indexes: &[SegmentIndex],
        segment_headers: &[SegmentHeader],
    ) -> bool {
        if segment_headers.len() != segment_indexes.len() {
            warn!(%peer_id, "Segment header and segment indexes collection differ");

            return false;
        }

        let returned_segment_indexes =
            BTreeSet::from_iter(segment_headers.iter().map(|rb| rb.segment_index()));
        if returned_segment_indexes.len() != segment_headers.len() {
            warn!(%peer_id, "Peer banned: it returned collection with duplicated segment headers");

            return false;
        }

        let indexes_match = segment_indexes.iter().zip(segment_headers.iter()).all(
            |(segment_index, segment_header)| *segment_index == segment_header.segment_index(),
        );

        if !indexes_match {
            warn!(%peer_id, "Segment header collection doesn't match segment indexes");

            return false;
        }

        true
    }

    fn is_last_segment_headers_response_valid(
        &self,
        peer_id: PeerId,
        segment_headers: &[SegmentHeader],
    ) -> bool {
        let segment_indexes = match segment_headers.last() {
            None => {
                // Empty collection is invalid, everyone has at least one segment header
                return false;
            }
            Some(last_segment_header) => {
                let last_segment_index = last_segment_header.segment_index();

                let mut segment_indices = (SegmentIndex::ZERO..=last_segment_index)
                    .rev()
                    .take(segment_headers.len())
                    .collect::<Vec<_>>();
                segment_indices.reverse();
                segment_indices
            }
        };

        self.is_segment_headers_response_valid(peer_id, &segment_indexes, segment_headers)
    }

    async fn get_segment_headers_batch(
        &self,
        peers: &[PeerId],
        segment_indexes: Vec<SegmentIndex>,
    ) -> Result<(PeerId, Vec<SegmentHeader>), Box<dyn Error>> {
        trace!(?segment_indexes, "Getting segment header batch..");

        let segment_indexes = Arc::new(segment_indexes);

        for &peer_id in peers {
            trace!(%peer_id, "get_closest_peers returned an item");

            let request_result = self
                .dsn_node
                .send_generic_request(
                    peer_id,
                    Vec::new(),
                    SegmentHeaderRequest::SegmentIndexes {
                        segment_indexes: Arc::clone(&segment_indexes),
                    },
                )
                .await;

            match request_result {
                Ok(SegmentHeaderResponse { segment_headers }) => {
                    trace!(%peer_id, ?segment_indexes, "Segment header request succeeded");

                    if !self.is_segment_headers_response_valid(
                        peer_id,
                        &segment_indexes,
                        &segment_headers,
                    ) {
                        warn!(%peer_id, "Received segment headers were invalid");

                        let _ = self.dsn_node.ban_peer(peer_id).await;
                    }

                    return Ok((peer_id, segment_headers));
                }
                Err(error) => {
                    debug!(%peer_id, ?segment_indexes, ?error, "Segment header request failed");
                }
            };
        }
        Err("No more peers for segment headers.".into())
    }
}