subspace_farmer_components/
segment_reconstruction.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use subspace_archiving::piece_reconstructor::{PiecesReconstructor, ReconstructorError};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_data_retrieval::piece_getter::PieceGetter;
use subspace_data_retrieval::segment_downloading::{
    download_segment_pieces, SegmentDownloadingError,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_kzg::Kzg;
use thiserror::Error;
use tokio::task::JoinError;
use tracing::{error, info};

#[derive(Debug, Error)]
pub(crate) enum SegmentReconstructionError {
    /// Segment downloading failed
    #[error("Segment downloading failed: {0}")]
    SegmentDownloadingFailed(#[from] SegmentDownloadingError),

    /// Internal piece retrieval process failed
    #[error("Piece reconstruction failed: {0}")]
    ReconstructionFailed(#[from] ReconstructorError),

    /// Join error
    #[error("Join error: {0}")]
    JoinError(#[from] JoinError),
}

pub(crate) async fn recover_missing_piece<PG>(
    piece_getter: &PG,
    kzg: Kzg,
    erasure_coding: ErasureCoding,
    missing_piece_index: PieceIndex,
) -> Result<Piece, SegmentReconstructionError>
where
    PG: PieceGetter + Send + Sync,
{
    info!(%missing_piece_index, "Recovering missing piece...");
    let segment_index = missing_piece_index.segment_index();
    let position = missing_piece_index.position();

    let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;

    let result = tokio::task::spawn_blocking(move || {
        let reconstructor = PiecesReconstructor::new(kzg, erasure_coding);

        reconstructor.reconstruct_piece(&segment_pieces, position as usize)
    })
    .await??;

    info!(%missing_piece_index, "Recovering missing piece succeeded.");

    Ok(result)
}