subspace_data_retrieval/
segment_fetcher.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright (C) 2024 Subspace Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// 	http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Fetching segments of the archived history of Subspace Network.

use crate::piece_getter::ObjectPieceGetter;
use async_lock::Semaphore;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use subspace_archiving::archiver::Segment;
use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
use subspace_core_primitives::pieces::Piece;
use subspace_core_primitives::segments::{
    ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex,
};
use subspace_erasure_coding::ErasureCoding;
use tokio::task::spawn_blocking;
use tracing::{debug, trace};

/// Segment getter errors.
#[derive(Debug, thiserror::Error)]
pub enum SegmentGetterError {
    /// Piece getter error
    #[error("Failed to get enough segment pieces")]
    PieceGetter { segment_index: SegmentIndex },

    /// Segment reconstruction error
    #[error("Segment reconstruction error: {source:?}")]
    SegmentReconstruction {
        #[from]
        source: ReconstructorError,
    },

    /// Segment decoding error
    #[error("Segment data decoding error: {source:?}")]
    SegmentDecoding {
        #[from]
        source: parity_scale_codec::Error,
    },
}

/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment.
pub async fn download_segment<PG>(
    segment_index: SegmentIndex,
    piece_getter: &PG,
    erasure_coding: ErasureCoding,
) -> Result<Segment, SegmentGetterError>
where
    PG: ObjectPieceGetter,
{
    let reconstructor = Reconstructor::new(erasure_coding);

    let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;

    let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
        .await
        .expect("Panic if blocking task panicked")?;

    Ok(segment)
}

/// Concurrently downloads the pieces for `segment_index`.
// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks():
// <https://github.com/autonomys/subspace/blob/d71ca47e45e1b53cd2e472413caa23472a91cd74/crates/subspace-service/src/sync_from_dsn/import_blocks.rs#L236-L322>
//
// TODO: pass a lower concurrency limit into this function, to avoid overwhelming residential routers or slow connections
pub async fn download_segment_pieces<PG>(
    segment_index: SegmentIndex,
    piece_getter: &PG,
) -> Result<Vec<Option<Piece>>, SegmentGetterError>
where
    PG: ObjectPieceGetter,
{
    debug!(%segment_index, "Retrieving pieces of the segment");

    let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS);

    let mut received_segment_pieces = segment_index
        .segment_piece_indexes_source_first()
        .into_iter()
        .map(|piece_index| {
            // Source pieces will acquire permit here right away
            let maybe_permit = semaphore.try_acquire();

            async move {
                let permit = match maybe_permit {
                    Some(permit) => permit,
                    None => {
                        // Other pieces will acquire permit here instead
                        semaphore.acquire().await
                    }
                };
                let piece = match piece_getter.get_piece(piece_index).await {
                    Ok(Some(piece)) => piece,
                    Ok(None) => {
                        trace!(?piece_index, "Piece not found");
                        return None;
                    }
                    Err(error) => {
                        trace!(
                            %error,
                            ?piece_index,
                            "Piece request failed",
                        );
                        return None;
                    }
                };

                trace!(?piece_index, "Piece request succeeded");

                // Piece was received successfully, "remove" this slot from semaphore
                permit.forget();
                Some((piece_index, piece))
            }
        })
        .collect::<FuturesUnordered<_>>();

    let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
    let mut pieces_received = 0;

    while let Some(maybe_piece) = received_segment_pieces.next().await {
        let Some((piece_index, piece)) = maybe_piece else {
            continue;
        };

        segment_pieces
            .get_mut(piece_index.position() as usize)
            .expect("Piece position is by definition within segment; qed")
            .replace(piece);

        pieces_received += 1;

        if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS {
            trace!(%segment_index, "Received half of the segment.");
            break;
        }
    }

    if pieces_received < RecordedHistorySegment::NUM_RAW_RECORDS {
        debug!(
            %segment_index,
            pieces_received,
            pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS,
            "Failed to get half of the pieces in the segment"
        );

        Err(SegmentGetterError::PieceGetter { segment_index })
    } else {
        trace!(
            %segment_index,
            pieces_received,
            pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS,
            "Successfully retrieved enough pieces of the segment"
        );

        Ok(segment_pieces)
    }
}