subspace_farmer/single_disk_farm/
piece_reader.rs1use crate::farm::{FarmError, PieceReader};
4use crate::single_disk_farm::direct_io_file::DirectIoFile;
5use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
6use async_trait::async_trait;
7use futures::channel::{mpsc, oneshot};
8use futures::{SinkExt, StreamExt};
9use std::collections::HashSet;
10use std::future::Future;
11use std::sync::Arc;
12use subspace_core_primitives::PublicKey;
13use subspace_core_primitives::pieces::{Piece, PieceOffset};
14use subspace_core_primitives::sectors::{SectorId, SectorIndex};
15use subspace_erasure_coding::ErasureCoding;
16use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
17use subspace_farmer_components::sector::{SectorMetadataChecksummed, sector_size};
18use subspace_farmer_components::{ReadAt, ReadAtAsync, ReadAtSync, reading};
19use subspace_proof_of_space::Table;
20use tracing::{error, warn};
21
22#[derive(Debug)]
23struct ReadPieceRequest {
24 sector_index: SectorIndex,
25 piece_offset: PieceOffset,
26 response_sender: oneshot::Sender<Option<Piece>>,
27}
28
29#[derive(Debug, Clone)]
31pub struct DiskPieceReader {
32 read_piece_sender: mpsc::Sender<ReadPieceRequest>,
33}
34
35#[async_trait]
36impl PieceReader for DiskPieceReader {
37 #[inline]
38 async fn read_piece(
39 &self,
40 sector_index: SectorIndex,
41 piece_offset: PieceOffset,
42 ) -> Result<Option<Piece>, FarmError> {
43 Ok(self.read_piece(sector_index, piece_offset).await)
44 }
45}
46
47impl DiskPieceReader {
48 #[allow(clippy::too_many_arguments)]
53 pub(super) fn new<PosTable>(
54 public_key: PublicKey,
55 pieces_in_sector: u16,
56 plot_file: Arc<DirectIoFile>,
57 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
58 erasure_coding: ErasureCoding,
59 sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
60 read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
61 global_mutex: Arc<AsyncMutex<()>>,
62 ) -> (Self, impl Future<Output = ()>)
63 where
64 PosTable: Table,
65 {
66 let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
67
68 let reading_fut = async move {
69 read_pieces::<PosTable, _>(
70 public_key,
71 pieces_in_sector,
72 &*plot_file,
73 sectors_metadata,
74 erasure_coding,
75 sectors_being_modified,
76 read_piece_receiver,
77 read_sector_record_chunks_mode,
78 global_mutex,
79 )
80 .await
81 };
82
83 (Self { read_piece_sender }, reading_fut)
84 }
85
86 pub(super) fn close_all_readers(&mut self) {
87 self.read_piece_sender.close_channel();
88 }
89
90 pub async fn read_piece(
93 &self,
94 sector_index: SectorIndex,
95 piece_offset: PieceOffset,
96 ) -> Option<Piece> {
97 let (response_sender, response_receiver) = oneshot::channel();
98 self.read_piece_sender
99 .clone()
100 .send(ReadPieceRequest {
101 sector_index,
102 piece_offset,
103 response_sender,
104 })
105 .await
106 .ok()?;
107 response_receiver.await.ok()?
108 }
109}
110
111#[allow(clippy::too_many_arguments)]
112async fn read_pieces<PosTable, S>(
113 public_key: PublicKey,
114 pieces_in_sector: u16,
115 plot_file: S,
116 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
117 erasure_coding: ErasureCoding,
118 sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
119 mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
120 mode: ReadSectorRecordChunksMode,
121 global_mutex: Arc<AsyncMutex<()>>,
122) where
123 PosTable: Table,
124 S: ReadAtSync,
125{
126 let mut table_generator = PosTable::generator();
127
128 while let Some(read_piece_request) = read_piece_receiver.next().await {
129 let ReadPieceRequest {
130 sector_index,
131 piece_offset,
132 response_sender,
133 } = read_piece_request;
134
135 if response_sender.is_canceled() {
136 continue;
137 }
138
139 let sectors_being_modified = &*sectors_being_modified.read().await;
140
141 if sectors_being_modified.contains(§or_index) {
142 continue;
144 }
145
146 let (sector_metadata, sector_count) = {
147 let sectors_metadata = sectors_metadata.read().await;
148
149 let sector_count = sectors_metadata.len() as SectorIndex;
150
151 let sector_metadata = match sectors_metadata.get(sector_index as usize) {
152 Some(sector_metadata) => sector_metadata.clone(),
153 None => {
154 error!(
155 %sector_index,
156 %sector_count,
157 "Tried to read piece from sector that is not yet plotted"
158 );
159 continue;
160 }
161 };
162
163 (sector_metadata, sector_count)
164 };
165
166 if sector_index >= sector_count {
168 warn!(
169 %sector_index,
170 %piece_offset,
171 %sector_count,
172 "Incorrect sector offset"
173 );
174 let _ = response_sender.send(None);
176 continue;
177 }
178 if u16::from(piece_offset) >= pieces_in_sector {
180 warn!(
181 %sector_index,
182 %piece_offset,
183 %sector_count,
184 "Incorrect piece offset"
185 );
186 let _ = response_sender.send(None);
188 continue;
189 }
190
191 let sector_size = sector_size(pieces_in_sector);
192 let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
193
194 global_mutex.lock().await;
196
197 let maybe_piece = read_piece::<PosTable, _, _>(
198 &public_key,
199 piece_offset,
200 §or_metadata,
201 &ReadAt::from_sync(§or),
203 &erasure_coding,
204 mode,
205 &mut table_generator,
206 )
207 .await;
208
209 let _ = response_sender.send(maybe_piece);
211 }
212}
213
214async fn read_piece<PosTable, S, A>(
215 public_key: &PublicKey,
216 piece_offset: PieceOffset,
217 sector_metadata: &SectorMetadataChecksummed,
218 sector: &ReadAt<S, A>,
219 erasure_coding: &ErasureCoding,
220 mode: ReadSectorRecordChunksMode,
221 table_generator: &mut PosTable::Generator,
222) -> Option<Piece>
223where
224 PosTable: Table,
225 S: ReadAtSync,
226 A: ReadAtAsync,
227{
228 let sector_index = sector_metadata.sector_index;
229
230 let sector_id = SectorId::new(
231 public_key.hash(),
232 sector_index,
233 sector_metadata.history_size,
234 );
235
236 let piece = match reading::read_piece::<PosTable, _, _>(
237 piece_offset,
238 §or_id,
239 sector_metadata,
240 sector,
241 erasure_coding,
242 mode,
243 table_generator,
244 )
245 .await
246 {
247 Ok(piece) => piece,
248 Err(error) => {
249 error!(
250 %sector_index,
251 %piece_offset,
252 %error,
253 "Failed to read piece from sector"
254 );
255 return None;
256 }
257 };
258
259 Some(piece)
260}