subspace_farmer/single_disk_farm/
piece_reader.rsuse crate::farm::{FarmError, PieceReader};
use crate::single_disk_farm::direct_io_file::DirectIoFile;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
use std::collections::HashSet;
use std::future::Future;
use std::sync::Arc;
use subspace_core_primitives::pieces::{Piece, PieceOffset};
use subspace_core_primitives::sectors::{SectorId, SectorIndex};
use subspace_core_primitives::PublicKey;
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed};
use subspace_farmer_components::{reading, ReadAt, ReadAtAsync, ReadAtSync};
use subspace_proof_of_space::Table;
use tracing::{error, warn};
#[derive(Debug)]
struct ReadPieceRequest {
sector_index: SectorIndex,
piece_offset: PieceOffset,
response_sender: oneshot::Sender<Option<Piece>>,
}
#[derive(Debug, Clone)]
pub struct DiskPieceReader {
read_piece_sender: mpsc::Sender<ReadPieceRequest>,
}
#[async_trait]
impl PieceReader for DiskPieceReader {
#[inline]
async fn read_piece(
&self,
sector_index: SectorIndex,
piece_offset: PieceOffset,
) -> Result<Option<Piece>, FarmError> {
Ok(self.read_piece(sector_index, piece_offset).await)
}
}
impl DiskPieceReader {
#[allow(clippy::too_many_arguments)]
pub(super) fn new<PosTable>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: Arc<DirectIoFile>,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
global_mutex: Arc<AsyncMutex<()>>,
) -> (Self, impl Future<Output = ()>)
where
PosTable: Table,
{
let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
let reading_fut = async move {
read_pieces::<PosTable, _>(
public_key,
pieces_in_sector,
&*plot_file,
sectors_metadata,
erasure_coding,
sectors_being_modified,
read_piece_receiver,
read_sector_record_chunks_mode,
global_mutex,
)
.await
};
(Self { read_piece_sender }, reading_fut)
}
pub(super) fn close_all_readers(&mut self) {
self.read_piece_sender.close_channel();
}
pub async fn read_piece(
&self,
sector_index: SectorIndex,
piece_offset: PieceOffset,
) -> Option<Piece> {
let (response_sender, response_receiver) = oneshot::channel();
self.read_piece_sender
.clone()
.send(ReadPieceRequest {
sector_index,
piece_offset,
response_sender,
})
.await
.ok()?;
response_receiver.await.ok()?
}
}
#[allow(clippy::too_many_arguments)]
async fn read_pieces<PosTable, S>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: S,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
mode: ReadSectorRecordChunksMode,
global_mutex: Arc<AsyncMutex<()>>,
) where
PosTable: Table,
S: ReadAtSync,
{
let mut table_generator = PosTable::generator();
while let Some(read_piece_request) = read_piece_receiver.next().await {
let ReadPieceRequest {
sector_index,
piece_offset,
response_sender,
} = read_piece_request;
if response_sender.is_canceled() {
continue;
}
let sectors_being_modified = &*sectors_being_modified.read().await;
if sectors_being_modified.contains(§or_index) {
continue;
}
let (sector_metadata, sector_count) = {
let sectors_metadata = sectors_metadata.read().await;
let sector_count = sectors_metadata.len() as SectorIndex;
let sector_metadata = match sectors_metadata.get(sector_index as usize) {
Some(sector_metadata) => sector_metadata.clone(),
None => {
error!(
%sector_index,
%sector_count,
"Tried to read piece from sector that is not yet plotted"
);
continue;
}
};
(sector_metadata, sector_count)
};
if sector_index >= sector_count {
warn!(
%sector_index,
%piece_offset,
%sector_count,
"Incorrect sector offset"
);
let _ = response_sender.send(None);
continue;
}
if u16::from(piece_offset) >= pieces_in_sector {
warn!(
%sector_index,
%piece_offset,
%sector_count,
"Incorrect piece offset"
);
let _ = response_sender.send(None);
continue;
}
let sector_size = sector_size(pieces_in_sector);
let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
global_mutex.lock().await;
let maybe_piece = read_piece::<PosTable, _, _>(
&public_key,
piece_offset,
§or_metadata,
&ReadAt::from_sync(§or),
&erasure_coding,
mode,
&mut table_generator,
)
.await;
let _ = response_sender.send(maybe_piece);
}
}
async fn read_piece<PosTable, S, A>(
public_key: &PublicKey,
piece_offset: PieceOffset,
sector_metadata: &SectorMetadataChecksummed,
sector: &ReadAt<S, A>,
erasure_coding: &ErasureCoding,
mode: ReadSectorRecordChunksMode,
table_generator: &mut PosTable::Generator,
) -> Option<Piece>
where
PosTable: Table,
S: ReadAtSync,
A: ReadAtAsync,
{
let sector_index = sector_metadata.sector_index;
let sector_id = SectorId::new(
public_key.hash(),
sector_index,
sector_metadata.history_size,
);
let piece = match reading::read_piece::<PosTable, _, _>(
piece_offset,
§or_id,
sector_metadata,
sector,
erasure_coding,
mode,
table_generator,
)
.await
{
Ok(piece) => piece,
Err(error) => {
error!(
%sector_index,
%piece_offset,
%error,
"Failed to read piece from sector"
);
return None;
}
};
Some(piece)
}