subspace_networking/protocols/request_response/
request_response_factory.rs

1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//!   remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//!   with the request itself. The remote then sends the size of the response as a LEB128 number,
27//!   followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it
30//!   takes to send/receive the request and response.
31//!
32//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
33//!   is used to handle incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(test)]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::core::transport::PortUse;
44use libp2p::core::{Endpoint, Multiaddr};
45use libp2p::identity::PeerId;
46use libp2p::request_response::{
47    Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
48    Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
49    OutboundRequestId, ProtocolSupport, ResponseChannel,
50};
51pub use libp2p::request_response::{InboundFailure, OutboundFailure};
52use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
53use libp2p::swarm::dial_opts::DialOpts;
54use libp2p::swarm::handler::multi::MultiHandler;
55use libp2p::swarm::{
56    ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
57};
58use libp2p::StreamProtocol;
59use std::borrow::Cow;
60use std::collections::hash_map::Entry;
61use std::collections::HashMap;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{io, iter};
66use tracing::{debug, error, warn};
67
68const LOG_TARGET: &str = "request-response-protocols";
69
70/// Defines a handler for the request-response protocol factory.
71#[async_trait]
72pub trait RequestHandler: Send {
73    /// Runs the underlying protocol handler.
74    async fn run(&mut self);
75
76    /// Returns a config for the request-response protocol factory.
77    fn protocol_config(&self) -> ProtocolConfig;
78
79    /// Returns a protocol name.
80    fn protocol_name(&self) -> &'static str;
81
82    /// Clone boxed value.
83    fn clone_box(&self) -> Box<dyn RequestHandler>;
84}
85
86impl Clone for Box<dyn RequestHandler> {
87    fn clone(&self) -> Self {
88        self.clone_box()
89    }
90}
91
92/// Configuration for a single request-response protocol.
93#[derive(Debug, Clone)]
94pub struct ProtocolConfig {
95    /// Name of the protocol on the wire. Should be something like `/foo/bar`.
96    pub name: &'static str,
97
98    /// Maximum allowed size, in bytes, of a request.
99    ///
100    /// Any request larger than this value will be declined as a way to avoid allocating too
101    /// much memory for it.
102    pub max_request_size: u64,
103
104    /// Maximum allowed size, in bytes, of a response.
105    ///
106    /// Any response larger than this value will be declined as a way to avoid allocating too
107    /// much memory for it.
108    pub max_response_size: u64,
109
110    /// Duration after which emitted requests are considered timed out.
111    ///
112    /// If you expect the response to come back quickly, you should set this to a smaller duration.
113    pub request_timeout: Duration,
114
115    /// Channel on which the networking service will send incoming requests.
116    ///
117    /// Every time a peer sends a request to the local node using this protocol, the networking
118    /// service will push an element on this channel. The receiving side of this channel then has
119    /// to pull this element, process the request, and send back the response to send back to the
120    /// peer.
121    ///
122    /// The size of the channel has to be carefully chosen. If the channel is full, the networking
123    /// service will discard the incoming request send back an error to the peer. Consequently,
124    /// the channel being full is an indicator that the node is overloaded.
125    ///
126    /// You can typically set the size of the channel to `T / d`, where `T` is the
127    /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
128    /// build a response.
129    ///
130    /// Can be `None` if the local node does not support answering incoming requests.
131    /// If this is `None`, then the local node will not advertise support for this protocol towards
132    /// other peers. If this is `Some` but the channel is closed, then the local node will
133    /// advertise support for this protocol, but any incoming request will lead to an error being
134    /// sent back.
135    pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
136}
137
138impl ProtocolConfig {
139    /// Creates request-response protocol config.
140    pub fn new(protocol_name: &'static str) -> ProtocolConfig {
141        ProtocolConfig {
142            name: protocol_name,
143            max_request_size: 1024 * 1024,
144            max_response_size: 16 * 1024 * 1024,
145            request_timeout: Duration::from_secs(20),
146            inbound_queue: None,
147        }
148    }
149}
150
151/// A single request received by a peer on a request-response protocol.
152#[derive(Debug)]
153pub struct IncomingRequest {
154    /// Who sent the request.
155    pub peer: PeerId,
156
157    /// Request sent by the remote. Will always be smaller than
158    /// [`ProtocolConfig::max_request_size`].
159    pub payload: Vec<u8>,
160
161    /// Channel to send back the response.
162    ///
163    /// There are two ways to indicate that handling the request failed:
164    ///
165    /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
166    ///
167    /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
168    ///    the given peer.
169    pub pending_response: oneshot::Sender<OutgoingResponse>,
170}
171
172/// Response for an incoming request to be send by a request protocol handler.
173#[derive(Debug)]
174pub struct OutgoingResponse {
175    /// The payload of the response.
176    ///
177    /// `Err(())` if none is available e.g. due an error while handling the request.
178    pub result: Result<Vec<u8>, ()>,
179
180    /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
181    /// peer.
182    ///
183    /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
184    /// outgoing data for each TCP socket, and it is not possible for a user
185    /// application to inspect this buffer. This channel here is not actually notified
186    /// when the response has been fully sent out, but rather when it has fully been
187    /// written to the buffer managed by the operating system.
188    pub sent_feedback: Option<oneshot::Sender<()>>,
189}
190
191/// Event generated by the [`RequestResponseFactoryBehaviour`].
192#[derive(Debug)]
193// We are not reading these events in a meaningful way right now, but the fields in there are still
194// potentially useful
195#[allow(dead_code)]
196pub enum Event {
197    /// A remote sent a request and either we have successfully answered it or an error happened.
198    ///
199    /// This event is generated for statistics purposes.
200    InboundRequest {
201        /// Peer which has emitted the request.
202        peer: PeerId,
203        /// Name of the protocol in question.
204        protocol: Cow<'static, str>,
205        /// Whether handling the request was successful or unsuccessful.
206        ///
207        /// When successful contains the time elapsed between when we received the request and when
208        /// we sent back the response. When unsuccessful contains the failure reason.
209        result: Result<(), ResponseFailure>,
210    },
211
212    /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
213    /// failed.
214    ///
215    /// This event is generated for statistics purposes.
216    RequestFinished {
217        /// Peer that we sent the request to, if one was chosen.
218        peer: Option<PeerId>,
219        /// Name of the protocol in question.
220        protocol: Cow<'static, str>,
221        /// Duration the request took.
222        duration: Duration,
223        /// Result of the request.
224        result: Result<(), String>,
225    },
226}
227
228/// Combination of a protocol name and a request id.
229///
230/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
231/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
232/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
233/// [`ProtocolRequestId`]s.
234#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235struct ProtocolRequestId {
236    protocol: Cow<'static, str>,
237    request_id: OutboundRequestId,
238}
239
240impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
241    #[inline]
242    fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
243        Self {
244            protocol,
245            request_id,
246        }
247    }
248}
249
250/// When sending a request, what to do on a disconnected recipient.
251#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
252pub enum IfDisconnected {
253    /// Try to connect to the peer.
254    TryConnect,
255    /// Just fail if the destination is not yet connected.
256    #[allow(dead_code)] // reserved for the future logic or config change
257    ImmediateError,
258}
259
260/// Convenience functions for `IfDisconnected`.
261impl IfDisconnected {
262    /// Shall we connect to a disconnected peer?
263    pub fn should_connect(self) -> bool {
264        match self {
265            Self::TryConnect => true,
266            Self::ImmediateError => false,
267        }
268    }
269}
270
271/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
272/// protocols.
273#[allow(clippy::type_complexity)] // to preserve compatibility with copied implementation
274pub struct RequestResponseFactoryBehaviour {
275    /// The multiple sub-protocols, by name.
276    /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
277    /// "response builder" used to build responses for incoming requests.
278    protocols: HashMap<
279        Cow<'static, str>,
280        (
281            RequestResponse<GenericCodec>,
282            Option<mpsc::Sender<IncomingRequest>>,
283        ),
284    >,
285
286    /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
287    pending_requests:
288        HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
289
290    /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
291    /// start time and the response to send back to the remote.
292    pending_responses: stream::FuturesUnordered<
293        Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
294    >,
295
296    /// Pending message request, holds `MessageRequest` as a Future state to poll it
297    /// until we get a response from `Peerset`
298    message_request: Option<MessageRequest>,
299
300    /// Request handlers future collection.
301    request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
302}
303
304// This is a state of processing incoming request Message.
305struct MessageRequest {
306    peer: PeerId,
307    request_id: InboundRequestId,
308    request: Vec<u8>,
309    channel: ResponseChannel<Result<Vec<u8>, ()>>,
310    protocol: String,
311    response_builder: Option<mpsc::Sender<IncomingRequest>>,
312}
313
314/// Generated by the response builder and waiting to be processed.
315struct RequestProcessingOutcome {
316    request_id: InboundRequestId,
317    protocol: Cow<'static, str>,
318    inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
319    response: OutgoingResponse,
320}
321
322impl RequestResponseFactoryBehaviour {
323    /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
324    /// the same protocol is passed twice.
325    pub fn new(
326        list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
327        max_concurrent_streams: usize,
328    ) -> Result<Self, RegisterError> {
329        let mut protocols = HashMap::new();
330        let mut request_handlers = Vec::new();
331        for mut handler in list {
332            let config = handler.protocol_config();
333
334            let protocol_support = if config.inbound_queue.is_some() {
335                ProtocolSupport::Full
336            } else {
337                ProtocolSupport::Outbound
338            };
339
340            let rq_rp = RequestResponse::with_codec(
341                GenericCodec {
342                    max_request_size: config.max_request_size,
343                    max_response_size: config.max_response_size,
344                },
345                iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
346                RequestResponseConfig::default()
347                    .with_request_timeout(config.request_timeout)
348                    .with_max_concurrent_streams(max_concurrent_streams),
349            );
350
351            match protocols.entry(Cow::Borrowed(config.name)) {
352                Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
353                Entry::Occupied(e) => {
354                    return Err(RegisterError::DuplicateProtocol(e.key().clone()))
355                }
356            };
357
358            let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
359                Box::pin(async move { handler.run().await }.fuse());
360
361            request_handlers.push(request_handler_run);
362        }
363
364        Ok(Self {
365            protocols,
366            pending_requests: Default::default(),
367            pending_responses: Default::default(),
368            message_request: None,
369            request_handlers,
370        })
371    }
372
373    /// Initiates sending a request.
374    ///
375    /// If there is no established connection to the target peer, the behavior is determined by the
376    /// choice of `connect`.
377    ///
378    /// An error is returned if the protocol doesn't match one that has been registered.
379    pub fn send_request(
380        &mut self,
381        target: &PeerId,
382        protocol_name: &str,
383        request: Vec<u8>,
384        pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
385        connect: IfDisconnected,
386        addresses: Vec<Multiaddr>,
387    ) {
388        if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
389            if protocol.is_connected(target) || connect.should_connect() {
390                let opts = DialOpts::peer_id(*target).addresses(addresses).build();
391                let request_id = protocol.send_request(opts, request);
392                let prev_req_id = self.pending_requests.insert(
393                    (protocol_name.to_string().into(), request_id).into(),
394                    (Instant::now(), pending_response),
395                );
396                debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
397            } else if pending_response
398                .send(Err(RequestFailure::NotConnected))
399                .is_err()
400            {
401                debug!(
402                    target: LOG_TARGET,
403                    "Not connected to peer {:?}. At the same time local \
404                     node is no longer interested in the result.",
405                    target,
406                );
407            }
408        } else if pending_response
409            .send(Err(RequestFailure::UnknownProtocol))
410            .is_err()
411        {
412            debug!(
413                target: LOG_TARGET,
414                "Unknown protocol {:?}. At the same time local \
415                 node is no longer interested in the result.",
416                protocol_name,
417            );
418        }
419    }
420}
421
422impl NetworkBehaviour for RequestResponseFactoryBehaviour {
423    type ConnectionHandler = MultiHandler<
424        String,
425        <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
426    >;
427    type ToSwarm = Event;
428
429    fn handle_established_inbound_connection(
430        &mut self,
431        connection_id: ConnectionId,
432        peer: PeerId,
433        local_addr: &Multiaddr,
434        remote_addr: &Multiaddr,
435    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
436        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
437            (
438                p.to_string(),
439                r.handle_established_inbound_connection(
440                    connection_id,
441                    peer,
442                    local_addr,
443                    remote_addr,
444                )
445                .expect(
446                    "Behaviours return handlers in these methods with the exception of \
447                    'connection management' behaviours like connection-limits or allow-black list. \
448                    So, inner request-response behaviour always returns Ok(handler).",
449                ),
450            )
451        });
452
453        let handler = MultiHandler::try_from_iter(iter).expect(
454            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
455			 which is the only possible error; qed",
456        );
457
458        Ok(handler)
459    }
460
461    fn handle_established_outbound_connection(
462        &mut self,
463        connection_id: ConnectionId,
464        peer: PeerId,
465        addr: &Multiaddr,
466        role_override: Endpoint,
467        port_use: PortUse,
468    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
469        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
470            (
471                p.to_string(),
472                r.handle_established_outbound_connection(
473                    connection_id,
474                    peer,
475                    addr,
476                    role_override,
477                    port_use,
478                )
479                .expect(
480                    "Behaviours return handlers in these methods with the exception of \
481                        'connection management' behaviours like connection-limits or allow-black \
482                        list. So, inner request-response behaviour always returns Ok(handler).",
483                ),
484            )
485        });
486
487        let handler = MultiHandler::try_from_iter(iter).expect(
488            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
489            which is the only possible error; qed",
490        );
491
492        Ok(handler)
493    }
494
495    /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
496    fn on_swarm_event(&mut self, event: FromSwarm) {
497        match event {
498            FromSwarm::ConnectionEstablished(inner) => {
499                for (protocol, _) in self.protocols.values_mut() {
500                    protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
501                }
502            }
503            FromSwarm::ConnectionClosed(inner) => {
504                for (protocol, _) in self.protocols.values_mut() {
505                    protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
506                        peer_id: inner.peer_id,
507                        connection_id: inner.connection_id,
508                        endpoint: inner.endpoint,
509                        cause: inner.cause,
510                        remaining_established: inner.remaining_established,
511                    }));
512                }
513            }
514            FromSwarm::AddressChange(inner) => {
515                for (protocol, _) in self.protocols.values_mut() {
516                    protocol.on_swarm_event(FromSwarm::AddressChange(inner));
517                }
518            }
519            FromSwarm::DialFailure(inner) => {
520                for (protocol, _) in self.protocols.values_mut() {
521                    protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
522                        peer_id: inner.peer_id,
523                        error: inner.error,
524                        connection_id: inner.connection_id,
525                    }));
526                }
527            }
528            FromSwarm::ListenFailure(inner) => {
529                for (protocol, _) in self.protocols.values_mut() {
530                    protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
531                        local_addr: inner.local_addr,
532                        send_back_addr: inner.send_back_addr,
533                        error: inner.error,
534                        connection_id: inner.connection_id,
535                        peer_id: inner.peer_id,
536                    }));
537                }
538            }
539            FromSwarm::NewListener(inner) => {
540                for (protocol, _) in self.protocols.values_mut() {
541                    protocol.on_swarm_event(FromSwarm::NewListener(inner));
542                }
543            }
544            FromSwarm::NewListenAddr(inner) => {
545                for (protocol, _) in self.protocols.values_mut() {
546                    protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
547                }
548            }
549            FromSwarm::ExpiredListenAddr(inner) => {
550                for (protocol, _) in self.protocols.values_mut() {
551                    protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
552                }
553            }
554            FromSwarm::ListenerError(inner) => {
555                for (protocol, _) in self.protocols.values_mut() {
556                    protocol.on_swarm_event(FromSwarm::ListenerError(inner));
557                }
558            }
559            FromSwarm::ListenerClosed(inner) => {
560                for (protocol, _) in self.protocols.values_mut() {
561                    protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
562                }
563            }
564            FromSwarm::NewExternalAddrCandidate(inner) => {
565                for (protocol, _) in self.protocols.values_mut() {
566                    protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
567                }
568            }
569            FromSwarm::ExternalAddrConfirmed(inner) => {
570                for (protocol, _) in self.protocols.values_mut() {
571                    protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
572                }
573            }
574            FromSwarm::ExternalAddrExpired(inner) => {
575                for (protocol, _) in self.protocols.values_mut() {
576                    protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
577                }
578            }
579            FromSwarm::NewExternalAddrOfPeer(inner) => {
580                for (protocol, _) in self.protocols.values_mut() {
581                    protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
582                }
583            }
584            event => {
585                warn!(
586                    ?event,
587                    "New event must be forwarded to request response protocols"
588                );
589            }
590        };
591    }
592
593    fn on_connection_handler_event(
594        &mut self,
595        peer_id: PeerId,
596        connection: ConnectionId,
597        event: THandlerOutEvent<Self>,
598    ) {
599        let p_name = event.0;
600        if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
601            return proto.on_connection_handler_event(peer_id, connection, event.1);
602        }
603
604        warn!(
605            target: LOG_TARGET,
606            "inject_node_event: no request-response instance registered for protocol {:?}", p_name
607        )
608    }
609
610    fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
611        'poll_all: loop {
612            if let Some(message_request) = self.message_request.take() {
613                let MessageRequest {
614                    peer,
615                    request_id,
616                    request,
617                    channel,
618                    protocol,
619                    response_builder,
620                } = message_request;
621
622                let (tx, rx) = oneshot::channel();
623
624                // Submit the request to the "response builder" passed by the user at
625                // initialization.
626                if let Some(mut response_builder) = response_builder {
627                    // If the response builder is too busy, silently drop `tx`. This
628                    // will be reported by the corresponding `RequestResponse` through
629                    // an `InboundFailure::Omission` event.
630                    let _ = response_builder.try_send(IncomingRequest {
631                        peer,
632                        payload: request,
633                        pending_response: tx,
634                    });
635                } else {
636                    debug_assert!(false, "Received message on outbound-only protocol.");
637                }
638
639                self.pending_responses.push(Box::pin(async move {
640                    // The `tx` created above can be dropped if we are not capable of
641                    // processing this request, which is reflected as a
642                    // `InboundFailure::Omission` event.
643                    if let Ok(response) = rx.await {
644                        Some(RequestProcessingOutcome {
645                            request_id,
646                            protocol: Cow::from(protocol),
647                            inner_channel: channel,
648                            response,
649                        })
650                    } else {
651                        None
652                    }
653                }));
654
655                // This `continue` makes sure that `pending_responses` gets polled
656                // after we have added the new element.
657                continue 'poll_all;
658            }
659            // Poll to see if any response is ready to be sent back.
660            while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
661                let RequestProcessingOutcome {
662                    request_id,
663                    protocol: protocol_name,
664                    inner_channel,
665                    response: OutgoingResponse { result, .. },
666                } = match outcome {
667                    Some(outcome) => outcome,
668                    // The response builder was too busy or handling the request failed. This is
669                    // later on reported as a `InboundFailure::Omission`.
670                    None => continue,
671                };
672
673                if let Ok(payload) = result {
674                    if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
675                        if protocol.send_response(inner_channel, Ok(payload)).is_err() {
676                            // Note: Failure is handled further below when receiving
677                            // `InboundFailure` event from `RequestResponse` behaviour.
678                            debug!(
679                                target: LOG_TARGET,
680                                %request_id,
681                                "Failed to send response for request on protocol {} due to a \
682                                timeout or due to the connection to the peer being closed. \
683                                Dropping response",
684                                protocol_name,
685                            );
686                        }
687                    }
688                }
689            }
690
691            for rq_rs_runner in &mut self.request_handlers {
692                // Future.Output == (), so we don't need a result here
693                let _ = rq_rs_runner.poll_unpin(cx);
694            }
695
696            // Poll request-responses protocols.
697            for (protocol, (behaviour, response_builder)) in &mut self.protocols {
698                while let Poll::Ready(event) = behaviour.poll(cx) {
699                    let event = match event {
700                        // Main events we are interested in.
701                        ToSwarm::GenerateEvent(event) => event,
702
703                        // Other events generated by the underlying behaviour are transparently
704                        // passed through.
705                        ToSwarm::Dial { opts } => {
706                            if opts.get_peer_id().is_none() {
707                                error!(
708                                    "The request-response isn't supposed to start dialing \
709                                    addresses"
710                                );
711                            }
712                            return Poll::Ready(ToSwarm::Dial { opts });
713                        }
714                        ToSwarm::NotifyHandler {
715                            peer_id,
716                            handler,
717                            event,
718                        } => {
719                            return Poll::Ready(ToSwarm::NotifyHandler {
720                                peer_id,
721                                handler,
722                                event: ((*protocol).to_string(), event),
723                            })
724                        }
725                        ToSwarm::CloseConnection {
726                            peer_id,
727                            connection,
728                        } => {
729                            return Poll::Ready(ToSwarm::CloseConnection {
730                                peer_id,
731                                connection,
732                            })
733                        }
734                        ToSwarm::NewExternalAddrCandidate(observed) => {
735                            return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed))
736                        }
737                        ToSwarm::ExternalAddrConfirmed(addr) => {
738                            return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr))
739                        }
740                        ToSwarm::ExternalAddrExpired(addr) => {
741                            return Poll::Ready(ToSwarm::ExternalAddrExpired(addr))
742                        }
743                        ToSwarm::ListenOn { opts } => {
744                            return Poll::Ready(ToSwarm::ListenOn { opts })
745                        }
746                        ToSwarm::RemoveListener { id } => {
747                            return Poll::Ready(ToSwarm::RemoveListener { id })
748                        }
749                        event => {
750                            warn!(
751                                ?event,
752                                "New event from request response protocol must be send up"
753                            );
754
755                            continue;
756                        }
757                    };
758
759                    match event {
760                        // Received a request from a remote.
761                        RequestResponseEvent::Message {
762                            peer,
763                            message:
764                                RequestResponseMessage::Request {
765                                    request_id,
766                                    request,
767                                    channel,
768                                },
769                        } => {
770                            self.message_request = Some(MessageRequest {
771                                peer,
772                                request_id,
773                                request,
774                                channel,
775                                protocol: protocol.to_string(),
776                                response_builder: response_builder.clone(),
777                            });
778
779                            // This `continue` makes sure that `message_request` gets polled
780                            // after we have added the new element.
781                            continue 'poll_all;
782                        }
783
784                        // Received a response from a remote to one of our requests.
785                        RequestResponseEvent::Message {
786                            peer,
787                            message:
788                                RequestResponseMessage::Response {
789                                    request_id,
790                                    response,
791                                },
792                        } => {
793                            let (started, delivered) = match self
794                                .pending_requests
795                                .remove(&(protocol.clone(), request_id).into())
796                            {
797                                Some((started, pending_response)) => {
798                                    let delivered = pending_response
799                                        .send(response.map_err(|()| RequestFailure::Refused))
800                                        .map_err(|_| RequestFailure::Obsolete.to_string());
801                                    (started, delivered)
802                                }
803                                None => {
804                                    warn!(
805                                        target: LOG_TARGET,
806                                        "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
807                                        request_id,
808                                    );
809                                    debug_assert!(false);
810                                    continue;
811                                }
812                            };
813
814                            let out = Event::RequestFinished {
815                                peer: Some(peer),
816                                protocol: protocol.clone(),
817                                duration: started.elapsed(),
818                                result: delivered,
819                            };
820
821                            return Poll::Ready(ToSwarm::GenerateEvent(out));
822                        }
823
824                        // One of our requests has failed.
825                        RequestResponseEvent::OutboundFailure {
826                            peer,
827                            request_id,
828                            error,
829                            ..
830                        } => {
831                            let error_string = error.to_string();
832                            let started = match self
833                                .pending_requests
834                                .remove(&(protocol.clone(), request_id).into())
835                            {
836                                Some((started, pending_response)) => {
837                                    if pending_response
838                                        .send(Err(RequestFailure::Network(error)))
839                                        .is_err()
840                                    {
841                                        debug!(
842                                            target: LOG_TARGET,
843                                            %request_id,
844                                            "Request failed. At the same time local node is no longer interested in \
845                                            the result",
846                                        );
847                                    }
848                                    started
849                                }
850                                None => {
851                                    warn!(
852                                        target: LOG_TARGET,
853                                        %request_id,
854                                        "Received `RequestResponseEvent::Message` with unexpected request",
855                                    );
856                                    debug_assert!(false);
857                                    continue;
858                                }
859                            };
860
861                            let out = Event::RequestFinished {
862                                peer,
863                                protocol: protocol.clone(),
864                                duration: started.elapsed(),
865                                result: Err(error_string),
866                            };
867
868                            return Poll::Ready(ToSwarm::GenerateEvent(out));
869                        }
870
871                        // An inbound request failed, either while reading the request or due to
872                        // failing to send a response.
873                        RequestResponseEvent::InboundFailure { peer, error, .. } => {
874                            debug!(?error, %peer, "Inbound request failed.");
875
876                            let out = Event::InboundRequest {
877                                peer,
878                                protocol: protocol.clone(),
879                                result: Err(ResponseFailure::Network(error)),
880                            };
881                            return Poll::Ready(ToSwarm::GenerateEvent(out));
882                        }
883
884                        // A response to an inbound request has been sent.
885                        RequestResponseEvent::ResponseSent { peer, .. } => {
886                            let out = Event::InboundRequest {
887                                peer,
888                                protocol: protocol.clone(),
889                                result: Ok(()),
890                            };
891
892                            return Poll::Ready(ToSwarm::GenerateEvent(out));
893                        }
894                    };
895                }
896            }
897
898            break Poll::Pending;
899        }
900    }
901}
902
903/// Error when registering a protocol.
904#[derive(Debug, thiserror::Error)]
905pub enum RegisterError {
906    /// A protocol has been specified multiple times.
907    #[error("{0}")]
908    DuplicateProtocol(Cow<'static, str>),
909}
910
911/// Error in a request.
912#[derive(Debug, thiserror::Error)]
913#[allow(missing_docs)]
914pub enum RequestFailure {
915    #[error("We are not currently connected to the requested peer.")]
916    NotConnected,
917    #[error("Given protocol hasn't been registered.")]
918    UnknownProtocol,
919    #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
920    Refused,
921    #[error("The remote replied, but the local node is no longer interested in the response.")]
922    Obsolete,
923    /// Problem on the network.
924    #[error("Problem on the network: {0}")]
925    Network(OutboundFailure),
926}
927
928/// Error when processing a request sent by a remote.
929#[derive(Debug, thiserror::Error)]
930pub enum ResponseFailure {
931    /// Problem on the network.
932    #[error("Problem on the network: {0}")]
933    Network(InboundFailure),
934}
935
936/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
937/// into requests and responses and vice-versa.
938#[derive(Debug, Clone)]
939#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
940pub struct GenericCodec {
941    max_request_size: u64,
942    max_response_size: u64,
943}
944
945#[async_trait::async_trait]
946impl RequestResponseCodec for GenericCodec {
947    type Protocol = StreamProtocol;
948    type Request = Vec<u8>;
949    type Response = Result<Vec<u8>, ()>;
950
951    async fn read_request<T>(
952        &mut self,
953        _: &Self::Protocol,
954        mut io: &mut T,
955    ) -> io::Result<Self::Request>
956    where
957        T: AsyncRead + Unpin + Send,
958    {
959        // Read the length.
960        let length = unsigned_varint::aio::read_usize(&mut io)
961            .await
962            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
963        if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
964            return Err(io::Error::new(
965                io::ErrorKind::InvalidInput,
966                format!(
967                    "Request size exceeds limit: {} > {}",
968                    length, self.max_request_size
969                ),
970            ));
971        }
972
973        // Read the payload.
974        let mut buffer = vec![0; length];
975        io.read_exact(&mut buffer).await?;
976        Ok(buffer)
977    }
978
979    async fn read_response<T>(
980        &mut self,
981        _: &Self::Protocol,
982        mut io: &mut T,
983    ) -> io::Result<Self::Response>
984    where
985        T: AsyncRead + Unpin + Send,
986    {
987        // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
988        // considered as a protocol error and will result in the entire connection being closed.
989        // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
990        // that this response is an error.
991
992        // Read the length.
993        let length = match unsigned_varint::aio::read_usize(&mut io).await {
994            Ok(l) => l,
995            Err(unsigned_varint::io::ReadError::Io(err))
996                if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
997            {
998                return Ok(Err(()))
999            }
1000            Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1001        };
1002
1003        if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1004            return Err(io::Error::new(
1005                io::ErrorKind::InvalidInput,
1006                format!(
1007                    "Response size exceeds limit: {} > {}",
1008                    length, self.max_response_size
1009                ),
1010            ));
1011        }
1012
1013        // Read the payload.
1014        let mut buffer = vec![0; length];
1015        io.read_exact(&mut buffer).await?;
1016        Ok(Ok(buffer))
1017    }
1018
1019    async fn write_request<T>(
1020        &mut self,
1021        _: &Self::Protocol,
1022        io: &mut T,
1023        req: Self::Request,
1024    ) -> io::Result<()>
1025    where
1026        T: AsyncWrite + Unpin + Send,
1027    {
1028        // Write the length.
1029        {
1030            let mut buffer = unsigned_varint::encode::usize_buffer();
1031            io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1032                .await?;
1033        }
1034
1035        // Write the payload.
1036        io.write_all(&req).await?;
1037
1038        io.close().await?;
1039        Ok(())
1040    }
1041
1042    async fn write_response<T>(
1043        &mut self,
1044        _: &Self::Protocol,
1045        io: &mut T,
1046        res: Self::Response,
1047    ) -> io::Result<()>
1048    where
1049        T: AsyncWrite + Unpin + Send,
1050    {
1051        // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1052        if let Ok(res) = res {
1053            // Write the length.
1054            {
1055                let mut buffer = unsigned_varint::encode::usize_buffer();
1056                io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1057                    .await?;
1058            }
1059
1060            // Write the payload.
1061            io.write_all(&res).await?;
1062        }
1063
1064        io.close().await?;
1065        Ok(())
1066    }
1067}