subspace_networking/protocols/request_response/
request_response_factory.rs

1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//!   remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//!   with the request itself. The remote then sends the size of the response as a LEB128 number,
27//!   followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it
30//!   takes to send/receive the request and response.
31//!
32//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
33//!   is used to handle incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(test)]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48    Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49    Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50    OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::dial_opts::DialOpts;
55use libp2p::swarm::handler::multi::MultiHandler;
56use libp2p::swarm::{
57    ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
58};
59use std::borrow::Cow;
60use std::collections::HashMap;
61use std::collections::hash_map::Entry;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{io, iter};
66use tracing::{debug, error, warn};
67
68/// Defines a handler for the request-response protocol factory.
69#[async_trait]
70pub trait RequestHandler: Send {
71    /// Runs the underlying protocol handler.
72    async fn run(&mut self);
73
74    /// Returns a config for the request-response protocol factory.
75    fn protocol_config(&self) -> ProtocolConfig;
76
77    /// Returns a protocol name.
78    fn protocol_name(&self) -> &'static str;
79
80    /// Clone boxed value.
81    fn clone_box(&self) -> Box<dyn RequestHandler>;
82}
83
84impl Clone for Box<dyn RequestHandler> {
85    fn clone(&self) -> Self {
86        self.clone_box()
87    }
88}
89
90/// Configuration for a single request-response protocol.
91#[derive(Debug, Clone)]
92pub struct ProtocolConfig {
93    /// Name of the protocol on the wire. Should be something like `/foo/bar`.
94    pub name: &'static str,
95
96    /// Maximum allowed size, in bytes, of a request.
97    ///
98    /// Any request larger than this value will be declined as a way to avoid allocating too
99    /// much memory for it.
100    pub max_request_size: u64,
101
102    /// Maximum allowed size, in bytes, of a response.
103    ///
104    /// Any response larger than this value will be declined as a way to avoid allocating too
105    /// much memory for it.
106    pub max_response_size: u64,
107
108    /// Duration after which emitted requests are considered timed out.
109    ///
110    /// If you expect the response to come back quickly, you should set this to a smaller duration.
111    pub request_timeout: Duration,
112
113    /// Channel on which the networking service will send incoming requests.
114    ///
115    /// Every time a peer sends a request to the local node using this protocol, the networking
116    /// service will push an element on this channel. The receiving side of this channel then has
117    /// to pull this element, process the request, and send back the response to send back to the
118    /// peer.
119    ///
120    /// The size of the channel has to be carefully chosen. If the channel is full, the networking
121    /// service will discard the incoming request send back an error to the peer. Consequently,
122    /// the channel being full is an indicator that the node is overloaded.
123    ///
124    /// You can typically set the size of the channel to `T / d`, where `T` is the
125    /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
126    /// build a response.
127    ///
128    /// Can be `None` if the local node does not support answering incoming requests.
129    /// If this is `None`, then the local node will not advertise support for this protocol towards
130    /// other peers. If this is `Some` but the channel is closed, then the local node will
131    /// advertise support for this protocol, but any incoming request will lead to an error being
132    /// sent back.
133    pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
134}
135
136impl ProtocolConfig {
137    /// Creates request-response protocol config.
138    pub fn new(protocol_name: &'static str) -> ProtocolConfig {
139        ProtocolConfig {
140            name: protocol_name,
141            max_request_size: 1024 * 1024,
142            max_response_size: 16 * 1024 * 1024,
143            request_timeout: Duration::from_secs(20),
144            inbound_queue: None,
145        }
146    }
147}
148
149/// A single request received by a peer on a request-response protocol.
150#[derive(Debug)]
151pub struct IncomingRequest {
152    /// Who sent the request.
153    pub peer: PeerId,
154
155    /// Request sent by the remote. Will always be smaller than
156    /// [`ProtocolConfig::max_request_size`].
157    pub payload: Vec<u8>,
158
159    /// Channel to send back the response.
160    ///
161    /// There are two ways to indicate that handling the request failed:
162    ///
163    /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
164    ///
165    /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
166    ///    the given peer.
167    pub pending_response: oneshot::Sender<OutgoingResponse>,
168}
169
170/// Response for an incoming request to be send by a request protocol handler.
171#[derive(Debug)]
172pub struct OutgoingResponse {
173    /// The payload of the response.
174    ///
175    /// `Err(())` if none is available e.g. due an error while handling the request.
176    pub result: Result<Vec<u8>, ()>,
177
178    /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
179    /// peer.
180    ///
181    /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
182    /// outgoing data for each TCP socket, and it is not possible for a user
183    /// application to inspect this buffer. This channel here is not actually notified
184    /// when the response has been fully sent out, but rather when it has fully been
185    /// written to the buffer managed by the operating system.
186    pub sent_feedback: Option<oneshot::Sender<()>>,
187}
188
189/// Event generated by the [`RequestResponseFactoryBehaviour`].
190#[derive(Debug)]
191// We are not reading these events in a meaningful way right now, but the fields in there are still
192// potentially useful
193#[allow(dead_code)]
194pub enum Event {
195    /// A remote sent a request and either we have successfully answered it or an error happened.
196    ///
197    /// This event is generated for statistics purposes.
198    InboundRequest {
199        /// Peer which has emitted the request.
200        peer: PeerId,
201        /// Name of the protocol in question.
202        protocol: Cow<'static, str>,
203        /// Whether handling the request was successful or unsuccessful.
204        ///
205        /// When successful contains the time elapsed between when we received the request and when
206        /// we sent back the response. When unsuccessful contains the failure reason.
207        result: Result<(), ResponseFailure>,
208    },
209
210    /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
211    /// failed.
212    ///
213    /// This event is generated for statistics purposes.
214    RequestFinished {
215        /// Peer that we sent the request to, if one was chosen.
216        peer: Option<PeerId>,
217        /// Name of the protocol in question.
218        protocol: Cow<'static, str>,
219        /// Duration the request took.
220        duration: Duration,
221        /// Result of the request.
222        result: Result<(), String>,
223    },
224}
225
226/// Combination of a protocol name and a request id.
227///
228/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
229/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
230/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
231/// [`ProtocolRequestId`]s.
232#[derive(Debug, Clone, PartialEq, Eq, Hash)]
233struct ProtocolRequestId {
234    protocol: Cow<'static, str>,
235    request_id: OutboundRequestId,
236}
237
238impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
239    #[inline]
240    fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
241        Self {
242            protocol,
243            request_id,
244        }
245    }
246}
247
248/// When sending a request, what to do on a disconnected recipient.
249#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
250pub enum IfDisconnected {
251    /// Try to connect to the peer.
252    TryConnect,
253    /// Just fail if the destination is not yet connected.
254    #[allow(dead_code)] // reserved for the future logic or config change
255    ImmediateError,
256}
257
258/// Convenience functions for `IfDisconnected`.
259impl IfDisconnected {
260    /// Shall we connect to a disconnected peer?
261    pub fn should_connect(self) -> bool {
262        match self {
263            Self::TryConnect => true,
264            Self::ImmediateError => false,
265        }
266    }
267}
268
269/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
270/// protocols.
271#[allow(clippy::type_complexity)] // to preserve compatibility with copied implementation
272pub struct RequestResponseFactoryBehaviour {
273    /// The multiple sub-protocols, by name.
274    /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
275    /// "response builder" used to build responses for incoming requests.
276    protocols: HashMap<
277        Cow<'static, str>,
278        (
279            RequestResponse<GenericCodec>,
280            Option<mpsc::Sender<IncomingRequest>>,
281        ),
282    >,
283
284    /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
285    pending_requests:
286        HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
287
288    /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
289    /// start time and the response to send back to the remote.
290    pending_responses: stream::FuturesUnordered<
291        Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
292    >,
293
294    /// Pending message request, holds `MessageRequest` as a Future state to poll it
295    /// until we get a response from `Peerset`
296    message_request: Option<MessageRequest>,
297
298    /// Request handlers future collection.
299    request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
300}
301
302// This is a state of processing incoming request Message.
303struct MessageRequest {
304    peer: PeerId,
305    request_id: InboundRequestId,
306    request: Vec<u8>,
307    channel: ResponseChannel<Result<Vec<u8>, ()>>,
308    protocol: String,
309    response_builder: Option<mpsc::Sender<IncomingRequest>>,
310}
311
312/// Generated by the response builder and waiting to be processed.
313struct RequestProcessingOutcome {
314    request_id: InboundRequestId,
315    protocol: Cow<'static, str>,
316    inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
317    response: OutgoingResponse,
318}
319
320impl RequestResponseFactoryBehaviour {
321    /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
322    /// the same protocol is passed twice.
323    pub fn new(
324        list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
325        max_concurrent_streams: usize,
326    ) -> Result<Self, RegisterError> {
327        let mut protocols = HashMap::new();
328        let mut request_handlers = Vec::new();
329        for mut handler in list {
330            let config = handler.protocol_config();
331
332            let protocol_support = if config.inbound_queue.is_some() {
333                ProtocolSupport::Full
334            } else {
335                ProtocolSupport::Outbound
336            };
337
338            let rq_rp = RequestResponse::with_codec(
339                GenericCodec {
340                    max_request_size: config.max_request_size,
341                    max_response_size: config.max_response_size,
342                },
343                iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
344                RequestResponseConfig::default()
345                    .with_request_timeout(config.request_timeout)
346                    .with_max_concurrent_streams(max_concurrent_streams),
347            );
348
349            match protocols.entry(Cow::Borrowed(config.name)) {
350                Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
351                Entry::Occupied(e) => {
352                    return Err(RegisterError::DuplicateProtocol(e.key().clone()));
353                }
354            };
355
356            let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
357                Box::pin(async move { handler.run().await }.fuse());
358
359            request_handlers.push(request_handler_run);
360        }
361
362        Ok(Self {
363            protocols,
364            pending_requests: Default::default(),
365            pending_responses: Default::default(),
366            message_request: None,
367            request_handlers,
368        })
369    }
370
371    /// Initiates sending a request.
372    ///
373    /// If there is no established connection to the target peer, the behavior is determined by the
374    /// choice of `connect`.
375    ///
376    /// An error is returned if the protocol doesn't match one that has been registered.
377    pub fn send_request(
378        &mut self,
379        target: &PeerId,
380        protocol_name: &str,
381        request: Vec<u8>,
382        pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
383        connect: IfDisconnected,
384        addresses: Vec<Multiaddr>,
385    ) {
386        if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
387            if protocol.is_connected(target) || connect.should_connect() {
388                let opts = DialOpts::peer_id(*target).addresses(addresses).build();
389                let request_id = protocol.send_request(opts, request);
390                let prev_req_id = self.pending_requests.insert(
391                    (protocol_name.to_string().into(), request_id).into(),
392                    (Instant::now(), pending_response),
393                );
394                debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
395            } else if pending_response
396                .send(Err(RequestFailure::NotConnected))
397                .is_err()
398            {
399                debug!(
400                    "Not connected to peer {:?}. At the same time local \
401                     node is no longer interested in the result.",
402                    target,
403                );
404            }
405        } else if pending_response
406            .send(Err(RequestFailure::UnknownProtocol))
407            .is_err()
408        {
409            debug!(
410                "Unknown protocol {:?}. At the same time local \
411                 node is no longer interested in the result.",
412                protocol_name,
413            );
414        }
415    }
416}
417
418impl NetworkBehaviour for RequestResponseFactoryBehaviour {
419    type ConnectionHandler = MultiHandler<
420        String,
421        <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
422    >;
423    type ToSwarm = Event;
424
425    fn handle_established_inbound_connection(
426        &mut self,
427        connection_id: ConnectionId,
428        peer: PeerId,
429        local_addr: &Multiaddr,
430        remote_addr: &Multiaddr,
431    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
432        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
433            (
434                p.to_string(),
435                r.handle_established_inbound_connection(
436                    connection_id,
437                    peer,
438                    local_addr,
439                    remote_addr,
440                )
441                .expect(
442                    "Behaviours return handlers in these methods with the exception of \
443                    'connection management' behaviours like connection-limits or allow-black list. \
444                    So, inner request-response behaviour always returns Ok(handler).",
445                ),
446            )
447        });
448
449        let handler = MultiHandler::try_from_iter(iter).expect(
450            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
451			 which is the only possible error; qed",
452        );
453
454        Ok(handler)
455    }
456
457    fn handle_established_outbound_connection(
458        &mut self,
459        connection_id: ConnectionId,
460        peer: PeerId,
461        addr: &Multiaddr,
462        role_override: Endpoint,
463        port_use: PortUse,
464    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
465        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
466            (
467                p.to_string(),
468                r.handle_established_outbound_connection(
469                    connection_id,
470                    peer,
471                    addr,
472                    role_override,
473                    port_use,
474                )
475                .expect(
476                    "Behaviours return handlers in these methods with the exception of \
477                        'connection management' behaviours like connection-limits or allow-black \
478                        list. So, inner request-response behaviour always returns Ok(handler).",
479                ),
480            )
481        });
482
483        let handler = MultiHandler::try_from_iter(iter).expect(
484            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
485            which is the only possible error; qed",
486        );
487
488        Ok(handler)
489    }
490
491    /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
492    fn on_swarm_event(&mut self, event: FromSwarm) {
493        match event {
494            FromSwarm::ConnectionEstablished(inner) => {
495                for (protocol, _) in self.protocols.values_mut() {
496                    protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
497                }
498            }
499            FromSwarm::ConnectionClosed(inner) => {
500                for (protocol, _) in self.protocols.values_mut() {
501                    protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
502                        peer_id: inner.peer_id,
503                        connection_id: inner.connection_id,
504                        endpoint: inner.endpoint,
505                        cause: inner.cause,
506                        remaining_established: inner.remaining_established,
507                    }));
508                }
509            }
510            FromSwarm::AddressChange(inner) => {
511                for (protocol, _) in self.protocols.values_mut() {
512                    protocol.on_swarm_event(FromSwarm::AddressChange(inner));
513                }
514            }
515            FromSwarm::DialFailure(inner) => {
516                for (protocol, _) in self.protocols.values_mut() {
517                    protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
518                        peer_id: inner.peer_id,
519                        error: inner.error,
520                        connection_id: inner.connection_id,
521                    }));
522                }
523            }
524            FromSwarm::ListenFailure(inner) => {
525                for (protocol, _) in self.protocols.values_mut() {
526                    protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
527                        local_addr: inner.local_addr,
528                        send_back_addr: inner.send_back_addr,
529                        error: inner.error,
530                        connection_id: inner.connection_id,
531                        peer_id: inner.peer_id,
532                    }));
533                }
534            }
535            FromSwarm::NewListener(inner) => {
536                for (protocol, _) in self.protocols.values_mut() {
537                    protocol.on_swarm_event(FromSwarm::NewListener(inner));
538                }
539            }
540            FromSwarm::NewListenAddr(inner) => {
541                for (protocol, _) in self.protocols.values_mut() {
542                    protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
543                }
544            }
545            FromSwarm::ExpiredListenAddr(inner) => {
546                for (protocol, _) in self.protocols.values_mut() {
547                    protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
548                }
549            }
550            FromSwarm::ListenerError(inner) => {
551                for (protocol, _) in self.protocols.values_mut() {
552                    protocol.on_swarm_event(FromSwarm::ListenerError(inner));
553                }
554            }
555            FromSwarm::ListenerClosed(inner) => {
556                for (protocol, _) in self.protocols.values_mut() {
557                    protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
558                }
559            }
560            FromSwarm::NewExternalAddrCandidate(inner) => {
561                for (protocol, _) in self.protocols.values_mut() {
562                    protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
563                }
564            }
565            FromSwarm::ExternalAddrConfirmed(inner) => {
566                for (protocol, _) in self.protocols.values_mut() {
567                    protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
568                }
569            }
570            FromSwarm::ExternalAddrExpired(inner) => {
571                for (protocol, _) in self.protocols.values_mut() {
572                    protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
573                }
574            }
575            FromSwarm::NewExternalAddrOfPeer(inner) => {
576                for (protocol, _) in self.protocols.values_mut() {
577                    protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
578                }
579            }
580            event => {
581                warn!(
582                    ?event,
583                    "New event must be forwarded to request response protocols"
584                );
585            }
586        };
587    }
588
589    fn on_connection_handler_event(
590        &mut self,
591        peer_id: PeerId,
592        connection: ConnectionId,
593        event: THandlerOutEvent<Self>,
594    ) {
595        let p_name = event.0;
596        if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
597            return proto.on_connection_handler_event(peer_id, connection, event.1);
598        }
599
600        warn!(
601            "inject_node_event: no request-response instance registered for protocol {:?}",
602            p_name
603        )
604    }
605
606    fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
607        'poll_all: loop {
608            if let Some(message_request) = self.message_request.take() {
609                let MessageRequest {
610                    peer,
611                    request_id,
612                    request,
613                    channel,
614                    protocol,
615                    response_builder,
616                } = message_request;
617
618                let (tx, rx) = oneshot::channel();
619
620                // Submit the request to the "response builder" passed by the user at
621                // initialization.
622                if let Some(mut response_builder) = response_builder {
623                    // If the response builder is too busy, silently drop `tx`. This
624                    // will be reported by the corresponding `RequestResponse` through
625                    // an `InboundFailure::Omission` event.
626                    let _ = response_builder.try_send(IncomingRequest {
627                        peer,
628                        payload: request,
629                        pending_response: tx,
630                    });
631                } else {
632                    debug_assert!(false, "Received message on outbound-only protocol.");
633                }
634
635                self.pending_responses.push(Box::pin(async move {
636                    // The `tx` created above can be dropped if we are not capable of
637                    // processing this request, which is reflected as a
638                    // `InboundFailure::Omission` event.
639                    if let Ok(response) = rx.await {
640                        Some(RequestProcessingOutcome {
641                            request_id,
642                            protocol: Cow::from(protocol),
643                            inner_channel: channel,
644                            response,
645                        })
646                    } else {
647                        None
648                    }
649                }));
650
651                // This `continue` makes sure that `pending_responses` gets polled
652                // after we have added the new element.
653                continue 'poll_all;
654            }
655            // Poll to see if any response is ready to be sent back.
656            while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
657                let RequestProcessingOutcome {
658                    request_id,
659                    protocol: protocol_name,
660                    inner_channel,
661                    response: OutgoingResponse { result, .. },
662                } = match outcome {
663                    Some(outcome) => outcome,
664                    // The response builder was too busy or handling the request failed. This is
665                    // later on reported as a `InboundFailure::Omission`.
666                    None => continue,
667                };
668
669                if let Ok(payload) = result
670                    && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
671                    && protocol.send_response(inner_channel, Ok(payload)).is_err()
672                {
673                    // Note: Failure is handled further below when receiving
674                    // `InboundFailure` event from `RequestResponse` behaviour.
675                    debug!(
676                        %request_id,
677                        "Failed to send response for request on protocol {} due to a \
678                        timeout or due to the connection to the peer being closed. \
679                        Dropping response",
680                        protocol_name,
681                    );
682                }
683            }
684
685            for rq_rs_runner in &mut self.request_handlers {
686                // Future.Output == (), so we don't need a result here
687                let _ = rq_rs_runner.poll_unpin(cx);
688            }
689
690            // Poll request-responses protocols.
691            for (protocol, (behaviour, response_builder)) in &mut self.protocols {
692                while let Poll::Ready(event) = behaviour.poll(cx) {
693                    let event = match event {
694                        // Main events we are interested in.
695                        ToSwarm::GenerateEvent(event) => event,
696
697                        // Other events generated by the underlying behaviour are transparently
698                        // passed through.
699                        ToSwarm::Dial { opts } => {
700                            if opts.get_peer_id().is_none() {
701                                error!(
702                                    "The request-response isn't supposed to start dialing \
703                                    addresses"
704                                );
705                            }
706                            return Poll::Ready(ToSwarm::Dial { opts });
707                        }
708                        ToSwarm::NotifyHandler {
709                            peer_id,
710                            handler,
711                            event,
712                        } => {
713                            return Poll::Ready(ToSwarm::NotifyHandler {
714                                peer_id,
715                                handler,
716                                event: ((*protocol).to_string(), event),
717                            });
718                        }
719                        ToSwarm::CloseConnection {
720                            peer_id,
721                            connection,
722                        } => {
723                            return Poll::Ready(ToSwarm::CloseConnection {
724                                peer_id,
725                                connection,
726                            });
727                        }
728                        ToSwarm::NewExternalAddrCandidate(observed) => {
729                            return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
730                        }
731                        ToSwarm::ExternalAddrConfirmed(addr) => {
732                            return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
733                        }
734                        ToSwarm::ExternalAddrExpired(addr) => {
735                            return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
736                        }
737                        ToSwarm::ListenOn { opts } => {
738                            return Poll::Ready(ToSwarm::ListenOn { opts });
739                        }
740                        ToSwarm::RemoveListener { id } => {
741                            return Poll::Ready(ToSwarm::RemoveListener { id });
742                        }
743                        event => {
744                            warn!(
745                                ?event,
746                                "New event from request response protocol must be send up"
747                            );
748
749                            continue;
750                        }
751                    };
752
753                    match event {
754                        // Received a request from a remote.
755                        RequestResponseEvent::Message {
756                            peer,
757                            message:
758                                RequestResponseMessage::Request {
759                                    request_id,
760                                    request,
761                                    channel,
762                                },
763                        } => {
764                            self.message_request = Some(MessageRequest {
765                                peer,
766                                request_id,
767                                request,
768                                channel,
769                                protocol: protocol.to_string(),
770                                response_builder: response_builder.clone(),
771                            });
772
773                            // This `continue` makes sure that `message_request` gets polled
774                            // after we have added the new element.
775                            continue 'poll_all;
776                        }
777
778                        // Received a response from a remote to one of our requests.
779                        RequestResponseEvent::Message {
780                            peer,
781                            message:
782                                RequestResponseMessage::Response {
783                                    request_id,
784                                    response,
785                                },
786                        } => {
787                            let (started, delivered) = match self
788                                .pending_requests
789                                .remove(&(protocol.clone(), request_id).into())
790                            {
791                                Some((started, pending_response)) => {
792                                    let delivered = pending_response
793                                        .send(response.map_err(|()| RequestFailure::Refused))
794                                        .map_err(|_| RequestFailure::Obsolete.to_string());
795                                    (started, delivered)
796                                }
797                                None => {
798                                    warn!(
799                                        "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
800                                        request_id,
801                                    );
802                                    debug_assert!(false);
803                                    continue;
804                                }
805                            };
806
807                            let out = Event::RequestFinished {
808                                peer: Some(peer),
809                                protocol: protocol.clone(),
810                                duration: started.elapsed(),
811                                result: delivered,
812                            };
813
814                            return Poll::Ready(ToSwarm::GenerateEvent(out));
815                        }
816
817                        // One of our requests has failed.
818                        RequestResponseEvent::OutboundFailure {
819                            peer,
820                            request_id,
821                            error,
822                            ..
823                        } => {
824                            let error_string = error.to_string();
825                            let started = match self
826                                .pending_requests
827                                .remove(&(protocol.clone(), request_id).into())
828                            {
829                                Some((started, pending_response)) => {
830                                    if pending_response
831                                        .send(Err(RequestFailure::Network(error)))
832                                        .is_err()
833                                    {
834                                        debug!(
835                                            %request_id,
836                                            "Request failed. At the same time local node is no longer interested in \
837                                            the result",
838                                        );
839                                    }
840                                    started
841                                }
842                                None => {
843                                    warn!(
844                                        %request_id,
845                                        "Received `RequestResponseEvent::Message` with unexpected request",
846                                    );
847                                    debug_assert!(false);
848                                    continue;
849                                }
850                            };
851
852                            let out = Event::RequestFinished {
853                                peer,
854                                protocol: protocol.clone(),
855                                duration: started.elapsed(),
856                                result: Err(error_string),
857                            };
858
859                            return Poll::Ready(ToSwarm::GenerateEvent(out));
860                        }
861
862                        // An inbound request failed, either while reading the request or due to
863                        // failing to send a response.
864                        RequestResponseEvent::InboundFailure { peer, error, .. } => {
865                            debug!(?error, %peer, "Inbound request failed.");
866
867                            let out = Event::InboundRequest {
868                                peer,
869                                protocol: protocol.clone(),
870                                result: Err(ResponseFailure::Network(error)),
871                            };
872                            return Poll::Ready(ToSwarm::GenerateEvent(out));
873                        }
874
875                        // A response to an inbound request has been sent.
876                        RequestResponseEvent::ResponseSent { peer, .. } => {
877                            let out = Event::InboundRequest {
878                                peer,
879                                protocol: protocol.clone(),
880                                result: Ok(()),
881                            };
882
883                            return Poll::Ready(ToSwarm::GenerateEvent(out));
884                        }
885                    };
886                }
887            }
888
889            break Poll::Pending;
890        }
891    }
892}
893
894/// Error when registering a protocol.
895#[derive(Debug, thiserror::Error)]
896pub enum RegisterError {
897    /// A protocol has been specified multiple times.
898    #[error("{0}")]
899    DuplicateProtocol(Cow<'static, str>),
900}
901
902/// Error in a request.
903#[derive(Debug, thiserror::Error)]
904#[allow(missing_docs)]
905pub enum RequestFailure {
906    #[error("We are not currently connected to the requested peer.")]
907    NotConnected,
908    #[error("Given protocol hasn't been registered.")]
909    UnknownProtocol,
910    #[error(
911        "Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it."
912    )]
913    Refused,
914    #[error("The remote replied, but the local node is no longer interested in the response.")]
915    Obsolete,
916    /// Problem on the network.
917    #[error("Problem on the network: {0}")]
918    Network(OutboundFailure),
919}
920
921/// Error when processing a request sent by a remote.
922#[derive(Debug, thiserror::Error)]
923pub enum ResponseFailure {
924    /// Problem on the network.
925    #[error("Problem on the network: {0}")]
926    Network(InboundFailure),
927}
928
929/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
930/// into requests and responses and vice-versa.
931#[derive(Debug, Clone)]
932#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
933pub struct GenericCodec {
934    max_request_size: u64,
935    max_response_size: u64,
936}
937
938#[async_trait::async_trait]
939impl RequestResponseCodec for GenericCodec {
940    type Protocol = StreamProtocol;
941    type Request = Vec<u8>;
942    type Response = Result<Vec<u8>, ()>;
943
944    async fn read_request<T>(
945        &mut self,
946        _: &Self::Protocol,
947        mut io: &mut T,
948    ) -> io::Result<Self::Request>
949    where
950        T: AsyncRead + Unpin + Send,
951    {
952        // Read the length.
953        let length = unsigned_varint::aio::read_usize(&mut io)
954            .await
955            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
956        if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
957            return Err(io::Error::new(
958                io::ErrorKind::InvalidInput,
959                format!(
960                    "Request size exceeds limit: {} > {}",
961                    length, self.max_request_size
962                ),
963            ));
964        }
965
966        // Read the payload.
967        let mut buffer = vec![0; length];
968        io.read_exact(&mut buffer).await?;
969        Ok(buffer)
970    }
971
972    async fn read_response<T>(
973        &mut self,
974        _: &Self::Protocol,
975        mut io: &mut T,
976    ) -> io::Result<Self::Response>
977    where
978        T: AsyncRead + Unpin + Send,
979    {
980        // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
981        // considered as a protocol error and will result in the entire connection being closed.
982        // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
983        // that this response is an error.
984
985        // Read the length.
986        let length = match unsigned_varint::aio::read_usize(&mut io).await {
987            Ok(l) => l,
988            Err(unsigned_varint::io::ReadError::Io(err))
989                if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
990            {
991                return Ok(Err(()));
992            }
993            Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
994        };
995
996        if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
997            return Err(io::Error::new(
998                io::ErrorKind::InvalidInput,
999                format!(
1000                    "Response size exceeds limit: {} > {}",
1001                    length, self.max_response_size
1002                ),
1003            ));
1004        }
1005
1006        // Read the payload.
1007        let mut buffer = vec![0; length];
1008        io.read_exact(&mut buffer).await?;
1009        Ok(Ok(buffer))
1010    }
1011
1012    async fn write_request<T>(
1013        &mut self,
1014        _: &Self::Protocol,
1015        io: &mut T,
1016        req: Self::Request,
1017    ) -> io::Result<()>
1018    where
1019        T: AsyncWrite + Unpin + Send,
1020    {
1021        // Write the length.
1022        {
1023            let mut buffer = unsigned_varint::encode::usize_buffer();
1024            io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1025                .await?;
1026        }
1027
1028        // Write the payload.
1029        io.write_all(&req).await?;
1030
1031        io.close().await?;
1032        Ok(())
1033    }
1034
1035    async fn write_response<T>(
1036        &mut self,
1037        _: &Self::Protocol,
1038        io: &mut T,
1039        res: Self::Response,
1040    ) -> io::Result<()>
1041    where
1042        T: AsyncWrite + Unpin + Send,
1043    {
1044        // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1045        if let Ok(res) = res {
1046            // Write the length.
1047            {
1048                let mut buffer = unsigned_varint::encode::usize_buffer();
1049                io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1050                    .await?;
1051            }
1052
1053            // Write the payload.
1054            io.write_all(&res).await?;
1055        }
1056
1057        io.close().await?;
1058        Ok(())
1059    }
1060}