subspace_farmer/farm/
plotted_pieces.rs1use crate::farm::{FarmError, PieceReader};
4use async_trait::async_trait;
5use rand::prelude::*;
6use rayon::prelude::*;
7use std::collections::hash_map::Entry;
8use std::collections::HashMap;
9use std::fmt;
10use std::future::Future;
11use std::hash::Hash;
12use std::sync::Arc;
13use subspace_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
14use subspace_core_primitives::sectors::SectorIndex;
15use subspace_farmer_components::plotting::PlottedSector;
16use tracing::{trace, warn};
17
18#[derive(Debug)]
19struct DummyReader;
20
21#[async_trait]
22impl PieceReader for DummyReader {
23 #[inline]
24 async fn read_piece(
25 &self,
26 _sector_index: SectorIndex,
27 _piece_offset: PieceOffset,
28 ) -> Result<Option<Piece>, FarmError> {
29 Ok(None)
30 }
31}
32
33#[derive(Debug, Copy, Clone, Eq, PartialEq)]
34struct PieceDetails<FarmIndex> {
35 farm_index: FarmIndex,
36 sector_index: SectorIndex,
37 piece_offset: PieceOffset,
38}
39
40#[derive(Debug, Default)]
42pub struct PlottedPieces<FarmIndex> {
43 readers: Vec<Arc<dyn PieceReader>>,
44 pieces: HashMap<PieceIndex, Vec<PieceDetails<FarmIndex>>>,
45}
46
47impl<FarmIndex> PlottedPieces<FarmIndex>
48where
49 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
50 usize: From<FarmIndex>,
51{
52 pub fn contains_piece(&self, piece_index: &PieceIndex) -> bool {
54 self.pieces.contains_key(piece_index)
55 }
56
57 pub fn read_piece(
62 &self,
63 piece_index: PieceIndex,
64 ) -> Option<impl Future<Output = Option<Piece>> + 'static> {
65 let piece_details = match self.pieces.get(&piece_index) {
66 Some(piece_details) => piece_details
67 .choose(&mut thread_rng())
68 .copied()
69 .expect("Empty lists are not stored in the map; qed"),
70 None => {
71 trace!(
72 ?piece_index,
73 "Piece is not stored in any of the local plots"
74 );
75 return None;
76 }
77 };
78 let reader = match self.readers.get(usize::from(piece_details.farm_index)) {
79 Some(reader) => reader.clone(),
80 None => {
81 warn!(
82 ?piece_index,
83 ?piece_details,
84 "No piece reader for associated farm index"
85 );
86 return None;
87 }
88 };
89
90 Some(async move {
91 reader
92 .read_piece(piece_details.sector_index, piece_details.piece_offset)
93 .await
94 .unwrap_or_else(|error| {
95 warn!(
96 %error,
97 %piece_index,
98 farm_index = ?piece_details.farm_index,
99 sector_index = piece_details.sector_index,
100 "Failed to retrieve piece"
101 );
102 None
103 })
104 })
105 }
106
107 pub fn add_sector(&mut self, farm_index: FarmIndex, plotted_sector: &PlottedSector) {
109 for (piece_offset, &piece_index) in
110 (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter())
111 {
112 let piece_details = PieceDetails {
113 farm_index,
114 sector_index: plotted_sector.sector_index,
115 piece_offset,
116 };
117
118 self.pieces
119 .entry(piece_index)
120 .or_default()
121 .push(piece_details);
122 }
123 }
124
125 pub fn delete_sector(&mut self, farm_index: FarmIndex, plotted_sector: &PlottedSector) {
127 for (piece_offset, &piece_index) in
128 (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter())
129 {
130 let searching_piece_details = PieceDetails {
131 farm_index,
132 sector_index: plotted_sector.sector_index,
133 piece_offset,
134 };
135
136 if let Entry::Occupied(mut entry) = self.pieces.entry(piece_index) {
137 let piece_details = entry.get_mut();
138 if let Some(index) =
139 piece_details
140 .iter()
141 .enumerate()
142 .find_map(|(index, piece_details)| {
143 (piece_details == &searching_piece_details).then_some(index)
144 })
145 {
146 piece_details.swap_remove(index);
147 }
148
149 if piece_details.is_empty() {
151 entry.remove_entry();
152 }
153 }
154 }
155 }
156
157 pub fn add_farm(&mut self, farm_index: FarmIndex, piece_reader: Arc<dyn PieceReader>) {
159 let farm_index = usize::from(farm_index);
160
161 if self.readers.len() <= farm_index {
162 self.readers.resize(farm_index, Arc::new(DummyReader));
163 self.readers.push(piece_reader);
164 } else {
165 self.readers[farm_index] = piece_reader;
166 }
167 }
168
169 pub fn delete_farm(&mut self, farm_index: FarmIndex) {
171 if let Some(reader) = self.readers.get_mut(usize::from(farm_index)) {
172 *reader = Arc::new(DummyReader);
174
175 let piece_indices_to_remove = self
176 .pieces
177 .par_iter_mut()
178 .filter_map(|(&piece_index, piece_details)| {
179 piece_details.retain(|piece_details| piece_details.farm_index != farm_index);
180
181 piece_details.is_empty().then_some(piece_index)
182 })
183 .collect::<Vec<_>>();
184
185 for piece_index in piece_indices_to_remove {
187 self.pieces.remove(&piece_index);
188 }
189 }
190 }
191
192 pub fn piece_indices(&self) -> impl Iterator<Item = &PieceIndex> {
194 self.pieces.keys()
195 }
196}