1use crate::constructor::DummyRecordStore;
4use crate::protocols::request_response::handlers::cached_piece_by_index::{
5 CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult,
6};
7use crate::protocols::request_response::handlers::piece_by_index::{
8 PieceByIndexRequest, PieceByIndexResponse,
9};
10use crate::utils::multihash::ToMultihash;
11use crate::{Multihash, Node};
12use async_lock::{Semaphore, SemaphoreGuard};
13use async_trait::async_trait;
14use futures::channel::mpsc;
15use futures::future::FusedFuture;
16use futures::stream::FuturesUnordered;
17use futures::task::noop_waker_ref;
18use futures::{FutureExt, Stream, StreamExt, stream};
19use libp2p::kad::{Behaviour as Kademlia, KBucketKey, RecordKey};
20use libp2p::swarm::NetworkBehaviour;
21use libp2p::{Multiaddr, PeerId};
22use rand::prelude::*;
23use std::any::type_name;
24use std::collections::{HashMap, HashSet};
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use subspace_core_primitives::pieces::{Piece, PieceIndex};
30use tokio_stream::StreamMap;
31use tracing::{Instrument, debug, trace, warn};
32
33#[async_trait]
35pub trait PieceValidator: Sync + Send {
36 async fn validate_piece(
38 &self,
39 source_peer_id: PeerId,
40 piece_index: PieceIndex,
41 piece: Piece,
42 ) -> Option<Piece>;
43}
44
45#[derive(Debug, Clone, Copy)]
47pub struct NoPieceValidator;
48
49#[async_trait]
50impl PieceValidator for NoPieceValidator {
51 async fn validate_piece(&self, _: PeerId, _: PieceIndex, piece: Piece) -> Option<Piece> {
52 Some(piece)
53 }
54}
55
56#[derive(Clone)]
59pub struct PieceProvider<PV> {
60 node: Node,
61 piece_validator: PV,
62 piece_downloading_semaphore: Arc<Semaphore>,
63}
64
65impl<PV> fmt::Debug for PieceProvider<PV> {
66 #[inline]
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct(&format!("PieceProvider<{}>", type_name::<PV>()))
69 .finish_non_exhaustive()
70 }
71}
72
73impl<PV> PieceProvider<PV>
74where
75 PV: PieceValidator,
76{
77 pub fn new(
79 node: Node,
80 piece_validator: PV,
81 piece_downloading_semaphore: Arc<Semaphore>,
82 ) -> Self {
83 Self {
84 node,
85 piece_validator,
86 piece_downloading_semaphore,
87 }
88 }
89
90 pub async fn get_from_cache<'a, PieceIndices>(
94 &'a self,
95 piece_indices: PieceIndices,
96 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a
97 where
98 PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
99 {
100 let download_id = random::<u64>();
101 let (tx, mut rx) = mpsc::unbounded();
102 let fut = async move {
103 let not_downloaded_pieces = download_cached_pieces(
104 piece_indices,
105 &self.node,
106 &self.piece_validator,
107 &tx,
108 &self.piece_downloading_semaphore,
109 )
110 .await;
111
112 if not_downloaded_pieces.is_empty() {
113 debug!("Done");
114 return;
115 }
116
117 for piece_index in not_downloaded_pieces {
118 tx.unbounded_send((piece_index, None))
119 .expect("This future isn't polled after receiver is dropped; qed");
120 }
121
122 debug!("Done #2");
123 };
124
125 let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());
126
127 stream::poll_fn(move |cx| {
129 if !fut.is_terminated() {
130 let _ = fut.poll_unpin(cx);
132 }
133
134 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
135 return Poll::Ready(maybe_result);
136 }
137
138 Poll::Pending
140 })
141 }
142
143 pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
145 let key = RecordKey::from(piece_index.to_multihash());
146
147 let request_batch = self.node.get_requests_batch_handle().await;
148 let mut get_providers_stream = request_batch
149 .get_providers(key.clone())
150 .await
151 .inspect_err(|err| warn!(%piece_index,?key, ?err, "get_providers returned an error"))
152 .ok()?;
153
154 let key = hex::encode(&key);
155 while let Some(provider_id) = get_providers_stream.next().await {
156 trace!(%piece_index, key, %provider_id, "get_providers returned an item");
157
158 let Ok(PieceByIndexResponse {
159 piece,
160 cached_pieces: _,
161 }) = request_batch
162 .send_generic_request(
163 provider_id,
164 Vec::new(),
165 PieceByIndexRequest {
166 piece_index,
167 cached_pieces: Arc::default(),
168 },
169 )
170 .await
171 .inspect_err(
172 |error| debug!(%piece_index, key, %provider_id, ?error, "Piece request failed"),
173 )
174 else {
175 continue;
176 };
177
178 if let Some(piece) = piece {
179 trace!(%piece_index, key, %provider_id, "Piece request succeeded");
180
181 return self
182 .piece_validator
183 .validate_piece(provider_id, piece_index, piece)
184 .await;
185 } else {
186 debug!(%piece_index, key, %provider_id, "Piece request returned empty piece");
187 }
188 }
189
190 None
191 }
192
193 pub async fn get_piece_from_peer(
195 &self,
196 peer_id: PeerId,
197 piece_index: PieceIndex,
198 ) -> Option<Piece> {
199 let PieceByIndexResponse {
201 piece,
202 cached_pieces: _,
203 } = self
204 .node
205 .send_generic_request(
206 peer_id,
207 Vec::new(),
208 PieceByIndexRequest {
209 piece_index,
210 cached_pieces: Arc::default(),
211 },
212 )
213 .await
214 .inspect_err(|error| debug!(%peer_id, %piece_index, ?error, "Piece request failed"))
215 .ok()?;
216
217 if let Some(piece) = piece {
218 trace!(%peer_id, %piece_index, "Piece request succeeded");
219
220 return self
221 .piece_validator
222 .validate_piece(peer_id, piece_index, piece)
223 .await;
224 } else {
225 debug!(%peer_id, %piece_index, "Piece request returned empty piece");
226 }
227
228 None
229 }
230
231 pub async fn get_piece_from_archival_storage(
234 &self,
235 piece_index: PieceIndex,
236 max_random_walking_rounds: usize,
237 ) -> Option<Piece> {
238 trace!(%piece_index, "Getting piece from archival storage..");
240
241 let connected_servers = {
242 let connected_servers = match self.node.connected_servers().await {
243 Ok(connected_servers) => connected_servers,
244 Err(err) => {
245 debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");
246
247 Default::default()
248 }
249 };
250
251 HashSet::<PeerId>::from_iter(connected_servers)
252 };
253
254 if connected_servers.is_empty() {
255 debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
256 } else {
257 for peer_id in connected_servers.iter() {
258 let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await;
259
260 if maybe_piece.is_some() {
261 trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");
262
263 return maybe_piece;
264 }
265 }
266 }
267
268 trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
269 let random_walk_result = self
270 .get_piece_by_random_walking(piece_index, max_random_walking_rounds)
271 .await;
272
273 if random_walk_result.is_some() {
274 trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
275
276 return random_walk_result;
277 } else {
278 debug!(
279 %piece_index,
280 %max_random_walking_rounds,
281 "Cannot acquire piece from DSN L1: random walk failed"
282 );
283 }
284
285 None
286 }
287
288 async fn get_piece_by_random_walking(
290 &self,
291 piece_index: PieceIndex,
292 walking_rounds: usize,
293 ) -> Option<Piece> {
294 for round in 0..walking_rounds {
295 debug!(%piece_index, round, "Random walk round");
296
297 let result = self
298 .get_piece_by_random_walking_from_single_round(piece_index, round)
299 .await;
300
301 if result.is_some() {
302 return result;
303 }
304 }
305
306 debug!(%piece_index, "Random walking piece retrieval failed.");
307
308 None
309 }
310
311 async fn get_piece_by_random_walking_from_single_round(
313 &self,
314 piece_index: PieceIndex,
315 round: usize,
316 ) -> Option<Piece> {
317 trace!(%piece_index, "get_piece_by_random_walking round");
319
320 let key = PeerId::random();
322
323 let request_batch = self.node.get_requests_batch_handle().await;
324 let mut get_closest_peers_stream = request_batch
325 .get_closest_peers(key.into())
326 .await
327 .inspect_err(|err| warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"))
328 .ok()?;
329
330 while let Some(peer_id) = get_closest_peers_stream.next().await {
331 trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");
332
333 let Ok(PieceByIndexResponse {
334 piece,
335 cached_pieces: _,
336 }) = request_batch
337 .send_generic_request(
338 peer_id,
339 Vec::new(),
340 PieceByIndexRequest {
341 piece_index,
342 cached_pieces: Arc::default(),
343 },
344 )
345 .await
346 .inspect_err(
347 |error| debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."),
348 )
349 else {
350 continue;
351 };
352
353 if let Some(piece) = piece {
354 trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded.");
355
356 return self
357 .piece_validator
358 .validate_piece(peer_id, piece_index, piece)
359 .await;
360 } else {
361 debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
362 }
363 }
364
365 None
366 }
367}
368
369struct KademliaWrapper {
371 local_peer_id: PeerId,
372 kademlia: Kademlia<DummyRecordStore>,
373}
374
375impl KademliaWrapper {
376 fn new(local_peer_id: PeerId) -> Self {
377 Self {
378 local_peer_id,
379 kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
380 }
381 }
382
383 fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
384 for address in addresses {
385 self.kademlia.add_address(peer_id, address);
386 }
387 while self
388 .kademlia
389 .poll(&mut Context::from_waker(noop_waker_ref()))
390 .is_ready()
391 {
392 }
394 }
395
396 fn closest_peers(
398 &mut self,
399 key: &KBucketKey<Multihash>,
400 ) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + 'static {
401 let mut closest_peers = self
402 .kademlia
403 .find_closest_local_peers(key, &self.local_peer_id)
404 .map(|peer| {
405 (
406 KBucketKey::from(peer.node_id).distance(key),
407 peer.node_id,
408 peer.multiaddrs,
409 )
410 })
411 .collect::<Vec<_>>();
412
413 closest_peers.sort_unstable_by(|a, b| a.0.cmp(&b.0));
414 closest_peers
415 .into_iter()
416 .map(|(_distance, peer_id, addresses)| (peer_id, addresses))
417 }
418}
419
420async fn download_cached_pieces<PV, PieceIndices>(
423 piece_indices: PieceIndices,
424 node: &Node,
425 piece_validator: &PV,
426 results: &mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
427 semaphore: &Semaphore,
428) -> impl ExactSizeIterator<Item = PieceIndex>
429where
430 PV: PieceValidator,
431 PieceIndices: IntoIterator<Item = PieceIndex>,
432{
433 let mut pieces_to_download = piece_indices
439 .into_iter()
440 .map(|piece_index| async move {
441 let mut kademlia = KademliaWrapper::new(node.id());
442 let key = piece_index.to_multihash();
443
444 let local_closest_peers = node
445 .get_closest_local_peers(key, None)
446 .await
447 .unwrap_or_default();
448
449 for (peer_id, addresses) in local_closest_peers {
451 kademlia.add_peer(&peer_id, addresses);
452 }
453
454 (piece_index, kademlia)
455 })
456 .collect::<FuturesUnordered<_>>()
457 .collect::<HashMap<_, _>>()
458 .await;
459
460 let num_pieces = pieces_to_download.len();
461 debug!(%num_pieces, "Starting");
462
463 let mut checked_peers = HashSet::new();
464
465 let Ok(connected_servers) = node.connected_servers().await else {
466 trace!("Connected servers error");
467 return pieces_to_download.into_keys();
468 };
469
470 let num_connected_servers = connected_servers.len();
471 debug!(
472 %num_connected_servers,
473 %num_pieces,
474 "Starting downloading"
475 );
476
477 let mut downloading_stream = connected_servers
479 .into_iter()
480 .take(num_pieces)
481 .enumerate()
482 .map(|(peer_index, peer_id)| {
483 checked_peers.insert(peer_id);
484
485 let step = num_pieces / num_connected_servers.min(num_pieces);
487
488 let mut check_cached_pieces = pieces_to_download
491 .keys()
492 .cycle()
493 .skip(step * peer_index)
494 .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1))
496 .copied()
497 .collect::<Vec<_>>();
498 let piece_index = check_cached_pieces.swap_remove(0);
500
501 trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer");
502
503 let permit = semaphore.try_acquire();
504
505 let fut = async move {
506 let permit = match permit {
507 Some(permit) => permit,
508 None => semaphore.acquire().await,
509 };
510
511 download_cached_piece_from_peer(
512 node,
513 piece_validator,
514 peer_id,
515 Vec::new(),
516 Arc::new(check_cached_pieces),
517 piece_index,
518 HashSet::new(),
519 HashSet::new(),
520 permit,
521 )
522 .await
523 };
524
525 (piece_index, Box::pin(fut.into_stream()) as _)
526 })
527 .collect::<StreamMap<_, _>>();
528
529 loop {
530 let mut additional_pieces_to_download =
532 (num_pieces / 2).saturating_sub(downloading_stream.len());
533 if additional_pieces_to_download > 0 {
534 trace!(
535 %additional_pieces_to_download,
536 num_pieces,
537 currently_downloading = %downloading_stream.len(),
538 "Downloading additional pieces from closest peers"
539 );
540 'outer: for peer_id in node
542 .connected_servers()
543 .await
544 .unwrap_or_default()
545 .into_iter()
546 .filter(|peer_id| checked_peers.insert(*peer_id))
547 .take(additional_pieces_to_download)
548 {
549 let permit = if downloading_stream.is_empty() {
550 semaphore.acquire().await
551 } else if let Some(permit) = semaphore.try_acquire() {
552 permit
553 } else {
554 break;
555 };
556
557 for &piece_index in pieces_to_download.keys() {
558 if downloading_stream.contains_key(&piece_index) {
559 continue;
560 }
561
562 trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer");
563
564 let check_cached_pieces = sample_cached_piece_indices(
565 pieces_to_download.keys(),
566 &HashSet::new(),
567 &HashSet::new(),
568 piece_index,
569 );
570 let fut = download_cached_piece_from_peer(
571 node,
572 piece_validator,
573 peer_id,
574 Vec::new(),
575 Arc::new(check_cached_pieces),
576 piece_index,
577 HashSet::new(),
578 HashSet::new(),
579 permit,
580 );
581
582 downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
583 additional_pieces_to_download -= 1;
584
585 continue 'outer;
586 }
587
588 break;
589 }
590
591 let pieces_indices_to_download = pieces_to_download.keys().copied().collect::<Vec<_>>();
595 for piece_index in pieces_indices_to_download {
596 if additional_pieces_to_download == 0 {
597 break;
598 }
599 if downloading_stream.contains_key(&piece_index) {
600 continue;
601 }
602 let permit = if downloading_stream.is_empty() {
603 semaphore.acquire().await
604 } else if let Some(permit) = semaphore.try_acquire() {
605 permit
606 } else {
607 break;
608 };
609
610 let kbucket_key = KBucketKey::from(piece_index.to_multihash());
611 let closest_peers_to_check = pieces_to_download
612 .get_mut(&piece_index)
613 .expect("Entries are not removed here; qed")
614 .closest_peers(&kbucket_key);
615 for (peer_id, addresses) in closest_peers_to_check {
616 if !checked_peers.insert(peer_id) {
617 continue;
618 }
619
620 trace!(%peer_id, %piece_index, "Downloading piece from closest peer");
621
622 let check_cached_pieces = sample_cached_piece_indices(
623 pieces_to_download.keys(),
624 &HashSet::new(),
625 &HashSet::new(),
626 piece_index,
627 );
628 let fut = download_cached_piece_from_peer(
629 node,
630 piece_validator,
631 peer_id,
632 addresses,
633 Arc::new(check_cached_pieces),
634 piece_index,
635 HashSet::new(),
636 HashSet::new(),
637 permit,
638 );
639
640 downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
641 additional_pieces_to_download -= 1;
642 break;
643 }
644 }
645
646 trace!(
647 pieces_left = %additional_pieces_to_download,
648 "Initiated downloading additional pieces from closest peers"
649 );
650 }
651
652 let Some((piece_index, result)) = downloading_stream.next().await else {
653 if !pieces_to_download.is_empty() {
654 debug!(
655 %num_pieces,
656 to_download = %pieces_to_download.len(),
657 "Finished downloading early"
658 );
659 break;
661 }
662 break;
663 };
664 process_downloading_result(
665 piece_index,
666 result,
667 &mut pieces_to_download,
668 &mut downloading_stream,
669 node,
670 piece_validator,
671 results,
672 );
673
674 if pieces_to_download.is_empty() {
675 break;
676 }
677 }
678
679 pieces_to_download.into_keys()
680}
681
682fn process_downloading_result<'a, 'b, PV>(
683 piece_index: PieceIndex,
684 result: DownloadedPieceFromPeer<'a>,
685 pieces_to_download: &'b mut HashMap<PieceIndex, KademliaWrapper>,
686 downloading_stream: &'b mut StreamMap<
687 PieceIndex,
688 Pin<Box<dyn Stream<Item = DownloadedPieceFromPeer<'a>> + Send + 'a>>,
689 >,
690 node: &'a Node,
691 piece_validator: &'a PV,
692 results: &'a mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
693) where
694 PV: PieceValidator,
695{
696 let DownloadedPieceFromPeer {
697 peer_id,
698 result,
699 mut cached_pieces,
700 not_cached_pieces,
701 permit,
702 } = result;
703 trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response");
704
705 let Some(result) = result else {
706 return;
708 };
709
710 match result {
711 PieceResult::Piece(piece) => {
712 trace!(%piece_index, %peer_id, "Got piece");
713
714 pieces_to_download.remove(&piece_index);
716
717 results
718 .unbounded_send((piece_index, Some(piece)))
719 .expect("This future isn't polled after receiver is dropped; qed");
720
721 if pieces_to_download.is_empty() {
722 return;
723 }
724
725 cached_pieces.remove(&piece_index);
726 }
727 PieceResult::ClosestPeers(closest_peers) => {
728 trace!(%piece_index, %peer_id, "Got closest peers");
729
730 if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) {
732 for (peer_id, addresses) in Vec::from(closest_peers) {
733 kademlia.add_peer(&peer_id, addresses);
734 }
735 }
736
737 if cached_pieces.remove(&piece_index) {
739 return;
740 }
741 }
742 }
743
744 let mut maybe_piece_index_to_download_next = None;
745 cached_pieces.retain(|piece_index| {
747 if !pieces_to_download.contains_key(piece_index) {
749 return false;
750 }
751
752 if maybe_piece_index_to_download_next.is_none()
754 && !downloading_stream.contains_key(piece_index)
755 {
756 maybe_piece_index_to_download_next.replace(*piece_index);
757 return true;
759 }
760
761 true
763 });
764
765 let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next
766 {
767 trace!(%piece_index, %peer_id, "Next piece to download from peer");
768 piece_index
769 } else {
770 trace!(%peer_id, "Peer doesn't have anything else");
771 return;
773 };
774
775 let fut = download_cached_piece_from_peer(
776 node,
777 piece_validator,
778 peer_id,
779 Vec::new(),
780 Arc::new(sample_cached_piece_indices(
783 pieces_to_download.keys(),
784 &cached_pieces,
785 ¬_cached_pieces,
786 piece_index_to_download_next,
787 )),
788 piece_index_to_download_next,
789 cached_pieces,
790 not_cached_pieces,
791 permit,
792 );
793 downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
794}
795
796fn sample_cached_piece_indices<'a, I>(
797 pieces_to_download: I,
798 cached_pieces: &HashSet<PieceIndex>,
799 not_cached_pieces: &HashSet<PieceIndex>,
800 piece_index_to_download_next: PieceIndex,
801) -> Vec<PieceIndex>
802where
803 I: Iterator<Item = &'a PieceIndex>,
804{
805 pieces_to_download
806 .filter_map(|piece_index| {
809 if piece_index == &piece_index_to_download_next
810 || cached_pieces.contains(piece_index)
811 || not_cached_pieces.contains(piece_index)
812 {
813 None
814 } else {
815 Some(*piece_index)
816 }
817 })
818 .choose_multiple(
819 &mut thread_rng(),
820 CachedPieceByIndexRequest::RECOMMENDED_LIMIT,
821 )
822}
823
824struct DownloadedPieceFromPeer<'a> {
825 peer_id: PeerId,
826 result: Option<PieceResult>,
827 cached_pieces: HashSet<PieceIndex>,
828 not_cached_pieces: HashSet<PieceIndex>,
829 permit: SemaphoreGuard<'a>,
830}
831
832#[allow(clippy::too_many_arguments)]
836async fn download_cached_piece_from_peer<'a, PV>(
837 node: &'a Node,
838 piece_validator: &'a PV,
839 peer_id: PeerId,
840 addresses: Vec<Multiaddr>,
841 check_cached_pieces: Arc<Vec<PieceIndex>>,
842 piece_index: PieceIndex,
843 mut cached_pieces: HashSet<PieceIndex>,
844 mut not_cached_pieces: HashSet<PieceIndex>,
845 permit: SemaphoreGuard<'a>,
846) -> DownloadedPieceFromPeer<'a>
847where
848 PV: PieceValidator,
849{
850 let result = match node
851 .send_generic_request(
852 peer_id,
853 addresses,
854 CachedPieceByIndexRequest {
855 piece_index,
856 cached_pieces: Arc::clone(&check_cached_pieces),
857 },
858 )
859 .await
860 {
861 Ok(response) => {
862 let CachedPieceByIndexResponse {
863 result,
864 cached_pieces,
865 } = response;
866
867 match result {
868 PieceResult::Piece(piece) => piece_validator
869 .validate_piece(peer_id, piece_index, piece)
870 .await
871 .map(|piece| CachedPieceByIndexResponse {
872 result: PieceResult::Piece(piece),
873 cached_pieces,
874 }),
875 PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse {
876 result: PieceResult::ClosestPeers(closest_peers),
877 cached_pieces,
878 }),
879 }
880 }
881 Err(error) => {
882 debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer");
883
884 None
885 }
886 };
887
888 match result {
889 Some(result) => {
890 cached_pieces.extend(result.cached_pieces);
891 not_cached_pieces.extend(
892 check_cached_pieces
893 .iter()
894 .filter(|piece_index| !cached_pieces.contains(piece_index))
895 .copied(),
896 );
897
898 DownloadedPieceFromPeer {
899 peer_id,
900 result: Some(result.result),
901 cached_pieces: { cached_pieces },
902 not_cached_pieces,
903 permit,
904 }
905 }
906 None => DownloadedPieceFromPeer {
907 peer_id,
908 result: None,
909 cached_pieces,
910 not_cached_pieces,
911 permit,
912 },
913 }
914}