1#[cfg(test)]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48 Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49 Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50 OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::dial_opts::DialOpts;
55use libp2p::swarm::handler::multi::MultiHandler;
56use libp2p::swarm::{
57 ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
58};
59use std::borrow::Cow;
60use std::collections::HashMap;
61use std::collections::hash_map::Entry;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{io, iter};
66use tracing::{debug, error, warn};
67
68#[async_trait]
70pub trait RequestHandler: Send {
71 async fn run(&mut self);
73
74 fn protocol_config(&self) -> ProtocolConfig;
76
77 fn protocol_name(&self) -> &'static str;
79
80 fn clone_box(&self) -> Box<dyn RequestHandler>;
82}
83
84impl Clone for Box<dyn RequestHandler> {
85 fn clone(&self) -> Self {
86 self.clone_box()
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ProtocolConfig {
93 pub name: &'static str,
95
96 pub max_request_size: u64,
101
102 pub max_response_size: u64,
107
108 pub request_timeout: Duration,
112
113 pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
134}
135
136impl ProtocolConfig {
137 pub fn new(protocol_name: &'static str) -> ProtocolConfig {
139 ProtocolConfig {
140 name: protocol_name,
141 max_request_size: 1024 * 1024,
142 max_response_size: 16 * 1024 * 1024,
143 request_timeout: Duration::from_secs(20),
144 inbound_queue: None,
145 }
146 }
147}
148
149#[derive(Debug)]
151pub struct IncomingRequest {
152 pub peer: PeerId,
154
155 pub payload: Vec<u8>,
158
159 pub pending_response: oneshot::Sender<OutgoingResponse>,
168}
169
170#[derive(Debug)]
172pub struct OutgoingResponse {
173 pub result: Result<Vec<u8>, ()>,
177
178 pub sent_feedback: Option<oneshot::Sender<()>>,
187}
188
189#[derive(Debug)]
191#[allow(dead_code)]
194pub enum Event {
195 InboundRequest {
199 peer: PeerId,
201 protocol: Cow<'static, str>,
203 result: Result<(), ResponseFailure>,
208 },
209
210 RequestFinished {
215 peer: Option<PeerId>,
217 protocol: Cow<'static, str>,
219 duration: Duration,
221 result: Result<(), String>,
223 },
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Hash)]
233struct ProtocolRequestId {
234 protocol: Cow<'static, str>,
235 request_id: OutboundRequestId,
236}
237
238impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
239 #[inline]
240 fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
241 Self {
242 protocol,
243 request_id,
244 }
245 }
246}
247
248#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
250pub enum IfDisconnected {
251 TryConnect,
253 #[allow(dead_code)] ImmediateError,
256}
257
258impl IfDisconnected {
260 pub fn should_connect(self) -> bool {
262 match self {
263 Self::TryConnect => true,
264 Self::ImmediateError => false,
265 }
266 }
267}
268
269#[allow(clippy::type_complexity)] pub struct RequestResponseFactoryBehaviour {
273 protocols: HashMap<
277 Cow<'static, str>,
278 (
279 RequestResponse<GenericCodec>,
280 Option<mpsc::Sender<IncomingRequest>>,
281 ),
282 >,
283
284 pending_requests:
286 HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
287
288 pending_responses: stream::FuturesUnordered<
291 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
292 >,
293
294 message_request: Option<MessageRequest>,
297
298 request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
300}
301
302struct MessageRequest {
304 peer: PeerId,
305 request_id: InboundRequestId,
306 request: Vec<u8>,
307 channel: ResponseChannel<Result<Vec<u8>, ()>>,
308 protocol: String,
309 response_builder: Option<mpsc::Sender<IncomingRequest>>,
310}
311
312struct RequestProcessingOutcome {
314 request_id: InboundRequestId,
315 protocol: Cow<'static, str>,
316 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
317 response: OutgoingResponse,
318}
319
320impl RequestResponseFactoryBehaviour {
321 pub fn new(
324 list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
325 max_concurrent_streams: usize,
326 ) -> Result<Self, RegisterError> {
327 let mut protocols = HashMap::new();
328 let mut request_handlers = Vec::new();
329 for mut handler in list {
330 let config = handler.protocol_config();
331
332 let protocol_support = if config.inbound_queue.is_some() {
333 ProtocolSupport::Full
334 } else {
335 ProtocolSupport::Outbound
336 };
337
338 let rq_rp = RequestResponse::with_codec(
339 GenericCodec {
340 max_request_size: config.max_request_size,
341 max_response_size: config.max_response_size,
342 },
343 iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
344 RequestResponseConfig::default()
345 .with_request_timeout(config.request_timeout)
346 .with_max_concurrent_streams(max_concurrent_streams),
347 );
348
349 match protocols.entry(Cow::Borrowed(config.name)) {
350 Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
351 Entry::Occupied(e) => {
352 return Err(RegisterError::DuplicateProtocol(e.key().clone()));
353 }
354 };
355
356 let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
357 Box::pin(async move { handler.run().await }.fuse());
358
359 request_handlers.push(request_handler_run);
360 }
361
362 Ok(Self {
363 protocols,
364 pending_requests: Default::default(),
365 pending_responses: Default::default(),
366 message_request: None,
367 request_handlers,
368 })
369 }
370
371 pub fn send_request(
378 &mut self,
379 target: &PeerId,
380 protocol_name: &str,
381 request: Vec<u8>,
382 pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
383 connect: IfDisconnected,
384 addresses: Vec<Multiaddr>,
385 ) {
386 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
387 if protocol.is_connected(target) || connect.should_connect() {
388 let opts = DialOpts::peer_id(*target).addresses(addresses).build();
389 let request_id = protocol.send_request(opts, request);
390 let prev_req_id = self.pending_requests.insert(
391 (protocol_name.to_string().into(), request_id).into(),
392 (Instant::now(), pending_response),
393 );
394 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
395 } else if pending_response
396 .send(Err(RequestFailure::NotConnected))
397 .is_err()
398 {
399 debug!(
400 "Not connected to peer {:?}. At the same time local \
401 node is no longer interested in the result.",
402 target,
403 );
404 }
405 } else if pending_response
406 .send(Err(RequestFailure::UnknownProtocol))
407 .is_err()
408 {
409 debug!(
410 "Unknown protocol {:?}. At the same time local \
411 node is no longer interested in the result.",
412 protocol_name,
413 );
414 }
415 }
416}
417
418impl NetworkBehaviour for RequestResponseFactoryBehaviour {
419 type ConnectionHandler = MultiHandler<
420 String,
421 <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
422 >;
423 type ToSwarm = Event;
424
425 fn handle_established_inbound_connection(
426 &mut self,
427 connection_id: ConnectionId,
428 peer: PeerId,
429 local_addr: &Multiaddr,
430 remote_addr: &Multiaddr,
431 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
432 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
433 (
434 p.to_string(),
435 r.handle_established_inbound_connection(
436 connection_id,
437 peer,
438 local_addr,
439 remote_addr,
440 )
441 .expect(
442 "Behaviours return handlers in these methods with the exception of \
443 'connection management' behaviours like connection-limits or allow-black list. \
444 So, inner request-response behaviour always returns Ok(handler).",
445 ),
446 )
447 });
448
449 let handler = MultiHandler::try_from_iter(iter).expect(
450 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
451 which is the only possible error; qed",
452 );
453
454 Ok(handler)
455 }
456
457 fn handle_established_outbound_connection(
458 &mut self,
459 connection_id: ConnectionId,
460 peer: PeerId,
461 addr: &Multiaddr,
462 role_override: Endpoint,
463 port_use: PortUse,
464 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
465 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
466 (
467 p.to_string(),
468 r.handle_established_outbound_connection(
469 connection_id,
470 peer,
471 addr,
472 role_override,
473 port_use,
474 )
475 .expect(
476 "Behaviours return handlers in these methods with the exception of \
477 'connection management' behaviours like connection-limits or allow-black \
478 list. So, inner request-response behaviour always returns Ok(handler).",
479 ),
480 )
481 });
482
483 let handler = MultiHandler::try_from_iter(iter).expect(
484 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
485 which is the only possible error; qed",
486 );
487
488 Ok(handler)
489 }
490
491 fn on_swarm_event(&mut self, event: FromSwarm) {
493 match event {
494 FromSwarm::ConnectionEstablished(inner) => {
495 for (protocol, _) in self.protocols.values_mut() {
496 protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
497 }
498 }
499 FromSwarm::ConnectionClosed(inner) => {
500 for (protocol, _) in self.protocols.values_mut() {
501 protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
502 peer_id: inner.peer_id,
503 connection_id: inner.connection_id,
504 endpoint: inner.endpoint,
505 cause: inner.cause,
506 remaining_established: inner.remaining_established,
507 }));
508 }
509 }
510 FromSwarm::AddressChange(inner) => {
511 for (protocol, _) in self.protocols.values_mut() {
512 protocol.on_swarm_event(FromSwarm::AddressChange(inner));
513 }
514 }
515 FromSwarm::DialFailure(inner) => {
516 for (protocol, _) in self.protocols.values_mut() {
517 protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
518 peer_id: inner.peer_id,
519 error: inner.error,
520 connection_id: inner.connection_id,
521 }));
522 }
523 }
524 FromSwarm::ListenFailure(inner) => {
525 for (protocol, _) in self.protocols.values_mut() {
526 protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
527 local_addr: inner.local_addr,
528 send_back_addr: inner.send_back_addr,
529 error: inner.error,
530 connection_id: inner.connection_id,
531 peer_id: inner.peer_id,
532 }));
533 }
534 }
535 FromSwarm::NewListener(inner) => {
536 for (protocol, _) in self.protocols.values_mut() {
537 protocol.on_swarm_event(FromSwarm::NewListener(inner));
538 }
539 }
540 FromSwarm::NewListenAddr(inner) => {
541 for (protocol, _) in self.protocols.values_mut() {
542 protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
543 }
544 }
545 FromSwarm::ExpiredListenAddr(inner) => {
546 for (protocol, _) in self.protocols.values_mut() {
547 protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
548 }
549 }
550 FromSwarm::ListenerError(inner) => {
551 for (protocol, _) in self.protocols.values_mut() {
552 protocol.on_swarm_event(FromSwarm::ListenerError(inner));
553 }
554 }
555 FromSwarm::ListenerClosed(inner) => {
556 for (protocol, _) in self.protocols.values_mut() {
557 protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
558 }
559 }
560 FromSwarm::NewExternalAddrCandidate(inner) => {
561 for (protocol, _) in self.protocols.values_mut() {
562 protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
563 }
564 }
565 FromSwarm::ExternalAddrConfirmed(inner) => {
566 for (protocol, _) in self.protocols.values_mut() {
567 protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
568 }
569 }
570 FromSwarm::ExternalAddrExpired(inner) => {
571 for (protocol, _) in self.protocols.values_mut() {
572 protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
573 }
574 }
575 FromSwarm::NewExternalAddrOfPeer(inner) => {
576 for (protocol, _) in self.protocols.values_mut() {
577 protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
578 }
579 }
580 event => {
581 warn!(
582 ?event,
583 "New event must be forwarded to request response protocols"
584 );
585 }
586 };
587 }
588
589 fn on_connection_handler_event(
590 &mut self,
591 peer_id: PeerId,
592 connection: ConnectionId,
593 event: THandlerOutEvent<Self>,
594 ) {
595 let p_name = event.0;
596 if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
597 return proto.on_connection_handler_event(peer_id, connection, event.1);
598 }
599
600 warn!(
601 "inject_node_event: no request-response instance registered for protocol {:?}",
602 p_name
603 )
604 }
605
606 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
607 'poll_all: loop {
608 if let Some(message_request) = self.message_request.take() {
609 let MessageRequest {
610 peer,
611 request_id,
612 request,
613 channel,
614 protocol,
615 response_builder,
616 } = message_request;
617
618 let (tx, rx) = oneshot::channel();
619
620 if let Some(mut response_builder) = response_builder {
623 let _ = response_builder.try_send(IncomingRequest {
627 peer,
628 payload: request,
629 pending_response: tx,
630 });
631 } else {
632 debug_assert!(false, "Received message on outbound-only protocol.");
633 }
634
635 self.pending_responses.push(Box::pin(async move {
636 if let Ok(response) = rx.await {
640 Some(RequestProcessingOutcome {
641 request_id,
642 protocol: Cow::from(protocol),
643 inner_channel: channel,
644 response,
645 })
646 } else {
647 None
648 }
649 }));
650
651 continue 'poll_all;
654 }
655 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
657 let RequestProcessingOutcome {
658 request_id,
659 protocol: protocol_name,
660 inner_channel,
661 response: OutgoingResponse { result, .. },
662 } = match outcome {
663 Some(outcome) => outcome,
664 None => continue,
667 };
668
669 if let Ok(payload) = result
670 && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
671 && protocol.send_response(inner_channel, Ok(payload)).is_err()
672 {
673 debug!(
676 %request_id,
677 "Failed to send response for request on protocol {} due to a \
678 timeout or due to the connection to the peer being closed. \
679 Dropping response",
680 protocol_name,
681 );
682 }
683 }
684
685 for rq_rs_runner in &mut self.request_handlers {
686 let _ = rq_rs_runner.poll_unpin(cx);
688 }
689
690 for (protocol, (behaviour, response_builder)) in &mut self.protocols {
692 while let Poll::Ready(event) = behaviour.poll(cx) {
693 let event = match event {
694 ToSwarm::GenerateEvent(event) => event,
696
697 ToSwarm::Dial { opts } => {
700 if opts.get_peer_id().is_none() {
701 error!(
702 "The request-response isn't supposed to start dialing \
703 addresses"
704 );
705 }
706 return Poll::Ready(ToSwarm::Dial { opts });
707 }
708 ToSwarm::NotifyHandler {
709 peer_id,
710 handler,
711 event,
712 } => {
713 return Poll::Ready(ToSwarm::NotifyHandler {
714 peer_id,
715 handler,
716 event: ((*protocol).to_string(), event),
717 });
718 }
719 ToSwarm::CloseConnection {
720 peer_id,
721 connection,
722 } => {
723 return Poll::Ready(ToSwarm::CloseConnection {
724 peer_id,
725 connection,
726 });
727 }
728 ToSwarm::NewExternalAddrCandidate(observed) => {
729 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
730 }
731 ToSwarm::ExternalAddrConfirmed(addr) => {
732 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
733 }
734 ToSwarm::ExternalAddrExpired(addr) => {
735 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
736 }
737 ToSwarm::ListenOn { opts } => {
738 return Poll::Ready(ToSwarm::ListenOn { opts });
739 }
740 ToSwarm::RemoveListener { id } => {
741 return Poll::Ready(ToSwarm::RemoveListener { id });
742 }
743 event => {
744 warn!(
745 ?event,
746 "New event from request response protocol must be send up"
747 );
748
749 continue;
750 }
751 };
752
753 match event {
754 RequestResponseEvent::Message {
756 peer,
757 message:
758 RequestResponseMessage::Request {
759 request_id,
760 request,
761 channel,
762 },
763 } => {
764 self.message_request = Some(MessageRequest {
765 peer,
766 request_id,
767 request,
768 channel,
769 protocol: protocol.to_string(),
770 response_builder: response_builder.clone(),
771 });
772
773 continue 'poll_all;
776 }
777
778 RequestResponseEvent::Message {
780 peer,
781 message:
782 RequestResponseMessage::Response {
783 request_id,
784 response,
785 },
786 } => {
787 let (started, delivered) = match self
788 .pending_requests
789 .remove(&(protocol.clone(), request_id).into())
790 {
791 Some((started, pending_response)) => {
792 let delivered = pending_response
793 .send(response.map_err(|()| RequestFailure::Refused))
794 .map_err(|_| RequestFailure::Obsolete.to_string());
795 (started, delivered)
796 }
797 None => {
798 warn!(
799 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
800 request_id,
801 );
802 debug_assert!(false);
803 continue;
804 }
805 };
806
807 let out = Event::RequestFinished {
808 peer: Some(peer),
809 protocol: protocol.clone(),
810 duration: started.elapsed(),
811 result: delivered,
812 };
813
814 return Poll::Ready(ToSwarm::GenerateEvent(out));
815 }
816
817 RequestResponseEvent::OutboundFailure {
819 peer,
820 request_id,
821 error,
822 ..
823 } => {
824 let error_string = error.to_string();
825 let started = match self
826 .pending_requests
827 .remove(&(protocol.clone(), request_id).into())
828 {
829 Some((started, pending_response)) => {
830 if pending_response
831 .send(Err(RequestFailure::Network(error)))
832 .is_err()
833 {
834 debug!(
835 %request_id,
836 "Request failed. At the same time local node is no longer interested in \
837 the result",
838 );
839 }
840 started
841 }
842 None => {
843 warn!(
844 %request_id,
845 "Received `RequestResponseEvent::Message` with unexpected request",
846 );
847 debug_assert!(false);
848 continue;
849 }
850 };
851
852 let out = Event::RequestFinished {
853 peer,
854 protocol: protocol.clone(),
855 duration: started.elapsed(),
856 result: Err(error_string),
857 };
858
859 return Poll::Ready(ToSwarm::GenerateEvent(out));
860 }
861
862 RequestResponseEvent::InboundFailure { peer, error, .. } => {
865 debug!(?error, %peer, "Inbound request failed.");
866
867 let out = Event::InboundRequest {
868 peer,
869 protocol: protocol.clone(),
870 result: Err(ResponseFailure::Network(error)),
871 };
872 return Poll::Ready(ToSwarm::GenerateEvent(out));
873 }
874
875 RequestResponseEvent::ResponseSent { peer, .. } => {
877 let out = Event::InboundRequest {
878 peer,
879 protocol: protocol.clone(),
880 result: Ok(()),
881 };
882
883 return Poll::Ready(ToSwarm::GenerateEvent(out));
884 }
885 };
886 }
887 }
888
889 break Poll::Pending;
890 }
891 }
892}
893
894#[derive(Debug, thiserror::Error)]
896pub enum RegisterError {
897 #[error("{0}")]
899 DuplicateProtocol(Cow<'static, str>),
900}
901
902#[derive(Debug, thiserror::Error)]
904#[allow(missing_docs)]
905pub enum RequestFailure {
906 #[error("We are not currently connected to the requested peer.")]
907 NotConnected,
908 #[error("Given protocol hasn't been registered.")]
909 UnknownProtocol,
910 #[error(
911 "Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it."
912 )]
913 Refused,
914 #[error("The remote replied, but the local node is no longer interested in the response.")]
915 Obsolete,
916 #[error("Problem on the network: {0}")]
918 Network(OutboundFailure),
919}
920
921#[derive(Debug, thiserror::Error)]
923pub enum ResponseFailure {
924 #[error("Problem on the network: {0}")]
926 Network(InboundFailure),
927}
928
929#[derive(Debug, Clone)]
932#[doc(hidden)] pub struct GenericCodec {
934 max_request_size: u64,
935 max_response_size: u64,
936}
937
938#[async_trait::async_trait]
939impl RequestResponseCodec for GenericCodec {
940 type Protocol = StreamProtocol;
941 type Request = Vec<u8>;
942 type Response = Result<Vec<u8>, ()>;
943
944 async fn read_request<T>(
945 &mut self,
946 _: &Self::Protocol,
947 mut io: &mut T,
948 ) -> io::Result<Self::Request>
949 where
950 T: AsyncRead + Unpin + Send,
951 {
952 let length = unsigned_varint::aio::read_usize(&mut io)
954 .await
955 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
956 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
957 return Err(io::Error::new(
958 io::ErrorKind::InvalidInput,
959 format!(
960 "Request size exceeds limit: {} > {}",
961 length, self.max_request_size
962 ),
963 ));
964 }
965
966 let mut buffer = vec![0; length];
968 io.read_exact(&mut buffer).await?;
969 Ok(buffer)
970 }
971
972 async fn read_response<T>(
973 &mut self,
974 _: &Self::Protocol,
975 mut io: &mut T,
976 ) -> io::Result<Self::Response>
977 where
978 T: AsyncRead + Unpin + Send,
979 {
980 let length = match unsigned_varint::aio::read_usize(&mut io).await {
987 Ok(l) => l,
988 Err(unsigned_varint::io::ReadError::Io(err))
989 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
990 {
991 return Ok(Err(()));
992 }
993 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
994 };
995
996 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
997 return Err(io::Error::new(
998 io::ErrorKind::InvalidInput,
999 format!(
1000 "Response size exceeds limit: {} > {}",
1001 length, self.max_response_size
1002 ),
1003 ));
1004 }
1005
1006 let mut buffer = vec![0; length];
1008 io.read_exact(&mut buffer).await?;
1009 Ok(Ok(buffer))
1010 }
1011
1012 async fn write_request<T>(
1013 &mut self,
1014 _: &Self::Protocol,
1015 io: &mut T,
1016 req: Self::Request,
1017 ) -> io::Result<()>
1018 where
1019 T: AsyncWrite + Unpin + Send,
1020 {
1021 {
1023 let mut buffer = unsigned_varint::encode::usize_buffer();
1024 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1025 .await?;
1026 }
1027
1028 io.write_all(&req).await?;
1030
1031 io.close().await?;
1032 Ok(())
1033 }
1034
1035 async fn write_response<T>(
1036 &mut self,
1037 _: &Self::Protocol,
1038 io: &mut T,
1039 res: Self::Response,
1040 ) -> io::Result<()>
1041 where
1042 T: AsyncWrite + Unpin + Send,
1043 {
1044 if let Ok(res) = res {
1046 {
1048 let mut buffer = unsigned_varint::encode::usize_buffer();
1049 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1050 .await?;
1051 }
1052
1053 io.write_all(&res).await?;
1055 }
1056
1057 io.close().await?;
1058 Ok(())
1059 }
1060}