subspace_networking/utils/
piece_provider.rs

1//! Provides methods to retrieve pieces from DSN.
2
3use crate::constructor::DummyRecordStore;
4use crate::protocols::request_response::handlers::cached_piece_by_index::{
5    CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult,
6};
7use crate::protocols::request_response::handlers::piece_by_index::{
8    PieceByIndexRequest, PieceByIndexResponse,
9};
10use crate::utils::multihash::ToMultihash;
11use crate::{Multihash, Node};
12use async_lock::{Semaphore, SemaphoreGuard};
13use async_trait::async_trait;
14use futures::channel::mpsc;
15use futures::future::FusedFuture;
16use futures::stream::FuturesUnordered;
17use futures::task::noop_waker_ref;
18use futures::{stream, FutureExt, Stream, StreamExt};
19use libp2p::kad::{Behaviour as Kademlia, KBucketKey, RecordKey};
20use libp2p::swarm::NetworkBehaviour;
21use libp2p::{Multiaddr, PeerId};
22use rand::prelude::*;
23use std::any::type_name;
24use std::collections::{HashMap, HashSet};
25use std::fmt;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use subspace_core_primitives::pieces::{Piece, PieceIndex};
30use tokio_stream::StreamMap;
31use tracing::{debug, trace, warn, Instrument};
32
33/// Validates piece against using its commitment.
34#[async_trait]
35pub trait PieceValidator: Sync + Send {
36    /// Validates piece against using its commitment.
37    async fn validate_piece(
38        &self,
39        source_peer_id: PeerId,
40        piece_index: PieceIndex,
41        piece: Piece,
42    ) -> Option<Piece>;
43}
44
45/// Stub implementation for piece validation.
46#[derive(Debug, Clone, Copy)]
47pub struct NoPieceValidator;
48
49#[async_trait]
50impl PieceValidator for NoPieceValidator {
51    async fn validate_piece(&self, _: PeerId, _: PieceIndex, piece: Piece) -> Option<Piece> {
52        Some(piece)
53    }
54}
55
56/// Piece provider with cancellation and piece validator.
57/// Use `NoPieceValidator` to disable validation.
58#[derive(Clone)]
59pub struct PieceProvider<PV> {
60    node: Node,
61    piece_validator: PV,
62    piece_downloading_semaphore: Arc<Semaphore>,
63}
64
65impl<PV> fmt::Debug for PieceProvider<PV> {
66    #[inline]
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct(&format!("PieceProvider<{}>", type_name::<PV>()))
69            .finish_non_exhaustive()
70    }
71}
72
73impl<PV> PieceProvider<PV>
74where
75    PV: PieceValidator,
76{
77    /// Creates new piece provider.
78    pub fn new(
79        node: Node,
80        piece_validator: PV,
81        piece_downloading_semaphore: Arc<Semaphore>,
82    ) -> Self {
83        Self {
84            node,
85            piece_validator,
86            piece_downloading_semaphore,
87        }
88    }
89
90    /// Get pieces with provided indices from cache.
91    ///
92    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
93    pub async fn get_from_cache<'a, PieceIndices>(
94        &'a self,
95        piece_indices: PieceIndices,
96    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a
97    where
98        PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
99    {
100        let download_id = random::<u64>();
101        let (tx, mut rx) = mpsc::unbounded();
102        let fut = async move {
103            let not_downloaded_pieces = download_cached_pieces(
104                piece_indices.into_iter(),
105                &self.node,
106                &self.piece_validator,
107                &tx,
108                &self.piece_downloading_semaphore,
109            )
110            .await;
111
112            if not_downloaded_pieces.is_empty() {
113                debug!("Done");
114                return;
115            }
116
117            for piece_index in not_downloaded_pieces {
118                tx.unbounded_send((piece_index, None))
119                    .expect("This future isn't polled after receiver is dropped; qed");
120            }
121
122            debug!("Done #2");
123        };
124
125        let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());
126
127        // Drive above future and stream back any pieces that were downloaded so far
128        stream::poll_fn(move |cx| {
129            if !fut.is_terminated() {
130                // Result doesn't matter, we'll need to poll stream below anyway
131                let _ = fut.poll_unpin(cx);
132            }
133
134            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
135                return Poll::Ready(maybe_result);
136            }
137
138            // Exit will be done by the stream above
139            Poll::Pending
140        })
141    }
142
143    /// Returns piece by its index from farmer's piece cache (L2)
144    pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
145        let key = RecordKey::from(piece_index.to_multihash());
146
147        let request_batch = self.node.get_requests_batch_handle().await;
148        let mut get_providers_stream = request_batch
149            .get_providers(key.clone())
150            .await
151            .inspect_err(|err| warn!(%piece_index,?key, ?err, "get_providers returned an error"))
152            .ok()?;
153
154        let key = hex::encode(&key);
155        while let Some(provider_id) = get_providers_stream.next().await {
156            trace!(%piece_index, key, %provider_id, "get_providers returned an item");
157
158            let Ok(PieceByIndexResponse {
159                piece,
160                cached_pieces: _,
161            }) = request_batch
162                .send_generic_request(
163                    provider_id,
164                    Vec::new(),
165                    PieceByIndexRequest {
166                        piece_index,
167                        cached_pieces: Arc::default(),
168                    },
169                )
170                .await
171                .inspect_err(
172                    |error| debug!(%piece_index, key, %provider_id, ?error, "Piece request failed"),
173                )
174            else {
175                continue;
176            };
177
178            if let Some(piece) = piece {
179                trace!(%piece_index, key, %provider_id, "Piece request succeeded");
180
181                return self
182                    .piece_validator
183                    .validate_piece(provider_id, piece_index, piece)
184                    .await;
185            } else {
186                debug!(%piece_index, key, %provider_id, "Piece request returned empty piece");
187            }
188        }
189
190        None
191    }
192
193    /// Get piece from a particular peer.
194    pub async fn get_piece_from_peer(
195        &self,
196        peer_id: PeerId,
197        piece_index: PieceIndex,
198    ) -> Option<Piece> {
199        // TODO: Take advantage of `cached_pieces`
200        let PieceByIndexResponse {
201            piece,
202            cached_pieces: _,
203        } = self
204            .node
205            .send_generic_request(
206                peer_id,
207                Vec::new(),
208                PieceByIndexRequest {
209                    piece_index,
210                    cached_pieces: Arc::default(),
211                },
212            )
213            .await
214            .inspect_err(|error| debug!(%peer_id, %piece_index, ?error, "Piece request failed"))
215            .ok()?;
216
217        if let Some(piece) = piece {
218            trace!(%peer_id, %piece_index, "Piece request succeeded");
219
220            return self
221                .piece_validator
222                .validate_piece(peer_id, piece_index, piece)
223                .await;
224        } else {
225            debug!(%peer_id, %piece_index, "Piece request returned empty piece");
226        }
227
228        None
229    }
230
231    /// Get piece from archival storage (L1). The algorithm tries to get a piece from currently
232    /// connected peers and falls back to random walking.
233    pub async fn get_piece_from_archival_storage(
234        &self,
235        piece_index: PieceIndex,
236        max_random_walking_rounds: usize,
237    ) -> Option<Piece> {
238        // TODO: consider using retry policy for L1 lookups as well.
239        trace!(%piece_index, "Getting piece from archival storage..");
240
241        let connected_servers = {
242            let connected_servers = match self.node.connected_servers().await {
243                Ok(connected_servers) => connected_servers,
244                Err(err) => {
245                    debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");
246
247                    Default::default()
248                }
249            };
250
251            HashSet::<PeerId>::from_iter(connected_servers)
252        };
253
254        if connected_servers.is_empty() {
255            debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
256        } else {
257            for peer_id in connected_servers.iter() {
258                let maybe_piece = self.get_piece_from_peer(*peer_id, piece_index).await;
259
260                if maybe_piece.is_some() {
261                    trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");
262
263                    return maybe_piece;
264                }
265            }
266        }
267
268        trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
269        let random_walk_result = self
270            .get_piece_by_random_walking(piece_index, max_random_walking_rounds)
271            .await;
272
273        if random_walk_result.is_some() {
274            trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
275
276            return random_walk_result;
277        } else {
278            debug!(
279                %piece_index,
280                %max_random_walking_rounds,
281                "Cannot acquire piece from DSN L1: random walk failed"
282            );
283        }
284
285        None
286    }
287
288    /// Get piece from L1 by random walking
289    async fn get_piece_by_random_walking(
290        &self,
291        piece_index: PieceIndex,
292        walking_rounds: usize,
293    ) -> Option<Piece> {
294        for round in 0..walking_rounds {
295            debug!(%piece_index, round, "Random walk round");
296
297            let result = self
298                .get_piece_by_random_walking_from_single_round(piece_index, round)
299                .await;
300
301            if result.is_some() {
302                return result;
303            }
304        }
305
306        debug!(%piece_index, "Random walking piece retrieval failed.");
307
308        None
309    }
310
311    /// Get piece from L1 by random walking (single round)
312    async fn get_piece_by_random_walking_from_single_round(
313        &self,
314        piece_index: PieceIndex,
315        round: usize,
316    ) -> Option<Piece> {
317        // TODO: Take advantage of `cached_pieces`
318        trace!(%piece_index, "get_piece_by_random_walking round");
319
320        // Random walk key
321        let key = PeerId::random();
322
323        let request_batch = self.node.get_requests_batch_handle().await;
324        let mut get_closest_peers_stream = request_batch
325            .get_closest_peers(key.into())
326            .await
327            .inspect_err(|err| warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"))
328            .ok()?;
329
330        while let Some(peer_id) = get_closest_peers_stream.next().await {
331            trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");
332
333            let Ok(PieceByIndexResponse {
334                piece,
335                cached_pieces: _,
336            }) = request_batch
337                .send_generic_request(
338                    peer_id,
339                    Vec::new(),
340                    PieceByIndexRequest {
341                        piece_index,
342                        cached_pieces: Arc::default(),
343                    },
344                )
345                .await
346                .inspect_err(
347                    |error| debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."),
348                )
349            else {
350                continue;
351            };
352
353            if let Some(piece) = piece {
354                trace!(%peer_id, %piece_index, ?key, %round,  "Piece request succeeded.");
355
356                return self
357                    .piece_validator
358                    .validate_piece(peer_id, piece_index, piece)
359                    .await;
360            } else {
361                debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
362            }
363        }
364
365        None
366    }
367}
368
369/// Kademlia wrapper to take advantage of its internal logic of selecting closest peers
370struct KademliaWrapper {
371    local_peer_id: PeerId,
372    kademlia: Kademlia<DummyRecordStore>,
373}
374
375impl KademliaWrapper {
376    fn new(local_peer_id: PeerId) -> Self {
377        Self {
378            local_peer_id,
379            kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
380        }
381    }
382
383    fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
384        for address in addresses {
385            self.kademlia.add_address(peer_id, address);
386        }
387        while self
388            .kademlia
389            .poll(&mut Context::from_waker(noop_waker_ref()))
390            .is_ready()
391        {
392            // Simply drain useless events generated by above calls
393        }
394    }
395
396    /// Returned peers are already sorted in ascending distance order
397    fn closest_peers(
398        &mut self,
399        key: &KBucketKey<Multihash>,
400    ) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + 'static {
401        let mut closest_peers = self
402            .kademlia
403            .find_closest_local_peers(key, &self.local_peer_id)
404            .map(|peer| {
405                (
406                    KBucketKey::from(peer.node_id).distance(key),
407                    peer.node_id,
408                    peer.multiaddrs,
409                )
410            })
411            .collect::<Vec<_>>();
412
413        closest_peers.sort_unstable_by(|a, b| a.0.cmp(&b.0));
414        closest_peers
415            .into_iter()
416            .map(|(_distance, peer_id, addresses)| (peer_id, addresses))
417    }
418}
419
420/// Takes pieces to download as an input, sends results with pieces that were downloaded
421/// successfully and returns those that were not downloaded
422async fn download_cached_pieces<PV, PieceIndices>(
423    piece_indices: PieceIndices,
424    node: &Node,
425    piece_validator: &PV,
426    results: &mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
427    semaphore: &Semaphore,
428) -> impl ExactSizeIterator<Item = PieceIndex>
429where
430    PV: PieceValidator,
431    PieceIndices: Iterator<Item = PieceIndex>,
432{
433    // Make sure every piece index has an entry since this will be the primary container for
434    // tracking pieces to download going forward.
435    //
436    // At the end pieces that were not downloaded will remain with a collection of known closest
437    // peers for them.
438    let mut pieces_to_download = piece_indices
439        .map(|piece_index| async move {
440            let mut kademlia = KademliaWrapper::new(node.id());
441            let key = piece_index.to_multihash();
442
443            let local_closest_peers = node
444                .get_closest_local_peers(key, None)
445                .await
446                .unwrap_or_default();
447
448            // Seed with local closest peers
449            for (peer_id, addresses) in local_closest_peers {
450                kademlia.add_peer(&peer_id, addresses);
451            }
452
453            (piece_index, kademlia)
454        })
455        .collect::<FuturesUnordered<_>>()
456        .collect::<HashMap<_, _>>()
457        .await;
458
459    let num_pieces = pieces_to_download.len();
460    debug!(%num_pieces, "Starting");
461
462    let mut checked_peers = HashSet::new();
463
464    let Ok(connected_servers) = node.connected_servers().await else {
465        trace!("Connected servers error");
466        return pieces_to_download.into_keys();
467    };
468
469    let num_connected_servers = connected_servers.len();
470    debug!(
471        %num_connected_servers,
472        %num_pieces,
473        "Starting downloading"
474    );
475
476    // Dispatch initial set of requests to peers with checked pieces distributed uniformly
477    let mut downloading_stream = connected_servers
478        .into_iter()
479        .take(num_pieces)
480        .enumerate()
481        .map(|(peer_index, peer_id)| {
482            checked_peers.insert(peer_id);
483
484            // Inside to avoid division by zero in case there are no connected servers or pieces
485            let step = num_pieces / num_connected_servers.min(num_pieces);
486
487            // Take unique first piece index for each connected peer and the rest just to check
488            // cached pieces up to recommended limit
489            let mut check_cached_pieces = pieces_to_download
490                .keys()
491                .cycle()
492                .skip(step * peer_index)
493                // + 1 because one index below is removed below
494                .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1))
495                .copied()
496                .collect::<Vec<_>>();
497            // Pick first piece index as the piece we want to download
498            let piece_index = check_cached_pieces.swap_remove(0);
499
500            trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer");
501
502            let permit = semaphore.try_acquire();
503
504            let fut = async move {
505                let permit = match permit {
506                    Some(permit) => permit,
507                    None => semaphore.acquire().await,
508                };
509
510                download_cached_piece_from_peer(
511                    node,
512                    piece_validator,
513                    peer_id,
514                    Vec::new(),
515                    Arc::new(check_cached_pieces),
516                    piece_index,
517                    HashSet::new(),
518                    HashSet::new(),
519                    permit,
520                )
521                .await
522            };
523
524            (piece_index, Box::pin(fut.into_stream()) as _)
525        })
526        .collect::<StreamMap<_, _>>();
527
528    loop {
529        // Process up to 50% of the pieces concurrently
530        let mut additional_pieces_to_download =
531            (num_pieces / 2).saturating_sub(downloading_stream.len());
532        if additional_pieces_to_download > 0 {
533            trace!(
534                %additional_pieces_to_download,
535                num_pieces,
536                currently_downloading = %downloading_stream.len(),
537                "Downloading additional pieces from closest peers"
538            );
539            // Pick up any newly connected peers (if any)
540            'outer: for peer_id in node
541                .connected_servers()
542                .await
543                .unwrap_or_default()
544                .into_iter()
545                .filter(|peer_id| checked_peers.insert(*peer_id))
546                .take(additional_pieces_to_download)
547            {
548                let permit = if downloading_stream.is_empty() {
549                    semaphore.acquire().await
550                } else if let Some(permit) = semaphore.try_acquire() {
551                    permit
552                } else {
553                    break;
554                };
555
556                for &piece_index in pieces_to_download.keys() {
557                    if downloading_stream.contains_key(&piece_index) {
558                        continue;
559                    }
560
561                    trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer");
562
563                    let check_cached_pieces = sample_cached_piece_indices(
564                        pieces_to_download.keys(),
565                        &HashSet::new(),
566                        &HashSet::new(),
567                        piece_index,
568                    );
569                    let fut = download_cached_piece_from_peer(
570                        node,
571                        piece_validator,
572                        peer_id,
573                        Vec::new(),
574                        Arc::new(check_cached_pieces),
575                        piece_index,
576                        HashSet::new(),
577                        HashSet::new(),
578                        permit,
579                    );
580
581                    downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
582                    additional_pieces_to_download -= 1;
583
584                    continue 'outer;
585                }
586
587                break;
588            }
589
590            // Pick up more pieces to download from the closest peers
591            // Ideally we'd not allocate here, but it is hard to explain to the compiler that
592            // entries are not removed otherwise
593            let pieces_indices_to_download = pieces_to_download.keys().copied().collect::<Vec<_>>();
594            for piece_index in pieces_indices_to_download {
595                if additional_pieces_to_download == 0 {
596                    break;
597                }
598                if downloading_stream.contains_key(&piece_index) {
599                    continue;
600                }
601                let permit = if downloading_stream.is_empty() {
602                    semaphore.acquire().await
603                } else if let Some(permit) = semaphore.try_acquire() {
604                    permit
605                } else {
606                    break;
607                };
608
609                let kbucket_key = KBucketKey::from(piece_index.to_multihash());
610                let closest_peers_to_check = pieces_to_download
611                    .get_mut(&piece_index)
612                    .expect("Entries are not removed here; qed")
613                    .closest_peers(&kbucket_key);
614                for (peer_id, addresses) in closest_peers_to_check {
615                    if !checked_peers.insert(peer_id) {
616                        continue;
617                    }
618
619                    trace!(%peer_id, %piece_index, "Downloading piece from closest peer");
620
621                    let check_cached_pieces = sample_cached_piece_indices(
622                        pieces_to_download.keys(),
623                        &HashSet::new(),
624                        &HashSet::new(),
625                        piece_index,
626                    );
627                    let fut = download_cached_piece_from_peer(
628                        node,
629                        piece_validator,
630                        peer_id,
631                        addresses,
632                        Arc::new(check_cached_pieces),
633                        piece_index,
634                        HashSet::new(),
635                        HashSet::new(),
636                        permit,
637                    );
638
639                    downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
640                    additional_pieces_to_download -= 1;
641                    break;
642                }
643            }
644
645            trace!(
646                pieces_left = %additional_pieces_to_download,
647                "Initiated downloading additional pieces from closest peers"
648            );
649        }
650
651        let Some((piece_index, result)) = downloading_stream.next().await else {
652            if !pieces_to_download.is_empty() {
653                debug!(
654                    %num_pieces,
655                    to_download = %pieces_to_download.len(),
656                    "Finished downloading early"
657                );
658                // Nothing was downloaded, we're done here
659                break;
660            }
661            break;
662        };
663        process_downloading_result(
664            piece_index,
665            result,
666            &mut pieces_to_download,
667            &mut downloading_stream,
668            node,
669            piece_validator,
670            results,
671        );
672
673        if pieces_to_download.is_empty() {
674            break;
675        }
676    }
677
678    pieces_to_download.into_keys()
679}
680
681fn process_downloading_result<'a, 'b, PV>(
682    piece_index: PieceIndex,
683    result: DownloadedPieceFromPeer<'a>,
684    pieces_to_download: &'b mut HashMap<PieceIndex, KademliaWrapper>,
685    downloading_stream: &'b mut StreamMap<
686        PieceIndex,
687        Pin<Box<dyn Stream<Item = DownloadedPieceFromPeer<'a>> + Send + 'a>>,
688    >,
689    node: &'a Node,
690    piece_validator: &'a PV,
691    results: &'a mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
692) where
693    PV: PieceValidator,
694{
695    let DownloadedPieceFromPeer {
696        peer_id,
697        result,
698        mut cached_pieces,
699        not_cached_pieces,
700        permit,
701    } = result;
702    trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response");
703
704    let Some(result) = result else {
705        // Downloading failed, ignore peer
706        return;
707    };
708
709    match result {
710        PieceResult::Piece(piece) => {
711            trace!(%piece_index, %peer_id, "Got piece");
712
713            // Downloaded successfully
714            pieces_to_download.remove(&piece_index);
715
716            results
717                .unbounded_send((piece_index, Some(piece)))
718                .expect("This future isn't polled after receiver is dropped; qed");
719
720            if pieces_to_download.is_empty() {
721                return;
722            }
723
724            cached_pieces.remove(&piece_index);
725        }
726        PieceResult::ClosestPeers(closest_peers) => {
727            trace!(%piece_index, %peer_id, "Got closest peers");
728
729            // Store closer peers in case piece index was not downloaded yet
730            if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) {
731                for (peer_id, addresses) in Vec::from(closest_peers) {
732                    kademlia.add_peer(&peer_id, addresses);
733                }
734            }
735
736            // No need to ask this peer again if they claimed to have this piece index earlier
737            if cached_pieces.remove(&piece_index) {
738                return;
739            }
740        }
741    }
742
743    let mut maybe_piece_index_to_download_next = None;
744    // Clear useless entries in cached pieces and find something to download next
745    cached_pieces.retain(|piece_index| {
746        // Clear downloaded pieces
747        if !pieces_to_download.contains_key(piece_index) {
748            return false;
749        }
750
751        // Try to pick a piece to download that is not being downloaded already
752        if maybe_piece_index_to_download_next.is_none()
753            && !downloading_stream.contains_key(piece_index)
754        {
755            maybe_piece_index_to_download_next.replace(*piece_index);
756            // We'll check it later when receiving response
757            return true;
758        }
759
760        // Retain everything else
761        true
762    });
763
764    let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next
765    {
766        trace!(%piece_index, %peer_id, "Next piece to download from peer");
767        piece_index
768    } else {
769        trace!(%peer_id, "Peer doesn't have anything else");
770        // Nothing left to do with this peer
771        return;
772    };
773
774    let fut = download_cached_piece_from_peer(
775        node,
776        piece_validator,
777        peer_id,
778        Vec::new(),
779        // Sample more random cached piece indices for connected peer, algorithm can be
780        // improved, but has to be something simple and this should do it for now
781        Arc::new(sample_cached_piece_indices(
782            pieces_to_download.keys(),
783            &cached_pieces,
784            &not_cached_pieces,
785            piece_index_to_download_next,
786        )),
787        piece_index_to_download_next,
788        cached_pieces,
789        not_cached_pieces,
790        permit,
791    );
792    downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
793}
794
795fn sample_cached_piece_indices<'a, I>(
796    pieces_to_download: I,
797    cached_pieces: &HashSet<PieceIndex>,
798    not_cached_pieces: &HashSet<PieceIndex>,
799    piece_index_to_download_next: PieceIndex,
800) -> Vec<PieceIndex>
801where
802    I: Iterator<Item = &'a PieceIndex>,
803{
804    pieces_to_download
805        // Do a bit of work to filter-out piece indices we already know remote peer
806        // has or doesn't to decrease burden on them
807        .filter_map(|piece_index| {
808            if piece_index == &piece_index_to_download_next
809                || cached_pieces.contains(piece_index)
810                || not_cached_pieces.contains(piece_index)
811            {
812                None
813            } else {
814                Some(*piece_index)
815            }
816        })
817        .choose_multiple(
818            &mut thread_rng(),
819            CachedPieceByIndexRequest::RECOMMENDED_LIMIT,
820        )
821}
822
823struct DownloadedPieceFromPeer<'a> {
824    peer_id: PeerId,
825    result: Option<PieceResult>,
826    cached_pieces: HashSet<PieceIndex>,
827    not_cached_pieces: HashSet<PieceIndex>,
828    permit: SemaphoreGuard<'a>,
829}
830
831/// `check_cached_pieces` contains a list of pieces for peer to filter-out according to locally
832/// caches pieces, `cached_pieces` and `not_cached_pieces` contain piece indices peer claims is
833/// known to have or not have already
834#[allow(clippy::too_many_arguments)]
835async fn download_cached_piece_from_peer<'a, PV>(
836    node: &'a Node,
837    piece_validator: &'a PV,
838    peer_id: PeerId,
839    addresses: Vec<Multiaddr>,
840    check_cached_pieces: Arc<Vec<PieceIndex>>,
841    piece_index: PieceIndex,
842    mut cached_pieces: HashSet<PieceIndex>,
843    mut not_cached_pieces: HashSet<PieceIndex>,
844    permit: SemaphoreGuard<'a>,
845) -> DownloadedPieceFromPeer<'a>
846where
847    PV: PieceValidator,
848{
849    let result = match node
850        .send_generic_request(
851            peer_id,
852            addresses,
853            CachedPieceByIndexRequest {
854                piece_index,
855                cached_pieces: Arc::clone(&check_cached_pieces),
856            },
857        )
858        .await
859    {
860        Ok(response) => {
861            let CachedPieceByIndexResponse {
862                result,
863                cached_pieces,
864            } = response;
865
866            match result {
867                PieceResult::Piece(piece) => piece_validator
868                    .validate_piece(peer_id, piece_index, piece)
869                    .await
870                    .map(|piece| CachedPieceByIndexResponse {
871                        result: PieceResult::Piece(piece),
872                        cached_pieces,
873                    }),
874                PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse {
875                    result: PieceResult::ClosestPeers(closest_peers),
876                    cached_pieces,
877                }),
878            }
879        }
880        Err(error) => {
881            debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer");
882
883            None
884        }
885    };
886
887    match result {
888        Some(result) => {
889            cached_pieces.extend(result.cached_pieces);
890            not_cached_pieces.extend(
891                check_cached_pieces
892                    .iter()
893                    .filter(|piece_index| !cached_pieces.contains(piece_index))
894                    .copied(),
895            );
896
897            DownloadedPieceFromPeer {
898                peer_id,
899                result: Some(result.result),
900                cached_pieces: { cached_pieces },
901                not_cached_pieces,
902                permit,
903            }
904        }
905        None => DownloadedPieceFromPeer {
906            peer_id,
907            result: None,
908            cached_pieces,
909            not_cached_pieces,
910            permit,
911        },
912    }
913}