subspace_networking/protocols/request_response/request_response_factory.rs
1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//! with the request itself. The remote then sends the size of the response as a LEB128 number,
27//! followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it
30//! takes to send/receive the request and response.
31//!
32//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
33//! is used to handle incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(test)]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::core::transport::PortUse;
44use libp2p::core::{Endpoint, Multiaddr};
45use libp2p::identity::PeerId;
46use libp2p::request_response::{
47 Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
48 Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
49 OutboundRequestId, ProtocolSupport, ResponseChannel,
50};
51pub use libp2p::request_response::{InboundFailure, OutboundFailure};
52use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
53use libp2p::swarm::dial_opts::DialOpts;
54use libp2p::swarm::handler::multi::MultiHandler;
55use libp2p::swarm::{
56 ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
57};
58use libp2p::StreamProtocol;
59use std::borrow::Cow;
60use std::collections::hash_map::Entry;
61use std::collections::HashMap;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{io, iter};
66use tracing::{debug, error, warn};
67
68const LOG_TARGET: &str = "request-response-protocols";
69
70/// Defines a handler for the request-response protocol factory.
71#[async_trait]
72pub trait RequestHandler: Send {
73 /// Runs the underlying protocol handler.
74 async fn run(&mut self);
75
76 /// Returns a config for the request-response protocol factory.
77 fn protocol_config(&self) -> ProtocolConfig;
78
79 /// Returns a protocol name.
80 fn protocol_name(&self) -> &'static str;
81
82 /// Clone boxed value.
83 fn clone_box(&self) -> Box<dyn RequestHandler>;
84}
85
86impl Clone for Box<dyn RequestHandler> {
87 fn clone(&self) -> Self {
88 self.clone_box()
89 }
90}
91
92/// Configuration for a single request-response protocol.
93#[derive(Debug, Clone)]
94pub struct ProtocolConfig {
95 /// Name of the protocol on the wire. Should be something like `/foo/bar`.
96 pub name: &'static str,
97
98 /// Maximum allowed size, in bytes, of a request.
99 ///
100 /// Any request larger than this value will be declined as a way to avoid allocating too
101 /// much memory for it.
102 pub max_request_size: u64,
103
104 /// Maximum allowed size, in bytes, of a response.
105 ///
106 /// Any response larger than this value will be declined as a way to avoid allocating too
107 /// much memory for it.
108 pub max_response_size: u64,
109
110 /// Duration after which emitted requests are considered timed out.
111 ///
112 /// If you expect the response to come back quickly, you should set this to a smaller duration.
113 pub request_timeout: Duration,
114
115 /// Channel on which the networking service will send incoming requests.
116 ///
117 /// Every time a peer sends a request to the local node using this protocol, the networking
118 /// service will push an element on this channel. The receiving side of this channel then has
119 /// to pull this element, process the request, and send back the response to send back to the
120 /// peer.
121 ///
122 /// The size of the channel has to be carefully chosen. If the channel is full, the networking
123 /// service will discard the incoming request send back an error to the peer. Consequently,
124 /// the channel being full is an indicator that the node is overloaded.
125 ///
126 /// You can typically set the size of the channel to `T / d`, where `T` is the
127 /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
128 /// build a response.
129 ///
130 /// Can be `None` if the local node does not support answering incoming requests.
131 /// If this is `None`, then the local node will not advertise support for this protocol towards
132 /// other peers. If this is `Some` but the channel is closed, then the local node will
133 /// advertise support for this protocol, but any incoming request will lead to an error being
134 /// sent back.
135 pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
136}
137
138impl ProtocolConfig {
139 /// Creates request-response protocol config.
140 pub fn new(protocol_name: &'static str) -> ProtocolConfig {
141 ProtocolConfig {
142 name: protocol_name,
143 max_request_size: 1024 * 1024,
144 max_response_size: 16 * 1024 * 1024,
145 request_timeout: Duration::from_secs(20),
146 inbound_queue: None,
147 }
148 }
149}
150
151/// A single request received by a peer on a request-response protocol.
152#[derive(Debug)]
153pub struct IncomingRequest {
154 /// Who sent the request.
155 pub peer: PeerId,
156
157 /// Request sent by the remote. Will always be smaller than
158 /// [`ProtocolConfig::max_request_size`].
159 pub payload: Vec<u8>,
160
161 /// Channel to send back the response.
162 ///
163 /// There are two ways to indicate that handling the request failed:
164 ///
165 /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
166 ///
167 /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
168 /// the given peer.
169 pub pending_response: oneshot::Sender<OutgoingResponse>,
170}
171
172/// Response for an incoming request to be send by a request protocol handler.
173#[derive(Debug)]
174pub struct OutgoingResponse {
175 /// The payload of the response.
176 ///
177 /// `Err(())` if none is available e.g. due an error while handling the request.
178 pub result: Result<Vec<u8>, ()>,
179
180 /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
181 /// peer.
182 ///
183 /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
184 /// outgoing data for each TCP socket, and it is not possible for a user
185 /// application to inspect this buffer. This channel here is not actually notified
186 /// when the response has been fully sent out, but rather when it has fully been
187 /// written to the buffer managed by the operating system.
188 pub sent_feedback: Option<oneshot::Sender<()>>,
189}
190
191/// Event generated by the [`RequestResponseFactoryBehaviour`].
192#[derive(Debug)]
193// We are not reading these events in a meaningful way right now, but the fields in there are still
194// potentially useful
195#[allow(dead_code)]
196pub enum Event {
197 /// A remote sent a request and either we have successfully answered it or an error happened.
198 ///
199 /// This event is generated for statistics purposes.
200 InboundRequest {
201 /// Peer which has emitted the request.
202 peer: PeerId,
203 /// Name of the protocol in question.
204 protocol: Cow<'static, str>,
205 /// Whether handling the request was successful or unsuccessful.
206 ///
207 /// When successful contains the time elapsed between when we received the request and when
208 /// we sent back the response. When unsuccessful contains the failure reason.
209 result: Result<(), ResponseFailure>,
210 },
211
212 /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
213 /// failed.
214 ///
215 /// This event is generated for statistics purposes.
216 RequestFinished {
217 /// Peer that we sent the request to, if one was chosen.
218 peer: Option<PeerId>,
219 /// Name of the protocol in question.
220 protocol: Cow<'static, str>,
221 /// Duration the request took.
222 duration: Duration,
223 /// Result of the request.
224 result: Result<(), String>,
225 },
226}
227
228/// Combination of a protocol name and a request id.
229///
230/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
231/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
232/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
233/// [`ProtocolRequestId`]s.
234#[derive(Debug, Clone, PartialEq, Eq, Hash)]
235struct ProtocolRequestId {
236 protocol: Cow<'static, str>,
237 request_id: OutboundRequestId,
238}
239
240impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
241 #[inline]
242 fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
243 Self {
244 protocol,
245 request_id,
246 }
247 }
248}
249
250/// When sending a request, what to do on a disconnected recipient.
251#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
252pub enum IfDisconnected {
253 /// Try to connect to the peer.
254 TryConnect,
255 /// Just fail if the destination is not yet connected.
256 #[allow(dead_code)] // reserved for the future logic or config change
257 ImmediateError,
258}
259
260/// Convenience functions for `IfDisconnected`.
261impl IfDisconnected {
262 /// Shall we connect to a disconnected peer?
263 pub fn should_connect(self) -> bool {
264 match self {
265 Self::TryConnect => true,
266 Self::ImmediateError => false,
267 }
268 }
269}
270
271/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
272/// protocols.
273#[allow(clippy::type_complexity)] // to preserve compatibility with copied implementation
274pub struct RequestResponseFactoryBehaviour {
275 /// The multiple sub-protocols, by name.
276 /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
277 /// "response builder" used to build responses for incoming requests.
278 protocols: HashMap<
279 Cow<'static, str>,
280 (
281 RequestResponse<GenericCodec>,
282 Option<mpsc::Sender<IncomingRequest>>,
283 ),
284 >,
285
286 /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
287 pending_requests:
288 HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
289
290 /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
291 /// start time and the response to send back to the remote.
292 pending_responses: stream::FuturesUnordered<
293 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
294 >,
295
296 /// Pending message request, holds `MessageRequest` as a Future state to poll it
297 /// until we get a response from `Peerset`
298 message_request: Option<MessageRequest>,
299
300 /// Request handlers future collection.
301 request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
302}
303
304// This is a state of processing incoming request Message.
305struct MessageRequest {
306 peer: PeerId,
307 request_id: InboundRequestId,
308 request: Vec<u8>,
309 channel: ResponseChannel<Result<Vec<u8>, ()>>,
310 protocol: String,
311 response_builder: Option<mpsc::Sender<IncomingRequest>>,
312}
313
314/// Generated by the response builder and waiting to be processed.
315struct RequestProcessingOutcome {
316 request_id: InboundRequestId,
317 protocol: Cow<'static, str>,
318 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
319 response: OutgoingResponse,
320}
321
322impl RequestResponseFactoryBehaviour {
323 /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
324 /// the same protocol is passed twice.
325 pub fn new(
326 list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
327 max_concurrent_streams: usize,
328 ) -> Result<Self, RegisterError> {
329 let mut protocols = HashMap::new();
330 let mut request_handlers = Vec::new();
331 for mut handler in list {
332 let config = handler.protocol_config();
333
334 let protocol_support = if config.inbound_queue.is_some() {
335 ProtocolSupport::Full
336 } else {
337 ProtocolSupport::Outbound
338 };
339
340 let rq_rp = RequestResponse::with_codec(
341 GenericCodec {
342 max_request_size: config.max_request_size,
343 max_response_size: config.max_response_size,
344 },
345 iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
346 RequestResponseConfig::default()
347 .with_request_timeout(config.request_timeout)
348 .with_max_concurrent_streams(max_concurrent_streams),
349 );
350
351 match protocols.entry(Cow::Borrowed(config.name)) {
352 Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
353 Entry::Occupied(e) => {
354 return Err(RegisterError::DuplicateProtocol(e.key().clone()))
355 }
356 };
357
358 let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
359 Box::pin(async move { handler.run().await }.fuse());
360
361 request_handlers.push(request_handler_run);
362 }
363
364 Ok(Self {
365 protocols,
366 pending_requests: Default::default(),
367 pending_responses: Default::default(),
368 message_request: None,
369 request_handlers,
370 })
371 }
372
373 /// Initiates sending a request.
374 ///
375 /// If there is no established connection to the target peer, the behavior is determined by the
376 /// choice of `connect`.
377 ///
378 /// An error is returned if the protocol doesn't match one that has been registered.
379 pub fn send_request(
380 &mut self,
381 target: &PeerId,
382 protocol_name: &str,
383 request: Vec<u8>,
384 pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
385 connect: IfDisconnected,
386 addresses: Vec<Multiaddr>,
387 ) {
388 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
389 if protocol.is_connected(target) || connect.should_connect() {
390 let opts = DialOpts::peer_id(*target).addresses(addresses).build();
391 let request_id = protocol.send_request(opts, request);
392 let prev_req_id = self.pending_requests.insert(
393 (protocol_name.to_string().into(), request_id).into(),
394 (Instant::now(), pending_response),
395 );
396 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
397 } else if pending_response
398 .send(Err(RequestFailure::NotConnected))
399 .is_err()
400 {
401 debug!(
402 target: LOG_TARGET,
403 "Not connected to peer {:?}. At the same time local \
404 node is no longer interested in the result.",
405 target,
406 );
407 }
408 } else if pending_response
409 .send(Err(RequestFailure::UnknownProtocol))
410 .is_err()
411 {
412 debug!(
413 target: LOG_TARGET,
414 "Unknown protocol {:?}. At the same time local \
415 node is no longer interested in the result.",
416 protocol_name,
417 );
418 }
419 }
420}
421
422impl NetworkBehaviour for RequestResponseFactoryBehaviour {
423 type ConnectionHandler = MultiHandler<
424 String,
425 <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
426 >;
427 type ToSwarm = Event;
428
429 fn handle_established_inbound_connection(
430 &mut self,
431 connection_id: ConnectionId,
432 peer: PeerId,
433 local_addr: &Multiaddr,
434 remote_addr: &Multiaddr,
435 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
436 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
437 (
438 p.to_string(),
439 r.handle_established_inbound_connection(
440 connection_id,
441 peer,
442 local_addr,
443 remote_addr,
444 )
445 .expect(
446 "Behaviours return handlers in these methods with the exception of \
447 'connection management' behaviours like connection-limits or allow-black list. \
448 So, inner request-response behaviour always returns Ok(handler).",
449 ),
450 )
451 });
452
453 let handler = MultiHandler::try_from_iter(iter).expect(
454 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
455 which is the only possible error; qed",
456 );
457
458 Ok(handler)
459 }
460
461 fn handle_established_outbound_connection(
462 &mut self,
463 connection_id: ConnectionId,
464 peer: PeerId,
465 addr: &Multiaddr,
466 role_override: Endpoint,
467 port_use: PortUse,
468 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
469 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
470 (
471 p.to_string(),
472 r.handle_established_outbound_connection(
473 connection_id,
474 peer,
475 addr,
476 role_override,
477 port_use,
478 )
479 .expect(
480 "Behaviours return handlers in these methods with the exception of \
481 'connection management' behaviours like connection-limits or allow-black \
482 list. So, inner request-response behaviour always returns Ok(handler).",
483 ),
484 )
485 });
486
487 let handler = MultiHandler::try_from_iter(iter).expect(
488 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
489 which is the only possible error; qed",
490 );
491
492 Ok(handler)
493 }
494
495 /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
496 fn on_swarm_event(&mut self, event: FromSwarm) {
497 match event {
498 FromSwarm::ConnectionEstablished(inner) => {
499 for (protocol, _) in self.protocols.values_mut() {
500 protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
501 }
502 }
503 FromSwarm::ConnectionClosed(inner) => {
504 for (protocol, _) in self.protocols.values_mut() {
505 protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
506 peer_id: inner.peer_id,
507 connection_id: inner.connection_id,
508 endpoint: inner.endpoint,
509 cause: inner.cause,
510 remaining_established: inner.remaining_established,
511 }));
512 }
513 }
514 FromSwarm::AddressChange(inner) => {
515 for (protocol, _) in self.protocols.values_mut() {
516 protocol.on_swarm_event(FromSwarm::AddressChange(inner));
517 }
518 }
519 FromSwarm::DialFailure(inner) => {
520 for (protocol, _) in self.protocols.values_mut() {
521 protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
522 peer_id: inner.peer_id,
523 error: inner.error,
524 connection_id: inner.connection_id,
525 }));
526 }
527 }
528 FromSwarm::ListenFailure(inner) => {
529 for (protocol, _) in self.protocols.values_mut() {
530 protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
531 local_addr: inner.local_addr,
532 send_back_addr: inner.send_back_addr,
533 error: inner.error,
534 connection_id: inner.connection_id,
535 peer_id: inner.peer_id,
536 }));
537 }
538 }
539 FromSwarm::NewListener(inner) => {
540 for (protocol, _) in self.protocols.values_mut() {
541 protocol.on_swarm_event(FromSwarm::NewListener(inner));
542 }
543 }
544 FromSwarm::NewListenAddr(inner) => {
545 for (protocol, _) in self.protocols.values_mut() {
546 protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
547 }
548 }
549 FromSwarm::ExpiredListenAddr(inner) => {
550 for (protocol, _) in self.protocols.values_mut() {
551 protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
552 }
553 }
554 FromSwarm::ListenerError(inner) => {
555 for (protocol, _) in self.protocols.values_mut() {
556 protocol.on_swarm_event(FromSwarm::ListenerError(inner));
557 }
558 }
559 FromSwarm::ListenerClosed(inner) => {
560 for (protocol, _) in self.protocols.values_mut() {
561 protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
562 }
563 }
564 FromSwarm::NewExternalAddrCandidate(inner) => {
565 for (protocol, _) in self.protocols.values_mut() {
566 protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
567 }
568 }
569 FromSwarm::ExternalAddrConfirmed(inner) => {
570 for (protocol, _) in self.protocols.values_mut() {
571 protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
572 }
573 }
574 FromSwarm::ExternalAddrExpired(inner) => {
575 for (protocol, _) in self.protocols.values_mut() {
576 protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
577 }
578 }
579 FromSwarm::NewExternalAddrOfPeer(inner) => {
580 for (protocol, _) in self.protocols.values_mut() {
581 protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
582 }
583 }
584 event => {
585 warn!(
586 ?event,
587 "New event must be forwarded to request response protocols"
588 );
589 }
590 };
591 }
592
593 fn on_connection_handler_event(
594 &mut self,
595 peer_id: PeerId,
596 connection: ConnectionId,
597 event: THandlerOutEvent<Self>,
598 ) {
599 let p_name = event.0;
600 if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
601 return proto.on_connection_handler_event(peer_id, connection, event.1);
602 }
603
604 warn!(
605 target: LOG_TARGET,
606 "inject_node_event: no request-response instance registered for protocol {:?}", p_name
607 )
608 }
609
610 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
611 'poll_all: loop {
612 if let Some(message_request) = self.message_request.take() {
613 let MessageRequest {
614 peer,
615 request_id,
616 request,
617 channel,
618 protocol,
619 response_builder,
620 } = message_request;
621
622 let (tx, rx) = oneshot::channel();
623
624 // Submit the request to the "response builder" passed by the user at
625 // initialization.
626 if let Some(mut response_builder) = response_builder {
627 // If the response builder is too busy, silently drop `tx`. This
628 // will be reported by the corresponding `RequestResponse` through
629 // an `InboundFailure::Omission` event.
630 let _ = response_builder.try_send(IncomingRequest {
631 peer,
632 payload: request,
633 pending_response: tx,
634 });
635 } else {
636 debug_assert!(false, "Received message on outbound-only protocol.");
637 }
638
639 self.pending_responses.push(Box::pin(async move {
640 // The `tx` created above can be dropped if we are not capable of
641 // processing this request, which is reflected as a
642 // `InboundFailure::Omission` event.
643 if let Ok(response) = rx.await {
644 Some(RequestProcessingOutcome {
645 request_id,
646 protocol: Cow::from(protocol),
647 inner_channel: channel,
648 response,
649 })
650 } else {
651 None
652 }
653 }));
654
655 // This `continue` makes sure that `pending_responses` gets polled
656 // after we have added the new element.
657 continue 'poll_all;
658 }
659 // Poll to see if any response is ready to be sent back.
660 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
661 let RequestProcessingOutcome {
662 request_id,
663 protocol: protocol_name,
664 inner_channel,
665 response: OutgoingResponse { result, .. },
666 } = match outcome {
667 Some(outcome) => outcome,
668 // The response builder was too busy or handling the request failed. This is
669 // later on reported as a `InboundFailure::Omission`.
670 None => continue,
671 };
672
673 if let Ok(payload) = result {
674 if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
675 if protocol.send_response(inner_channel, Ok(payload)).is_err() {
676 // Note: Failure is handled further below when receiving
677 // `InboundFailure` event from `RequestResponse` behaviour.
678 debug!(
679 target: LOG_TARGET,
680 %request_id,
681 "Failed to send response for request on protocol {} due to a \
682 timeout or due to the connection to the peer being closed. \
683 Dropping response",
684 protocol_name,
685 );
686 }
687 }
688 }
689 }
690
691 for rq_rs_runner in &mut self.request_handlers {
692 // Future.Output == (), so we don't need a result here
693 let _ = rq_rs_runner.poll_unpin(cx);
694 }
695
696 // Poll request-responses protocols.
697 for (protocol, (behaviour, response_builder)) in &mut self.protocols {
698 while let Poll::Ready(event) = behaviour.poll(cx) {
699 let event = match event {
700 // Main events we are interested in.
701 ToSwarm::GenerateEvent(event) => event,
702
703 // Other events generated by the underlying behaviour are transparently
704 // passed through.
705 ToSwarm::Dial { opts } => {
706 if opts.get_peer_id().is_none() {
707 error!(
708 "The request-response isn't supposed to start dialing \
709 addresses"
710 );
711 }
712 return Poll::Ready(ToSwarm::Dial { opts });
713 }
714 ToSwarm::NotifyHandler {
715 peer_id,
716 handler,
717 event,
718 } => {
719 return Poll::Ready(ToSwarm::NotifyHandler {
720 peer_id,
721 handler,
722 event: ((*protocol).to_string(), event),
723 })
724 }
725 ToSwarm::CloseConnection {
726 peer_id,
727 connection,
728 } => {
729 return Poll::Ready(ToSwarm::CloseConnection {
730 peer_id,
731 connection,
732 })
733 }
734 ToSwarm::NewExternalAddrCandidate(observed) => {
735 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed))
736 }
737 ToSwarm::ExternalAddrConfirmed(addr) => {
738 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr))
739 }
740 ToSwarm::ExternalAddrExpired(addr) => {
741 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr))
742 }
743 ToSwarm::ListenOn { opts } => {
744 return Poll::Ready(ToSwarm::ListenOn { opts })
745 }
746 ToSwarm::RemoveListener { id } => {
747 return Poll::Ready(ToSwarm::RemoveListener { id })
748 }
749 event => {
750 warn!(
751 ?event,
752 "New event from request response protocol must be send up"
753 );
754
755 continue;
756 }
757 };
758
759 match event {
760 // Received a request from a remote.
761 RequestResponseEvent::Message {
762 peer,
763 message:
764 RequestResponseMessage::Request {
765 request_id,
766 request,
767 channel,
768 },
769 } => {
770 self.message_request = Some(MessageRequest {
771 peer,
772 request_id,
773 request,
774 channel,
775 protocol: protocol.to_string(),
776 response_builder: response_builder.clone(),
777 });
778
779 // This `continue` makes sure that `message_request` gets polled
780 // after we have added the new element.
781 continue 'poll_all;
782 }
783
784 // Received a response from a remote to one of our requests.
785 RequestResponseEvent::Message {
786 peer,
787 message:
788 RequestResponseMessage::Response {
789 request_id,
790 response,
791 },
792 } => {
793 let (started, delivered) = match self
794 .pending_requests
795 .remove(&(protocol.clone(), request_id).into())
796 {
797 Some((started, pending_response)) => {
798 let delivered = pending_response
799 .send(response.map_err(|()| RequestFailure::Refused))
800 .map_err(|_| RequestFailure::Obsolete.to_string());
801 (started, delivered)
802 }
803 None => {
804 warn!(
805 target: LOG_TARGET,
806 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
807 request_id,
808 );
809 debug_assert!(false);
810 continue;
811 }
812 };
813
814 let out = Event::RequestFinished {
815 peer: Some(peer),
816 protocol: protocol.clone(),
817 duration: started.elapsed(),
818 result: delivered,
819 };
820
821 return Poll::Ready(ToSwarm::GenerateEvent(out));
822 }
823
824 // One of our requests has failed.
825 RequestResponseEvent::OutboundFailure {
826 peer,
827 request_id,
828 error,
829 ..
830 } => {
831 let error_string = error.to_string();
832 let started = match self
833 .pending_requests
834 .remove(&(protocol.clone(), request_id).into())
835 {
836 Some((started, pending_response)) => {
837 if pending_response
838 .send(Err(RequestFailure::Network(error)))
839 .is_err()
840 {
841 debug!(
842 target: LOG_TARGET,
843 %request_id,
844 "Request failed. At the same time local node is no longer interested in \
845 the result",
846 );
847 }
848 started
849 }
850 None => {
851 warn!(
852 target: LOG_TARGET,
853 %request_id,
854 "Received `RequestResponseEvent::Message` with unexpected request",
855 );
856 debug_assert!(false);
857 continue;
858 }
859 };
860
861 let out = Event::RequestFinished {
862 peer,
863 protocol: protocol.clone(),
864 duration: started.elapsed(),
865 result: Err(error_string),
866 };
867
868 return Poll::Ready(ToSwarm::GenerateEvent(out));
869 }
870
871 // An inbound request failed, either while reading the request or due to
872 // failing to send a response.
873 RequestResponseEvent::InboundFailure { peer, error, .. } => {
874 debug!(?error, %peer, "Inbound request failed.");
875
876 let out = Event::InboundRequest {
877 peer,
878 protocol: protocol.clone(),
879 result: Err(ResponseFailure::Network(error)),
880 };
881 return Poll::Ready(ToSwarm::GenerateEvent(out));
882 }
883
884 // A response to an inbound request has been sent.
885 RequestResponseEvent::ResponseSent { peer, .. } => {
886 let out = Event::InboundRequest {
887 peer,
888 protocol: protocol.clone(),
889 result: Ok(()),
890 };
891
892 return Poll::Ready(ToSwarm::GenerateEvent(out));
893 }
894 };
895 }
896 }
897
898 break Poll::Pending;
899 }
900 }
901}
902
903/// Error when registering a protocol.
904#[derive(Debug, thiserror::Error)]
905pub enum RegisterError {
906 /// A protocol has been specified multiple times.
907 #[error("{0}")]
908 DuplicateProtocol(Cow<'static, str>),
909}
910
911/// Error in a request.
912#[derive(Debug, thiserror::Error)]
913#[allow(missing_docs)]
914pub enum RequestFailure {
915 #[error("We are not currently connected to the requested peer.")]
916 NotConnected,
917 #[error("Given protocol hasn't been registered.")]
918 UnknownProtocol,
919 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
920 Refused,
921 #[error("The remote replied, but the local node is no longer interested in the response.")]
922 Obsolete,
923 /// Problem on the network.
924 #[error("Problem on the network: {0}")]
925 Network(OutboundFailure),
926}
927
928/// Error when processing a request sent by a remote.
929#[derive(Debug, thiserror::Error)]
930pub enum ResponseFailure {
931 /// Problem on the network.
932 #[error("Problem on the network: {0}")]
933 Network(InboundFailure),
934}
935
936/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
937/// into requests and responses and vice-versa.
938#[derive(Debug, Clone)]
939#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
940pub struct GenericCodec {
941 max_request_size: u64,
942 max_response_size: u64,
943}
944
945#[async_trait::async_trait]
946impl RequestResponseCodec for GenericCodec {
947 type Protocol = StreamProtocol;
948 type Request = Vec<u8>;
949 type Response = Result<Vec<u8>, ()>;
950
951 async fn read_request<T>(
952 &mut self,
953 _: &Self::Protocol,
954 mut io: &mut T,
955 ) -> io::Result<Self::Request>
956 where
957 T: AsyncRead + Unpin + Send,
958 {
959 // Read the length.
960 let length = unsigned_varint::aio::read_usize(&mut io)
961 .await
962 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
963 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
964 return Err(io::Error::new(
965 io::ErrorKind::InvalidInput,
966 format!(
967 "Request size exceeds limit: {} > {}",
968 length, self.max_request_size
969 ),
970 ));
971 }
972
973 // Read the payload.
974 let mut buffer = vec![0; length];
975 io.read_exact(&mut buffer).await?;
976 Ok(buffer)
977 }
978
979 async fn read_response<T>(
980 &mut self,
981 _: &Self::Protocol,
982 mut io: &mut T,
983 ) -> io::Result<Self::Response>
984 where
985 T: AsyncRead + Unpin + Send,
986 {
987 // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
988 // considered as a protocol error and will result in the entire connection being closed.
989 // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
990 // that this response is an error.
991
992 // Read the length.
993 let length = match unsigned_varint::aio::read_usize(&mut io).await {
994 Ok(l) => l,
995 Err(unsigned_varint::io::ReadError::Io(err))
996 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
997 {
998 return Ok(Err(()))
999 }
1000 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1001 };
1002
1003 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1004 return Err(io::Error::new(
1005 io::ErrorKind::InvalidInput,
1006 format!(
1007 "Response size exceeds limit: {} > {}",
1008 length, self.max_response_size
1009 ),
1010 ));
1011 }
1012
1013 // Read the payload.
1014 let mut buffer = vec![0; length];
1015 io.read_exact(&mut buffer).await?;
1016 Ok(Ok(buffer))
1017 }
1018
1019 async fn write_request<T>(
1020 &mut self,
1021 _: &Self::Protocol,
1022 io: &mut T,
1023 req: Self::Request,
1024 ) -> io::Result<()>
1025 where
1026 T: AsyncWrite + Unpin + Send,
1027 {
1028 // Write the length.
1029 {
1030 let mut buffer = unsigned_varint::encode::usize_buffer();
1031 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1032 .await?;
1033 }
1034
1035 // Write the payload.
1036 io.write_all(&req).await?;
1037
1038 io.close().await?;
1039 Ok(())
1040 }
1041
1042 async fn write_response<T>(
1043 &mut self,
1044 _: &Self::Protocol,
1045 io: &mut T,
1046 res: Self::Response,
1047 ) -> io::Result<()>
1048 where
1049 T: AsyncWrite + Unpin + Send,
1050 {
1051 // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1052 if let Ok(res) = res {
1053 // Write the length.
1054 {
1055 let mut buffer = unsigned_varint::encode::usize_buffer();
1056 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1057 .await?;
1058 }
1059
1060 // Write the payload.
1061 io.write_all(&res).await?;
1062 }
1063
1064 io.close().await?;
1065 Ok(())
1066 }
1067}