1use crate::object_fetcher::partial_object::{PartialObject, RawPieceData};
4use crate::object_fetcher::segment_header::{
5 MAX_SEGMENT_PADDING, max_segment_header_encoded_size, min_segment_header_encoded_size,
6};
7use crate::piece_fetcher::download_pieces;
8use crate::piece_getter::PieceGetter;
9use parity_scale_codec::{Compact, CompactLen, Decode};
10use std::sync::Arc;
11use subspace_archiving::archiver::SegmentItem;
12use subspace_core_primitives::hashes::Blake3Hash;
13use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping};
14use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord};
15use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex};
16use tracing::{debug, trace, warn};
17
18mod partial_object;
19mod segment_header;
20#[cfg(test)]
21mod tests;
22
23pub use segment_header::MAX_BLOCK_LENGTH;
24
25#[inline]
42pub fn max_supported_object_length() -> usize {
43 RecordedHistorySegment::SIZE
46 - MAX_SEGMENT_PADDING
47 - 1
48 - 1
49 - max_segment_header_encoded_size()
50 - 1
51 - MAX_ENCODED_LENGTH_SIZE * 2
52}
53
54const MAX_ENCODED_LENGTH_SIZE: usize = 4;
56
57pub type LastPieceCache = (PieceIndex, Piece);
59
60#[derive(Debug, PartialEq, Eq, thiserror::Error)]
62pub enum Error {
63 #[error("Piece index is not a source piece, object: {mapping:?}")]
65 NotSourcePiece { mapping: GlobalObject },
66
67 #[error(
69 "Piece offset is too large, must be less than {}, object: {mapping:?}",
70 RawRecord::SIZE
71 )]
72 PieceOffsetTooLarge { mapping: GlobalObject },
73
74 #[error(
76 "Data length {data_length} exceeds maximum object size {max_object_len} \
77 for object: {mapping:?}"
78 )]
79 ObjectTooLarge {
80 data_length: usize,
81 max_object_len: usize,
82 mapping: GlobalObject,
83 },
84
85 #[error(
87 "Length prefix length {length_prefix_len} exceeds maximum object size {max_object_len} \
88 for object: {mapping:?}"
89 )]
90 LengthPrefixTooLarge {
91 length_prefix_len: usize,
92 max_object_len: usize,
93 mapping: GlobalObject,
94 },
95
96 #[error("Incorrect data hash {data_hash:?} for {data_length} byte object: {mapping:?}")]
98 InvalidDataHash {
99 data_hash: Blake3Hash,
100 data_length: usize,
101 mapping: GlobalObject,
102 #[cfg(test)]
104 data: String,
105 },
106
107 #[error("Getting piece caused an error: {error}, object: {mapping:?}")]
109 PieceGetterError {
110 error: String,
113 mapping: GlobalObject,
114 },
115
116 #[error("Piece {piece_index:?} was not found by piece getter")]
118 PieceNotFound { piece_index: PieceIndex },
119
120 #[error(
122 "Piece offset is inside the segment header, min size of segment header: {}, object: {mapping:?}",
123 min_segment_header_encoded_size()
124 )]
125 PieceOffsetInSegmentHeader { mapping: GlobalObject },
126
127 #[error("Segment {segment_index:?} data decoding error: {source:?}, object: {mapping:?}")]
129 SegmentDecoding {
130 source: parity_scale_codec::Error,
131 segment_index: SegmentIndex,
132 mapping: GlobalObject,
133 },
134
135 #[error(
137 "Decoding segment {segment_index:?} failed: unknown variant: {segment_variant}, \
138 object: {mapping:?}"
139 )]
140 UnknownSegmentVariant {
141 segment_variant: u8,
142 segment_index: SegmentIndex,
143 mapping: GlobalObject,
144 },
145
146 #[error(
148 "Segment {segment_index:?} has unexpected item, current progress: {segment_progress}, \
149 object: {mapping:?}, item: {segment_item:?}"
150 )]
151 UnexpectedSegmentItem {
152 segment_progress: usize,
153 segment_index: SegmentIndex,
154 segment_item: Box<SegmentItem>,
155 mapping: GlobalObject,
156 },
157
158 #[error(
160 "Segment {segment_index:?} has unexpected item, current progress: {segment_progress}, \
161 object: {mapping:?}, item: {segment_item_variant:?}, item size and data lengths: \
162 {segment_item_lengths:?}"
163 )]
164 UnexpectedSegmentItemVariant {
165 segment_progress: usize,
166 segment_index: SegmentIndex,
167 segment_item_variant: u8,
168 segment_item_lengths: Option<(usize, usize)>,
169 mapping: GlobalObject,
170 },
171
172 #[error(
174 "Invalid object: next source piece: {next_source_piece_index:?}, segment data length: \
175 {segment_data_length:?}, object: {mapping:?}"
176 )]
177 InvalidObject {
178 next_source_piece_index: PieceIndex,
180 segment_data_length: Option<usize>,
182 mapping: GlobalObject,
183 },
184
185 #[error(
187 "Invalid mapping: data length: {object_data_length:?}, next source piece: \
188 {next_source_piece_index:?}, remaining_piece_count: {remaining_piece_count}, object: \
189 {mapping:?}"
190 )]
191 InvalidMapping {
192 next_source_piece_index: PieceIndex,
194 remaining_piece_count: usize,
196 object_data_length: usize,
198 mapping: GlobalObject,
199 },
200}
201
202pub struct ObjectFetcher<PG>
204where
205 PG: PieceGetter + Send + Sync,
206{
207 piece_getter: Arc<PG>,
209
210 max_object_len: usize,
212}
213
214impl<PG> ObjectFetcher<PG>
215where
216 PG: PieceGetter + Send + Sync,
217{
218 pub fn new(piece_getter: Arc<PG>, mut max_object_len: usize) -> Self {
225 if max_object_len > max_supported_object_length() {
226 warn!(
227 max_object_len,
228 max_supported_object_length = ?max_supported_object_length(),
229 "Object fetcher size limit exceeds maximum supported object size, \
230 limiting to implementation-supported size"
231 );
232
233 max_object_len = max_supported_object_length();
234 }
235
236 Self {
237 piece_getter,
238 max_object_len,
239 }
240 }
241
242 pub async fn fetch_objects(
254 &self,
255 mappings: GlobalObjectMapping,
256 ) -> Result<Vec<Vec<u8>>, Error> {
257 let mut objects = Vec::with_capacity(mappings.objects().len());
258 let mut piece_cache = None;
259
260 for &mapping in mappings.objects() {
264 let GlobalObject {
265 piece_index,
266 offset,
267 ..
268 } = mapping;
269
270 if !piece_index.is_source() {
272 debug!(
273 ?mapping,
274 "Invalid piece index for object: must be a source piece",
275 );
276
277 return Err(Error::NotSourcePiece { mapping });
280 }
281
282 if piece_index.source_position() == 0
285 && offset < min_segment_header_encoded_size() as u32
286 {
287 debug!(
288 ?mapping,
289 min_segment_header_encoded_size = ?min_segment_header_encoded_size(),
290 "Invalid offset for object: must not be inside the segment header",
291 );
292
293 return Err(Error::PieceOffsetInSegmentHeader { mapping });
294 }
295
296 if offset >= RawRecord::SIZE as u32 {
297 debug!(
298 ?mapping,
299 RawRecord_SIZE = RawRecord::SIZE,
300 "Invalid piece offset for object: must be less than the size of a raw record",
301 );
302
303 return Err(Error::PieceOffsetTooLarge { mapping });
304 }
305
306 let data = self.fetch_object(mapping, &mut piece_cache).await?;
309
310 objects.push(data);
311 }
312
313 Ok(objects)
314 }
315
316 async fn fetch_object(
329 &self,
330 mapping: GlobalObject,
331 piece_cache: &mut Option<LastPieceCache>,
332 ) -> Result<Vec<u8>, Error> {
333 let GlobalObject {
334 piece_index,
335 offset,
336 ..
337 } = mapping;
338
339 let mut next_source_piece_index = piece_index;
341
342 let mut raw_data = RawPieceData::new_for_first_piece(mapping);
344
345 let piece = self
348 .read_piece(next_source_piece_index, mapping, piece_cache)
349 .await?;
350
351 let piece_data = piece
354 .record()
355 .to_raw_record_chunks()
356 .flatten()
357 .skip(offset as usize)
358 .copied()
359 .collect::<Vec<u8>>();
360
361 raw_data.add_piece_data(next_source_piece_index, piece_data, mapping)?;
362 next_source_piece_index = next_source_piece_index.next_source_index();
363
364 let mut partial_object = if let Some(partial_object) =
366 PartialObject::new_with_padding(&raw_data, self.max_object_len, mapping)?
367 {
368 std::mem::drop(raw_data);
371
372 trace!(
373 %next_source_piece_index,
374 ?mapping,
375 ?partial_object,
376 "Successfully decoded partial object length from first piece",
377 );
378
379 partial_object
380 } else {
381 trace!(
383 %next_source_piece_index,
384 ?mapping,
385 ?raw_data,
386 "Part of object length bytes are in next piece, fetching",
387 );
388
389 let piece = self
391 .read_piece(next_source_piece_index, mapping, piece_cache)
392 .await?;
393 let piece_data = piece
395 .record()
396 .to_raw_record_chunks()
397 .flatten()
398 .copied()
399 .collect::<Vec<u8>>();
400
401 raw_data.add_piece_data(next_source_piece_index, piece_data, mapping)?;
402 next_source_piece_index = next_source_piece_index.next_source_index();
403
404 if let Some(partial_object) =
406 PartialObject::new_with_padding(&raw_data, self.max_object_len, mapping)?
407 {
408 std::mem::drop(raw_data);
411
412 trace!(
413 %next_source_piece_index,
414 ?mapping,
415 ?partial_object,
416 "Successfully decoded partial object length from first two pieces",
417 );
418
419 partial_object
420 } else {
421 return Err(Error::InvalidObject {
424 next_source_piece_index,
425 segment_data_length: raw_data.segment_data_length(),
426 mapping,
427 });
428 }
429 };
430
431 if let Some(data) = partial_object.try_reconstruct_object(mapping)? {
433 return Ok(data);
434 }
435
436 let remaining_piece_count = partial_object
442 .max_remaining_download_length()
443 .div_ceil(RawRecord::SIZE);
444
445 if remaining_piece_count > 0 {
446 let remaining_piece_indexes = (next_source_piece_index..)
447 .filter(|i| i.is_source())
448 .take(remaining_piece_count)
449 .collect::<Arc<[PieceIndex]>>();
450 let pieces = self
453 .read_pieces(remaining_piece_indexes.clone(), mapping, piece_cache)
454 .await?
455 .into_iter()
456 .zip(remaining_piece_indexes.iter().copied())
457 .map(|(piece, piece_index)| {
458 (
459 piece_index,
460 piece
461 .record()
462 .to_raw_record_chunks()
463 .flatten()
464 .copied()
465 .collect::<Vec<u8>>(),
466 )
467 });
468
469 for (piece_index, piece_data) in pieces {
470 let mut new_data = RawPieceData::new_for_next_piece(
471 partial_object.max_remaining_download_length(),
472 piece_index,
473 );
474 new_data.add_piece_data(piece_index, piece_data, mapping)?;
475 partial_object.add_piece_data_with_padding(new_data);
476
477 if let Some(data) = partial_object.try_reconstruct_object(mapping)? {
479 return Ok(data);
480 }
481 }
482 }
483
484 Err(Error::InvalidMapping {
488 next_source_piece_index,
489 remaining_piece_count,
490 object_data_length: partial_object.fetched_data_length(),
491 mapping,
492 })
493 }
494
495 async fn read_pieces(
499 &self,
500 piece_indexes: Arc<[PieceIndex]>,
501 mapping: GlobalObject,
502 piece_cache: &mut Option<LastPieceCache>,
503 ) -> Result<Vec<Piece>, Error> {
504 download_pieces(
505 piece_indexes.clone(),
506 &piece_cache.clone().with_fallback(self.piece_getter.clone()),
507 )
508 .await
509 .inspect(|pieces| {
510 if let (Some(piece_index), Some(piece)) = (piece_indexes.last(), pieces.last()) {
511 *piece_cache = Some((*piece_index, piece.clone()))
512 }
513 })
514 .map_err(|source| {
515 debug!(
516 ?piece_indexes,
517 error = ?source,
518 ?mapping,
519 "Error fetching pieces during object assembling"
520 );
521
522 Error::PieceGetterError {
523 error: format!("{source:?}"),
524 mapping,
525 }
526 })
527 }
528
529 async fn read_piece(
533 &self,
534 piece_index: PieceIndex,
535 mapping: GlobalObject,
536 piece_cache: &mut Option<LastPieceCache>,
537 ) -> Result<Piece, Error> {
538 let piece_indexes = Arc::<[PieceIndex]>::from(vec![piece_index]);
539 download_pieces(
540 piece_indexes.clone(),
541 &piece_cache.clone().with_fallback(self.piece_getter.clone()),
542 )
543 .await
544 .inspect(|pieces| *piece_cache = Some((piece_index, pieces[0].clone())))
545 .map(|pieces| {
546 pieces
547 .first()
548 .expect("download_pieces always returns exact pieces or error")
549 .clone()
550 })
551 .map_err(|source| {
552 debug!(
553 %piece_index,
554 error = ?source,
555 ?mapping,
556 "Error fetching piece during object assembling"
557 );
558
559 Error::PieceGetterError {
560 error: format!("{source:?}"),
561 mapping,
562 }
563 })
564 }
565}
566
567fn decode_data_length(
575 mut data: &[u8],
576 max_object_len: usize,
577 mapping: GlobalObject,
578) -> Result<Option<(usize, usize)>, Error> {
579 let data_length = match Compact::<u32>::decode(&mut data) {
580 Ok(Compact(data_length)) => {
581 let data_length = data_length as usize;
582 if data_length > max_object_len {
583 debug!(
584 data_length,
585 max_object_len,
586 ?mapping,
587 "Data length exceeds object size limit for object fetcher"
588 );
589
590 return Err(Error::ObjectTooLarge {
591 data_length,
592 max_object_len,
593 mapping,
594 });
595 }
596
597 data_length
598 }
599 Err(err) => {
600 if data.len() >= Compact::<u32>::compact_len(&(max_object_len as u32)) {
604 debug!(
605 length_prefix_len = data.len(),
606 max_object_len,
607 ?mapping,
608 "Length prefix exceeds object size limit for object fetcher"
609 );
610
611 return Err(Error::LengthPrefixTooLarge {
612 length_prefix_len: data.len(),
613 max_object_len,
614 mapping,
615 });
616 }
617
618 debug!(
619 ?err,
620 ?mapping,
621 "Not enough bytes to decode data length for object"
622 );
623
624 return Ok(None);
625 }
626 };
627
628 let data_length_encoded_length = Compact::<u32>::compact_len(&(data_length as u32));
629
630 trace!(
631 data_length,
632 data_length_encoded_length,
633 ?mapping,
634 "Decoded data length for object"
635 );
636
637 Ok(Some((data_length_encoded_length, data_length)))
638}