subspace_farmer_components/
plotting.rs

1//! Plotting utilities
2//!
3//! This module contains functions and data structures that can be used for plotting purposes
4//! (primarily with CPU).
5//!
6//! Plotted sectors can be written to plot and later [`read`](crate::reading) and/or
7//! [`audited`](crate::auditing)/[`proven`](crate::proving) using other modules of this crate.
8
9use crate::sector::{
10    sector_record_chunks_size, sector_size, EncodedChunksUsed, RawSector, RecordMetadata,
11    SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
12};
13use crate::segment_reconstruction::recover_missing_piece;
14use crate::FarmerProtocolInfo;
15use async_lock::{Mutex as AsyncMutex, Semaphore};
16use backoff::future::retry;
17use backoff::{Error as BackoffError, ExponentialBackoff};
18use futures::stream::FuturesUnordered;
19use futures::{select, StreamExt};
20use parity_scale_codec::{Decode, Encode};
21use parking_lot::Mutex;
22use rayon::prelude::*;
23use std::collections::HashMap;
24use std::simd::Simd;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::Arc;
27use std::time::Duration;
28use subspace_core_primitives::hashes::{blake3_hash, blake3_hash_parallel, Blake3Hash};
29use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record};
30use subspace_core_primitives::pos::PosSeed;
31use subspace_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
32use subspace_core_primitives::segments::HistorySize;
33use subspace_core_primitives::{PublicKey, ScalarBytes};
34use subspace_data_retrieval::piece_getter::PieceGetter;
35use subspace_erasure_coding::ErasureCoding;
36use subspace_kzg::{Kzg, Scalar};
37use subspace_proof_of_space::{Table, TableGenerator};
38use thiserror::Error;
39use tracing::{debug, trace, warn};
40
41const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
42
43fn default_backoff() -> ExponentialBackoff {
44    ExponentialBackoff {
45        initial_interval: Duration::from_secs(15),
46        max_interval: Duration::from_secs(10 * 60),
47        // Try until we get a valid piece
48        max_elapsed_time: None,
49        ..ExponentialBackoff::default()
50    }
51}
52
53/// Information about sector that was plotted
54#[derive(Debug, Clone, Encode, Decode)]
55pub struct PlottedSector {
56    /// Sector ID
57    pub sector_id: SectorId,
58    /// Sector index
59    pub sector_index: SectorIndex,
60    /// Sector metadata
61    pub sector_metadata: SectorMetadataChecksummed,
62    /// Indexes of pieces that were plotted
63    pub piece_indexes: Vec<PieceIndex>,
64}
65
66/// Plotting status
67#[derive(Debug, Error)]
68pub enum PlottingError {
69    /// Records encoder error
70    #[error("Records encoder error: {error}")]
71    RecordsEncoderError {
72        /// Lower-level error
73        error: anyhow::Error,
74    },
75    /// Bad sector output size
76    #[error("Bad sector output size: provided {provided}, expected {expected}")]
77    BadSectorOutputSize {
78        /// Actual size
79        provided: usize,
80        /// Expected size
81        expected: usize,
82    },
83    /// Can't recover missing piece
84    #[error("Can't recover missing piece {piece_index}: {error}")]
85    PieceRecoveryFailed {
86        /// Piece index
87        piece_index: PieceIndex,
88        /// Lower-level error
89        error: anyhow::Error,
90    },
91    /// Failed to retrieve piece
92    #[error("Failed to retrieve pieces: {error}")]
93    FailedToRetrievePieces {
94        /// Lower-level error
95        error: anyhow::Error,
96    },
97    /// Abort early
98    #[error("Abort early")]
99    AbortEarly,
100}
101
102/// Options for plotting a sector.
103///
104/// Sector output and sector metadata output should be either empty (in which case they'll be
105/// resized to correct size automatically) or correctly sized from the beginning or else error will
106/// be returned.
107#[derive(Debug)]
108pub struct PlotSectorOptions<'a, RE, PG> {
109    /// Public key corresponding to sector
110    pub public_key: &'a PublicKey,
111    /// Sector index
112    pub sector_index: SectorIndex,
113    /// Getter for pieces of archival history
114    pub piece_getter: &'a PG,
115    /// Farmer protocol info
116    pub farmer_protocol_info: FarmerProtocolInfo,
117    /// KZG instance
118    pub kzg: &'a Kzg,
119    /// Erasure coding instance
120    pub erasure_coding: &'a ErasureCoding,
121    /// How many pieces should sector contain
122    pub pieces_in_sector: u16,
123    /// Where plotted sector should be written, vector must either be empty (in which case it'll be
124    /// resized to correct size automatically) or correctly sized from the beginning
125    pub sector_output: &'a mut Vec<u8>,
126    /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
127    /// usage of the plotting process, permit will be held until the end of the plotting process
128    pub downloading_semaphore: Option<Arc<Semaphore>>,
129    /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
130    /// allow one permit at a time for efficient CPU utilization
131    pub encoding_semaphore: Option<&'a Semaphore>,
132    /// Proof of space table generators
133    pub records_encoder: &'a mut RE,
134    /// Whether encoding should be aborted early
135    pub abort_early: &'a AtomicBool,
136}
137
138/// Plot a single sector.
139///
140/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
141///
142/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
143/// separate thread in order to prevent blocking an executor.
144pub async fn plot_sector<RE, PG>(
145    options: PlotSectorOptions<'_, RE, PG>,
146) -> Result<PlottedSector, PlottingError>
147where
148    RE: RecordsEncoder,
149    PG: PieceGetter + Send + Sync,
150{
151    let PlotSectorOptions {
152        public_key,
153        sector_index,
154        piece_getter,
155        farmer_protocol_info,
156        kzg,
157        erasure_coding,
158        pieces_in_sector,
159        sector_output,
160        downloading_semaphore,
161        encoding_semaphore,
162        records_encoder,
163        abort_early,
164    } = options;
165
166    let _downloading_permit = match downloading_semaphore {
167        Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
168        None => None,
169    };
170
171    let download_sector_fut = download_sector(DownloadSectorOptions {
172        public_key,
173        sector_index,
174        piece_getter,
175        farmer_protocol_info,
176        kzg,
177        erasure_coding,
178        pieces_in_sector,
179    });
180
181    let _encoding_permit = match encoding_semaphore {
182        Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
183        None => None,
184    };
185
186    let encoded_sector = encode_sector(
187        download_sector_fut.await?,
188        EncodeSectorOptions::<RE> {
189            sector_index,
190            records_encoder,
191            abort_early,
192        },
193    )?;
194
195    if abort_early.load(Ordering::Acquire) {
196        return Err(PlottingError::AbortEarly);
197    }
198
199    write_sector(&encoded_sector, sector_output)?;
200
201    Ok(encoded_sector.plotted_sector)
202}
203
204/// Opaque sector downloading result and ready for writing
205#[derive(Debug)]
206pub struct DownloadedSector {
207    sector_id: SectorId,
208    piece_indices: Vec<PieceIndex>,
209    raw_sector: RawSector,
210    history_size: HistorySize,
211}
212
213/// Options for sector downloading
214#[derive(Debug)]
215pub struct DownloadSectorOptions<'a, PG> {
216    /// Public key corresponding to sector
217    pub public_key: &'a PublicKey,
218    /// Sector index
219    pub sector_index: SectorIndex,
220    /// Getter for pieces of archival history
221    pub piece_getter: &'a PG,
222    /// Farmer protocol info
223    pub farmer_protocol_info: FarmerProtocolInfo,
224    /// KZG instance
225    pub kzg: &'a Kzg,
226    /// Erasure coding instance
227    pub erasure_coding: &'a ErasureCoding,
228    /// How many pieces should sector contain
229    pub pieces_in_sector: u16,
230}
231
232/// Download sector for plotting.
233///
234/// This will identify necessary pieces and download them using provided piece getter, after which
235/// they can be encoded using [`encode_sector`] and written to the plot.
236pub async fn download_sector<PG>(
237    options: DownloadSectorOptions<'_, PG>,
238) -> Result<DownloadedSector, PlottingError>
239where
240    PG: PieceGetter + Send + Sync,
241{
242    let DownloadSectorOptions {
243        public_key,
244        sector_index,
245        piece_getter,
246        farmer_protocol_info,
247        kzg,
248        erasure_coding,
249        pieces_in_sector,
250    } = options;
251
252    let sector_id = SectorId::new(
253        public_key.hash(),
254        sector_index,
255        farmer_protocol_info.history_size,
256    );
257
258    let piece_indices = (PieceOffset::ZERO..)
259        .take(pieces_in_sector.into())
260        .map(|piece_offset| {
261            sector_id.derive_piece_index(
262                piece_offset,
263                farmer_protocol_info.history_size,
264                farmer_protocol_info.max_pieces_in_sector,
265                farmer_protocol_info.recent_segments,
266                farmer_protocol_info.recent_history_fraction,
267            )
268        })
269        .collect::<Vec<_>>();
270
271    let raw_sector = {
272        let mut raw_sector = RawSector::new(pieces_in_sector);
273        let mut pieces_to_download =
274            HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
275        for (piece_index, (record, metadata)) in piece_indices
276            .iter()
277            .copied()
278            .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
279        {
280            pieces_to_download
281                .entry(piece_index)
282                .or_default()
283                .push((record, metadata));
284        }
285        // This map will be mutated, removing piece indices we have already processed
286        let pieces_to_download = AsyncMutex::new(pieces_to_download);
287
288        retry(default_backoff(), || async {
289            let mut pieces_to_download = pieces_to_download.lock().await;
290
291            if let Err(error) =
292                download_sector_internal(&mut pieces_to_download, piece_getter, kzg, erasure_coding)
293                    .await
294            {
295                warn!(
296                    %sector_index,
297                    %error,
298                    %pieces_in_sector,
299                    remaining_pieces = %pieces_to_download.len(),
300                    "Sector downloading attempt failed, will retry later"
301                );
302
303                return Err(BackoffError::transient(error));
304            }
305
306            debug!(%sector_index, "Sector downloaded successfully");
307
308            Ok(())
309        })
310        .await?;
311
312        raw_sector
313    };
314
315    Ok(DownloadedSector {
316        sector_id,
317        piece_indices,
318        raw_sector,
319        history_size: farmer_protocol_info.history_size,
320    })
321}
322
323/// Records encoder for plotting purposes
324pub trait RecordsEncoder {
325    /// Encode provided sector records
326    fn encode_records(
327        &mut self,
328        sector_id: &SectorId,
329        records: &mut [Record],
330        abort_early: &AtomicBool,
331    ) -> anyhow::Result<SectorContentsMap>;
332}
333
334/// CPU implementation of [`RecordsEncoder`]
335#[derive(Debug)]
336pub struct CpuRecordsEncoder<'a, PosTable>
337where
338    PosTable: Table,
339{
340    table_generators: &'a mut [PosTable::Generator],
341    erasure_coding: &'a ErasureCoding,
342    global_mutex: &'a AsyncMutex<()>,
343}
344
345impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
346where
347    PosTable: Table,
348{
349    fn encode_records(
350        &mut self,
351        sector_id: &SectorId,
352        records: &mut [Record],
353        abort_early: &AtomicBool,
354    ) -> anyhow::Result<SectorContentsMap> {
355        if self.erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
356            return Err(anyhow::anyhow!(
357                "Invalid erasure coding instance: {} shards needed, {} supported",
358                Record::NUM_S_BUCKETS,
359                self.erasure_coding.max_shards()
360            ));
361        }
362
363        if self.table_generators.is_empty() {
364            return Err(anyhow::anyhow!("No table generators"));
365        }
366
367        let pieces_in_sector = records
368            .len()
369            .try_into()
370            .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
371        let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
372
373        {
374            let table_generators = &mut *self.table_generators;
375            let global_mutex = self.global_mutex;
376            let erasure_coding = self.erasure_coding;
377
378            let iter = Mutex::new(
379                (PieceOffset::ZERO..)
380                    .zip(records.iter_mut())
381                    .zip(sector_contents_map.iter_record_bitfields_mut()),
382            );
383
384            rayon::scope(|scope| {
385                for table_generator in table_generators {
386                    scope.spawn(|_scope| {
387                        let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS);
388
389                        loop {
390                            // Take mutex briefly to make sure encoding is allowed right now
391                            global_mutex.lock_blocking();
392
393                            // This instead of `while` above because otherwise mutex will be held
394                            // for the duration of the loop and will limit concurrency to 1 record
395                            let Some(((piece_offset, record), encoded_chunks_used)) =
396                                iter.lock().next()
397                            else {
398                                return;
399                            };
400                            let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
401
402                            record_encoding::<PosTable>(
403                                &pos_seed,
404                                record,
405                                encoded_chunks_used,
406                                table_generator,
407                                erasure_coding,
408                                &mut chunks_scratch,
409                            );
410
411                            if abort_early.load(Ordering::Relaxed) {
412                                return;
413                            }
414                        }
415                    });
416                }
417            });
418        }
419
420        Ok(sector_contents_map)
421    }
422}
423
424impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
425where
426    PosTable: Table,
427{
428    /// Create new instance
429    pub fn new(
430        table_generators: &'a mut [PosTable::Generator],
431        erasure_coding: &'a ErasureCoding,
432        global_mutex: &'a AsyncMutex<()>,
433    ) -> Self {
434        Self {
435            table_generators,
436            erasure_coding,
437            global_mutex,
438        }
439    }
440}
441
442/// Options for encoding a sector.
443///
444/// Sector output and sector metadata output should be either empty (in which case they'll be
445/// resized to correct size automatically) or correctly sized from the beginning or else error will
446/// be returned.
447#[derive(Debug)]
448pub struct EncodeSectorOptions<'a, RE>
449where
450    RE: RecordsEncoder,
451{
452    /// Sector index
453    pub sector_index: SectorIndex,
454    /// Records encoding instance
455    pub records_encoder: &'a mut RE,
456    /// Whether encoding should be aborted early
457    pub abort_early: &'a AtomicBool,
458}
459
460/// Mostly opaque sector encoding result ready for writing
461#[derive(Debug)]
462pub struct EncodedSector {
463    /// Information about sector that was plotted
464    pub plotted_sector: PlottedSector,
465    raw_sector: RawSector,
466    sector_contents_map: SectorContentsMap,
467}
468
469/// Encode downloaded sector.
470///
471/// This function encodes downloaded sector records and returns sector encoding result that can be
472/// written using [`write_sector`].
473pub fn encode_sector<RE>(
474    downloaded_sector: DownloadedSector,
475    encoding_options: EncodeSectorOptions<'_, RE>,
476) -> Result<EncodedSector, PlottingError>
477where
478    RE: RecordsEncoder,
479{
480    let DownloadedSector {
481        sector_id,
482        piece_indices,
483        mut raw_sector,
484        history_size,
485    } = downloaded_sector;
486    let EncodeSectorOptions {
487        sector_index,
488        records_encoder,
489        abort_early,
490    } = encoding_options;
491
492    let pieces_in_sector = raw_sector.records.len().try_into().expect(
493        "Raw sector can only be created in this crate and it is always done correctly; qed",
494    );
495
496    let sector_contents_map = records_encoder
497        .encode_records(&sector_id, &mut raw_sector.records, abort_early)
498        .map_err(|error| PlottingError::RecordsEncoderError { error })?;
499
500    let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
501        sector_index,
502        pieces_in_sector,
503        s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
504        history_size,
505    });
506
507    Ok(EncodedSector {
508        plotted_sector: PlottedSector {
509            sector_id,
510            sector_index,
511            sector_metadata,
512            piece_indexes: piece_indices,
513        },
514        raw_sector,
515        sector_contents_map,
516    })
517}
518
519/// Write encoded sector into sector output
520pub fn write_sector(
521    encoded_sector: &EncodedSector,
522    sector_output: &mut Vec<u8>,
523) -> Result<(), PlottingError> {
524    let EncodedSector {
525        plotted_sector: _,
526        raw_sector,
527        sector_contents_map,
528    } = encoded_sector;
529
530    let pieces_in_sector = raw_sector.records.len().try_into().expect(
531        "Raw sector can only be created in this crate and it is always done correctly; qed",
532    );
533
534    let sector_size = sector_size(pieces_in_sector);
535
536    if !sector_output.is_empty() && sector_output.len() != sector_size {
537        return Err(PlottingError::BadSectorOutputSize {
538            provided: sector_output.len(),
539            expected: sector_size,
540        });
541    }
542
543    sector_output.resize(sector_size, 0);
544
545    // Write sector to disk in form of following regions:
546    // * sector contents map
547    // * record chunks as s-buckets
548    // * record metadata
549    // * checksum
550    {
551        let (sector_contents_map_region, remaining_bytes) =
552            sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
553        // Slice remaining memory into belonging to s-buckets and metadata
554        let (s_buckets_region, metadata_region) =
555            remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
556
557        // Write sector contents map so we can decode it later
558        sector_contents_map
559            .encode_into(sector_contents_map_region)
560            .expect("Chunked into correct size above; qed");
561
562        let num_encoded_record_chunks = sector_contents_map.num_encoded_record_chunks();
563        let mut next_encoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
564        let mut next_unencoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
565        // Write record chunks, one s-bucket at a time
566        for ((piece_offset, encoded_chunk_used), output) in (SBucket::ZERO..=SBucket::MAX)
567            .flat_map(|s_bucket| {
568                sector_contents_map
569                    .iter_s_bucket_records(s_bucket)
570                    .expect("S-bucket guaranteed to be in range; qed")
571            })
572            .zip(s_buckets_region.array_chunks_mut::<{ ScalarBytes::FULL_BYTES }>())
573        {
574            let num_encoded_record_chunks =
575                usize::from(num_encoded_record_chunks[usize::from(piece_offset)]);
576            let next_encoded_record_chunks_offset =
577                &mut next_encoded_record_chunks_offset[usize::from(piece_offset)];
578            let next_unencoded_record_chunks_offset =
579                &mut next_unencoded_record_chunks_offset[usize::from(piece_offset)];
580
581            // We know that s-buckets in `raw_sector.records` are stored in order (encoded first,
582            // then unencoded), hence we don't need to calculate the position, we can just store a
583            // few cursors and know the position that way
584            let chunk_position;
585            if encoded_chunk_used {
586                chunk_position = *next_encoded_record_chunks_offset;
587                *next_encoded_record_chunks_offset += 1;
588            } else {
589                chunk_position = num_encoded_record_chunks + *next_unencoded_record_chunks_offset;
590                *next_unencoded_record_chunks_offset += 1;
591            }
592            output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
593        }
594
595        let metadata_chunks =
596            metadata_region.array_chunks_mut::<{ RecordMetadata::encoded_size() }>();
597        for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
598            record_metadata.encode_to(&mut output.as_mut_slice());
599        }
600
601        // It would be more efficient to not re-read the whole sector again, but it makes above code
602        // significantly more convoluted and most likely not worth it
603        let (sector_contents, sector_checksum) =
604            sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
605        sector_checksum.copy_from_slice(blake3_hash_parallel(sector_contents).as_ref());
606    }
607
608    Ok(())
609}
610
611fn record_encoding<PosTable>(
612    pos_seed: &PosSeed,
613    record: &mut Record,
614    mut encoded_chunks_used: EncodedChunksUsed<'_>,
615    table_generator: &mut PosTable::Generator,
616    erasure_coding: &ErasureCoding,
617    chunks_scratch: &mut Vec<[u8; ScalarBytes::FULL_BYTES]>,
618) where
619    PosTable: Table,
620{
621    // Derive PoSpace table
622    let pos_table = table_generator.generate_parallel(pos_seed);
623
624    // Erasure code source record chunks
625    let parity_record_chunks = erasure_coding
626        .extend(
627            &record
628                .iter()
629                .map(|scalar_bytes| {
630                    Scalar::try_from(scalar_bytes).expect(
631                        "Piece getter must returns valid pieces of history that contain \
632                        proper scalar bytes; qed",
633                    )
634                })
635                .collect::<Vec<_>>(),
636        )
637        .expect("Instance was verified to be able to work with this many values earlier; qed")
638        .into_iter()
639        .map(<[u8; ScalarBytes::FULL_BYTES]>::from)
640        .collect::<Vec<_>>();
641    let source_record_chunks = record.to_vec();
642
643    chunks_scratch.clear();
644    // For every erasure coded chunk check if there is proof present, if so then encode
645    // with PoSpace proof bytes and set corresponding `encoded_chunks_used` bit to `true`
646    (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
647        .into_par_iter()
648        .map(SBucket::from)
649        .zip(
650            source_record_chunks
651                .par_iter()
652                .interleave(&parity_record_chunks),
653        )
654        .map(|(s_bucket, record_chunk)| {
655            if let Some(proof) = pos_table.find_proof(s_bucket.into()) {
656                (Simd::from(*record_chunk) ^ Simd::from(*proof.hash())).to_array()
657            } else {
658                // Dummy value indicating no proof
659                [0; ScalarBytes::FULL_BYTES]
660            }
661        })
662        .collect_into_vec(chunks_scratch);
663    let num_successfully_encoded_chunks = chunks_scratch
664        .drain(..)
665        .zip(encoded_chunks_used.iter_mut())
666        .filter_map(|(maybe_encoded_chunk, mut encoded_chunk_used)| {
667            // No proof, see above
668            if maybe_encoded_chunk == [0; ScalarBytes::FULL_BYTES] {
669                None
670            } else {
671                *encoded_chunk_used = true;
672
673                Some(maybe_encoded_chunk)
674            }
675        })
676        // Make sure above filter function (and corresponding `encoded_chunk_used` update)
677        // happen at most as many times as there is number of chunks in the record,
678        // otherwise `n+1` iterations could happen and update extra `encoded_chunk_used`
679        // unnecessarily causing issues down the line
680        .take(record.len())
681        .zip(record.iter_mut())
682        // Write encoded chunk back so we can reuse original allocation
683        .map(|(input_chunk, output_chunk)| {
684            *output_chunk = input_chunk;
685        })
686        .count();
687
688    // In some cases there is not enough PoSpace proofs available, in which case we add
689    // remaining number of unencoded erasure coded record chunks to the end
690    source_record_chunks
691        .iter()
692        .zip(&parity_record_chunks)
693        .flat_map(|(a, b)| [a, b])
694        .zip(encoded_chunks_used.iter())
695        // Skip chunks that were used previously
696        .filter_map(|(record_chunk, encoded_chunk_used)| {
697            if *encoded_chunk_used {
698                None
699            } else {
700                Some(record_chunk)
701            }
702        })
703        // First `num_successfully_encoded_chunks` chunks are encoded
704        .zip(record.iter_mut().skip(num_successfully_encoded_chunks))
705        // Write necessary number of unencoded chunks at the end
706        .for_each(|(input_chunk, output_chunk)| {
707            *output_chunk = *input_chunk;
708        });
709}
710
711async fn download_sector_internal<PG>(
712    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
713    piece_getter: &PG,
714    kzg: &Kzg,
715    erasure_coding: &ErasureCoding,
716) -> Result<(), PlottingError>
717where
718    PG: PieceGetter + Send + Sync,
719{
720    // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
721    //  concurrency from there
722    let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
723
724    // Allocate to decouple lifetime from `pieces_to_download` that will be modified below
725    let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
726    let mut downloaded_pieces = piece_getter
727        .get_pieces(piece_indices)
728        .await
729        .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
730        .fuse();
731    let mut reconstructed_pieces = FuturesUnordered::new();
732
733    let mut final_result = Ok(());
734
735    loop {
736        let (piece_index, result) = select! {
737            (piece_index, result) = downloaded_pieces.select_next_some() => {
738                match result {
739                    Ok(Some(piece)) => (piece_index, Ok(piece)),
740                    Ok(None) => {
741                        trace!(%piece_index, "Piece was not found, trying reconstruction");
742
743                        reconstructed_pieces.push(reconstruct_piece(
744                            piece_index,
745                            recovery_semaphore,
746                            piece_getter,
747                            kzg,
748                            erasure_coding,
749                        ));
750                        continue;
751                    }
752                    Err(error) => {
753                        trace!(
754                            %error,
755                            %piece_index,
756                            "Failed to download piece, trying reconstruction"
757                        );
758
759                        reconstructed_pieces.push(reconstruct_piece(
760                            piece_index,
761                            recovery_semaphore,
762                            piece_getter,
763                            kzg,
764                            erasure_coding,
765                        ));
766                        continue;
767                    }
768                }
769            },
770            (piece_index, result) = reconstructed_pieces.select_next_some() => {
771                (piece_index, result)
772            },
773            complete => {
774                break;
775            }
776        };
777
778        match result {
779            Ok(piece) => {
780                process_piece(piece_index, piece, pieces_to_download);
781            }
782            Err(error) => {
783                trace!(%error, %piece_index, "Failed to download piece");
784
785                if final_result.is_ok() {
786                    final_result = Err(error);
787                }
788            }
789        }
790    }
791
792    if final_result.is_ok() && !pieces_to_download.is_empty() {
793        return Err(PlottingError::FailedToRetrievePieces {
794            error: anyhow::anyhow!(
795                "Successful result, but not all pieces were downloaded, this is likely a piece \
796                getter implementation bug"
797            ),
798        });
799    }
800
801    final_result
802}
803
804async fn reconstruct_piece<PG>(
805    piece_index: PieceIndex,
806    recovery_semaphore: &Semaphore,
807    piece_getter: &PG,
808    kzg: &Kzg,
809    erasure_coding: &ErasureCoding,
810) -> (PieceIndex, Result<Piece, PlottingError>)
811where
812    PG: PieceGetter + Send + Sync,
813{
814    let _permit = recovery_semaphore.acquire().await;
815    let recovered_piece_fut = recover_missing_piece(
816        piece_getter,
817        kzg.clone(),
818        erasure_coding.clone(),
819        piece_index,
820    );
821
822    (
823        piece_index,
824        recovered_piece_fut
825            .await
826            .map_err(|error| PlottingError::PieceRecoveryFailed {
827                piece_index,
828                error: error.into(),
829            }),
830    )
831}
832
833fn process_piece(
834    piece_index: PieceIndex,
835    piece: Piece,
836    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
837) {
838    for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
839        // Fancy way to insert value in order to avoid going through stack (if naive
840        // de-referencing is used) and potentially causing stack overflow as the
841        // result
842        record
843            .as_flattened_mut()
844            .copy_from_slice(piece.record().as_flattened());
845        *metadata = RecordMetadata {
846            commitment: *piece.commitment(),
847            witness: *piece.witness(),
848            piece_checksum: blake3_hash(piece.as_ref()),
849        };
850    }
851}