subspace_farmer/single_disk_farm/
piece_reader.rs

1//! Piece reader for single disk farm
2
3use crate::farm::{FarmError, PieceReader};
4use crate::single_disk_farm::direct_io_file::DirectIoFile;
5use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
6use async_trait::async_trait;
7use futures::channel::{mpsc, oneshot};
8use futures::{SinkExt, StreamExt};
9use std::collections::HashSet;
10use std::future::Future;
11use std::sync::Arc;
12use subspace_core_primitives::PublicKey;
13use subspace_core_primitives::pieces::{Piece, PieceOffset};
14use subspace_core_primitives::sectors::{SectorId, SectorIndex};
15use subspace_erasure_coding::ErasureCoding;
16use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
17use subspace_farmer_components::sector::{SectorMetadataChecksummed, sector_size};
18use subspace_farmer_components::{ReadAt, ReadAtAsync, ReadAtSync, reading};
19use subspace_proof_of_space::Table;
20use tracing::{error, warn};
21
22#[derive(Debug)]
23struct ReadPieceRequest {
24    sector_index: SectorIndex,
25    piece_offset: PieceOffset,
26    response_sender: oneshot::Sender<Option<Piece>>,
27}
28
29/// Wrapper data structure that can be used to read pieces from single disk farm
30#[derive(Debug, Clone)]
31pub struct DiskPieceReader {
32    read_piece_sender: mpsc::Sender<ReadPieceRequest>,
33}
34
35#[async_trait]
36impl PieceReader for DiskPieceReader {
37    #[inline]
38    async fn read_piece(
39        &self,
40        sector_index: SectorIndex,
41        piece_offset: PieceOffset,
42    ) -> Result<Option<Piece>, FarmError> {
43        Ok(self.read_piece(sector_index, piece_offset).await)
44    }
45}
46
47impl DiskPieceReader {
48    /// Creates new piece reader instance and background future that handles reads internally.
49    ///
50    /// NOTE: Background future is async, but does blocking operations and should be running in
51    /// dedicated thread.
52    #[allow(clippy::too_many_arguments)]
53    pub(super) fn new<PosTable>(
54        public_key: PublicKey,
55        pieces_in_sector: u16,
56        plot_file: Arc<DirectIoFile>,
57        sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
58        erasure_coding: ErasureCoding,
59        sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
60        read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
61        global_mutex: Arc<AsyncMutex<()>>,
62    ) -> (Self, impl Future<Output = ()>)
63    where
64        PosTable: Table,
65    {
66        let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
67
68        let reading_fut = async move {
69            read_pieces::<PosTable, _>(
70                public_key,
71                pieces_in_sector,
72                &*plot_file,
73                sectors_metadata,
74                erasure_coding,
75                sectors_being_modified,
76                read_piece_receiver,
77                read_sector_record_chunks_mode,
78                global_mutex,
79            )
80            .await
81        };
82
83        (Self { read_piece_sender }, reading_fut)
84    }
85
86    pub(super) fn close_all_readers(&mut self) {
87        self.read_piece_sender.close_channel();
88    }
89
90    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
91    /// reader was shut down
92    pub async fn read_piece(
93        &self,
94        sector_index: SectorIndex,
95        piece_offset: PieceOffset,
96    ) -> Option<Piece> {
97        let (response_sender, response_receiver) = oneshot::channel();
98        self.read_piece_sender
99            .clone()
100            .send(ReadPieceRequest {
101                sector_index,
102                piece_offset,
103                response_sender,
104            })
105            .await
106            .ok()?;
107        response_receiver.await.ok()?
108    }
109}
110
111#[allow(clippy::too_many_arguments)]
112async fn read_pieces<PosTable, S>(
113    public_key: PublicKey,
114    pieces_in_sector: u16,
115    plot_file: S,
116    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
117    erasure_coding: ErasureCoding,
118    sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
119    mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
120    mode: ReadSectorRecordChunksMode,
121    global_mutex: Arc<AsyncMutex<()>>,
122) where
123    PosTable: Table,
124    S: ReadAtSync,
125{
126    let mut table_generator = PosTable::generator();
127
128    while let Some(read_piece_request) = read_piece_receiver.next().await {
129        let ReadPieceRequest {
130            sector_index,
131            piece_offset,
132            response_sender,
133        } = read_piece_request;
134
135        if response_sender.is_canceled() {
136            continue;
137        }
138
139        let sectors_being_modified = &*sectors_being_modified.read().await;
140
141        if sectors_being_modified.contains(&sector_index) {
142            // Skip sector that is being modified right now
143            continue;
144        }
145
146        let (sector_metadata, sector_count) = {
147            let sectors_metadata = sectors_metadata.read().await;
148
149            let sector_count = sectors_metadata.len() as SectorIndex;
150
151            let sector_metadata = match sectors_metadata.get(sector_index as usize) {
152                Some(sector_metadata) => sector_metadata.clone(),
153                None => {
154                    error!(
155                        %sector_index,
156                        %sector_count,
157                        "Tried to read piece from sector that is not yet plotted"
158                    );
159                    continue;
160                }
161            };
162
163            (sector_metadata, sector_count)
164        };
165
166        // Sector must be plotted
167        if sector_index >= sector_count {
168            warn!(
169                %sector_index,
170                %piece_offset,
171                %sector_count,
172                "Incorrect sector offset"
173            );
174            // Doesn't matter if receiver still cares about it
175            let _ = response_sender.send(None);
176            continue;
177        }
178        // Piece must be within sector
179        if u16::from(piece_offset) >= pieces_in_sector {
180            warn!(
181                %sector_index,
182                %piece_offset,
183                %sector_count,
184                "Incorrect piece offset"
185            );
186            // Doesn't matter if receiver still cares about it
187            let _ = response_sender.send(None);
188            continue;
189        }
190
191        let sector_size = sector_size(pieces_in_sector);
192        let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
193
194        // Take mutex briefly to make sure piece reading is allowed right now
195        global_mutex.lock().await;
196
197        let maybe_piece = read_piece::<PosTable, _, _>(
198            &public_key,
199            piece_offset,
200            &sector_metadata,
201            // TODO: Async
202            &ReadAt::from_sync(&sector),
203            &erasure_coding,
204            mode,
205            &mut table_generator,
206        )
207        .await;
208
209        // Doesn't matter if receiver still cares about it
210        let _ = response_sender.send(maybe_piece);
211    }
212}
213
214async fn read_piece<PosTable, S, A>(
215    public_key: &PublicKey,
216    piece_offset: PieceOffset,
217    sector_metadata: &SectorMetadataChecksummed,
218    sector: &ReadAt<S, A>,
219    erasure_coding: &ErasureCoding,
220    mode: ReadSectorRecordChunksMode,
221    table_generator: &mut PosTable::Generator,
222) -> Option<Piece>
223where
224    PosTable: Table,
225    S: ReadAtSync,
226    A: ReadAtAsync,
227{
228    let sector_index = sector_metadata.sector_index;
229
230    let sector_id = SectorId::new(
231        public_key.hash(),
232        sector_index,
233        sector_metadata.history_size,
234    );
235
236    let piece = match reading::read_piece::<PosTable, _, _>(
237        piece_offset,
238        &sector_id,
239        sector_metadata,
240        sector,
241        erasure_coding,
242        mode,
243        table_generator,
244    )
245    .await
246    {
247        Ok(piece) => piece,
248        Err(error) => {
249            error!(
250                %sector_index,
251                %piece_offset,
252                %error,
253                "Failed to read piece from sector"
254            );
255            return None;
256        }
257    };
258
259    Some(piece)
260}