subspace_networking/protocols/request_response/
request_response_factory.rs

1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//!   remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//!   with the request itself. The remote then sends the size of the response as a LEB128 number,
27//!   followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it
30//!   takes to send/receive the request and response.
31//!
32//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
33//!   is used to handle incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(test)]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48    Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49    Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50    OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::handler::multi::MultiHandler;
55use libp2p::swarm::{
56    ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
57};
58use std::borrow::Cow;
59use std::collections::HashMap;
60use std::collections::hash_map::Entry;
61use std::pin::Pin;
62use std::task::{Context, Poll};
63use std::time::{Duration, Instant};
64use std::{io, iter};
65use tracing::{debug, error, warn};
66
67/// Defines a handler for the request-response protocol factory.
68#[async_trait]
69pub trait RequestHandler: Send {
70    /// Runs the underlying protocol handler.
71    async fn run(&mut self);
72
73    /// Returns a config for the request-response protocol factory.
74    fn protocol_config(&self) -> ProtocolConfig;
75
76    /// Returns a protocol name.
77    fn protocol_name(&self) -> &'static str;
78
79    /// Clone boxed value.
80    fn clone_box(&self) -> Box<dyn RequestHandler>;
81}
82
83impl Clone for Box<dyn RequestHandler> {
84    fn clone(&self) -> Self {
85        self.clone_box()
86    }
87}
88
89/// Configuration for a single request-response protocol.
90#[derive(Debug, Clone)]
91pub struct ProtocolConfig {
92    /// Name of the protocol on the wire. Should be something like `/foo/bar`.
93    pub name: &'static str,
94
95    /// Maximum allowed size, in bytes, of a request.
96    ///
97    /// Any request larger than this value will be declined as a way to avoid allocating too
98    /// much memory for it.
99    pub max_request_size: u64,
100
101    /// Maximum allowed size, in bytes, of a response.
102    ///
103    /// Any response larger than this value will be declined as a way to avoid allocating too
104    /// much memory for it.
105    pub max_response_size: u64,
106
107    /// Duration after which emitted requests are considered timed out.
108    ///
109    /// If you expect the response to come back quickly, you should set this to a smaller duration.
110    pub request_timeout: Duration,
111
112    /// Channel on which the networking service will send incoming requests.
113    ///
114    /// Every time a peer sends a request to the local node using this protocol, the networking
115    /// service will push an element on this channel. The receiving side of this channel then has
116    /// to pull this element, process the request, and send back the response to send back to the
117    /// peer.
118    ///
119    /// The size of the channel has to be carefully chosen. If the channel is full, the networking
120    /// service will discard the incoming request send back an error to the peer. Consequently,
121    /// the channel being full is an indicator that the node is overloaded.
122    ///
123    /// You can typically set the size of the channel to `T / d`, where `T` is the
124    /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
125    /// build a response.
126    ///
127    /// Can be `None` if the local node does not support answering incoming requests.
128    /// If this is `None`, then the local node will not advertise support for this protocol towards
129    /// other peers. If this is `Some` but the channel is closed, then the local node will
130    /// advertise support for this protocol, but any incoming request will lead to an error being
131    /// sent back.
132    pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
133}
134
135impl ProtocolConfig {
136    /// Creates request-response protocol config.
137    pub fn new(protocol_name: &'static str) -> ProtocolConfig {
138        ProtocolConfig {
139            name: protocol_name,
140            max_request_size: 1024 * 1024,
141            max_response_size: 16 * 1024 * 1024,
142            request_timeout: Duration::from_secs(20),
143            inbound_queue: None,
144        }
145    }
146}
147
148/// A single request received by a peer on a request-response protocol.
149#[derive(Debug)]
150pub struct IncomingRequest {
151    /// Who sent the request.
152    pub peer: PeerId,
153
154    /// Request sent by the remote. Will always be smaller than
155    /// [`ProtocolConfig::max_request_size`].
156    pub payload: Vec<u8>,
157
158    /// Channel to send back the response.
159    ///
160    /// There are two ways to indicate that handling the request failed:
161    ///
162    /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
163    ///
164    /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
165    ///    the given peer.
166    pub pending_response: oneshot::Sender<OutgoingResponse>,
167}
168
169/// Response for an incoming request to be send by a request protocol handler.
170#[derive(Debug)]
171pub struct OutgoingResponse {
172    /// The payload of the response.
173    ///
174    /// `Err(())` if none is available e.g. due an error while handling the request.
175    pub result: Result<Vec<u8>, ()>,
176
177    /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
178    /// peer.
179    ///
180    /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
181    /// outgoing data for each TCP socket, and it is not possible for a user
182    /// application to inspect this buffer. This channel here is not actually notified
183    /// when the response has been fully sent out, but rather when it has fully been
184    /// written to the buffer managed by the operating system.
185    pub sent_feedback: Option<oneshot::Sender<()>>,
186}
187
188/// Event generated by the [`RequestResponseFactoryBehaviour`].
189#[derive(Debug)]
190// We are not reading these events in a meaningful way right now, but the fields in there are still
191// potentially useful
192#[allow(dead_code)]
193pub enum Event {
194    /// A remote sent a request and either we have successfully answered it or an error happened.
195    ///
196    /// This event is generated for statistics purposes.
197    InboundRequest {
198        /// Peer which has emitted the request.
199        peer: PeerId,
200        /// Name of the protocol in question.
201        protocol: Cow<'static, str>,
202        /// Whether handling the request was successful or unsuccessful.
203        ///
204        /// When successful contains the time elapsed between when we received the request and when
205        /// we sent back the response. When unsuccessful contains the failure reason.
206        result: Result<(), ResponseFailure>,
207    },
208
209    /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
210    /// failed.
211    ///
212    /// This event is generated for statistics purposes.
213    RequestFinished {
214        /// Peer that we sent the request to, if one was chosen.
215        peer: Option<PeerId>,
216        /// Name of the protocol in question.
217        protocol: Cow<'static, str>,
218        /// Duration the request took.
219        duration: Duration,
220        /// Result of the request.
221        result: Result<(), String>,
222    },
223}
224
225/// Combination of a protocol name and a request id.
226///
227/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
228/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
229/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
230/// [`ProtocolRequestId`]s.
231#[derive(Debug, Clone, PartialEq, Eq, Hash)]
232struct ProtocolRequestId {
233    protocol: Cow<'static, str>,
234    request_id: OutboundRequestId,
235}
236
237impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
238    #[inline]
239    fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
240        Self {
241            protocol,
242            request_id,
243        }
244    }
245}
246
247/// When sending a request, what to do on a disconnected recipient.
248#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
249pub enum IfDisconnected {
250    /// Try to connect to the peer.
251    TryConnect,
252    /// Just fail if the destination is not yet connected.
253    #[allow(dead_code)] // reserved for the future logic or config change
254    ImmediateError,
255}
256
257/// Convenience functions for `IfDisconnected`.
258impl IfDisconnected {
259    /// Shall we connect to a disconnected peer?
260    pub fn should_connect(self) -> bool {
261        match self {
262            Self::TryConnect => true,
263            Self::ImmediateError => false,
264        }
265    }
266}
267
268/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
269/// protocols.
270#[allow(clippy::type_complexity)] // to preserve compatibility with copied implementation
271pub struct RequestResponseFactoryBehaviour {
272    /// The multiple sub-protocols, by name.
273    /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
274    /// "response builder" used to build responses for incoming requests.
275    protocols: HashMap<
276        Cow<'static, str>,
277        (
278            RequestResponse<GenericCodec>,
279            Option<mpsc::Sender<IncomingRequest>>,
280        ),
281    >,
282
283    /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
284    pending_requests:
285        HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
286
287    /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
288    /// start time and the response to send back to the remote.
289    pending_responses: stream::FuturesUnordered<
290        Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
291    >,
292
293    /// Pending message request, holds `MessageRequest` as a Future state to poll it
294    /// until we get a response from `Peerset`
295    message_request: Option<MessageRequest>,
296
297    /// Request handlers future collection.
298    request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
299}
300
301// This is a state of processing incoming request Message.
302struct MessageRequest {
303    peer: PeerId,
304    request_id: InboundRequestId,
305    request: Vec<u8>,
306    channel: ResponseChannel<Result<Vec<u8>, ()>>,
307    protocol: String,
308    response_builder: Option<mpsc::Sender<IncomingRequest>>,
309}
310
311/// Generated by the response builder and waiting to be processed.
312struct RequestProcessingOutcome {
313    request_id: InboundRequestId,
314    protocol: Cow<'static, str>,
315    inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
316    response: OutgoingResponse,
317}
318
319impl RequestResponseFactoryBehaviour {
320    /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
321    /// the same protocol is passed twice.
322    pub fn new(
323        list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
324        max_concurrent_streams: usize,
325    ) -> Result<Self, RegisterError> {
326        let mut protocols = HashMap::new();
327        let mut request_handlers = Vec::new();
328        for mut handler in list {
329            let config = handler.protocol_config();
330
331            let protocol_support = if config.inbound_queue.is_some() {
332                ProtocolSupport::Full
333            } else {
334                ProtocolSupport::Outbound
335            };
336
337            let rq_rp = RequestResponse::with_codec(
338                GenericCodec {
339                    max_request_size: config.max_request_size,
340                    max_response_size: config.max_response_size,
341                },
342                iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
343                RequestResponseConfig::default()
344                    .with_request_timeout(config.request_timeout)
345                    .with_max_concurrent_streams(max_concurrent_streams),
346            );
347
348            match protocols.entry(Cow::Borrowed(config.name)) {
349                Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
350                Entry::Occupied(e) => {
351                    return Err(RegisterError::DuplicateProtocol(e.key().clone()));
352                }
353            };
354
355            let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
356                Box::pin(async move { handler.run().await }.fuse());
357
358            request_handlers.push(request_handler_run);
359        }
360
361        Ok(Self {
362            protocols,
363            pending_requests: Default::default(),
364            pending_responses: Default::default(),
365            message_request: None,
366            request_handlers,
367        })
368    }
369
370    /// Initiates sending a request.
371    ///
372    /// If there is no established connection to the target peer, the behavior is determined by the
373    /// choice of `connect`.
374    ///
375    /// An error is returned if the protocol doesn't match one that has been registered.
376    pub fn send_request(
377        &mut self,
378        target: &PeerId,
379        protocol_name: &str,
380        request: Vec<u8>,
381        pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
382        connect: IfDisconnected,
383        addresses: Vec<Multiaddr>,
384    ) {
385        if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
386            if protocol.is_connected(target) || connect.should_connect() {
387                let request_id = protocol.send_request_with_addresses(target, request, addresses);
388                let prev_req_id = self.pending_requests.insert(
389                    (protocol_name.to_string().into(), request_id).into(),
390                    (Instant::now(), pending_response),
391                );
392                debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
393            } else if pending_response
394                .send(Err(RequestFailure::NotConnected))
395                .is_err()
396            {
397                debug!(
398                    "Not connected to peer {:?}. At the same time local \
399                     node is no longer interested in the result.",
400                    target,
401                );
402            }
403        } else if pending_response
404            .send(Err(RequestFailure::UnknownProtocol))
405            .is_err()
406        {
407            debug!(
408                "Unknown protocol {:?}. At the same time local \
409                 node is no longer interested in the result.",
410                protocol_name,
411            );
412        }
413    }
414}
415
416impl NetworkBehaviour for RequestResponseFactoryBehaviour {
417    type ConnectionHandler = MultiHandler<
418        String,
419        <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
420    >;
421    type ToSwarm = Event;
422
423    fn handle_established_inbound_connection(
424        &mut self,
425        connection_id: ConnectionId,
426        peer: PeerId,
427        local_addr: &Multiaddr,
428        remote_addr: &Multiaddr,
429    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
430        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
431            (
432                p.to_string(),
433                r.handle_established_inbound_connection(
434                    connection_id,
435                    peer,
436                    local_addr,
437                    remote_addr,
438                )
439                .expect(
440                    "Behaviours return handlers in these methods with the exception of \
441                    'connection management' behaviours like connection-limits or allow-black list. \
442                    So, inner request-response behaviour always returns Ok(handler).",
443                ),
444            )
445        });
446
447        let handler = MultiHandler::try_from_iter(iter).expect(
448            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
449			 which is the only possible error; qed",
450        );
451
452        Ok(handler)
453    }
454
455    fn handle_established_outbound_connection(
456        &mut self,
457        connection_id: ConnectionId,
458        peer: PeerId,
459        addr: &Multiaddr,
460        role_override: Endpoint,
461        port_use: PortUse,
462    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
463        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
464            (
465                p.to_string(),
466                r.handle_established_outbound_connection(
467                    connection_id,
468                    peer,
469                    addr,
470                    role_override,
471                    port_use,
472                )
473                .expect(
474                    "Behaviours return handlers in these methods with the exception of \
475                        'connection management' behaviours like connection-limits or allow-black \
476                        list. So, inner request-response behaviour always returns Ok(handler).",
477                ),
478            )
479        });
480
481        let handler = MultiHandler::try_from_iter(iter).expect(
482            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
483            which is the only possible error; qed",
484        );
485
486        Ok(handler)
487    }
488
489    /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
490    fn on_swarm_event(&mut self, event: FromSwarm) {
491        match event {
492            FromSwarm::ConnectionEstablished(inner) => {
493                for (protocol, _) in self.protocols.values_mut() {
494                    protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
495                }
496            }
497            FromSwarm::ConnectionClosed(inner) => {
498                for (protocol, _) in self.protocols.values_mut() {
499                    protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
500                        peer_id: inner.peer_id,
501                        connection_id: inner.connection_id,
502                        endpoint: inner.endpoint,
503                        cause: inner.cause,
504                        remaining_established: inner.remaining_established,
505                    }));
506                }
507            }
508            FromSwarm::AddressChange(inner) => {
509                for (protocol, _) in self.protocols.values_mut() {
510                    protocol.on_swarm_event(FromSwarm::AddressChange(inner));
511                }
512            }
513            FromSwarm::DialFailure(inner) => {
514                for (protocol, _) in self.protocols.values_mut() {
515                    protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
516                        peer_id: inner.peer_id,
517                        error: inner.error,
518                        connection_id: inner.connection_id,
519                    }));
520                }
521            }
522            FromSwarm::ListenFailure(inner) => {
523                for (protocol, _) in self.protocols.values_mut() {
524                    protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
525                        local_addr: inner.local_addr,
526                        send_back_addr: inner.send_back_addr,
527                        error: inner.error,
528                        connection_id: inner.connection_id,
529                        peer_id: inner.peer_id,
530                    }));
531                }
532            }
533            FromSwarm::NewListener(inner) => {
534                for (protocol, _) in self.protocols.values_mut() {
535                    protocol.on_swarm_event(FromSwarm::NewListener(inner));
536                }
537            }
538            FromSwarm::NewListenAddr(inner) => {
539                for (protocol, _) in self.protocols.values_mut() {
540                    protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
541                }
542            }
543            FromSwarm::ExpiredListenAddr(inner) => {
544                for (protocol, _) in self.protocols.values_mut() {
545                    protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
546                }
547            }
548            FromSwarm::ListenerError(inner) => {
549                for (protocol, _) in self.protocols.values_mut() {
550                    protocol.on_swarm_event(FromSwarm::ListenerError(inner));
551                }
552            }
553            FromSwarm::ListenerClosed(inner) => {
554                for (protocol, _) in self.protocols.values_mut() {
555                    protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
556                }
557            }
558            FromSwarm::NewExternalAddrCandidate(inner) => {
559                for (protocol, _) in self.protocols.values_mut() {
560                    protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
561                }
562            }
563            FromSwarm::ExternalAddrConfirmed(inner) => {
564                for (protocol, _) in self.protocols.values_mut() {
565                    protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
566                }
567            }
568            FromSwarm::ExternalAddrExpired(inner) => {
569                for (protocol, _) in self.protocols.values_mut() {
570                    protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
571                }
572            }
573            FromSwarm::NewExternalAddrOfPeer(inner) => {
574                for (protocol, _) in self.protocols.values_mut() {
575                    protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
576                }
577            }
578            event => {
579                warn!(
580                    ?event,
581                    "New event must be forwarded to request response protocols"
582                );
583            }
584        };
585    }
586
587    fn on_connection_handler_event(
588        &mut self,
589        peer_id: PeerId,
590        connection: ConnectionId,
591        event: THandlerOutEvent<Self>,
592    ) {
593        let p_name = event.0;
594        if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
595            return proto.on_connection_handler_event(peer_id, connection, event.1);
596        }
597
598        warn!(
599            "inject_node_event: no request-response instance registered for protocol {:?}",
600            p_name
601        )
602    }
603
604    fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
605        'poll_all: loop {
606            if let Some(message_request) = self.message_request.take() {
607                let MessageRequest {
608                    peer,
609                    request_id,
610                    request,
611                    channel,
612                    protocol,
613                    response_builder,
614                } = message_request;
615
616                let (tx, rx) = oneshot::channel();
617
618                // Submit the request to the "response builder" passed by the user at
619                // initialization.
620                if let Some(mut response_builder) = response_builder {
621                    // If the response builder is too busy, silently drop `tx`. This
622                    // will be reported by the corresponding `RequestResponse` through
623                    // an `InboundFailure::Omission` event.
624                    let _ = response_builder.try_send(IncomingRequest {
625                        peer,
626                        payload: request,
627                        pending_response: tx,
628                    });
629                } else {
630                    debug_assert!(false, "Received message on outbound-only protocol.");
631                }
632
633                self.pending_responses.push(Box::pin(async move {
634                    // The `tx` created above can be dropped if we are not capable of
635                    // processing this request, which is reflected as a
636                    // `InboundFailure::Omission` event.
637                    if let Ok(response) = rx.await {
638                        Some(RequestProcessingOutcome {
639                            request_id,
640                            protocol: Cow::from(protocol),
641                            inner_channel: channel,
642                            response,
643                        })
644                    } else {
645                        None
646                    }
647                }));
648
649                // This `continue` makes sure that `pending_responses` gets polled
650                // after we have added the new element.
651                continue 'poll_all;
652            }
653            // Poll to see if any response is ready to be sent back.
654            while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
655                let RequestProcessingOutcome {
656                    request_id,
657                    protocol: protocol_name,
658                    inner_channel,
659                    response: OutgoingResponse { result, .. },
660                } = match outcome {
661                    Some(outcome) => outcome,
662                    // The response builder was too busy or handling the request failed. This is
663                    // later on reported as a `InboundFailure::Omission`.
664                    None => continue,
665                };
666
667                if let Ok(payload) = result
668                    && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
669                    && protocol.send_response(inner_channel, Ok(payload)).is_err()
670                {
671                    // Note: Failure is handled further below when receiving
672                    // `InboundFailure` event from `RequestResponse` behaviour.
673                    debug!(
674                        %request_id,
675                        "Failed to send response for request on protocol {} due to a \
676                        timeout or due to the connection to the peer being closed. \
677                        Dropping response",
678                        protocol_name,
679                    );
680                }
681            }
682
683            for rq_rs_runner in &mut self.request_handlers {
684                // Future.Output == (), so we don't need a result here
685                let _ = rq_rs_runner.poll_unpin(cx);
686            }
687
688            // Poll request-responses protocols.
689            for (protocol, (behaviour, response_builder)) in &mut self.protocols {
690                while let Poll::Ready(event) = behaviour.poll(cx) {
691                    let event = match event {
692                        // Main events we are interested in.
693                        ToSwarm::GenerateEvent(event) => event,
694
695                        // Other events generated by the underlying behaviour are transparently
696                        // passed through.
697                        ToSwarm::Dial { opts } => {
698                            if opts.get_peer_id().is_none() {
699                                error!(
700                                    "The request-response isn't supposed to start dialing \
701                                    addresses"
702                                );
703                            }
704                            return Poll::Ready(ToSwarm::Dial { opts });
705                        }
706                        ToSwarm::NotifyHandler {
707                            peer_id,
708                            handler,
709                            event,
710                        } => {
711                            return Poll::Ready(ToSwarm::NotifyHandler {
712                                peer_id,
713                                handler,
714                                event: ((*protocol).to_string(), event),
715                            });
716                        }
717                        ToSwarm::CloseConnection {
718                            peer_id,
719                            connection,
720                        } => {
721                            return Poll::Ready(ToSwarm::CloseConnection {
722                                peer_id,
723                                connection,
724                            });
725                        }
726                        ToSwarm::NewExternalAddrCandidate(observed) => {
727                            return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
728                        }
729                        ToSwarm::ExternalAddrConfirmed(addr) => {
730                            return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
731                        }
732                        ToSwarm::ExternalAddrExpired(addr) => {
733                            return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
734                        }
735                        ToSwarm::ListenOn { opts } => {
736                            return Poll::Ready(ToSwarm::ListenOn { opts });
737                        }
738                        ToSwarm::RemoveListener { id } => {
739                            return Poll::Ready(ToSwarm::RemoveListener { id });
740                        }
741                        event => {
742                            warn!(
743                                ?event,
744                                "New event from request response protocol must be send up"
745                            );
746
747                            continue;
748                        }
749                    };
750
751                    match event {
752                        // Received a request from a remote.
753                        RequestResponseEvent::Message {
754                            peer,
755                            message:
756                                RequestResponseMessage::Request {
757                                    request_id,
758                                    request,
759                                    channel,
760                                },
761                            ..
762                        } => {
763                            self.message_request = Some(MessageRequest {
764                                peer,
765                                request_id,
766                                request,
767                                channel,
768                                protocol: protocol.to_string(),
769                                response_builder: response_builder.clone(),
770                            });
771
772                            // This `continue` makes sure that `message_request` gets polled
773                            // after we have added the new element.
774                            continue 'poll_all;
775                        }
776
777                        // Received a response from a remote to one of our requests.
778                        RequestResponseEvent::Message {
779                            peer,
780                            message:
781                                RequestResponseMessage::Response {
782                                    request_id,
783                                    response,
784                                },
785                            ..
786                        } => {
787                            let (started, delivered) = match self
788                                .pending_requests
789                                .remove(&(protocol.clone(), request_id).into())
790                            {
791                                Some((started, pending_response)) => {
792                                    let delivered = pending_response
793                                        .send(response.map_err(|()| RequestFailure::Refused))
794                                        .map_err(|_| RequestFailure::Obsolete.to_string());
795                                    (started, delivered)
796                                }
797                                None => {
798                                    warn!(
799                                        "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
800                                        request_id,
801                                    );
802                                    debug_assert!(false);
803                                    continue;
804                                }
805                            };
806
807                            let out = Event::RequestFinished {
808                                peer: Some(peer),
809                                protocol: protocol.clone(),
810                                duration: started.elapsed(),
811                                result: delivered,
812                            };
813
814                            return Poll::Ready(ToSwarm::GenerateEvent(out));
815                        }
816
817                        // One of our requests has failed.
818                        RequestResponseEvent::OutboundFailure {
819                            peer,
820                            request_id,
821                            error,
822                            ..
823                        } => {
824                            let error_string = error.to_string();
825                            let started = match self
826                                .pending_requests
827                                .remove(&(protocol.clone(), request_id).into())
828                            {
829                                Some((started, pending_response)) => {
830                                    if pending_response
831                                        .send(Err(RequestFailure::Network(error)))
832                                        .is_err()
833                                    {
834                                        debug!(
835                                            %request_id,
836                                            "Request failed. At the same time local node is no longer interested in \
837                                            the result",
838                                        );
839                                    }
840                                    started
841                                }
842                                None => {
843                                    warn!(
844                                        %request_id,
845                                        "Received `RequestResponseEvent::Message` with unexpected request",
846                                    );
847                                    debug_assert!(false);
848                                    continue;
849                                }
850                            };
851
852                            let out = Event::RequestFinished {
853                                peer: Some(peer),
854                                protocol: protocol.clone(),
855                                duration: started.elapsed(),
856                                result: Err(error_string),
857                            };
858
859                            return Poll::Ready(ToSwarm::GenerateEvent(out));
860                        }
861
862                        // An inbound request failed, either while reading the request or due to
863                        // failing to send a response.
864                        RequestResponseEvent::InboundFailure { peer, error, .. } => {
865                            debug!(?error, %peer, "Inbound request failed.");
866
867                            let out = Event::InboundRequest {
868                                peer,
869                                protocol: protocol.clone(),
870                                result: Err(ResponseFailure::Network(error)),
871                            };
872                            return Poll::Ready(ToSwarm::GenerateEvent(out));
873                        }
874
875                        // A response to an inbound request has been sent.
876                        RequestResponseEvent::ResponseSent { peer, .. } => {
877                            let out = Event::InboundRequest {
878                                peer,
879                                protocol: protocol.clone(),
880                                result: Ok(()),
881                            };
882
883                            return Poll::Ready(ToSwarm::GenerateEvent(out));
884                        }
885                    };
886                }
887            }
888
889            break Poll::Pending;
890        }
891    }
892}
893
894/// Error when registering a protocol.
895#[derive(Debug, thiserror::Error)]
896pub enum RegisterError {
897    /// A protocol has been specified multiple times.
898    #[error("{0}")]
899    DuplicateProtocol(Cow<'static, str>),
900}
901
902/// Error in a request.
903#[derive(Debug, thiserror::Error)]
904#[allow(missing_docs)]
905pub enum RequestFailure {
906    #[error("We are not currently connected to the requested peer.")]
907    NotConnected,
908    #[error("Given protocol hasn't been registered.")]
909    UnknownProtocol,
910    #[error(
911        "Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it."
912    )]
913    Refused,
914    #[error("The remote replied, but the local node is no longer interested in the response.")]
915    Obsolete,
916    /// Problem on the network.
917    #[error("Problem on the network: {0}")]
918    Network(OutboundFailure),
919}
920
921/// Error when processing a request sent by a remote.
922#[derive(Debug, thiserror::Error)]
923pub enum ResponseFailure {
924    /// Problem on the network.
925    #[error("Problem on the network: {0}")]
926    Network(InboundFailure),
927}
928
929/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
930/// into requests and responses and vice-versa.
931#[derive(Debug, Clone)]
932#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
933pub struct GenericCodec {
934    max_request_size: u64,
935    max_response_size: u64,
936}
937
938#[async_trait::async_trait]
939impl RequestResponseCodec for GenericCodec {
940    type Protocol = StreamProtocol;
941    type Request = Vec<u8>;
942    type Response = Result<Vec<u8>, ()>;
943
944    async fn read_request<T>(
945        &mut self,
946        _: &Self::Protocol,
947        mut io: &mut T,
948    ) -> io::Result<Self::Request>
949    where
950        T: AsyncRead + Unpin + Send,
951    {
952        // Read the length.
953        let length = unsigned_varint::aio::read_usize(&mut io)
954            .await
955            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
956        if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
957            return Err(io::Error::new(
958                io::ErrorKind::InvalidInput,
959                format!(
960                    "Request size exceeds limit: {} > {}",
961                    length, self.max_request_size
962                ),
963            ));
964        }
965
966        // Read the payload.
967        let mut buffer = vec![0; length];
968        io.read_exact(&mut buffer).await?;
969        Ok(buffer)
970    }
971
972    async fn read_response<T>(
973        &mut self,
974        _: &Self::Protocol,
975        mut io: &mut T,
976    ) -> io::Result<Self::Response>
977    where
978        T: AsyncRead + Unpin + Send,
979    {
980        // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
981        // considered as a protocol error and will result in the entire connection being closed.
982        // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
983        // that this response is an error.
984
985        // Read the length.
986        let length = match unsigned_varint::aio::read_usize(&mut io).await {
987            Ok(l) => l,
988            Err(unsigned_varint::io::ReadError::Io(err))
989                if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
990            {
991                return Ok(Err(()));
992            }
993            Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
994        };
995
996        if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
997            return Err(io::Error::new(
998                io::ErrorKind::InvalidInput,
999                format!(
1000                    "Response size exceeds limit: {} > {}",
1001                    length, self.max_response_size
1002                ),
1003            ));
1004        }
1005
1006        // Read the payload.
1007        let mut buffer = vec![0; length];
1008        io.read_exact(&mut buffer).await?;
1009        Ok(Ok(buffer))
1010    }
1011
1012    async fn write_request<T>(
1013        &mut self,
1014        _: &Self::Protocol,
1015        io: &mut T,
1016        req: Self::Request,
1017    ) -> io::Result<()>
1018    where
1019        T: AsyncWrite + Unpin + Send,
1020    {
1021        // Write the length.
1022        {
1023            let mut buffer = unsigned_varint::encode::usize_buffer();
1024            io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1025                .await?;
1026        }
1027
1028        // Write the payload.
1029        io.write_all(&req).await?;
1030
1031        io.close().await?;
1032        Ok(())
1033    }
1034
1035    async fn write_response<T>(
1036        &mut self,
1037        _: &Self::Protocol,
1038        io: &mut T,
1039        res: Self::Response,
1040    ) -> io::Result<()>
1041    where
1042        T: AsyncWrite + Unpin + Send,
1043    {
1044        // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1045        if let Ok(res) = res {
1046            // Write the length.
1047            {
1048                let mut buffer = unsigned_varint::encode::usize_buffer();
1049                io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1050                    .await?;
1051            }
1052
1053            // Write the payload.
1054            io.write_all(&res).await?;
1055        }
1056
1057        io.close().await?;
1058        Ok(())
1059    }
1060}