subspace_farmer/farm/
plotted_pieces.rs

1//! Wrapper for pieces plotted under multiple plots
2
3use crate::farm::{FarmError, PieceReader};
4use async_trait::async_trait;
5use rand::prelude::*;
6use rayon::prelude::*;
7use std::collections::hash_map::Entry;
8use std::collections::HashMap;
9use std::fmt;
10use std::future::Future;
11use std::hash::Hash;
12use std::sync::Arc;
13use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
14use subspace_core_primitives::sectors::SectorIndex;
15use subspace_farmer_components::plotting::PlottedSector;
16use tracing::{trace, warn};
17
18#[derive(Debug)]
19struct DummyReader;
20
21#[async_trait]
22impl PieceReader for DummyReader {
23    #[inline]
24    async fn read_piece(
25        &self,
26        _sector_index: SectorIndex,
27        _piece_offset: PieceOffset,
28    ) -> Result<Option<Piece>, FarmError> {
29        Ok(None)
30    }
31}
32
33#[derive(Debug, Copy, Clone, Eq, PartialEq)]
34struct PieceDetails<FarmIndex> {
35    farm_index: FarmIndex,
36    sector_index: SectorIndex,
37    piece_offset: PieceOffset,
38}
39
40/// Wrapper data structure for pieces plotted under multiple plots
41#[derive(Debug, Default)]
42pub struct PlottedPieces<FarmIndex> {
43    readers: Vec<Arc<dyn PieceReader>>,
44    pieces: HashMap<PieceIndex, Vec<PieceDetails<FarmIndex>>>,
45}
46
47impl<FarmIndex> PlottedPieces<FarmIndex>
48where
49    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
50    usize: From<FarmIndex>,
51{
52    /// Check if piece is known and can be retrieved
53    pub fn contains_piece(&self, piece_index: &PieceIndex) -> bool {
54        self.pieces.contains_key(piece_index)
55    }
56
57    /// Read plotted piece from one of the farms.
58    ///
59    /// If piece doesn't exist `None` is returned, if by the time future is polled piece is no
60    /// longer in the plot, future will resolve with `None`.
61    pub fn read_piece(
62        &self,
63        piece_index: PieceIndex,
64    ) -> Option<impl Future<Output = Option<Piece>> + 'static> {
65        let piece_details = match self.pieces.get(&piece_index) {
66            Some(piece_details) => piece_details
67                .choose(&mut thread_rng())
68                .copied()
69                .expect("Empty lists are not stored in the map; qed"),
70            None => {
71                trace!(
72                    ?piece_index,
73                    "Piece is not stored in any of the local plots"
74                );
75                return None;
76            }
77        };
78        let reader = match self.readers.get(usize::from(piece_details.farm_index)) {
79            Some(reader) => reader.clone(),
80            None => {
81                warn!(
82                    ?piece_index,
83                    ?piece_details,
84                    "No piece reader for associated farm index"
85                );
86                return None;
87            }
88        };
89
90        Some(async move {
91            reader
92                .read_piece(piece_details.sector_index, piece_details.piece_offset)
93                .await
94                .unwrap_or_else(|error| {
95                    warn!(
96                        %error,
97                        %piece_index,
98                        farm_index = ?piece_details.farm_index,
99                        sector_index = piece_details.sector_index,
100                        "Failed to retrieve piece"
101                    );
102                    None
103                })
104        })
105    }
106
107    /// Add new sector to collect plotted pieces
108    pub fn add_sector(&mut self, farm_index: FarmIndex, plotted_sector: &PlottedSector) {
109        for (piece_offset, &piece_index) in
110            (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter())
111        {
112            let piece_details = PieceDetails {
113                farm_index,
114                sector_index: plotted_sector.sector_index,
115                piece_offset,
116            };
117
118            self.pieces
119                .entry(piece_index)
120                .or_default()
121                .push(piece_details);
122        }
123    }
124
125    /// Add old sector from plotted pieces (happens on replotting)
126    pub fn delete_sector(&mut self, farm_index: FarmIndex, plotted_sector: &PlottedSector) {
127        for (piece_offset, &piece_index) in
128            (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter())
129        {
130            let searching_piece_details = PieceDetails {
131                farm_index,
132                sector_index: plotted_sector.sector_index,
133                piece_offset,
134            };
135
136            if let Entry::Occupied(mut entry) = self.pieces.entry(piece_index) {
137                let piece_details = entry.get_mut();
138                if let Some(index) =
139                    piece_details
140                        .iter()
141                        .enumerate()
142                        .find_map(|(index, piece_details)| {
143                            (piece_details == &searching_piece_details).then_some(index)
144                        })
145                {
146                    piece_details.swap_remove(index);
147                }
148
149                // We do not store empty lists
150                if piece_details.is_empty() {
151                    entry.remove_entry();
152                }
153            }
154        }
155    }
156
157    /// Add new farm with corresponding piece reader
158    pub fn add_farm(&mut self, farm_index: FarmIndex, piece_reader: Arc<dyn PieceReader>) {
159        let farm_index = usize::from(farm_index);
160
161        if self.readers.len() <= farm_index {
162            self.readers.resize(farm_index, Arc::new(DummyReader));
163            self.readers.push(piece_reader);
164        } else {
165            self.readers[farm_index] = piece_reader;
166        }
167    }
168
169    /// Add all sectors of the farm
170    pub fn delete_farm(&mut self, farm_index: FarmIndex) {
171        if let Some(reader) = self.readers.get_mut(usize::from(farm_index)) {
172            // Replace reader with a dummy one to maintain farm order
173            *reader = Arc::new(DummyReader);
174
175            let piece_indices_to_remove = self
176                .pieces
177                .par_iter_mut()
178                .filter_map(|(&piece_index, piece_details)| {
179                    piece_details.retain(|piece_details| piece_details.farm_index != farm_index);
180
181                    piece_details.is_empty().then_some(piece_index)
182                })
183                .collect::<Vec<_>>();
184
185            // Remove pieces for which this was the only farm storing them
186            for piece_index in piece_indices_to_remove {
187                self.pieces.remove(&piece_index);
188            }
189        }
190    }
191
192    /// Iterator over all unique piece indices plotted
193    pub fn piece_indices(&self) -> impl Iterator<Item = &PieceIndex> {
194        self.pieces.keys()
195    }
196}