subspace_farmer_components/
lib.rs#![feature(
array_chunks,
const_trait_impl,
exact_size_is_empty,
int_roundings,
iter_collect_into,
never_type,
new_zeroed_alloc,
portable_simd,
try_blocks
)]
#![warn(rust_2018_idioms, missing_debug_implementations, missing_docs)]
pub mod auditing;
pub mod file_ext;
pub mod plotting;
pub mod proving;
pub mod reading;
pub mod sector;
mod segment_reconstruction;
use crate::file_ext::FileExt;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::Stream;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::fs::File;
use std::future::Future;
use std::io;
use std::sync::Arc;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_core_primitives::segments::{ArchivedHistorySegment, HistorySize};
#[async_trait]
pub trait PieceGetter {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a;
}
#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
self.as_ref().get_piece(piece_index).await
}
async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
self.as_ref().get_pieces(piece_indices).await
}
}
#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let position = usize::try_from(u64::from(piece_index))?;
Ok(self.pieces().nth(position))
}
async fn get_pieces<'a, PieceIndices>(
&'a self,
piece_indices: PieceIndices,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
Ok(Box::new(
piece_indices
.into_iter()
.map(|piece_index| async move {
let result = self.get_piece(piece_index).await;
(piece_index, result)
})
.collect::<FuturesUnordered<_>>(),
) as Box<_>)
}
}
#[derive(Debug, Copy, Clone)]
pub enum ReadAt<S, A>
where
S: ReadAtSync,
A: ReadAtAsync,
{
Sync(S),
Async(A),
}
impl<S> ReadAt<S, !>
where
S: ReadAtSync,
{
pub fn from_sync(value: S) -> Self {
Self::Sync(value)
}
}
impl<A> ReadAt<!, A>
where
A: ReadAtAsync,
{
pub fn from_async(value: A) -> Self {
Self::Async(value)
}
}
pub trait ReadAtSync: Send + Sync {
fn offset(&self, offset: u64) -> ReadAtOffset<'_, Self>
where
Self: Sized,
{
ReadAtOffset {
inner: self,
offset,
}
}
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()>;
}
impl ReadAtSync for ! {
fn read_at(&self, _buf: &mut [u8], _offset: u64) -> io::Result<()> {
unreachable!("Is never called")
}
}
#[repr(transparent)]
#[derive(Debug)]
pub struct AsyncReadBytes<B>(B)
where
B: AsMut<[u8]> + Unpin + 'static;
impl From<Vec<u8>> for AsyncReadBytes<Vec<u8>> {
fn from(value: Vec<u8>) -> Self {
Self(value)
}
}
impl From<Box<[u8]>> for AsyncReadBytes<Box<[u8]>> {
fn from(value: Box<[u8]>) -> Self {
Self(value)
}
}
impl<B> AsMut<[u8]> for AsyncReadBytes<B>
where
B: AsMut<[u8]> + Unpin + 'static,
{
fn as_mut(&mut self) -> &mut [u8] {
self.0.as_mut()
}
}
impl<B> AsyncReadBytes<B>
where
B: AsMut<[u8]> + Unpin + 'static,
{
pub fn into_inner(self) -> B {
self.0
}
}
pub trait ReadAtAsync {
fn offset(&self, offset: u64) -> ReadAtOffset<'_, Self>
where
Self: Sized,
{
ReadAtOffset {
inner: self,
offset,
}
}
fn read_at<B>(&self, buf: B, offset: u64) -> impl Future<Output = io::Result<B>>
where
AsyncReadBytes<B>: From<B>,
B: AsMut<[u8]> + Unpin + 'static;
}
impl ReadAtAsync for ! {
async fn read_at<B>(&self, _buf: B, _offset: u64) -> io::Result<B>
where
AsyncReadBytes<B>: From<B>,
B: AsMut<[u8]> + Unpin + 'static,
{
unreachable!("Is never called")
}
}
impl ReadAtSync for [u8] {
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
if buf.len() as u64 + offset > self.len() as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Buffer length with offset exceeds own length",
));
}
buf.copy_from_slice(&self[offset as usize..][..buf.len()]);
Ok(())
}
}
impl ReadAtSync for &[u8] {
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
if buf.len() as u64 + offset > self.len() as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Buffer length with offset exceeds own length",
));
}
buf.copy_from_slice(&self[offset as usize..][..buf.len()]);
Ok(())
}
}
impl ReadAtSync for Vec<u8> {
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
self.as_slice().read_at(buf, offset)
}
}
impl ReadAtSync for &Vec<u8> {
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
self.as_slice().read_at(buf, offset)
}
}
impl ReadAtSync for File {
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
self.read_exact_at(buf, offset)
}
}
impl ReadAtSync for &File {
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
self.read_exact_at(buf, offset)
}
}
#[derive(Debug, Copy, Clone)]
pub struct ReadAtOffset<'a, T> {
inner: &'a T,
offset: u64,
}
impl<T> ReadAtSync for ReadAtOffset<'_, T>
where
T: ReadAtSync,
{
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
self.inner.read_at(buf, offset + self.offset)
}
}
impl<T> ReadAtSync for &ReadAtOffset<'_, T>
where
T: ReadAtSync,
{
fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
self.inner.read_at(buf, offset + self.offset)
}
}
impl<T> ReadAtAsync for ReadAtOffset<'_, T>
where
T: ReadAtAsync,
{
async fn read_at<B>(&self, buf: B, offset: u64) -> io::Result<B>
where
AsyncReadBytes<B>: From<B>,
B: AsMut<[u8]> + Unpin + 'static,
{
self.inner.read_at(buf, offset + self.offset).await
}
}
impl<T> ReadAtAsync for &ReadAtOffset<'_, T>
where
T: ReadAtAsync,
{
async fn read_at<B>(&self, buf: B, offset: u64) -> io::Result<B>
where
AsyncReadBytes<B>: From<B>,
B: AsMut<[u8]> + Unpin + 'static,
{
self.inner.read_at(buf, offset + self.offset).await
}
}
const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
#[derive(Debug, Copy, Clone, Encode, Decode, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FarmerProtocolInfo {
pub history_size: HistorySize,
pub max_pieces_in_sector: u16,
pub recent_segments: HistorySize,
pub recent_history_fraction: (HistorySize, HistorySize),
pub min_sector_lifetime: HistorySize,
}