1use crate::constructor::DummyRecordStore;
4use crate::protocols::request_response::handlers::cached_piece_by_index::{
5 CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult,
6};
7use crate::protocols::request_response::handlers::piece_by_index::{
8 PieceByIndexRequest, PieceByIndexResponse,
9};
10use crate::utils::multihash::ToMultihash;
11use crate::{Multihash, Node};
12use async_lock::{Semaphore, SemaphoreGuard};
13use async_trait::async_trait;
14use futures::channel::mpsc;
15use futures::future::FusedFuture;
16use futures::stream::FuturesUnordered;
17use futures::task::noop_waker_ref;
18use futures::{stream, FutureExt, Stream, StreamExt};
19use libp2p::kad::{Behaviour as Kademlia, KBucketKey, RecordKey};
20use libp2p::swarm::NetworkBehaviour;
21use libp2p::{Multiaddr, PeerId};
22use rand::prelude::*;
23use std::any::type_name;
24use std::collections::{HashMap, HashSet};
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use subspace_core_primitives::pieces::{Piece, PieceIndex};
30use tokio_stream::StreamMap;
31use tracing::{debug, trace, warn, Instrument};
32
33#[async_trait]
35pub trait PieceValidator: Sync + Send {
36 async fn validate_piece(
38 &self,
39 source_peer_id: PeerId,
40 piece_index: PieceIndex,
41 piece: Piece,
42 ) -> Option<Piece>;
43}
44
45#[derive(Debug, Clone, Copy)]
47pub struct NoPieceValidator;
48
49#[async_trait]
50impl PieceValidator for NoPieceValidator {
51 async fn validate_piece(&self, _: PeerId, _: PieceIndex, piece: Piece) -> Option<Piece> {
52 Some(piece)
53 }
54}
55
56#[derive(Clone)]
59pub struct PieceProvider<PV> {
60 node: Node,
61 piece_validator: PV,
62 piece_downloading_semaphore: Arc<Semaphore>,
63}
64
65impl<PV> fmt::Debug for PieceProvider<PV> {
66 #[inline]
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct(&format!("PieceProvider<{}>", type_name::<PV>()))
69 .finish_non_exhaustive()
70 }
71}
72
73impl<PV> PieceProvider<PV>
74where
75 PV: PieceValidator,
76{
77 pub fn new(
79 node: Node,
80 piece_validator: PV,
81 piece_downloading_semaphore: Arc<Semaphore>,
82 ) -> Self {
83 Self {
84 node,
85 piece_validator,
86 piece_downloading_semaphore,
87 }
88 }
89
90 pub async fn get_from_cache<'a, PieceIndices>(
94 &'a self,
95 piece_indices: PieceIndices,
96 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a
97 where
98 PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
99 {
100 let download_id = random::<u64>();
101 let (tx, mut rx) = mpsc::unbounded();
102 let fut = async move {
103 let not_downloaded_pieces = download_cached_pieces(
104 piece_indices.into_iter(),
105 &self.node,
106 &self.piece_validator,
107 &tx,
108 &self.piece_downloading_semaphore,
109 )
110 .await;
111
112 if not_downloaded_pieces.is_empty() {
113 debug!("Done");
114 return;
115 }
116
117 for piece_index in not_downloaded_pieces {
118 tx.unbounded_send((piece_index, None))
119 .expect("This future isn't polled after receiver is dropped; qed");
120 }
121
122 debug!("Done #2");
123 };
124
125 let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());
126
127 stream::poll_fn(move |cx| {
129 if !fut.is_terminated() {
130 let _ = fut.poll_unpin(cx);
132 }
133
134 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
135 return Poll::Ready(maybe_result);
136 }
137
138 Poll::Pending
140 })
141 }
142
143 pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
145 let key = RecordKey::from(piece_index.to_multihash());
146
147 let request_batch = self.node.get_requests_batch_handle().await;
148 let mut get_providers_stream = request_batch
149 .get_providers(key.clone())
150 .await
151 .inspect_err(|err| warn!(%piece_index,?key, ?err, "get_providers returned an error"))
152 .ok()?;
153
154 let key = hex::encode(&key);
155 while let Some(provider_id) = get_providers_stream.next().await {
156 trace!(%piece_index, key, %provider_id, "get_providers returned an item");
157
158 let Ok(PieceByIndexResponse {
159 piece,
160 cached_pieces: _,
161 }) = request_batch
162 .send_generic_request(
163 provider_id,
164 Vec::new(),
165 PieceByIndexRequest {
166 piece_index,
167 cached_pieces: Arc::default(),
168 },
169 )
170 .await
171 .inspect_err(
172 |error| debug!(%piece_index, key, %provider_id, ?error, "Piece request failed"),
173 )
174 else {
175 continue;
176 };
177
178 if let Some(piece) = piece {
179 trace!(%piece_index, key, %provider_id, "Piece request succeeded");
180
181 return self
182 .piece_validator
183 .validate_piece(provider_id, piece_index, piece)
184 .await;
185 } else {
186 debug!(%piece_index, key, %provider_id, "Piece request returned empty piece");
187 }
188 }
189
190 None
191 }
192
193 pub async fn get_piece_from_peer(
195 &self,
196 peer_id: PeerId,
197 piece_index: PieceIndex,
198 ) -> Option<Piece> {
199 let PieceByIndexResponse {
201 piece,
202 cached_pieces: _,
203 } = self
204 .node
205 .send_generic_request(
206 peer_id,
207 Vec::new(),
208 PieceByIndexRequest {
209 piece_index,
210 cached_pieces: Arc::default(),
211 },
212 )
213 .await
214 .inspect_err(|error| debug!(%peer_id, %piece_index, ?error, "Piece request failed"))
215 .ok()?;
216
217 if let Some(piece) = piece {
218 trace!(%peer_id, %piece_index, "Piece request succeeded");
219
220 return self
221 .piece_validator
222 .validate_piece(peer_id, piece_index, piece)
223 .await;
224 } else {
225 debug!(%peer_id, %piece_index, "Piece request returned empty piece");
226 }
227
228 None
229 }
230
231 pub async fn get_piece_from_archival_storage(
234 &self,
235 piece_index: PieceIndex,
236 max_random_walking_rounds: usize,
237 ) -> Option<Piece> {
238 trace!(%piece_index, "Getting piece from archival storage..");
240
241 let connected_servers = {
242 let connected_servers = match self.node.connected_servers().await {
243 Ok(connected_servers) => connected_servers,
244 Err(err) => {
245 debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");
246
247 Default::default()
248 }
249 };
250
251 HashSet::<PeerId>::from_iter(connected_servers)
252 };
253
254 if connected_servers.is_empty() {
255 debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
256 } else {
257 for peer_id in connected_servers.iter() {
258 let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await;
259
260 if maybe_piece.is_some() {
261 trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");
262
263 return maybe_piece;
264 }
265 }
266 }
267
268 trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
269 let random_walk_result = self
270 .get_piece_by_random_walking(piece_index, max_random_walking_rounds)
271 .await;
272
273 if random_walk_result.is_some() {
274 trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
275
276 return random_walk_result;
277 } else {
278 debug!(
279 %piece_index,
280 %max_random_walking_rounds,
281 "Cannot acquire piece from DSN L1: random walk failed"
282 );
283 }
284
285 None
286 }
287
288 async fn get_piece_by_random_walking(
290 &self,
291 piece_index: PieceIndex,
292 walking_rounds: usize,
293 ) -> Option<Piece> {
294 for round in 0..walking_rounds {
295 debug!(%piece_index, round, "Random walk round");
296
297 let result = self
298 .get_piece_by_random_walking_from_single_round(piece_index, round)
299 .await;
300
301 if result.is_some() {
302 return result;
303 }
304 }
305
306 debug!(%piece_index, "Random walking piece retrieval failed.");
307
308 None
309 }
310
311 async fn get_piece_by_random_walking_from_single_round(
313 &self,
314 piece_index: PieceIndex,
315 round: usize,
316 ) -> Option<Piece> {
317 trace!(%piece_index, "get_piece_by_random_walking round");
319
320 let key = PeerId::random();
322
323 let request_batch = self.node.get_requests_batch_handle().await;
324 let mut get_closest_peers_stream = request_batch
325 .get_closest_peers(key.into())
326 .await
327 .inspect_err(|err| warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"))
328 .ok()?;
329
330 while let Some(peer_id) = get_closest_peers_stream.next().await {
331 trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");
332
333 let Ok(PieceByIndexResponse {
334 piece,
335 cached_pieces: _,
336 }) = request_batch
337 .send_generic_request(
338 peer_id,
339 Vec::new(),
340 PieceByIndexRequest {
341 piece_index,
342 cached_pieces: Arc::default(),
343 },
344 )
345 .await
346 .inspect_err(
347 |error| debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."),
348 )
349 else {
350 continue;
351 };
352
353 if let Some(piece) = piece {
354 trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded.");
355
356 return self
357 .piece_validator
358 .validate_piece(peer_id, piece_index, piece)
359 .await;
360 } else {
361 debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
362 }
363 }
364
365 None
366 }
367}
368
369struct KademliaWrapper {
371 local_peer_id: PeerId,
372 kademlia: Kademlia<DummyRecordStore>,
373}
374
375impl KademliaWrapper {
376 fn new(local_peer_id: PeerId) -> Self {
377 Self {
378 local_peer_id,
379 kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
380 }
381 }
382
383 fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
384 for address in addresses {
385 self.kademlia.add_address(peer_id, address);
386 }
387 while self
388 .kademlia
389 .poll(&mut Context::from_waker(noop_waker_ref()))
390 .is_ready()
391 {
392 }
394 }
395
396 fn closest_peers(
398 &mut self,
399 key: &KBucketKey<Multihash>,
400 ) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + 'static {
401 let mut closest_peers = self
402 .kademlia
403 .find_closest_local_peers(key, &self.local_peer_id)
404 .map(|peer| {
405 (
406 KBucketKey::from(peer.node_id).distance(key),
407 peer.node_id,
408 peer.multiaddrs,
409 )
410 })
411 .collect::<Vec<_>>();
412
413 closest_peers.sort_unstable_by(|a, b| a.0.cmp(&b.0));
414 closest_peers
415 .into_iter()
416 .map(|(_distance, peer_id, addresses)| (peer_id, addresses))
417 }
418}
419
420async fn download_cached_pieces<PV, PieceIndices>(
423 piece_indices: PieceIndices,
424 node: &Node,
425 piece_validator: &PV,
426 results: &mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
427 semaphore: &Semaphore,
428) -> impl ExactSizeIterator<Item = PieceIndex>
429where
430 PV: PieceValidator,
431 PieceIndices: Iterator<Item = PieceIndex>,
432{
433 let mut pieces_to_download = piece_indices
439 .map(|piece_index| async move {
440 let mut kademlia = KademliaWrapper::new(node.id());
441 let key = piece_index.to_multihash();
442
443 let local_closest_peers = node
444 .get_closest_local_peers(key, None)
445 .await
446 .unwrap_or_default();
447
448 for (peer_id, addresses) in local_closest_peers {
450 kademlia.add_peer(&peer_id, addresses);
451 }
452
453 (piece_index, kademlia)
454 })
455 .collect::<FuturesUnordered<_>>()
456 .collect::<HashMap<_, _>>()
457 .await;
458
459 let num_pieces = pieces_to_download.len();
460 debug!(%num_pieces, "Starting");
461
462 let mut checked_peers = HashSet::new();
463
464 let Ok(connected_servers) = node.connected_servers().await else {
465 trace!("Connected servers error");
466 return pieces_to_download.into_keys();
467 };
468
469 let num_connected_servers = connected_servers.len();
470 debug!(
471 %num_connected_servers,
472 %num_pieces,
473 "Starting downloading"
474 );
475
476 let mut downloading_stream = connected_servers
478 .into_iter()
479 .take(num_pieces)
480 .enumerate()
481 .map(|(peer_index, peer_id)| {
482 checked_peers.insert(peer_id);
483
484 let step = num_pieces / num_connected_servers.min(num_pieces);
486
487 let mut check_cached_pieces = pieces_to_download
490 .keys()
491 .cycle()
492 .skip(step * peer_index)
493 .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1))
495 .copied()
496 .collect::<Vec<_>>();
497 let piece_index = check_cached_pieces.swap_remove(0);
499
500 trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer");
501
502 let permit = semaphore.try_acquire();
503
504 let fut = async move {
505 let permit = match permit {
506 Some(permit) => permit,
507 None => semaphore.acquire().await,
508 };
509
510 download_cached_piece_from_peer(
511 node,
512 piece_validator,
513 peer_id,
514 Vec::new(),
515 Arc::new(check_cached_pieces),
516 piece_index,
517 HashSet::new(),
518 HashSet::new(),
519 permit,
520 )
521 .await
522 };
523
524 (piece_index, Box::pin(fut.into_stream()) as _)
525 })
526 .collect::<StreamMap<_, _>>();
527
528 loop {
529 let mut additional_pieces_to_download =
531 (num_pieces / 2).saturating_sub(downloading_stream.len());
532 if additional_pieces_to_download > 0 {
533 trace!(
534 %additional_pieces_to_download,
535 num_pieces,
536 currently_downloading = %downloading_stream.len(),
537 "Downloading additional pieces from closest peers"
538 );
539 'outer: for peer_id in node
541 .connected_servers()
542 .await
543 .unwrap_or_default()
544 .into_iter()
545 .filter(|peer_id| checked_peers.insert(*peer_id))
546 .take(additional_pieces_to_download)
547 {
548 let permit = if downloading_stream.is_empty() {
549 semaphore.acquire().await
550 } else if let Some(permit) = semaphore.try_acquire() {
551 permit
552 } else {
553 break;
554 };
555
556 for &piece_index in pieces_to_download.keys() {
557 if downloading_stream.contains_key(&piece_index) {
558 continue;
559 }
560
561 trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer");
562
563 let check_cached_pieces = sample_cached_piece_indices(
564 pieces_to_download.keys(),
565 &HashSet::new(),
566 &HashSet::new(),
567 piece_index,
568 );
569 let fut = download_cached_piece_from_peer(
570 node,
571 piece_validator,
572 peer_id,
573 Vec::new(),
574 Arc::new(check_cached_pieces),
575 piece_index,
576 HashSet::new(),
577 HashSet::new(),
578 permit,
579 );
580
581 downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
582 additional_pieces_to_download -= 1;
583
584 continue 'outer;
585 }
586
587 break;
588 }
589
590 let pieces_indices_to_download = pieces_to_download.keys().copied().collect::<Vec<_>>();
594 for piece_index in pieces_indices_to_download {
595 if additional_pieces_to_download == 0 {
596 break;
597 }
598 if downloading_stream.contains_key(&piece_index) {
599 continue;
600 }
601 let permit = if downloading_stream.is_empty() {
602 semaphore.acquire().await
603 } else if let Some(permit) = semaphore.try_acquire() {
604 permit
605 } else {
606 break;
607 };
608
609 let kbucket_key = KBucketKey::from(piece_index.to_multihash());
610 let closest_peers_to_check = pieces_to_download
611 .get_mut(&piece_index)
612 .expect("Entries are not removed here; qed")
613 .closest_peers(&kbucket_key);
614 for (peer_id, addresses) in closest_peers_to_check {
615 if !checked_peers.insert(peer_id) {
616 continue;
617 }
618
619 trace!(%peer_id, %piece_index, "Downloading piece from closest peer");
620
621 let check_cached_pieces = sample_cached_piece_indices(
622 pieces_to_download.keys(),
623 &HashSet::new(),
624 &HashSet::new(),
625 piece_index,
626 );
627 let fut = download_cached_piece_from_peer(
628 node,
629 piece_validator,
630 peer_id,
631 addresses,
632 Arc::new(check_cached_pieces),
633 piece_index,
634 HashSet::new(),
635 HashSet::new(),
636 permit,
637 );
638
639 downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
640 additional_pieces_to_download -= 1;
641 break;
642 }
643 }
644
645 trace!(
646 pieces_left = %additional_pieces_to_download,
647 "Initiated downloading additional pieces from closest peers"
648 );
649 }
650
651 let Some((piece_index, result)) = downloading_stream.next().await else {
652 if !pieces_to_download.is_empty() {
653 debug!(
654 %num_pieces,
655 to_download = %pieces_to_download.len(),
656 "Finished downloading early"
657 );
658 break;
660 }
661 break;
662 };
663 process_downloading_result(
664 piece_index,
665 result,
666 &mut pieces_to_download,
667 &mut downloading_stream,
668 node,
669 piece_validator,
670 results,
671 );
672
673 if pieces_to_download.is_empty() {
674 break;
675 }
676 }
677
678 pieces_to_download.into_keys()
679}
680
681fn process_downloading_result<'a, 'b, PV>(
682 piece_index: PieceIndex,
683 result: DownloadedPieceFromPeer<'a>,
684 pieces_to_download: &'b mut HashMap<PieceIndex, KademliaWrapper>,
685 downloading_stream: &'b mut StreamMap<
686 PieceIndex,
687 Pin<Box<dyn Stream<Item = DownloadedPieceFromPeer<'a>> + Send + 'a>>,
688 >,
689 node: &'a Node,
690 piece_validator: &'a PV,
691 results: &'a mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
692) where
693 PV: PieceValidator,
694{
695 let DownloadedPieceFromPeer {
696 peer_id,
697 result,
698 mut cached_pieces,
699 not_cached_pieces,
700 permit,
701 } = result;
702 trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response");
703
704 let Some(result) = result else {
705 return;
707 };
708
709 match result {
710 PieceResult::Piece(piece) => {
711 trace!(%piece_index, %peer_id, "Got piece");
712
713 pieces_to_download.remove(&piece_index);
715
716 results
717 .unbounded_send((piece_index, Some(piece)))
718 .expect("This future isn't polled after receiver is dropped; qed");
719
720 if pieces_to_download.is_empty() {
721 return;
722 }
723
724 cached_pieces.remove(&piece_index);
725 }
726 PieceResult::ClosestPeers(closest_peers) => {
727 trace!(%piece_index, %peer_id, "Got closest peers");
728
729 if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) {
731 for (peer_id, addresses) in Vec::from(closest_peers) {
732 kademlia.add_peer(&peer_id, addresses);
733 }
734 }
735
736 if cached_pieces.remove(&piece_index) {
738 return;
739 }
740 }
741 }
742
743 let mut maybe_piece_index_to_download_next = None;
744 cached_pieces.retain(|piece_index| {
746 if !pieces_to_download.contains_key(piece_index) {
748 return false;
749 }
750
751 if maybe_piece_index_to_download_next.is_none()
753 && !downloading_stream.contains_key(piece_index)
754 {
755 maybe_piece_index_to_download_next.replace(*piece_index);
756 return true;
758 }
759
760 true
762 });
763
764 let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next
765 {
766 trace!(%piece_index, %peer_id, "Next piece to download from peer");
767 piece_index
768 } else {
769 trace!(%peer_id, "Peer doesn't have anything else");
770 return;
772 };
773
774 let fut = download_cached_piece_from_peer(
775 node,
776 piece_validator,
777 peer_id,
778 Vec::new(),
779 Arc::new(sample_cached_piece_indices(
782 pieces_to_download.keys(),
783 &cached_pieces,
784 ¬_cached_pieces,
785 piece_index_to_download_next,
786 )),
787 piece_index_to_download_next,
788 cached_pieces,
789 not_cached_pieces,
790 permit,
791 );
792 downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
793}
794
795fn sample_cached_piece_indices<'a, I>(
796 pieces_to_download: I,
797 cached_pieces: &HashSet<PieceIndex>,
798 not_cached_pieces: &HashSet<PieceIndex>,
799 piece_index_to_download_next: PieceIndex,
800) -> Vec<PieceIndex>
801where
802 I: Iterator<Item = &'a PieceIndex>,
803{
804 pieces_to_download
805 .filter_map(|piece_index| {
808 if piece_index == &piece_index_to_download_next
809 || cached_pieces.contains(piece_index)
810 || not_cached_pieces.contains(piece_index)
811 {
812 None
813 } else {
814 Some(*piece_index)
815 }
816 })
817 .choose_multiple(
818 &mut thread_rng(),
819 CachedPieceByIndexRequest::RECOMMENDED_LIMIT,
820 )
821}
822
823struct DownloadedPieceFromPeer<'a> {
824 peer_id: PeerId,
825 result: Option<PieceResult>,
826 cached_pieces: HashSet<PieceIndex>,
827 not_cached_pieces: HashSet<PieceIndex>,
828 permit: SemaphoreGuard<'a>,
829}
830
831#[allow(clippy::too_many_arguments)]
835async fn download_cached_piece_from_peer<'a, PV>(
836 node: &'a Node,
837 piece_validator: &'a PV,
838 peer_id: PeerId,
839 addresses: Vec<Multiaddr>,
840 check_cached_pieces: Arc<Vec<PieceIndex>>,
841 piece_index: PieceIndex,
842 mut cached_pieces: HashSet<PieceIndex>,
843 mut not_cached_pieces: HashSet<PieceIndex>,
844 permit: SemaphoreGuard<'a>,
845) -> DownloadedPieceFromPeer<'a>
846where
847 PV: PieceValidator,
848{
849 let result = match node
850 .send_generic_request(
851 peer_id,
852 addresses,
853 CachedPieceByIndexRequest {
854 piece_index,
855 cached_pieces: Arc::clone(&check_cached_pieces),
856 },
857 )
858 .await
859 {
860 Ok(response) => {
861 let CachedPieceByIndexResponse {
862 result,
863 cached_pieces,
864 } = response;
865
866 match result {
867 PieceResult::Piece(piece) => piece_validator
868 .validate_piece(peer_id, piece_index, piece)
869 .await
870 .map(|piece| CachedPieceByIndexResponse {
871 result: PieceResult::Piece(piece),
872 cached_pieces,
873 }),
874 PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse {
875 result: PieceResult::ClosestPeers(closest_peers),
876 cached_pieces,
877 }),
878 }
879 }
880 Err(error) => {
881 debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer");
882
883 None
884 }
885 };
886
887 match result {
888 Some(result) => {
889 cached_pieces.extend(result.cached_pieces);
890 not_cached_pieces.extend(
891 check_cached_pieces
892 .iter()
893 .filter(|piece_index| !cached_pieces.contains(piece_index))
894 .copied(),
895 );
896
897 DownloadedPieceFromPeer {
898 peer_id,
899 result: Some(result.result),
900 cached_pieces: { cached_pieces },
901 not_cached_pieces,
902 permit,
903 }
904 }
905 None => DownloadedPieceFromPeer {
906 peer_id,
907 result: None,
908 cached_pieces,
909 not_cached_pieces,
910 permit,
911 },
912 }
913}