1use crate::sector::{
10 sector_record_chunks_size, sector_size, EncodedChunksUsed, RawSector, RecordMetadata,
11 SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
12};
13use crate::segment_reconstruction::recover_missing_piece;
14use crate::FarmerProtocolInfo;
15use async_lock::{Mutex as AsyncMutex, Semaphore};
16use backoff::future::retry;
17use backoff::{Error as BackoffError, ExponentialBackoff};
18use futures::stream::FuturesUnordered;
19use futures::{select, StreamExt};
20use parity_scale_codec::{Decode, Encode};
21use parking_lot::Mutex;
22use rayon::prelude::*;
23use std::collections::HashMap;
24use std::simd::Simd;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::Arc;
27use std::time::Duration;
28use subspace_core_primitives::hashes::{blake3_hash, blake3_hash_parallel, Blake3Hash};
29use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record};
30use subspace_core_primitives::pos::PosSeed;
31use subspace_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
32use subspace_core_primitives::segments::HistorySize;
33use subspace_core_primitives::{PublicKey, ScalarBytes};
34use subspace_data_retrieval::piece_getter::PieceGetter;
35use subspace_erasure_coding::ErasureCoding;
36use subspace_kzg::{Kzg, Scalar};
37use subspace_proof_of_space::{Table, TableGenerator};
38use thiserror::Error;
39use tracing::{debug, trace, warn};
40
41const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
42
43fn default_backoff() -> ExponentialBackoff {
44 ExponentialBackoff {
45 initial_interval: Duration::from_secs(15),
46 max_interval: Duration::from_secs(10 * 60),
47 max_elapsed_time: None,
49 ..ExponentialBackoff::default()
50 }
51}
52
53#[derive(Debug, Clone, Encode, Decode)]
55pub struct PlottedSector {
56 pub sector_id: SectorId,
58 pub sector_index: SectorIndex,
60 pub sector_metadata: SectorMetadataChecksummed,
62 pub piece_indexes: Vec<PieceIndex>,
64}
65
66#[derive(Debug, Error)]
68pub enum PlottingError {
69 #[error("Records encoder error: {error}")]
71 RecordsEncoderError {
72 error: anyhow::Error,
74 },
75 #[error("Bad sector output size: provided {provided}, expected {expected}")]
77 BadSectorOutputSize {
78 provided: usize,
80 expected: usize,
82 },
83 #[error("Can't recover missing piece {piece_index}: {error}")]
85 PieceRecoveryFailed {
86 piece_index: PieceIndex,
88 error: anyhow::Error,
90 },
91 #[error("Failed to retrieve pieces: {error}")]
93 FailedToRetrievePieces {
94 error: anyhow::Error,
96 },
97 #[error("Abort early")]
99 AbortEarly,
100}
101
102#[derive(Debug)]
108pub struct PlotSectorOptions<'a, RE, PG> {
109 pub public_key: &'a PublicKey,
111 pub sector_index: SectorIndex,
113 pub piece_getter: &'a PG,
115 pub farmer_protocol_info: FarmerProtocolInfo,
117 pub kzg: &'a Kzg,
119 pub erasure_coding: &'a ErasureCoding,
121 pub pieces_in_sector: u16,
123 pub sector_output: &'a mut Vec<u8>,
126 pub downloading_semaphore: Option<Arc<Semaphore>>,
129 pub encoding_semaphore: Option<&'a Semaphore>,
132 pub records_encoder: &'a mut RE,
134 pub abort_early: &'a AtomicBool,
136}
137
138pub async fn plot_sector<RE, PG>(
145 options: PlotSectorOptions<'_, RE, PG>,
146) -> Result<PlottedSector, PlottingError>
147where
148 RE: RecordsEncoder,
149 PG: PieceGetter + Send + Sync,
150{
151 let PlotSectorOptions {
152 public_key,
153 sector_index,
154 piece_getter,
155 farmer_protocol_info,
156 kzg,
157 erasure_coding,
158 pieces_in_sector,
159 sector_output,
160 downloading_semaphore,
161 encoding_semaphore,
162 records_encoder,
163 abort_early,
164 } = options;
165
166 let _downloading_permit = match downloading_semaphore {
167 Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
168 None => None,
169 };
170
171 let download_sector_fut = download_sector(DownloadSectorOptions {
172 public_key,
173 sector_index,
174 piece_getter,
175 farmer_protocol_info,
176 kzg,
177 erasure_coding,
178 pieces_in_sector,
179 });
180
181 let _encoding_permit = match encoding_semaphore {
182 Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
183 None => None,
184 };
185
186 let encoded_sector = encode_sector(
187 download_sector_fut.await?,
188 EncodeSectorOptions::<RE> {
189 sector_index,
190 records_encoder,
191 abort_early,
192 },
193 )?;
194
195 if abort_early.load(Ordering::Acquire) {
196 return Err(PlottingError::AbortEarly);
197 }
198
199 write_sector(&encoded_sector, sector_output)?;
200
201 Ok(encoded_sector.plotted_sector)
202}
203
204#[derive(Debug)]
206pub struct DownloadedSector {
207 sector_id: SectorId,
208 piece_indices: Vec<PieceIndex>,
209 raw_sector: RawSector,
210 history_size: HistorySize,
211}
212
213#[derive(Debug)]
215pub struct DownloadSectorOptions<'a, PG> {
216 pub public_key: &'a PublicKey,
218 pub sector_index: SectorIndex,
220 pub piece_getter: &'a PG,
222 pub farmer_protocol_info: FarmerProtocolInfo,
224 pub kzg: &'a Kzg,
226 pub erasure_coding: &'a ErasureCoding,
228 pub pieces_in_sector: u16,
230}
231
232pub async fn download_sector<PG>(
237 options: DownloadSectorOptions<'_, PG>,
238) -> Result<DownloadedSector, PlottingError>
239where
240 PG: PieceGetter + Send + Sync,
241{
242 let DownloadSectorOptions {
243 public_key,
244 sector_index,
245 piece_getter,
246 farmer_protocol_info,
247 kzg,
248 erasure_coding,
249 pieces_in_sector,
250 } = options;
251
252 let sector_id = SectorId::new(
253 public_key.hash(),
254 sector_index,
255 farmer_protocol_info.history_size,
256 );
257
258 let piece_indices = (PieceOffset::ZERO..)
259 .take(pieces_in_sector.into())
260 .map(|piece_offset| {
261 sector_id.derive_piece_index(
262 piece_offset,
263 farmer_protocol_info.history_size,
264 farmer_protocol_info.max_pieces_in_sector,
265 farmer_protocol_info.recent_segments,
266 farmer_protocol_info.recent_history_fraction,
267 )
268 })
269 .collect::<Vec<_>>();
270
271 let raw_sector = {
272 let mut raw_sector = RawSector::new(pieces_in_sector);
273 let mut pieces_to_download =
274 HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
275 for (piece_index, (record, metadata)) in piece_indices
276 .iter()
277 .copied()
278 .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
279 {
280 pieces_to_download
281 .entry(piece_index)
282 .or_default()
283 .push((record, metadata));
284 }
285 let pieces_to_download = AsyncMutex::new(pieces_to_download);
287
288 retry(default_backoff(), || async {
289 let mut pieces_to_download = pieces_to_download.lock().await;
290
291 if let Err(error) =
292 download_sector_internal(&mut pieces_to_download, piece_getter, kzg, erasure_coding)
293 .await
294 {
295 warn!(
296 %sector_index,
297 %error,
298 %pieces_in_sector,
299 remaining_pieces = %pieces_to_download.len(),
300 "Sector downloading attempt failed, will retry later"
301 );
302
303 return Err(BackoffError::transient(error));
304 }
305
306 debug!(%sector_index, "Sector downloaded successfully");
307
308 Ok(())
309 })
310 .await?;
311
312 raw_sector
313 };
314
315 Ok(DownloadedSector {
316 sector_id,
317 piece_indices,
318 raw_sector,
319 history_size: farmer_protocol_info.history_size,
320 })
321}
322
323pub trait RecordsEncoder {
325 fn encode_records(
327 &mut self,
328 sector_id: &SectorId,
329 records: &mut [Record],
330 abort_early: &AtomicBool,
331 ) -> anyhow::Result<SectorContentsMap>;
332}
333
334#[derive(Debug)]
336pub struct CpuRecordsEncoder<'a, PosTable>
337where
338 PosTable: Table,
339{
340 table_generators: &'a mut [PosTable::Generator],
341 erasure_coding: &'a ErasureCoding,
342 global_mutex: &'a AsyncMutex<()>,
343}
344
345impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
346where
347 PosTable: Table,
348{
349 fn encode_records(
350 &mut self,
351 sector_id: &SectorId,
352 records: &mut [Record],
353 abort_early: &AtomicBool,
354 ) -> anyhow::Result<SectorContentsMap> {
355 if self.erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
356 return Err(anyhow::anyhow!(
357 "Invalid erasure coding instance: {} shards needed, {} supported",
358 Record::NUM_S_BUCKETS,
359 self.erasure_coding.max_shards()
360 ));
361 }
362
363 if self.table_generators.is_empty() {
364 return Err(anyhow::anyhow!("No table generators"));
365 }
366
367 let pieces_in_sector = records
368 .len()
369 .try_into()
370 .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
371 let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
372
373 {
374 let table_generators = &mut *self.table_generators;
375 let global_mutex = self.global_mutex;
376 let erasure_coding = self.erasure_coding;
377
378 let iter = Mutex::new(
379 (PieceOffset::ZERO..)
380 .zip(records.iter_mut())
381 .zip(sector_contents_map.iter_record_bitfields_mut()),
382 );
383
384 rayon::scope(|scope| {
385 for table_generator in table_generators {
386 scope.spawn(|_scope| {
387 let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS);
388
389 loop {
390 global_mutex.lock_blocking();
392
393 let Some(((piece_offset, record), encoded_chunks_used)) =
396 iter.lock().next()
397 else {
398 return;
399 };
400 let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
401
402 record_encoding::<PosTable>(
403 &pos_seed,
404 record,
405 encoded_chunks_used,
406 table_generator,
407 erasure_coding,
408 &mut chunks_scratch,
409 );
410
411 if abort_early.load(Ordering::Relaxed) {
412 return;
413 }
414 }
415 });
416 }
417 });
418 }
419
420 Ok(sector_contents_map)
421 }
422}
423
424impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
425where
426 PosTable: Table,
427{
428 pub fn new(
430 table_generators: &'a mut [PosTable::Generator],
431 erasure_coding: &'a ErasureCoding,
432 global_mutex: &'a AsyncMutex<()>,
433 ) -> Self {
434 Self {
435 table_generators,
436 erasure_coding,
437 global_mutex,
438 }
439 }
440}
441
442#[derive(Debug)]
448pub struct EncodeSectorOptions<'a, RE>
449where
450 RE: RecordsEncoder,
451{
452 pub sector_index: SectorIndex,
454 pub records_encoder: &'a mut RE,
456 pub abort_early: &'a AtomicBool,
458}
459
460#[derive(Debug)]
462pub struct EncodedSector {
463 pub plotted_sector: PlottedSector,
465 raw_sector: RawSector,
466 sector_contents_map: SectorContentsMap,
467}
468
469pub fn encode_sector<RE>(
474 downloaded_sector: DownloadedSector,
475 encoding_options: EncodeSectorOptions<'_, RE>,
476) -> Result<EncodedSector, PlottingError>
477where
478 RE: RecordsEncoder,
479{
480 let DownloadedSector {
481 sector_id,
482 piece_indices,
483 mut raw_sector,
484 history_size,
485 } = downloaded_sector;
486 let EncodeSectorOptions {
487 sector_index,
488 records_encoder,
489 abort_early,
490 } = encoding_options;
491
492 let pieces_in_sector = raw_sector.records.len().try_into().expect(
493 "Raw sector can only be created in this crate and it is always done correctly; qed",
494 );
495
496 let sector_contents_map = records_encoder
497 .encode_records(§or_id, &mut raw_sector.records, abort_early)
498 .map_err(|error| PlottingError::RecordsEncoderError { error })?;
499
500 let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
501 sector_index,
502 pieces_in_sector,
503 s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
504 history_size,
505 });
506
507 Ok(EncodedSector {
508 plotted_sector: PlottedSector {
509 sector_id,
510 sector_index,
511 sector_metadata,
512 piece_indexes: piece_indices,
513 },
514 raw_sector,
515 sector_contents_map,
516 })
517}
518
519pub fn write_sector(
521 encoded_sector: &EncodedSector,
522 sector_output: &mut Vec<u8>,
523) -> Result<(), PlottingError> {
524 let EncodedSector {
525 plotted_sector: _,
526 raw_sector,
527 sector_contents_map,
528 } = encoded_sector;
529
530 let pieces_in_sector = raw_sector.records.len().try_into().expect(
531 "Raw sector can only be created in this crate and it is always done correctly; qed",
532 );
533
534 let sector_size = sector_size(pieces_in_sector);
535
536 if !sector_output.is_empty() && sector_output.len() != sector_size {
537 return Err(PlottingError::BadSectorOutputSize {
538 provided: sector_output.len(),
539 expected: sector_size,
540 });
541 }
542
543 sector_output.resize(sector_size, 0);
544
545 {
551 let (sector_contents_map_region, remaining_bytes) =
552 sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
553 let (s_buckets_region, metadata_region) =
555 remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
556
557 sector_contents_map
559 .encode_into(sector_contents_map_region)
560 .expect("Chunked into correct size above; qed");
561
562 let num_encoded_record_chunks = sector_contents_map.num_encoded_record_chunks();
563 let mut next_encoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
564 let mut next_unencoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
565 for ((piece_offset, encoded_chunk_used), output) in (SBucket::ZERO..=SBucket::MAX)
567 .flat_map(|s_bucket| {
568 sector_contents_map
569 .iter_s_bucket_records(s_bucket)
570 .expect("S-bucket guaranteed to be in range; qed")
571 })
572 .zip(s_buckets_region.array_chunks_mut::<{ ScalarBytes::FULL_BYTES }>())
573 {
574 let num_encoded_record_chunks =
575 usize::from(num_encoded_record_chunks[usize::from(piece_offset)]);
576 let next_encoded_record_chunks_offset =
577 &mut next_encoded_record_chunks_offset[usize::from(piece_offset)];
578 let next_unencoded_record_chunks_offset =
579 &mut next_unencoded_record_chunks_offset[usize::from(piece_offset)];
580
581 let chunk_position;
585 if encoded_chunk_used {
586 chunk_position = *next_encoded_record_chunks_offset;
587 *next_encoded_record_chunks_offset += 1;
588 } else {
589 chunk_position = num_encoded_record_chunks + *next_unencoded_record_chunks_offset;
590 *next_unencoded_record_chunks_offset += 1;
591 }
592 output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
593 }
594
595 let metadata_chunks =
596 metadata_region.array_chunks_mut::<{ RecordMetadata::encoded_size() }>();
597 for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
598 record_metadata.encode_to(&mut output.as_mut_slice());
599 }
600
601 let (sector_contents, sector_checksum) =
604 sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
605 sector_checksum.copy_from_slice(blake3_hash_parallel(sector_contents).as_ref());
606 }
607
608 Ok(())
609}
610
611fn record_encoding<PosTable>(
612 pos_seed: &PosSeed,
613 record: &mut Record,
614 mut encoded_chunks_used: EncodedChunksUsed<'_>,
615 table_generator: &mut PosTable::Generator,
616 erasure_coding: &ErasureCoding,
617 chunks_scratch: &mut Vec<[u8; ScalarBytes::FULL_BYTES]>,
618) where
619 PosTable: Table,
620{
621 let pos_table = table_generator.generate_parallel(pos_seed);
623
624 let parity_record_chunks = erasure_coding
626 .extend(
627 &record
628 .iter()
629 .map(|scalar_bytes| {
630 Scalar::try_from(scalar_bytes).expect(
631 "Piece getter must returns valid pieces of history that contain \
632 proper scalar bytes; qed",
633 )
634 })
635 .collect::<Vec<_>>(),
636 )
637 .expect("Instance was verified to be able to work with this many values earlier; qed")
638 .into_iter()
639 .map(<[u8; ScalarBytes::FULL_BYTES]>::from)
640 .collect::<Vec<_>>();
641 let source_record_chunks = record.to_vec();
642
643 chunks_scratch.clear();
644 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
647 .into_par_iter()
648 .map(SBucket::from)
649 .zip(
650 source_record_chunks
651 .par_iter()
652 .interleave(&parity_record_chunks),
653 )
654 .map(|(s_bucket, record_chunk)| {
655 if let Some(proof) = pos_table.find_proof(s_bucket.into()) {
656 (Simd::from(*record_chunk) ^ Simd::from(*proof.hash())).to_array()
657 } else {
658 [0; ScalarBytes::FULL_BYTES]
660 }
661 })
662 .collect_into_vec(chunks_scratch);
663 let num_successfully_encoded_chunks = chunks_scratch
664 .drain(..)
665 .zip(encoded_chunks_used.iter_mut())
666 .filter_map(|(maybe_encoded_chunk, mut encoded_chunk_used)| {
667 if maybe_encoded_chunk == [0; ScalarBytes::FULL_BYTES] {
669 None
670 } else {
671 *encoded_chunk_used = true;
672
673 Some(maybe_encoded_chunk)
674 }
675 })
676 .take(record.len())
681 .zip(record.iter_mut())
682 .map(|(input_chunk, output_chunk)| {
684 *output_chunk = input_chunk;
685 })
686 .count();
687
688 source_record_chunks
691 .iter()
692 .zip(&parity_record_chunks)
693 .flat_map(|(a, b)| [a, b])
694 .zip(encoded_chunks_used.iter())
695 .filter_map(|(record_chunk, encoded_chunk_used)| {
697 if *encoded_chunk_used {
698 None
699 } else {
700 Some(record_chunk)
701 }
702 })
703 .zip(record.iter_mut().skip(num_successfully_encoded_chunks))
705 .for_each(|(input_chunk, output_chunk)| {
707 *output_chunk = *input_chunk;
708 });
709}
710
711async fn download_sector_internal<PG>(
712 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
713 piece_getter: &PG,
714 kzg: &Kzg,
715 erasure_coding: &ErasureCoding,
716) -> Result<(), PlottingError>
717where
718 PG: PieceGetter + Send + Sync,
719{
720 let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
723
724 let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
726 let mut downloaded_pieces = piece_getter
727 .get_pieces(piece_indices)
728 .await
729 .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
730 .fuse();
731 let mut reconstructed_pieces = FuturesUnordered::new();
732
733 let mut final_result = Ok(());
734
735 loop {
736 let (piece_index, result) = select! {
737 (piece_index, result) = downloaded_pieces.select_next_some() => {
738 match result {
739 Ok(Some(piece)) => (piece_index, Ok(piece)),
740 Ok(None) => {
741 trace!(%piece_index, "Piece was not found, trying reconstruction");
742
743 reconstructed_pieces.push(reconstruct_piece(
744 piece_index,
745 recovery_semaphore,
746 piece_getter,
747 kzg,
748 erasure_coding,
749 ));
750 continue;
751 }
752 Err(error) => {
753 trace!(
754 %error,
755 %piece_index,
756 "Failed to download piece, trying reconstruction"
757 );
758
759 reconstructed_pieces.push(reconstruct_piece(
760 piece_index,
761 recovery_semaphore,
762 piece_getter,
763 kzg,
764 erasure_coding,
765 ));
766 continue;
767 }
768 }
769 },
770 (piece_index, result) = reconstructed_pieces.select_next_some() => {
771 (piece_index, result)
772 },
773 complete => {
774 break;
775 }
776 };
777
778 match result {
779 Ok(piece) => {
780 process_piece(piece_index, piece, pieces_to_download);
781 }
782 Err(error) => {
783 trace!(%error, %piece_index, "Failed to download piece");
784
785 if final_result.is_ok() {
786 final_result = Err(error);
787 }
788 }
789 }
790 }
791
792 if final_result.is_ok() && !pieces_to_download.is_empty() {
793 return Err(PlottingError::FailedToRetrievePieces {
794 error: anyhow::anyhow!(
795 "Successful result, but not all pieces were downloaded, this is likely a piece \
796 getter implementation bug"
797 ),
798 });
799 }
800
801 final_result
802}
803
804async fn reconstruct_piece<PG>(
805 piece_index: PieceIndex,
806 recovery_semaphore: &Semaphore,
807 piece_getter: &PG,
808 kzg: &Kzg,
809 erasure_coding: &ErasureCoding,
810) -> (PieceIndex, Result<Piece, PlottingError>)
811where
812 PG: PieceGetter + Send + Sync,
813{
814 let _permit = recovery_semaphore.acquire().await;
815 let recovered_piece_fut = recover_missing_piece(
816 piece_getter,
817 kzg.clone(),
818 erasure_coding.clone(),
819 piece_index,
820 );
821
822 (
823 piece_index,
824 recovered_piece_fut
825 .await
826 .map_err(|error| PlottingError::PieceRecoveryFailed {
827 piece_index,
828 error: error.into(),
829 }),
830 )
831}
832
833fn process_piece(
834 piece_index: PieceIndex,
835 piece: Piece,
836 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
837) {
838 for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
839 record
843 .as_flattened_mut()
844 .copy_from_slice(piece.record().as_flattened());
845 *metadata = RecordMetadata {
846 commitment: *piece.commitment(),
847 witness: *piece.witness(),
848 piece_checksum: blake3_hash(piece.as_ref()),
849 };
850 }
851}