subspace_networking/utils/
piece_provider.rs

1//! Provides methods to retrieve pieces from DSN.
2
3use crate::constructor::DummyRecordStore;
4use crate::protocols::request_response::handlers::cached_piece_by_index::{
5    CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult,
6};
7use crate::protocols::request_response::handlers::piece_by_index::{
8    PieceByIndexRequest, PieceByIndexResponse,
9};
10use crate::utils::multihash::ToMultihash;
11use crate::{Multihash, Node};
12use async_lock::{Semaphore, SemaphoreGuard};
13use async_trait::async_trait;
14use futures::channel::mpsc;
15use futures::future::FusedFuture;
16use futures::stream::FuturesUnordered;
17use futures::task::noop_waker_ref;
18use futures::{FutureExt, Stream, StreamExt, stream};
19use libp2p::kad::{Behaviour as Kademlia, KBucketKey, RecordKey};
20use libp2p::swarm::NetworkBehaviour;
21use libp2p::{Multiaddr, PeerId};
22use rand::prelude::*;
23use std::any::type_name;
24use std::collections::{HashMap, HashSet};
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use subspace_core_primitives::pieces::{Piece, PieceIndex};
30use tokio_stream::StreamMap;
31use tracing::{Instrument, debug, trace, warn};
32
33/// Validates piece against using its commitment.
34#[async_trait]
35pub trait PieceValidator: Sync + Send {
36    /// Validates piece against using its commitment.
37    async fn validate_piece(
38        &self,
39        source_peer_id: PeerId,
40        piece_index: PieceIndex,
41        piece: Piece,
42    ) -> Option<Piece>;
43}
44
45/// Stub implementation for piece validation.
46#[derive(Debug, Clone, Copy)]
47pub struct NoPieceValidator;
48
49#[async_trait]
50impl PieceValidator for NoPieceValidator {
51    async fn validate_piece(&self, _: PeerId, _: PieceIndex, piece: Piece) -> Option<Piece> {
52        Some(piece)
53    }
54}
55
56/// Piece provider with cancellation and piece validator.
57/// Use `NoPieceValidator` to disable validation.
58#[derive(Clone)]
59pub struct PieceProvider<PV> {
60    node: Node,
61    piece_validator: PV,
62    piece_downloading_semaphore: Arc<Semaphore>,
63}
64
65impl<PV> fmt::Debug for PieceProvider<PV> {
66    #[inline]
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct(&format!("PieceProvider<{}>", type_name::<PV>()))
69            .finish_non_exhaustive()
70    }
71}
72
73impl<PV> PieceProvider<PV>
74where
75    PV: PieceValidator,
76{
77    /// Creates new piece provider.
78    pub fn new(
79        node: Node,
80        piece_validator: PV,
81        piece_downloading_semaphore: Arc<Semaphore>,
82    ) -> Self {
83        Self {
84            node,
85            piece_validator,
86            piece_downloading_semaphore,
87        }
88    }
89
90    /// Get pieces with provided indices from cache.
91    ///
92    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
93    pub async fn get_from_cache<'a, PieceIndices>(
94        &'a self,
95        piece_indices: PieceIndices,
96    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a
97    where
98        PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
99    {
100        let download_id = random::<u64>();
101        let (tx, mut rx) = mpsc::unbounded();
102        let fut = async move {
103            let not_downloaded_pieces = download_cached_pieces(
104                piece_indices,
105                &self.node,
106                &self.piece_validator,
107                &tx,
108                &self.piece_downloading_semaphore,
109            )
110            .await;
111
112            if not_downloaded_pieces.is_empty() {
113                debug!("Done");
114                return;
115            }
116
117            for piece_index in not_downloaded_pieces {
118                tx.unbounded_send((piece_index, None))
119                    .expect("This future isn't polled after receiver is dropped; qed");
120            }
121
122            debug!("Done #2");
123        };
124
125        let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());
126
127        // Drive above future and stream back any pieces that were downloaded so far
128        stream::poll_fn(move |cx| {
129            if !fut.is_terminated() {
130                // Result doesn't matter, we'll need to poll stream below anyway
131                let _ = fut.poll_unpin(cx);
132            }
133
134            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
135                return Poll::Ready(maybe_result);
136            }
137
138            // Exit will be done by the stream above
139            Poll::Pending
140        })
141    }
142
143    /// Returns piece by its index from farmer's piece cache (L2)
144    pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
145        let key = RecordKey::from(piece_index.to_multihash());
146
147        let request_batch = self.node.get_requests_batch_handle().await;
148        let mut get_providers_stream = request_batch
149            .get_providers(key.clone())
150            .await
151            .inspect_err(|err| warn!(%piece_index,?key, ?err, "get_providers returned an error"))
152            .ok()?;
153
154        let key = hex::encode(&key);
155        while let Some(provider_id) = get_providers_stream.next().await {
156            trace!(%piece_index, key, %provider_id, "get_providers returned an item");
157
158            let Ok(PieceByIndexResponse {
159                piece,
160                cached_pieces: _,
161            }) = request_batch
162                .send_generic_request(
163                    provider_id,
164                    Vec::new(),
165                    PieceByIndexRequest {
166                        piece_index,
167                        cached_pieces: Arc::default(),
168                    },
169                )
170                .await
171                .inspect_err(
172                    |error| debug!(%piece_index, key, %provider_id, ?error, "Piece request failed"),
173                )
174            else {
175                continue;
176            };
177
178            if let Some(piece) = piece {
179                trace!(%piece_index, key, %provider_id, "Piece request succeeded");
180
181                return self
182                    .piece_validator
183                    .validate_piece(provider_id, piece_index, piece)
184                    .await;
185            } else {
186                debug!(%piece_index, key, %provider_id, "Piece request returned empty piece");
187            }
188        }
189
190        None
191    }
192
193    /// Get piece from a particular peer.
194    pub async fn get_piece_from_peer(
195        &self,
196        peer_id: PeerId,
197        piece_index: PieceIndex,
198    ) -> Option<Piece> {
199        // TODO: Take advantage of `cached_pieces`
200        let PieceByIndexResponse {
201            piece,
202            cached_pieces: _,
203        } = self
204            .node
205            .send_generic_request(
206                peer_id,
207                Vec::new(),
208                PieceByIndexRequest {
209                    piece_index,
210                    cached_pieces: Arc::default(),
211                },
212            )
213            .await
214            .inspect_err(|error| debug!(%peer_id, %piece_index, ?error, "Piece request failed"))
215            .ok()?;
216
217        if let Some(piece) = piece {
218            trace!(%peer_id, %piece_index, "Piece request succeeded");
219
220            return self
221                .piece_validator
222                .validate_piece(peer_id, piece_index, piece)
223                .await;
224        } else {
225            debug!(%peer_id, %piece_index, "Piece request returned empty piece");
226        }
227
228        None
229    }
230
231    /// Get piece from archival storage (L1). The algorithm tries to get a piece from currently
232    /// connected peers and falls back to random walking.
233    pub async fn get_piece_from_archival_storage(
234        &self,
235        piece_index: PieceIndex,
236        max_random_walking_rounds: usize,
237    ) -> Option<Piece> {
238        // TODO: consider using retry policy for L1 lookups as well.
239        trace!(%piece_index, "Getting piece from archival storage..");
240
241        let connected_servers = {
242            let connected_servers = match self.node.connected_servers().await {
243                Ok(connected_servers) => connected_servers,
244                Err(err) => {
245                    debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");
246
247                    Default::default()
248                }
249            };
250
251            HashSet::<PeerId>::from_iter(connected_servers)
252        };
253
254        if connected_servers.is_empty() {
255            debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
256        } else {
257            for peer_id in connected_servers.iter() {
258                let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await;
259
260                if maybe_piece.is_some() {
261                    trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");
262
263                    return maybe_piece;
264                }
265            }
266        }
267
268        trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
269        let random_walk_result = self
270            .get_piece_by_random_walking(piece_index, max_random_walking_rounds)
271            .await;
272
273        if random_walk_result.is_some() {
274            trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
275
276            return random_walk_result;
277        } else {
278            debug!(
279                %piece_index,
280                %max_random_walking_rounds,
281                "Cannot acquire piece from DSN L1: random walk failed"
282            );
283        }
284
285        None
286    }
287
288    /// Get piece from L1 by random walking
289    async fn get_piece_by_random_walking(
290        &self,
291        piece_index: PieceIndex,
292        walking_rounds: usize,
293    ) -> Option<Piece> {
294        for round in 0..walking_rounds {
295            debug!(%piece_index, round, "Random walk round");
296
297            let result = self
298                .get_piece_by_random_walking_from_single_round(piece_index, round)
299                .await;
300
301            if result.is_some() {
302                return result;
303            }
304        }
305
306        debug!(%piece_index, "Random walking piece retrieval failed.");
307
308        None
309    }
310
311    /// Get piece from L1 by random walking (single round)
312    async fn get_piece_by_random_walking_from_single_round(
313        &self,
314        piece_index: PieceIndex,
315        round: usize,
316    ) -> Option<Piece> {
317        // TODO: Take advantage of `cached_pieces`
318        trace!(%piece_index, "get_piece_by_random_walking round");
319
320        // Random walk key
321        let key = PeerId::random();
322
323        let request_batch = self.node.get_requests_batch_handle().await;
324        let mut get_closest_peers_stream = request_batch
325            .get_closest_peers(key.into())
326            .await
327            .inspect_err(|err| warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"))
328            .ok()?;
329
330        while let Some(peer_id) = get_closest_peers_stream.next().await {
331            trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");
332
333            let Ok(PieceByIndexResponse {
334                piece,
335                cached_pieces: _,
336            }) = request_batch
337                .send_generic_request(
338                    peer_id,
339                    Vec::new(),
340                    PieceByIndexRequest {
341                        piece_index,
342                        cached_pieces: Arc::default(),
343                    },
344                )
345                .await
346                .inspect_err(
347                    |error| debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."),
348                )
349            else {
350                continue;
351            };
352
353            if let Some(piece) = piece {
354                trace!(%peer_id, %piece_index, ?key, %round,  "Piece request succeeded.");
355
356                return self
357                    .piece_validator
358                    .validate_piece(peer_id, piece_index, piece)
359                    .await;
360            } else {
361                debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
362            }
363        }
364
365        None
366    }
367}
368
369/// Kademlia wrapper to take advantage of its internal logic of selecting closest peers
370struct KademliaWrapper {
371    local_peer_id: PeerId,
372    kademlia: Kademlia<DummyRecordStore>,
373}
374
375impl KademliaWrapper {
376    fn new(local_peer_id: PeerId) -> Self {
377        Self {
378            local_peer_id,
379            kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
380        }
381    }
382
383    fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
384        for address in addresses {
385            self.kademlia.add_address(peer_id, address);
386        }
387        while self
388            .kademlia
389            .poll(&mut Context::from_waker(noop_waker_ref()))
390            .is_ready()
391        {
392            // Simply drain useless events generated by above calls
393        }
394    }
395
396    /// Returned peers are already sorted in ascending distance order
397    fn closest_peers(
398        &mut self,
399        key: &KBucketKey<Multihash>,
400    ) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + 'static {
401        let mut closest_peers = self
402            .kademlia
403            .find_closest_local_peers(key, &self.local_peer_id)
404            .map(|peer| {
405                (
406                    KBucketKey::from(peer.node_id).distance(key),
407                    peer.node_id,
408                    peer.multiaddrs,
409                )
410            })
411            .collect::<Vec<_>>();
412
413        closest_peers.sort_unstable_by(|a, b| a.0.cmp(&b.0));
414        closest_peers
415            .into_iter()
416            .map(|(_distance, peer_id, addresses)| (peer_id, addresses))
417    }
418}
419
420/// Takes pieces to download as an input, sends results with pieces that were downloaded
421/// successfully and returns those that were not downloaded
422async fn download_cached_pieces<PV, PieceIndices>(
423    piece_indices: PieceIndices,
424    node: &Node,
425    piece_validator: &PV,
426    results: &mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
427    semaphore: &Semaphore,
428) -> impl ExactSizeIterator<Item = PieceIndex>
429where
430    PV: PieceValidator,
431    PieceIndices: IntoIterator<Item = PieceIndex>,
432{
433    // Make sure every piece index has an entry since this will be the primary container for
434    // tracking pieces to download going forward.
435    //
436    // At the end pieces that were not downloaded will remain with a collection of known closest
437    // peers for them.
438    let mut pieces_to_download = piece_indices
439        .into_iter()
440        .map(|piece_index| async move {
441            let mut kademlia = KademliaWrapper::new(node.id());
442            let key = piece_index.to_multihash();
443
444            let local_closest_peers = node
445                .get_closest_local_peers(key, None)
446                .await
447                .unwrap_or_default();
448
449            // Seed with local closest peers
450            for (peer_id, addresses) in local_closest_peers {
451                kademlia.add_peer(&peer_id, addresses);
452            }
453
454            (piece_index, kademlia)
455        })
456        .collect::<FuturesUnordered<_>>()
457        .collect::<HashMap<_, _>>()
458        .await;
459
460    let num_pieces = pieces_to_download.len();
461    debug!(%num_pieces, "Starting");
462
463    let mut checked_peers = HashSet::new();
464
465    let Ok(connected_servers) = node.connected_servers().await else {
466        trace!("Connected servers error");
467        return pieces_to_download.into_keys();
468    };
469
470    let num_connected_servers = connected_servers.len();
471    debug!(
472        %num_connected_servers,
473        %num_pieces,
474        "Starting downloading"
475    );
476
477    // Dispatch initial set of requests to peers with checked pieces distributed uniformly
478    let mut downloading_stream = connected_servers
479        .into_iter()
480        .take(num_pieces)
481        .enumerate()
482        .map(|(peer_index, peer_id)| {
483            checked_peers.insert(peer_id);
484
485            // Inside to avoid division by zero in case there are no connected servers or pieces
486            let step = num_pieces / num_connected_servers.min(num_pieces);
487
488            // Take unique first piece index for each connected peer and the rest just to check
489            // cached pieces up to recommended limit
490            let mut check_cached_pieces = pieces_to_download
491                .keys()
492                .cycle()
493                .skip(step * peer_index)
494                // + 1 because one index below is removed below
495                .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1))
496                .copied()
497                .collect::<Vec<_>>();
498            // Pick first piece index as the piece we want to download
499            let piece_index = check_cached_pieces.swap_remove(0);
500
501            trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer");
502
503            let permit = semaphore.try_acquire();
504
505            let fut = async move {
506                let permit = match permit {
507                    Some(permit) => permit,
508                    None => semaphore.acquire().await,
509                };
510
511                download_cached_piece_from_peer(
512                    node,
513                    piece_validator,
514                    peer_id,
515                    Vec::new(),
516                    Arc::new(check_cached_pieces),
517                    piece_index,
518                    HashSet::new(),
519                    HashSet::new(),
520                    permit,
521                )
522                .await
523            };
524
525            (piece_index, Box::pin(fut.into_stream()) as _)
526        })
527        .collect::<StreamMap<_, _>>();
528
529    loop {
530        // Process up to 50% of the pieces concurrently
531        let mut additional_pieces_to_download =
532            (num_pieces / 2).saturating_sub(downloading_stream.len());
533        if additional_pieces_to_download > 0 {
534            trace!(
535                %additional_pieces_to_download,
536                num_pieces,
537                currently_downloading = %downloading_stream.len(),
538                "Downloading additional pieces from closest peers"
539            );
540            // Pick up any newly connected peers (if any)
541            'outer: for peer_id in node
542                .connected_servers()
543                .await
544                .unwrap_or_default()
545                .into_iter()
546                .filter(|peer_id| checked_peers.insert(*peer_id))
547                .take(additional_pieces_to_download)
548            {
549                let permit = if downloading_stream.is_empty() {
550                    semaphore.acquire().await
551                } else if let Some(permit) = semaphore.try_acquire() {
552                    permit
553                } else {
554                    break;
555                };
556
557                for &piece_index in pieces_to_download.keys() {
558                    if downloading_stream.contains_key(&piece_index) {
559                        continue;
560                    }
561
562                    trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer");
563
564                    let check_cached_pieces = sample_cached_piece_indices(
565                        pieces_to_download.keys(),
566                        &HashSet::new(),
567                        &HashSet::new(),
568                        piece_index,
569                    );
570                    let fut = download_cached_piece_from_peer(
571                        node,
572                        piece_validator,
573                        peer_id,
574                        Vec::new(),
575                        Arc::new(check_cached_pieces),
576                        piece_index,
577                        HashSet::new(),
578                        HashSet::new(),
579                        permit,
580                    );
581
582                    downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
583                    additional_pieces_to_download -= 1;
584
585                    continue 'outer;
586                }
587
588                break;
589            }
590
591            // Pick up more pieces to download from the closest peers
592            // Ideally we'd not allocate here, but it is hard to explain to the compiler that
593            // entries are not removed otherwise
594            let pieces_indices_to_download = pieces_to_download.keys().copied().collect::<Vec<_>>();
595            for piece_index in pieces_indices_to_download {
596                if additional_pieces_to_download == 0 {
597                    break;
598                }
599                if downloading_stream.contains_key(&piece_index) {
600                    continue;
601                }
602                let permit = if downloading_stream.is_empty() {
603                    semaphore.acquire().await
604                } else if let Some(permit) = semaphore.try_acquire() {
605                    permit
606                } else {
607                    break;
608                };
609
610                let kbucket_key = KBucketKey::from(piece_index.to_multihash());
611                let closest_peers_to_check = pieces_to_download
612                    .get_mut(&piece_index)
613                    .expect("Entries are not removed here; qed")
614                    .closest_peers(&kbucket_key);
615                for (peer_id, addresses) in closest_peers_to_check {
616                    if !checked_peers.insert(peer_id) {
617                        continue;
618                    }
619
620                    trace!(%peer_id, %piece_index, "Downloading piece from closest peer");
621
622                    let check_cached_pieces = sample_cached_piece_indices(
623                        pieces_to_download.keys(),
624                        &HashSet::new(),
625                        &HashSet::new(),
626                        piece_index,
627                    );
628                    let fut = download_cached_piece_from_peer(
629                        node,
630                        piece_validator,
631                        peer_id,
632                        addresses,
633                        Arc::new(check_cached_pieces),
634                        piece_index,
635                        HashSet::new(),
636                        HashSet::new(),
637                        permit,
638                    );
639
640                    downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
641                    additional_pieces_to_download -= 1;
642                    break;
643                }
644            }
645
646            trace!(
647                pieces_left = %additional_pieces_to_download,
648                "Initiated downloading additional pieces from closest peers"
649            );
650        }
651
652        let Some((piece_index, result)) = downloading_stream.next().await else {
653            if !pieces_to_download.is_empty() {
654                debug!(
655                    %num_pieces,
656                    to_download = %pieces_to_download.len(),
657                    "Finished downloading early"
658                );
659                // Nothing was downloaded, we're done here
660                break;
661            }
662            break;
663        };
664        process_downloading_result(
665            piece_index,
666            result,
667            &mut pieces_to_download,
668            &mut downloading_stream,
669            node,
670            piece_validator,
671            results,
672        );
673
674        if pieces_to_download.is_empty() {
675            break;
676        }
677    }
678
679    pieces_to_download.into_keys()
680}
681
682fn process_downloading_result<'a, 'b, PV>(
683    piece_index: PieceIndex,
684    result: DownloadedPieceFromPeer<'a>,
685    pieces_to_download: &'b mut HashMap<PieceIndex, KademliaWrapper>,
686    downloading_stream: &'b mut StreamMap<
687        PieceIndex,
688        Pin<Box<dyn Stream<Item = DownloadedPieceFromPeer<'a>> + Send + 'a>>,
689    >,
690    node: &'a Node,
691    piece_validator: &'a PV,
692    results: &'a mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
693) where
694    PV: PieceValidator,
695{
696    let DownloadedPieceFromPeer {
697        peer_id,
698        result,
699        mut cached_pieces,
700        not_cached_pieces,
701        permit,
702    } = result;
703    trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response");
704
705    let Some(result) = result else {
706        // Downloading failed, ignore peer
707        return;
708    };
709
710    match result {
711        PieceResult::Piece(piece) => {
712            trace!(%piece_index, %peer_id, "Got piece");
713
714            // Downloaded successfully
715            pieces_to_download.remove(&piece_index);
716
717            results
718                .unbounded_send((piece_index, Some(piece)))
719                .expect("This future isn't polled after receiver is dropped; qed");
720
721            if pieces_to_download.is_empty() {
722                return;
723            }
724
725            cached_pieces.remove(&piece_index);
726        }
727        PieceResult::ClosestPeers(closest_peers) => {
728            trace!(%piece_index, %peer_id, "Got closest peers");
729
730            // Store closer peers in case piece index was not downloaded yet
731            if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) {
732                for (peer_id, addresses) in Vec::from(closest_peers) {
733                    kademlia.add_peer(&peer_id, addresses);
734                }
735            }
736
737            // No need to ask this peer again if they claimed to have this piece index earlier
738            if cached_pieces.remove(&piece_index) {
739                return;
740            }
741        }
742    }
743
744    let mut maybe_piece_index_to_download_next = None;
745    // Clear useless entries in cached pieces and find something to download next
746    cached_pieces.retain(|piece_index| {
747        // Clear downloaded pieces
748        if !pieces_to_download.contains_key(piece_index) {
749            return false;
750        }
751
752        // Try to pick a piece to download that is not being downloaded already
753        if maybe_piece_index_to_download_next.is_none()
754            && !downloading_stream.contains_key(piece_index)
755        {
756            maybe_piece_index_to_download_next.replace(*piece_index);
757            // We'll check it later when receiving response
758            return true;
759        }
760
761        // Retain everything else
762        true
763    });
764
765    let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next
766    {
767        trace!(%piece_index, %peer_id, "Next piece to download from peer");
768        piece_index
769    } else {
770        trace!(%peer_id, "Peer doesn't have anything else");
771        // Nothing left to do with this peer
772        return;
773    };
774
775    let fut = download_cached_piece_from_peer(
776        node,
777        piece_validator,
778        peer_id,
779        Vec::new(),
780        // Sample more random cached piece indices for connected peer, algorithm can be
781        // improved, but has to be something simple and this should do it for now
782        Arc::new(sample_cached_piece_indices(
783            pieces_to_download.keys(),
784            &cached_pieces,
785            &not_cached_pieces,
786            piece_index_to_download_next,
787        )),
788        piece_index_to_download_next,
789        cached_pieces,
790        not_cached_pieces,
791        permit,
792    );
793    downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
794}
795
796fn sample_cached_piece_indices<'a, I>(
797    pieces_to_download: I,
798    cached_pieces: &HashSet<PieceIndex>,
799    not_cached_pieces: &HashSet<PieceIndex>,
800    piece_index_to_download_next: PieceIndex,
801) -> Vec<PieceIndex>
802where
803    I: Iterator<Item = &'a PieceIndex>,
804{
805    pieces_to_download
806        // Do a bit of work to filter-out piece indices we already know remote peer
807        // has or doesn't to decrease burden on them
808        .filter_map(|piece_index| {
809            if piece_index == &piece_index_to_download_next
810                || cached_pieces.contains(piece_index)
811                || not_cached_pieces.contains(piece_index)
812            {
813                None
814            } else {
815                Some(*piece_index)
816            }
817        })
818        .choose_multiple(
819            &mut thread_rng(),
820            CachedPieceByIndexRequest::RECOMMENDED_LIMIT,
821        )
822}
823
824struct DownloadedPieceFromPeer<'a> {
825    peer_id: PeerId,
826    result: Option<PieceResult>,
827    cached_pieces: HashSet<PieceIndex>,
828    not_cached_pieces: HashSet<PieceIndex>,
829    permit: SemaphoreGuard<'a>,
830}
831
832/// `check_cached_pieces` contains a list of pieces for peer to filter-out according to locally
833/// caches pieces, `cached_pieces` and `not_cached_pieces` contain piece indices peer claims is
834/// known to have or not have already
835#[allow(clippy::too_many_arguments)]
836async fn download_cached_piece_from_peer<'a, PV>(
837    node: &'a Node,
838    piece_validator: &'a PV,
839    peer_id: PeerId,
840    addresses: Vec<Multiaddr>,
841    check_cached_pieces: Arc<Vec<PieceIndex>>,
842    piece_index: PieceIndex,
843    mut cached_pieces: HashSet<PieceIndex>,
844    mut not_cached_pieces: HashSet<PieceIndex>,
845    permit: SemaphoreGuard<'a>,
846) -> DownloadedPieceFromPeer<'a>
847where
848    PV: PieceValidator,
849{
850    let result = match node
851        .send_generic_request(
852            peer_id,
853            addresses,
854            CachedPieceByIndexRequest {
855                piece_index,
856                cached_pieces: Arc::clone(&check_cached_pieces),
857            },
858        )
859        .await
860    {
861        Ok(response) => {
862            let CachedPieceByIndexResponse {
863                result,
864                cached_pieces,
865            } = response;
866
867            match result {
868                PieceResult::Piece(piece) => piece_validator
869                    .validate_piece(peer_id, piece_index, piece)
870                    .await
871                    .map(|piece| CachedPieceByIndexResponse {
872                        result: PieceResult::Piece(piece),
873                        cached_pieces,
874                    }),
875                PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse {
876                    result: PieceResult::ClosestPeers(closest_peers),
877                    cached_pieces,
878                }),
879            }
880        }
881        Err(error) => {
882            debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer");
883
884            None
885        }
886    };
887
888    match result {
889        Some(result) => {
890            cached_pieces.extend(result.cached_pieces);
891            not_cached_pieces.extend(
892                check_cached_pieces
893                    .iter()
894                    .filter(|piece_index| !cached_pieces.contains(piece_index))
895                    .copied(),
896            );
897
898            DownloadedPieceFromPeer {
899                peer_id,
900                result: Some(result.result),
901                cached_pieces: { cached_pieces },
902                not_cached_pieces,
903                permit,
904            }
905        }
906        None => DownloadedPieceFromPeer {
907            peer_id,
908            result: None,
909            cached_pieces,
910            not_cached_pieces,
911            permit,
912        },
913    }
914}