subspace_networking/protocols/request_response/request_response_factory.rs
1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//! with the request itself. The remote then sends the size of the response as a LEB128 number,
27//! followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it
30//! takes to send/receive the request and response.
31//!
32//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
33//! is used to handle incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(test)]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48 Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49 Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50 OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::handler::multi::MultiHandler;
55use libp2p::swarm::{
56 ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
57};
58use std::borrow::Cow;
59use std::collections::HashMap;
60use std::collections::hash_map::Entry;
61use std::pin::Pin;
62use std::task::{Context, Poll};
63use std::time::{Duration, Instant};
64use std::{io, iter};
65use tracing::{debug, error, warn};
66
67/// Defines a handler for the request-response protocol factory.
68#[async_trait]
69pub trait RequestHandler: Send {
70 /// Runs the underlying protocol handler.
71 async fn run(&mut self);
72
73 /// Returns a config for the request-response protocol factory.
74 fn protocol_config(&self) -> ProtocolConfig;
75
76 /// Returns a protocol name.
77 fn protocol_name(&self) -> &'static str;
78
79 /// Clone boxed value.
80 fn clone_box(&self) -> Box<dyn RequestHandler>;
81}
82
83impl Clone for Box<dyn RequestHandler> {
84 fn clone(&self) -> Self {
85 self.clone_box()
86 }
87}
88
89/// Configuration for a single request-response protocol.
90#[derive(Debug, Clone)]
91pub struct ProtocolConfig {
92 /// Name of the protocol on the wire. Should be something like `/foo/bar`.
93 pub name: &'static str,
94
95 /// Maximum allowed size, in bytes, of a request.
96 ///
97 /// Any request larger than this value will be declined as a way to avoid allocating too
98 /// much memory for it.
99 pub max_request_size: u64,
100
101 /// Maximum allowed size, in bytes, of a response.
102 ///
103 /// Any response larger than this value will be declined as a way to avoid allocating too
104 /// much memory for it.
105 pub max_response_size: u64,
106
107 /// Duration after which emitted requests are considered timed out.
108 ///
109 /// If you expect the response to come back quickly, you should set this to a smaller duration.
110 pub request_timeout: Duration,
111
112 /// Channel on which the networking service will send incoming requests.
113 ///
114 /// Every time a peer sends a request to the local node using this protocol, the networking
115 /// service will push an element on this channel. The receiving side of this channel then has
116 /// to pull this element, process the request, and send back the response to send back to the
117 /// peer.
118 ///
119 /// The size of the channel has to be carefully chosen. If the channel is full, the networking
120 /// service will discard the incoming request send back an error to the peer. Consequently,
121 /// the channel being full is an indicator that the node is overloaded.
122 ///
123 /// You can typically set the size of the channel to `T / d`, where `T` is the
124 /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
125 /// build a response.
126 ///
127 /// Can be `None` if the local node does not support answering incoming requests.
128 /// If this is `None`, then the local node will not advertise support for this protocol towards
129 /// other peers. If this is `Some` but the channel is closed, then the local node will
130 /// advertise support for this protocol, but any incoming request will lead to an error being
131 /// sent back.
132 pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
133}
134
135impl ProtocolConfig {
136 /// Creates request-response protocol config.
137 pub fn new(protocol_name: &'static str) -> ProtocolConfig {
138 ProtocolConfig {
139 name: protocol_name,
140 max_request_size: 1024 * 1024,
141 max_response_size: 16 * 1024 * 1024,
142 request_timeout: Duration::from_secs(20),
143 inbound_queue: None,
144 }
145 }
146}
147
148/// A single request received by a peer on a request-response protocol.
149#[derive(Debug)]
150pub struct IncomingRequest {
151 /// Who sent the request.
152 pub peer: PeerId,
153
154 /// Request sent by the remote. Will always be smaller than
155 /// [`ProtocolConfig::max_request_size`].
156 pub payload: Vec<u8>,
157
158 /// Channel to send back the response.
159 ///
160 /// There are two ways to indicate that handling the request failed:
161 ///
162 /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
163 ///
164 /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
165 /// the given peer.
166 pub pending_response: oneshot::Sender<OutgoingResponse>,
167}
168
169/// Response for an incoming request to be send by a request protocol handler.
170#[derive(Debug)]
171pub struct OutgoingResponse {
172 /// The payload of the response.
173 ///
174 /// `Err(())` if none is available e.g. due an error while handling the request.
175 pub result: Result<Vec<u8>, ()>,
176
177 /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
178 /// peer.
179 ///
180 /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
181 /// outgoing data for each TCP socket, and it is not possible for a user
182 /// application to inspect this buffer. This channel here is not actually notified
183 /// when the response has been fully sent out, but rather when it has fully been
184 /// written to the buffer managed by the operating system.
185 pub sent_feedback: Option<oneshot::Sender<()>>,
186}
187
188/// Event generated by the [`RequestResponseFactoryBehaviour`].
189#[derive(Debug)]
190// We are not reading these events in a meaningful way right now, but the fields in there are still
191// potentially useful
192#[allow(dead_code)]
193pub enum Event {
194 /// A remote sent a request and either we have successfully answered it or an error happened.
195 ///
196 /// This event is generated for statistics purposes.
197 InboundRequest {
198 /// Peer which has emitted the request.
199 peer: PeerId,
200 /// Name of the protocol in question.
201 protocol: Cow<'static, str>,
202 /// Whether handling the request was successful or unsuccessful.
203 ///
204 /// When successful contains the time elapsed between when we received the request and when
205 /// we sent back the response. When unsuccessful contains the failure reason.
206 result: Result<(), ResponseFailure>,
207 },
208
209 /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
210 /// failed.
211 ///
212 /// This event is generated for statistics purposes.
213 RequestFinished {
214 /// Peer that we sent the request to, if one was chosen.
215 peer: Option<PeerId>,
216 /// Name of the protocol in question.
217 protocol: Cow<'static, str>,
218 /// Duration the request took.
219 duration: Duration,
220 /// Result of the request.
221 result: Result<(), String>,
222 },
223}
224
225/// Combination of a protocol name and a request id.
226///
227/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
228/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
229/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
230/// [`ProtocolRequestId`]s.
231#[derive(Debug, Clone, PartialEq, Eq, Hash)]
232struct ProtocolRequestId {
233 protocol: Cow<'static, str>,
234 request_id: OutboundRequestId,
235}
236
237impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
238 #[inline]
239 fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
240 Self {
241 protocol,
242 request_id,
243 }
244 }
245}
246
247/// When sending a request, what to do on a disconnected recipient.
248#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
249pub enum IfDisconnected {
250 /// Try to connect to the peer.
251 TryConnect,
252 /// Just fail if the destination is not yet connected.
253 #[allow(dead_code)] // reserved for the future logic or config change
254 ImmediateError,
255}
256
257/// Convenience functions for `IfDisconnected`.
258impl IfDisconnected {
259 /// Shall we connect to a disconnected peer?
260 pub fn should_connect(self) -> bool {
261 match self {
262 Self::TryConnect => true,
263 Self::ImmediateError => false,
264 }
265 }
266}
267
268/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
269/// protocols.
270#[allow(clippy::type_complexity)] // to preserve compatibility with copied implementation
271pub struct RequestResponseFactoryBehaviour {
272 /// The multiple sub-protocols, by name.
273 /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
274 /// "response builder" used to build responses for incoming requests.
275 protocols: HashMap<
276 Cow<'static, str>,
277 (
278 RequestResponse<GenericCodec>,
279 Option<mpsc::Sender<IncomingRequest>>,
280 ),
281 >,
282
283 /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
284 pending_requests:
285 HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
286
287 /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
288 /// start time and the response to send back to the remote.
289 pending_responses: stream::FuturesUnordered<
290 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
291 >,
292
293 /// Pending message request, holds `MessageRequest` as a Future state to poll it
294 /// until we get a response from `Peerset`
295 message_request: Option<MessageRequest>,
296
297 /// Request handlers future collection.
298 request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
299}
300
301// This is a state of processing incoming request Message.
302struct MessageRequest {
303 peer: PeerId,
304 request_id: InboundRequestId,
305 request: Vec<u8>,
306 channel: ResponseChannel<Result<Vec<u8>, ()>>,
307 protocol: String,
308 response_builder: Option<mpsc::Sender<IncomingRequest>>,
309}
310
311/// Generated by the response builder and waiting to be processed.
312struct RequestProcessingOutcome {
313 request_id: InboundRequestId,
314 protocol: Cow<'static, str>,
315 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
316 response: OutgoingResponse,
317}
318
319impl RequestResponseFactoryBehaviour {
320 /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
321 /// the same protocol is passed twice.
322 pub fn new(
323 list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
324 max_concurrent_streams: usize,
325 ) -> Result<Self, RegisterError> {
326 let mut protocols = HashMap::new();
327 let mut request_handlers = Vec::new();
328 for mut handler in list {
329 let config = handler.protocol_config();
330
331 let protocol_support = if config.inbound_queue.is_some() {
332 ProtocolSupport::Full
333 } else {
334 ProtocolSupport::Outbound
335 };
336
337 let rq_rp = RequestResponse::with_codec(
338 GenericCodec {
339 max_request_size: config.max_request_size,
340 max_response_size: config.max_response_size,
341 },
342 iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
343 RequestResponseConfig::default()
344 .with_request_timeout(config.request_timeout)
345 .with_max_concurrent_streams(max_concurrent_streams),
346 );
347
348 match protocols.entry(Cow::Borrowed(config.name)) {
349 Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
350 Entry::Occupied(e) => {
351 return Err(RegisterError::DuplicateProtocol(e.key().clone()));
352 }
353 };
354
355 let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
356 Box::pin(async move { handler.run().await }.fuse());
357
358 request_handlers.push(request_handler_run);
359 }
360
361 Ok(Self {
362 protocols,
363 pending_requests: Default::default(),
364 pending_responses: Default::default(),
365 message_request: None,
366 request_handlers,
367 })
368 }
369
370 /// Initiates sending a request.
371 ///
372 /// If there is no established connection to the target peer, the behavior is determined by the
373 /// choice of `connect`.
374 ///
375 /// An error is returned if the protocol doesn't match one that has been registered.
376 pub fn send_request(
377 &mut self,
378 target: &PeerId,
379 protocol_name: &str,
380 request: Vec<u8>,
381 pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
382 connect: IfDisconnected,
383 addresses: Vec<Multiaddr>,
384 ) {
385 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
386 if protocol.is_connected(target) || connect.should_connect() {
387 let request_id = protocol.send_request_with_addresses(target, request, addresses);
388 let prev_req_id = self.pending_requests.insert(
389 (protocol_name.to_string().into(), request_id).into(),
390 (Instant::now(), pending_response),
391 );
392 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
393 } else if pending_response
394 .send(Err(RequestFailure::NotConnected))
395 .is_err()
396 {
397 debug!(
398 "Not connected to peer {:?}. At the same time local \
399 node is no longer interested in the result.",
400 target,
401 );
402 }
403 } else if pending_response
404 .send(Err(RequestFailure::UnknownProtocol))
405 .is_err()
406 {
407 debug!(
408 "Unknown protocol {:?}. At the same time local \
409 node is no longer interested in the result.",
410 protocol_name,
411 );
412 }
413 }
414}
415
416impl NetworkBehaviour for RequestResponseFactoryBehaviour {
417 type ConnectionHandler = MultiHandler<
418 String,
419 <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
420 >;
421 type ToSwarm = Event;
422
423 fn handle_established_inbound_connection(
424 &mut self,
425 connection_id: ConnectionId,
426 peer: PeerId,
427 local_addr: &Multiaddr,
428 remote_addr: &Multiaddr,
429 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
430 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
431 (
432 p.to_string(),
433 r.handle_established_inbound_connection(
434 connection_id,
435 peer,
436 local_addr,
437 remote_addr,
438 )
439 .expect(
440 "Behaviours return handlers in these methods with the exception of \
441 'connection management' behaviours like connection-limits or allow-black list. \
442 So, inner request-response behaviour always returns Ok(handler).",
443 ),
444 )
445 });
446
447 let handler = MultiHandler::try_from_iter(iter).expect(
448 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
449 which is the only possible error; qed",
450 );
451
452 Ok(handler)
453 }
454
455 fn handle_established_outbound_connection(
456 &mut self,
457 connection_id: ConnectionId,
458 peer: PeerId,
459 addr: &Multiaddr,
460 role_override: Endpoint,
461 port_use: PortUse,
462 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
463 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
464 (
465 p.to_string(),
466 r.handle_established_outbound_connection(
467 connection_id,
468 peer,
469 addr,
470 role_override,
471 port_use,
472 )
473 .expect(
474 "Behaviours return handlers in these methods with the exception of \
475 'connection management' behaviours like connection-limits or allow-black \
476 list. So, inner request-response behaviour always returns Ok(handler).",
477 ),
478 )
479 });
480
481 let handler = MultiHandler::try_from_iter(iter).expect(
482 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
483 which is the only possible error; qed",
484 );
485
486 Ok(handler)
487 }
488
489 /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
490 fn on_swarm_event(&mut self, event: FromSwarm) {
491 match event {
492 FromSwarm::ConnectionEstablished(inner) => {
493 for (protocol, _) in self.protocols.values_mut() {
494 protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
495 }
496 }
497 FromSwarm::ConnectionClosed(inner) => {
498 for (protocol, _) in self.protocols.values_mut() {
499 protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
500 peer_id: inner.peer_id,
501 connection_id: inner.connection_id,
502 endpoint: inner.endpoint,
503 cause: inner.cause,
504 remaining_established: inner.remaining_established,
505 }));
506 }
507 }
508 FromSwarm::AddressChange(inner) => {
509 for (protocol, _) in self.protocols.values_mut() {
510 protocol.on_swarm_event(FromSwarm::AddressChange(inner));
511 }
512 }
513 FromSwarm::DialFailure(inner) => {
514 for (protocol, _) in self.protocols.values_mut() {
515 protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
516 peer_id: inner.peer_id,
517 error: inner.error,
518 connection_id: inner.connection_id,
519 }));
520 }
521 }
522 FromSwarm::ListenFailure(inner) => {
523 for (protocol, _) in self.protocols.values_mut() {
524 protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
525 local_addr: inner.local_addr,
526 send_back_addr: inner.send_back_addr,
527 error: inner.error,
528 connection_id: inner.connection_id,
529 peer_id: inner.peer_id,
530 }));
531 }
532 }
533 FromSwarm::NewListener(inner) => {
534 for (protocol, _) in self.protocols.values_mut() {
535 protocol.on_swarm_event(FromSwarm::NewListener(inner));
536 }
537 }
538 FromSwarm::NewListenAddr(inner) => {
539 for (protocol, _) in self.protocols.values_mut() {
540 protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
541 }
542 }
543 FromSwarm::ExpiredListenAddr(inner) => {
544 for (protocol, _) in self.protocols.values_mut() {
545 protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
546 }
547 }
548 FromSwarm::ListenerError(inner) => {
549 for (protocol, _) in self.protocols.values_mut() {
550 protocol.on_swarm_event(FromSwarm::ListenerError(inner));
551 }
552 }
553 FromSwarm::ListenerClosed(inner) => {
554 for (protocol, _) in self.protocols.values_mut() {
555 protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
556 }
557 }
558 FromSwarm::NewExternalAddrCandidate(inner) => {
559 for (protocol, _) in self.protocols.values_mut() {
560 protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
561 }
562 }
563 FromSwarm::ExternalAddrConfirmed(inner) => {
564 for (protocol, _) in self.protocols.values_mut() {
565 protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
566 }
567 }
568 FromSwarm::ExternalAddrExpired(inner) => {
569 for (protocol, _) in self.protocols.values_mut() {
570 protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
571 }
572 }
573 FromSwarm::NewExternalAddrOfPeer(inner) => {
574 for (protocol, _) in self.protocols.values_mut() {
575 protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
576 }
577 }
578 event => {
579 warn!(
580 ?event,
581 "New event must be forwarded to request response protocols"
582 );
583 }
584 };
585 }
586
587 fn on_connection_handler_event(
588 &mut self,
589 peer_id: PeerId,
590 connection: ConnectionId,
591 event: THandlerOutEvent<Self>,
592 ) {
593 let p_name = event.0;
594 if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
595 return proto.on_connection_handler_event(peer_id, connection, event.1);
596 }
597
598 warn!(
599 "inject_node_event: no request-response instance registered for protocol {:?}",
600 p_name
601 )
602 }
603
604 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
605 'poll_all: loop {
606 if let Some(message_request) = self.message_request.take() {
607 let MessageRequest {
608 peer,
609 request_id,
610 request,
611 channel,
612 protocol,
613 response_builder,
614 } = message_request;
615
616 let (tx, rx) = oneshot::channel();
617
618 // Submit the request to the "response builder" passed by the user at
619 // initialization.
620 if let Some(mut response_builder) = response_builder {
621 // If the response builder is too busy, silently drop `tx`. This
622 // will be reported by the corresponding `RequestResponse` through
623 // an `InboundFailure::Omission` event.
624 let _ = response_builder.try_send(IncomingRequest {
625 peer,
626 payload: request,
627 pending_response: tx,
628 });
629 } else {
630 debug_assert!(false, "Received message on outbound-only protocol.");
631 }
632
633 self.pending_responses.push(Box::pin(async move {
634 // The `tx` created above can be dropped if we are not capable of
635 // processing this request, which is reflected as a
636 // `InboundFailure::Omission` event.
637 if let Ok(response) = rx.await {
638 Some(RequestProcessingOutcome {
639 request_id,
640 protocol: Cow::from(protocol),
641 inner_channel: channel,
642 response,
643 })
644 } else {
645 None
646 }
647 }));
648
649 // This `continue` makes sure that `pending_responses` gets polled
650 // after we have added the new element.
651 continue 'poll_all;
652 }
653 // Poll to see if any response is ready to be sent back.
654 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
655 let RequestProcessingOutcome {
656 request_id,
657 protocol: protocol_name,
658 inner_channel,
659 response: OutgoingResponse { result, .. },
660 } = match outcome {
661 Some(outcome) => outcome,
662 // The response builder was too busy or handling the request failed. This is
663 // later on reported as a `InboundFailure::Omission`.
664 None => continue,
665 };
666
667 if let Ok(payload) = result
668 && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
669 && protocol.send_response(inner_channel, Ok(payload)).is_err()
670 {
671 // Note: Failure is handled further below when receiving
672 // `InboundFailure` event from `RequestResponse` behaviour.
673 debug!(
674 %request_id,
675 "Failed to send response for request on protocol {} due to a \
676 timeout or due to the connection to the peer being closed. \
677 Dropping response",
678 protocol_name,
679 );
680 }
681 }
682
683 for rq_rs_runner in &mut self.request_handlers {
684 // Future.Output == (), so we don't need a result here
685 let _ = rq_rs_runner.poll_unpin(cx);
686 }
687
688 // Poll request-responses protocols.
689 for (protocol, (behaviour, response_builder)) in &mut self.protocols {
690 while let Poll::Ready(event) = behaviour.poll(cx) {
691 let event = match event {
692 // Main events we are interested in.
693 ToSwarm::GenerateEvent(event) => event,
694
695 // Other events generated by the underlying behaviour are transparently
696 // passed through.
697 ToSwarm::Dial { opts } => {
698 if opts.get_peer_id().is_none() {
699 error!(
700 "The request-response isn't supposed to start dialing \
701 addresses"
702 );
703 }
704 return Poll::Ready(ToSwarm::Dial { opts });
705 }
706 ToSwarm::NotifyHandler {
707 peer_id,
708 handler,
709 event,
710 } => {
711 return Poll::Ready(ToSwarm::NotifyHandler {
712 peer_id,
713 handler,
714 event: ((*protocol).to_string(), event),
715 });
716 }
717 ToSwarm::CloseConnection {
718 peer_id,
719 connection,
720 } => {
721 return Poll::Ready(ToSwarm::CloseConnection {
722 peer_id,
723 connection,
724 });
725 }
726 ToSwarm::NewExternalAddrCandidate(observed) => {
727 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
728 }
729 ToSwarm::ExternalAddrConfirmed(addr) => {
730 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
731 }
732 ToSwarm::ExternalAddrExpired(addr) => {
733 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
734 }
735 ToSwarm::ListenOn { opts } => {
736 return Poll::Ready(ToSwarm::ListenOn { opts });
737 }
738 ToSwarm::RemoveListener { id } => {
739 return Poll::Ready(ToSwarm::RemoveListener { id });
740 }
741 event => {
742 warn!(
743 ?event,
744 "New event from request response protocol must be send up"
745 );
746
747 continue;
748 }
749 };
750
751 match event {
752 // Received a request from a remote.
753 RequestResponseEvent::Message {
754 peer,
755 message:
756 RequestResponseMessage::Request {
757 request_id,
758 request,
759 channel,
760 },
761 ..
762 } => {
763 self.message_request = Some(MessageRequest {
764 peer,
765 request_id,
766 request,
767 channel,
768 protocol: protocol.to_string(),
769 response_builder: response_builder.clone(),
770 });
771
772 // This `continue` makes sure that `message_request` gets polled
773 // after we have added the new element.
774 continue 'poll_all;
775 }
776
777 // Received a response from a remote to one of our requests.
778 RequestResponseEvent::Message {
779 peer,
780 message:
781 RequestResponseMessage::Response {
782 request_id,
783 response,
784 },
785 ..
786 } => {
787 let (started, delivered) = match self
788 .pending_requests
789 .remove(&(protocol.clone(), request_id).into())
790 {
791 Some((started, pending_response)) => {
792 let delivered = pending_response
793 .send(response.map_err(|()| RequestFailure::Refused))
794 .map_err(|_| RequestFailure::Obsolete.to_string());
795 (started, delivered)
796 }
797 None => {
798 warn!(
799 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
800 request_id,
801 );
802 debug_assert!(false);
803 continue;
804 }
805 };
806
807 let out = Event::RequestFinished {
808 peer: Some(peer),
809 protocol: protocol.clone(),
810 duration: started.elapsed(),
811 result: delivered,
812 };
813
814 return Poll::Ready(ToSwarm::GenerateEvent(out));
815 }
816
817 // One of our requests has failed.
818 RequestResponseEvent::OutboundFailure {
819 peer,
820 request_id,
821 error,
822 ..
823 } => {
824 let error_string = error.to_string();
825 let started = match self
826 .pending_requests
827 .remove(&(protocol.clone(), request_id).into())
828 {
829 Some((started, pending_response)) => {
830 if pending_response
831 .send(Err(RequestFailure::Network(error)))
832 .is_err()
833 {
834 debug!(
835 %request_id,
836 "Request failed. At the same time local node is no longer interested in \
837 the result",
838 );
839 }
840 started
841 }
842 None => {
843 warn!(
844 %request_id,
845 "Received `RequestResponseEvent::Message` with unexpected request",
846 );
847 debug_assert!(false);
848 continue;
849 }
850 };
851
852 let out = Event::RequestFinished {
853 peer: Some(peer),
854 protocol: protocol.clone(),
855 duration: started.elapsed(),
856 result: Err(error_string),
857 };
858
859 return Poll::Ready(ToSwarm::GenerateEvent(out));
860 }
861
862 // An inbound request failed, either while reading the request or due to
863 // failing to send a response.
864 RequestResponseEvent::InboundFailure { peer, error, .. } => {
865 debug!(?error, %peer, "Inbound request failed.");
866
867 let out = Event::InboundRequest {
868 peer,
869 protocol: protocol.clone(),
870 result: Err(ResponseFailure::Network(error)),
871 };
872 return Poll::Ready(ToSwarm::GenerateEvent(out));
873 }
874
875 // A response to an inbound request has been sent.
876 RequestResponseEvent::ResponseSent { peer, .. } => {
877 let out = Event::InboundRequest {
878 peer,
879 protocol: protocol.clone(),
880 result: Ok(()),
881 };
882
883 return Poll::Ready(ToSwarm::GenerateEvent(out));
884 }
885 };
886 }
887 }
888
889 break Poll::Pending;
890 }
891 }
892}
893
894/// Error when registering a protocol.
895#[derive(Debug, thiserror::Error)]
896pub enum RegisterError {
897 /// A protocol has been specified multiple times.
898 #[error("{0}")]
899 DuplicateProtocol(Cow<'static, str>),
900}
901
902/// Error in a request.
903#[derive(Debug, thiserror::Error)]
904#[allow(missing_docs)]
905pub enum RequestFailure {
906 #[error("We are not currently connected to the requested peer.")]
907 NotConnected,
908 #[error("Given protocol hasn't been registered.")]
909 UnknownProtocol,
910 #[error(
911 "Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it."
912 )]
913 Refused,
914 #[error("The remote replied, but the local node is no longer interested in the response.")]
915 Obsolete,
916 /// Problem on the network.
917 #[error("Problem on the network: {0}")]
918 Network(OutboundFailure),
919}
920
921/// Error when processing a request sent by a remote.
922#[derive(Debug, thiserror::Error)]
923pub enum ResponseFailure {
924 /// Problem on the network.
925 #[error("Problem on the network: {0}")]
926 Network(InboundFailure),
927}
928
929/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
930/// into requests and responses and vice-versa.
931#[derive(Debug, Clone)]
932#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
933pub struct GenericCodec {
934 max_request_size: u64,
935 max_response_size: u64,
936}
937
938#[async_trait::async_trait]
939impl RequestResponseCodec for GenericCodec {
940 type Protocol = StreamProtocol;
941 type Request = Vec<u8>;
942 type Response = Result<Vec<u8>, ()>;
943
944 async fn read_request<T>(
945 &mut self,
946 _: &Self::Protocol,
947 mut io: &mut T,
948 ) -> io::Result<Self::Request>
949 where
950 T: AsyncRead + Unpin + Send,
951 {
952 // Read the length.
953 let length = unsigned_varint::aio::read_usize(&mut io)
954 .await
955 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
956 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
957 return Err(io::Error::new(
958 io::ErrorKind::InvalidInput,
959 format!(
960 "Request size exceeds limit: {} > {}",
961 length, self.max_request_size
962 ),
963 ));
964 }
965
966 // Read the payload.
967 let mut buffer = vec![0; length];
968 io.read_exact(&mut buffer).await?;
969 Ok(buffer)
970 }
971
972 async fn read_response<T>(
973 &mut self,
974 _: &Self::Protocol,
975 mut io: &mut T,
976 ) -> io::Result<Self::Response>
977 where
978 T: AsyncRead + Unpin + Send,
979 {
980 // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
981 // considered as a protocol error and will result in the entire connection being closed.
982 // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
983 // that this response is an error.
984
985 // Read the length.
986 let length = match unsigned_varint::aio::read_usize(&mut io).await {
987 Ok(l) => l,
988 Err(unsigned_varint::io::ReadError::Io(err))
989 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
990 {
991 return Ok(Err(()));
992 }
993 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
994 };
995
996 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
997 return Err(io::Error::new(
998 io::ErrorKind::InvalidInput,
999 format!(
1000 "Response size exceeds limit: {} > {}",
1001 length, self.max_response_size
1002 ),
1003 ));
1004 }
1005
1006 // Read the payload.
1007 let mut buffer = vec![0; length];
1008 io.read_exact(&mut buffer).await?;
1009 Ok(Ok(buffer))
1010 }
1011
1012 async fn write_request<T>(
1013 &mut self,
1014 _: &Self::Protocol,
1015 io: &mut T,
1016 req: Self::Request,
1017 ) -> io::Result<()>
1018 where
1019 T: AsyncWrite + Unpin + Send,
1020 {
1021 // Write the length.
1022 {
1023 let mut buffer = unsigned_varint::encode::usize_buffer();
1024 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1025 .await?;
1026 }
1027
1028 // Write the payload.
1029 io.write_all(&req).await?;
1030
1031 io.close().await?;
1032 Ok(())
1033 }
1034
1035 async fn write_response<T>(
1036 &mut self,
1037 _: &Self::Protocol,
1038 io: &mut T,
1039 res: Self::Response,
1040 ) -> io::Result<()>
1041 where
1042 T: AsyncWrite + Unpin + Send,
1043 {
1044 // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1045 if let Ok(res) = res {
1046 // Write the length.
1047 {
1048 let mut buffer = unsigned_varint::encode::usize_buffer();
1049 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1050 .await?;
1051 }
1052
1053 // Write the payload.
1054 io.write_all(&res).await?;
1055 }
1056
1057 io.close().await?;
1058 Ok(())
1059 }
1060}