1use crate::object_fetcher::partial_object::{PartialObject, RawPieceData};
4use crate::object_fetcher::segment_header::{
5 max_segment_header_encoded_size, min_segment_header_encoded_size, MAX_SEGMENT_PADDING,
6};
7use crate::piece_fetcher::download_pieces;
8use crate::piece_getter::PieceGetter;
9use parity_scale_codec::{Compact, CompactLen, Decode};
10use std::sync::Arc;
11use subspace_archiving::archiver::SegmentItem;
12use subspace_core_primitives::hashes::Blake3Hash;
13use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping};
14use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord};
15use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex};
16use tracing::{debug, trace, warn};
17
18mod partial_object;
19mod segment_header;
20#[cfg(test)]
21mod tests;
22
23pub use segment_header::MAX_BLOCK_LENGTH;
24
25#[inline]
42pub fn max_supported_object_length() -> usize {
43 RecordedHistorySegment::SIZE
46 - MAX_SEGMENT_PADDING
47 - 1
48 - 1
49 - max_segment_header_encoded_size()
50 - 1
51 - MAX_ENCODED_LENGTH_SIZE * 2
52}
53
54const MAX_ENCODED_LENGTH_SIZE: usize = 4;
56
57pub type LastPieceCache = (PieceIndex, Piece);
59
60#[derive(Debug, PartialEq, Eq, thiserror::Error)]
62pub enum Error {
63 #[error("Piece index is not a source piece, object: {mapping:?}")]
65 NotSourcePiece { mapping: GlobalObject },
66
67 #[error(
69 "Piece offset is too large, must be less than {}, object: {mapping:?}",
70 RawRecord::SIZE
71 )]
72 PieceOffsetTooLarge { mapping: GlobalObject },
73
74 #[error(
76 "Data length {data_length} exceeds maximum object size {max_object_len} \
77 for object: {mapping:?}"
78 )]
79 ObjectTooLarge {
80 data_length: usize,
81 max_object_len: usize,
82 mapping: GlobalObject,
83 },
84
85 #[error(
87 "Length prefix length {length_prefix_len} exceeds maximum object size {max_object_len} \
88 for object: {mapping:?}"
89 )]
90 LengthPrefixTooLarge {
91 length_prefix_len: usize,
92 max_object_len: usize,
93 mapping: GlobalObject,
94 },
95
96 #[error("Incorrect data hash {data_hash:?} for {data_length} byte object: {mapping:?}")]
98 InvalidDataHash {
99 data_hash: Blake3Hash,
100 data_length: usize,
101 mapping: GlobalObject,
102 #[cfg(test)]
104 data: String,
105 },
106
107 #[error("Getting piece caused an error: {error}, object: {mapping:?}")]
109 PieceGetterError {
110 error: String,
113 mapping: GlobalObject,
114 },
115
116 #[error("Piece {piece_index:?} was not found by piece getter")]
118 PieceNotFound { piece_index: PieceIndex },
119
120 #[error(
122 "Piece offset is inside the segment header, min size of segment header: {}, object: {mapping:?}",
123 min_segment_header_encoded_size(),
124 )]
125 PieceOffsetInSegmentHeader { mapping: GlobalObject },
126
127 #[error("Segment {segment_index:?} data decoding error: {source:?}, object: {mapping:?}")]
129 SegmentDecoding {
130 source: parity_scale_codec::Error,
131 segment_index: SegmentIndex,
132 mapping: GlobalObject,
133 },
134
135 #[error(
137 "Decoding segment {segment_index:?} failed: unknown variant: {segment_variant}, \
138 object: {mapping:?}"
139 )]
140 UnknownSegmentVariant {
141 segment_variant: u8,
142 segment_index: SegmentIndex,
143 mapping: GlobalObject,
144 },
145
146 #[error(
148 "Segment {segment_index:?} has unexpected item, current progress: {segment_progress}, \
149 object: {mapping:?}, item: {segment_item:?}"
150 )]
151 UnexpectedSegmentItem {
152 segment_progress: usize,
153 segment_index: SegmentIndex,
154 segment_item: Box<SegmentItem>,
155 mapping: GlobalObject,
156 },
157
158 #[error(
160 "Segment {segment_index:?} has unexpected item, current progress: {segment_progress}, \
161 object: {mapping:?}, item: {segment_item_variant:?}, item size and data lengths: \
162 {segment_item_lengths:?}"
163 )]
164 UnexpectedSegmentItemVariant {
165 segment_progress: usize,
166 segment_index: SegmentIndex,
167 segment_item_variant: u8,
168 segment_item_lengths: Option<(usize, usize)>,
169 mapping: GlobalObject,
170 },
171
172 #[error(
174 "Invalid object: next source piece: {next_source_piece_index:?}, segment data length: \
175 {segment_data_length:?}, object: {mapping:?}"
176 )]
177 InvalidObject {
178 next_source_piece_index: PieceIndex,
180 segment_data_length: Option<usize>,
182 mapping: GlobalObject,
183 },
184
185 #[error(
187 "Invalid mapping: data length: {object_data_length:?}, next source piece: \
188 {next_source_piece_index:?}, remaining_piece_count: {remaining_piece_count}, object: \
189 {mapping:?}"
190 )]
191 InvalidMapping {
192 next_source_piece_index: PieceIndex,
194 remaining_piece_count: usize,
196 object_data_length: usize,
198 mapping: GlobalObject,
199 },
200}
201
202pub struct ObjectFetcher<PG>
204where
205 PG: PieceGetter + Send + Sync,
206{
207 piece_getter: Arc<PG>,
209
210 max_object_len: usize,
212}
213
214impl<PG> ObjectFetcher<PG>
215where
216 PG: PieceGetter + Send + Sync,
217{
218 pub fn new(piece_getter: Arc<PG>, mut max_object_len: usize) -> Self {
225 if max_object_len > max_supported_object_length() {
226 warn!(
227 max_object_len,
228 max_supported_object_length = ?max_supported_object_length(),
229 "Object fetcher size limit exceeds maximum supported object size, \
230 limiting to implementation-supported size"
231 );
232
233 max_object_len = max_supported_object_length();
234 }
235
236 Self {
237 piece_getter,
238 max_object_len,
239 }
240 }
241
242 pub async fn fetch_objects(
254 &self,
255 mappings: GlobalObjectMapping,
256 ) -> Result<Vec<Vec<u8>>, Error> {
257 let mut objects = Vec::with_capacity(mappings.objects().len());
258 let mut piece_cache = None;
259
260 for &mapping in mappings.objects() {
264 let GlobalObject {
265 piece_index,
266 offset,
267 ..
268 } = mapping;
269
270 if !piece_index.is_source() {
272 debug!(
273 ?mapping,
274 "Invalid piece index for object: must be a source piece",
275 );
276
277 return Err(Error::NotSourcePiece { mapping });
280 }
281
282 if piece_index.source_position() == 0
285 && offset < min_segment_header_encoded_size() as u32
286 {
287 debug!(
288 ?mapping,
289 min_segment_header_encoded_size = ?min_segment_header_encoded_size(),
290 "Invalid offset for object: must not be inside the segment header",
291 );
292
293 return Err(Error::PieceOffsetInSegmentHeader { mapping });
294 }
295
296 if offset >= RawRecord::SIZE as u32 {
297 debug!(
298 ?mapping,
299 RawRecord_SIZE = RawRecord::SIZE,
300 "Invalid piece offset for object: must be less than the size of a raw record",
301 );
302
303 return Err(Error::PieceOffsetTooLarge { mapping });
304 }
305
306 let data = self.fetch_object(mapping, &mut piece_cache).await?;
309
310 objects.push(data);
311 }
312
313 Ok(objects)
314 }
315
316 async fn fetch_object(
329 &self,
330 mapping: GlobalObject,
331 piece_cache: &mut Option<LastPieceCache>,
332 ) -> Result<Vec<u8>, Error> {
333 let GlobalObject {
334 piece_index,
335 offset,
336 ..
337 } = mapping;
338
339 let mut next_source_piece_index = piece_index;
341
342 let mut raw_data = RawPieceData::new_for_first_piece(mapping);
344
345 let piece = self
348 .read_piece(next_source_piece_index, mapping, piece_cache)
349 .await?;
350
351 let piece_data = piece
354 .record()
355 .to_raw_record_chunks()
356 .flatten()
357 .skip(offset as usize)
358 .copied()
359 .collect::<Vec<u8>>();
360
361 raw_data.add_piece_data(next_source_piece_index, piece_data, mapping)?;
362 next_source_piece_index = next_source_piece_index.next_source_index();
363
364 let mut partial_object = if let Some(partial_object) =
366 PartialObject::new_with_padding(&raw_data, self.max_object_len, mapping)?
367 {
368 std::mem::drop(raw_data);
370
371 trace!(
372 %next_source_piece_index,
373 ?mapping,
374 ?partial_object,
375 "Successfully decoded partial object length from first piece",
376 );
377
378 partial_object
379 } else {
380 trace!(
382 %next_source_piece_index,
383 ?mapping,
384 ?raw_data,
385 "Part of object length bytes are in next piece, fetching",
386 );
387
388 let piece = self
390 .read_piece(next_source_piece_index, mapping, piece_cache)
391 .await?;
392 let piece_data = piece
394 .record()
395 .to_raw_record_chunks()
396 .flatten()
397 .copied()
398 .collect::<Vec<u8>>();
399
400 raw_data.add_piece_data(next_source_piece_index, piece_data, mapping)?;
401 next_source_piece_index = next_source_piece_index.next_source_index();
402
403 if let Some(partial_object) =
405 PartialObject::new_with_padding(&raw_data, self.max_object_len, mapping)?
406 {
407 std::mem::drop(raw_data);
409
410 trace!(
411 %next_source_piece_index,
412 ?mapping,
413 ?partial_object,
414 "Successfully decoded partial object length from first two pieces",
415 );
416
417 partial_object
418 } else {
419 return Err(Error::InvalidObject {
422 next_source_piece_index,
423 segment_data_length: raw_data.segment_data_length(),
424 mapping,
425 });
426 }
427 };
428
429 if let Some(data) = partial_object.try_reconstruct_object(mapping)? {
431 return Ok(data);
432 }
433
434 let remaining_piece_count = partial_object
440 .max_remaining_download_length()
441 .div_ceil(RawRecord::SIZE);
442
443 if remaining_piece_count > 0 {
444 let remaining_piece_indexes = (next_source_piece_index..)
445 .filter(|i| i.is_source())
446 .take(remaining_piece_count)
447 .collect::<Arc<[PieceIndex]>>();
448 let pieces = self
451 .read_pieces(remaining_piece_indexes.clone(), mapping, piece_cache)
452 .await?
453 .into_iter()
454 .zip(remaining_piece_indexes.iter().copied())
455 .map(|(piece, piece_index)| {
456 (
457 piece_index,
458 piece
459 .record()
460 .to_raw_record_chunks()
461 .flatten()
462 .copied()
463 .collect::<Vec<u8>>(),
464 )
465 });
466
467 for (piece_index, piece_data) in pieces {
468 let mut new_data = RawPieceData::new_for_next_piece(
469 partial_object.max_remaining_download_length(),
470 piece_index,
471 );
472 new_data.add_piece_data(piece_index, piece_data, mapping)?;
473 partial_object.add_piece_data_with_padding(new_data);
474
475 if let Some(data) = partial_object.try_reconstruct_object(mapping)? {
477 return Ok(data);
478 }
479 }
480 }
481
482 Err(Error::InvalidMapping {
486 next_source_piece_index,
487 remaining_piece_count,
488 object_data_length: partial_object.fetched_data_length(),
489 mapping,
490 })
491 }
492
493 async fn read_pieces(
497 &self,
498 piece_indexes: Arc<[PieceIndex]>,
499 mapping: GlobalObject,
500 piece_cache: &mut Option<LastPieceCache>,
501 ) -> Result<Vec<Piece>, Error> {
502 download_pieces(
503 piece_indexes.clone(),
504 &piece_cache.clone().with_fallback(self.piece_getter.clone()),
505 )
506 .await
507 .inspect(|pieces| {
508 if let (Some(piece_index), Some(piece)) = (piece_indexes.last(), pieces.last()) {
509 *piece_cache = Some((*piece_index, piece.clone()))
510 }
511 })
512 .map_err(|source| {
513 debug!(
514 ?piece_indexes,
515 error = ?source,
516 ?mapping,
517 "Error fetching pieces during object assembling"
518 );
519
520 Error::PieceGetterError {
521 error: format!("{source:?}"),
522 mapping,
523 }
524 })
525 }
526
527 async fn read_piece(
531 &self,
532 piece_index: PieceIndex,
533 mapping: GlobalObject,
534 piece_cache: &mut Option<LastPieceCache>,
535 ) -> Result<Piece, Error> {
536 let piece_indexes = Arc::<[PieceIndex]>::from(vec![piece_index]);
537 download_pieces(
538 piece_indexes.clone(),
539 &piece_cache.clone().with_fallback(self.piece_getter.clone()),
540 )
541 .await
542 .inspect(|pieces| *piece_cache = Some((piece_index, pieces[0].clone())))
543 .map(|pieces| {
544 pieces
545 .first()
546 .expect("download_pieces always returns exact pieces or error")
547 .clone()
548 })
549 .map_err(|source| {
550 debug!(
551 %piece_index,
552 error = ?source,
553 ?mapping,
554 "Error fetching piece during object assembling"
555 );
556
557 Error::PieceGetterError {
558 error: format!("{source:?}"),
559 mapping,
560 }
561 })
562 }
563}
564
565fn decode_data_length(
573 mut data: &[u8],
574 max_object_len: usize,
575 mapping: GlobalObject,
576) -> Result<Option<(usize, usize)>, Error> {
577 let data_length = match Compact::<u32>::decode(&mut data) {
578 Ok(Compact(data_length)) => {
579 let data_length = data_length as usize;
580 if data_length > max_object_len {
581 debug!(
582 data_length,
583 max_object_len,
584 ?mapping,
585 "Data length exceeds object size limit for object fetcher"
586 );
587
588 return Err(Error::ObjectTooLarge {
589 data_length,
590 max_object_len,
591 mapping,
592 });
593 }
594
595 data_length
596 }
597 Err(err) => {
598 if data.len() >= Compact::<u32>::compact_len(&(max_object_len as u32)) {
602 debug!(
603 length_prefix_len = data.len(),
604 max_object_len,
605 ?mapping,
606 "Length prefix exceeds object size limit for object fetcher"
607 );
608
609 return Err(Error::LengthPrefixTooLarge {
610 length_prefix_len: data.len(),
611 max_object_len,
612 mapping,
613 });
614 }
615
616 debug!(
617 ?err,
618 ?mapping,
619 "Not enough bytes to decode data length for object"
620 );
621
622 return Ok(None);
623 }
624 };
625
626 let data_length_encoded_length = Compact::<u32>::compact_len(&(data_length as u32));
627
628 trace!(
629 data_length,
630 data_length_encoded_length,
631 ?mapping,
632 "Decoded data length for object"
633 );
634
635 Ok(Some((data_length_encoded_length, data_length)))
636}