subspace_farmer/cluster/
nats_client.rs

1//! NATS client
2//!
3//! [`NatsClient`] provided here is a wrapper around [`Client`] that provides convenient methods
4//! using domain-specific traits.
5//!
6//! Before reading code, make sure to familiarize yourself with NATS documentation, especially with
7//! [subjects](https://docs.nats.io/nats-concepts/subjects) and
8//! [Core NATS](https://docs.nats.io/nats-concepts/core-nats) features.
9//!
10//! Abstractions provided here cover a few use cases:
11//! * request/response (for example piece request)
12//! * request/stream of responses (for example a stream of plotted sectors of the farmer)
13//! * notifications (typically targeting a particular instance of an app) and corresponding subscriptions (for example solution notification)
14//! * broadcasts and corresponding subscriptions (for example slot info broadcast)
15
16use crate::utils::AsyncJoinOnDrop;
17use anyhow::anyhow;
18use async_nats::{
19    Client, ConnectOptions, HeaderMap, HeaderValue, Message, PublishError, RequestError,
20    RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs,
21};
22use backoff::backoff::Backoff;
23use backoff::ExponentialBackoff;
24use futures::channel::mpsc;
25use futures::stream::FuturesUnordered;
26use futures::{select, FutureExt, Stream, StreamExt};
27use parity_scale_codec::{Decode, Encode};
28use std::any::type_name;
29use std::collections::VecDeque;
30use std::future::Future;
31use std::marker::PhantomData;
32use std::ops::Deref;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::task::{Context, Poll};
36use std::time::Duration;
37use std::{fmt, mem};
38use thiserror::Error;
39use tracing::{debug, error, trace, warn, Instrument};
40use ulid::Ulid;
41
42const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
43const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1);
44/// Requests should time out eventually, but we should set a larger timeout to allow for spikes in
45/// load to be absorbed gracefully
46const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);
47
48/// Generic request with associated response.
49///
50/// Used for cases where request/response pattern is needed and response contains a single small
51/// message. For large messages or multiple messages chunking with [`GenericStreamRequest`] can be
52/// used instead.
53pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
54    /// Request subject with optional `*` in place of application instance to receive the request
55    const SUBJECT: &'static str;
56    /// Response type that corresponds to this request
57    type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
58}
59
60/// Generic stream request where response is streamed using
61/// [`NatsClient::stream_request_responder`].
62///
63/// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or
64/// there is a very large number of messages to send. For simple request/response patten
65/// [`GenericRequest`] can be used instead.
66pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
67    /// Request subject with optional `*` in place of application instance to receive the request
68    const SUBJECT: &'static str;
69    /// Response type that corresponds to this stream request.
70    ///
71    /// These responses are send as a stream of messages, each message must fit into NATS message,
72    /// [`NatsClient::approximate_max_message_size()`] can be used to estimate appropriate message
73    /// size in case chunking is needed.
74    type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
75}
76
77/// Messages sent in response to [`GenericStreamRequest`].
78///
79/// Empty list of responses means the end of the stream.
80#[derive(Debug, Encode, Decode)]
81enum GenericStreamResponses<Response> {
82    /// Some responses, but the stream didn't end yet
83    Continue {
84        /// Monotonically increasing index of responses in a stream
85        index: u32,
86        /// Individual responses
87        responses: VecDeque<Response>,
88        /// Subject where to send acknowledgement of received stream response indices, which acts as
89        /// a backpressure mechanism
90        ack_subject: String,
91    },
92    /// Remaining responses and this is the end of the stream.
93    Last {
94        /// Monotonically increasing index of responses in a stream
95        index: u32,
96        /// Individual responses
97        responses: VecDeque<Response>,
98    },
99}
100
101impl<Response> From<GenericStreamResponses<Response>> for VecDeque<Response> {
102    #[inline]
103    fn from(value: GenericStreamResponses<Response>) -> Self {
104        match value {
105            GenericStreamResponses::Continue { responses, .. } => responses,
106            GenericStreamResponses::Last { responses, .. } => responses,
107        }
108    }
109}
110
111impl<Response> GenericStreamResponses<Response> {
112    fn next(&mut self) -> Option<Response> {
113        match self {
114            GenericStreamResponses::Continue { responses, .. } => responses.pop_front(),
115            GenericStreamResponses::Last { responses, .. } => responses.pop_front(),
116        }
117    }
118
119    fn index(&self) -> u32 {
120        match self {
121            GenericStreamResponses::Continue { index, .. } => *index,
122            GenericStreamResponses::Last { index, .. } => *index,
123        }
124    }
125
126    fn ack_subject(&self) -> Option<&str> {
127        if let GenericStreamResponses::Continue { ack_subject, .. } = self {
128            Some(ack_subject)
129        } else {
130            None
131        }
132    }
133
134    fn is_last(&self) -> bool {
135        matches!(self, Self::Last { .. })
136    }
137}
138
139/// Stream request error
140#[derive(Debug, Error)]
141pub enum StreamRequestError {
142    /// Subscribe error
143    #[error("Subscribe error: {0}")]
144    Subscribe(#[from] SubscribeError),
145    /// Publish error
146    #[error("Publish error: {0}")]
147    Publish(#[from] PublishError),
148}
149
150/// Wrapper around subscription that transforms stream of wrapped response messages into a normal
151/// `Response` stream.
152#[derive(Debug)]
153#[pin_project::pin_project]
154pub struct StreamResponseSubscriber<Response> {
155    #[pin]
156    subscriber: Subscriber,
157    response_subject: String,
158    buffered_responses: Option<GenericStreamResponses<Response>>,
159    next_index: u32,
160    acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
161    _background_task: AsyncJoinOnDrop<()>,
162    _phantom: PhantomData<Response>,
163}
164
165impl<Response> Stream for StreamResponseSubscriber<Response>
166where
167    Response: Decode,
168{
169    type Item = Response;
170
171    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172        if let Some(buffered_responses) = self.buffered_responses.as_mut() {
173            if let Some(response) = buffered_responses.next() {
174                return Poll::Ready(Some(response));
175            } else if buffered_responses.is_last() {
176                return Poll::Ready(None);
177            }
178
179            self.buffered_responses.take();
180            self.next_index += 1;
181        }
182
183        let mut projected = self.project();
184        match projected.subscriber.poll_next_unpin(cx) {
185            Poll::Ready(Some(message)) => {
186                match GenericStreamResponses::<Response>::decode(&mut message.payload.as_ref()) {
187                    Ok(mut responses) => {
188                        if responses.index() != *projected.next_index {
189                            warn!(
190                                actual_index = %responses.index(),
191                                expected_index = %*projected.next_index,
192                                message_type = %type_name::<Response>(),
193                                response_subject = %projected.response_subject,
194                                "Received unexpected response stream index, aborting stream"
195                            );
196
197                            return Poll::Ready(None);
198                        }
199
200                        if let Some(ack_subject) = responses.ack_subject() {
201                            let index = responses.index();
202                            let ack_subject = ack_subject.to_string();
203
204                            if let Err(error) = projected
205                                .acknowledgement_sender
206                                .unbounded_send((ack_subject.clone(), index))
207                            {
208                                warn!(
209                                    %error,
210                                    %index,
211                                    message_type = %type_name::<Response>(),
212                                    response_subject = %projected.response_subject,
213                                    %ack_subject,
214                                    "Failed to send acknowledgement for stream response"
215                                );
216                            }
217                        }
218
219                        if let Some(response) = responses.next() {
220                            *projected.buffered_responses = Some(responses);
221                            Poll::Ready(Some(response))
222                        } else {
223                            Poll::Ready(None)
224                        }
225                    }
226                    Err(error) => {
227                        warn!(
228                            %error,
229                            response_type = %type_name::<Response>(),
230                            response_subject = %projected.response_subject,
231                            message = %hex::encode(message.payload),
232                            "Failed to decode stream response"
233                        );
234
235                        Poll::Ready(None)
236                    }
237                }
238            }
239            Poll::Ready(None) => Poll::Ready(None),
240            Poll::Pending => Poll::Pending,
241        }
242    }
243}
244
245impl<Response> StreamResponseSubscriber<Response> {
246    fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
247        let (acknowledgement_sender, mut acknowledgement_receiver) =
248            mpsc::unbounded::<(String, u32)>();
249
250        let ack_publisher_fut = {
251            let response_subject = response_subject.clone();
252
253            async move {
254                while let Some((subject, index)) = acknowledgement_receiver.next().await {
255                    trace!(
256                        %subject,
257                        %index,
258                        %response_subject,
259                        %index,
260                        "Sending stream response acknowledgement"
261                    );
262                    if let Err(error) = nats_client
263                        .publish(subject.clone(), index.to_le_bytes().to_vec().into())
264                        .await
265                    {
266                        warn!(
267                            %error,
268                            %subject,
269                            %index,
270                            %response_subject,
271                            %index,
272                            "Failed to send stream response acknowledgement"
273                        );
274                        return;
275                    }
276                }
277            }
278        };
279        let background_task =
280            AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);
281
282        Self {
283            response_subject,
284            subscriber,
285            buffered_responses: None,
286            next_index: 0,
287            acknowledgement_sender,
288            _background_task: background_task,
289            _phantom: PhantomData,
290        }
291    }
292}
293
294/// Generic one-off notification
295pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static {
296    /// Notification subject with optional `*` in place of application instance receiving the
297    /// request
298    const SUBJECT: &'static str;
299}
300
301/// Generic broadcast message.
302///
303/// Broadcast messages are sent by an instance to (potentially) an instance-specific subject that
304/// any other app can subscribe to. The same broadcast message can also originate from multiple
305/// places and be de-duplicated using [`Self::deterministic_message_id`].
306pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static {
307    /// Broadcast subject with optional `*` in place of application instance sending broadcast
308    const SUBJECT: &'static str;
309
310    /// Deterministic message ID that is used for de-duplicating messages broadcast by different
311    /// instances
312    fn deterministic_message_id(&self) -> Option<HeaderValue> {
313        None
314    }
315}
316
317/// Subscriber wrapper that decodes messages automatically and skips messages that can't be decoded
318#[derive(Debug)]
319#[pin_project::pin_project]
320pub struct SubscriberWrapper<Message> {
321    #[pin]
322    subscriber: Subscriber,
323    _phantom: PhantomData<Message>,
324}
325
326impl<Message> Stream for SubscriberWrapper<Message>
327where
328    Message: Decode,
329{
330    type Item = Message;
331
332    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333        match self.project().subscriber.poll_next_unpin(cx) {
334            Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) {
335                Ok(message) => Poll::Ready(Some(message)),
336                Err(error) => {
337                    warn!(
338                        %error,
339                        message_type = %type_name::<Message>(),
340                        message = %hex::encode(message.payload),
341                        "Failed to decode stream message"
342                    );
343
344                    Poll::Pending
345                }
346            },
347            Poll::Ready(None) => Poll::Ready(None),
348            Poll::Pending => Poll::Pending,
349        }
350    }
351}
352
353#[derive(Debug)]
354struct Inner {
355    client: Client,
356    request_retry_backoff_policy: ExponentialBackoff,
357    approximate_max_message_size: usize,
358    max_message_size: usize,
359}
360
361/// NATS client wrapper that can be used to interact with other Subspace-specific clients
362#[derive(Debug, Clone)]
363pub struct NatsClient {
364    inner: Arc<Inner>,
365}
366
367impl Deref for NatsClient {
368    type Target = Client;
369
370    #[inline]
371    fn deref(&self) -> &Self::Target {
372        &self.inner.client
373    }
374}
375
376impl NatsClient {
377    /// Create new instance by connecting to specified addresses
378    pub async fn new<A: ToServerAddrs>(
379        addrs: A,
380        request_retry_backoff_policy: ExponentialBackoff,
381    ) -> Result<Self, async_nats::Error> {
382        let servers = addrs.to_server_addrs()?.collect::<Vec<_>>();
383        Self::from_client(
384            async_nats::connect_with_options(
385                &servers,
386                ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)),
387            )
388            .await?,
389            request_retry_backoff_policy,
390        )
391    }
392
393    /// Create new client from existing NATS instance
394    pub fn from_client(
395        client: Client,
396        request_retry_backoff_policy: ExponentialBackoff,
397    ) -> Result<Self, async_nats::Error> {
398        let max_payload = client.server_info().max_payload;
399        if max_payload < EXPECTED_MESSAGE_SIZE {
400            return Err(format!(
401                "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
402                increase it by specifying max_payload = 2MB or higher number in NATS configuration"
403            )
404            .into());
405        }
406
407        let inner = Inner {
408            client,
409            request_retry_backoff_policy,
410            // Allow up to 90%, the rest will be wrapper data structures, etc.
411            approximate_max_message_size: max_payload * 9 / 10,
412            // Allow up to 90%, the rest will be wrapper data structures, etc.
413            max_message_size: max_payload,
414        };
415
416        Ok(Self {
417            inner: Arc::new(inner),
418        })
419    }
420
421    /// Approximate max message size (a few more bytes will not hurt), the actual limit is expected
422    /// to be a bit higher
423    pub fn approximate_max_message_size(&self) -> usize {
424        self.inner.approximate_max_message_size
425    }
426
427    /// Make request and wait for response
428    pub async fn request<Request>(
429        &self,
430        request: &Request,
431        instance: Option<&str>,
432    ) -> Result<Request::Response, RequestError>
433    where
434        Request: GenericRequest,
435    {
436        let subject = subject_with_instance(Request::SUBJECT, instance);
437        let mut maybe_retry_backoff = None;
438        let message = loop {
439            match self
440                .inner
441                .client
442                .request(subject.clone(), request.encode().into())
443                .await
444            {
445                Ok(message) => {
446                    break message;
447                }
448                Err(error) => {
449                    match error.kind() {
450                        RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
451                            // Continue with retries
452                        }
453                        RequestErrorKind::Other => {
454                            return Err(error);
455                        }
456                    }
457
458                    let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| {
459                        let mut retry_backoff = self.inner.request_retry_backoff_policy.clone();
460                        retry_backoff.reset();
461                        retry_backoff
462                    });
463
464                    if let Some(delay) = retry_backoff.next_backoff() {
465                        debug!(
466                            %subject,
467                            %error,
468                            request_type = %type_name::<Request>(),
469                            ?delay,
470                            "Failed to make request, retrying after some delay"
471                        );
472
473                        tokio::time::sleep(delay).await;
474                        continue;
475                    } else {
476                        return Err(error);
477                    }
478                }
479            }
480        };
481
482        let response =
483            Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
484                warn!(
485                    %subject,
486                    %error,
487                    response_type = %type_name::<Request::Response>(),
488                    response = %hex::encode(message.payload),
489                    "Response decoding failed"
490                );
491
492                RequestErrorKind::Other
493            })?;
494
495        Ok(response)
496    }
497
498    /// Responds to requests from the given subject using the provided processing function.
499    ///
500    /// This will create a subscription on the subject for the given instance (if provided) and
501    /// queue group. Incoming messages will be deserialized as the request type `Request` and passed
502    /// to the `process` function to produce a response of type `Request::Response`. The response
503    /// will then be sent back on the reply subject from the original request.
504    ///
505    /// Each request is processed in a newly created async tokio task.
506    ///
507    /// # Arguments
508    ///
509    /// * `instance` - Optional instance name to use in place of the `*` in the subject
510    /// * `group` - The queue group name for the subscription
511    /// * `process` - The function to call with the decoded request to produce a response
512    pub async fn request_responder<Request, F, OP>(
513        &self,
514        instance: Option<&str>,
515        queue_group: Option<String>,
516        process: OP,
517    ) -> anyhow::Result<()>
518    where
519        Request: GenericRequest,
520        F: Future<Output = Option<Request::Response>> + Send,
521        OP: Fn(Request) -> F + Send + Sync,
522    {
523        // Initialize with pending future so it never ends
524        let mut processing = FuturesUnordered::new();
525
526        let subscription = self
527            .common_subscribe(Request::SUBJECT, instance, queue_group)
528            .await
529            .map_err(|error| {
530                anyhow!(
531                    "Failed to subscribe to {} requests for {instance:?}: {error}",
532                    type_name::<Request>(),
533                )
534            })?;
535
536        debug!(
537            request_type = %type_name::<Request>(),
538            ?subscription,
539            "Requests subscription"
540        );
541        let mut subscription = subscription.fuse();
542
543        loop {
544            select! {
545                message = subscription.select_next_some() => {
546                    // Create background task for concurrent processing
547                    processing.push(
548                        self
549                            .process_request(
550                                message,
551                                &process,
552                            )
553                            .in_current_span(),
554                    );
555                },
556                _ = processing.next() => {
557                    // Nothing to do here
558                },
559                complete => {
560                    break;
561                }
562            }
563        }
564
565        Ok(())
566    }
567
568    async fn process_request<Request, F, OP>(&self, message: Message, process: OP)
569    where
570        Request: GenericRequest,
571        F: Future<Output = Option<Request::Response>> + Send,
572        OP: Fn(Request) -> F + Send + Sync,
573    {
574        let Some(reply_subject) = message.reply else {
575            return;
576        };
577
578        let message_payload_size = message.payload.len();
579        let request = match Request::decode(&mut message.payload.as_ref()) {
580            Ok(request) => {
581                // Free allocation early
582                drop(message.payload);
583                request
584            }
585            Err(error) => {
586                warn!(
587                    request_type = %type_name::<Request>(),
588                    %error,
589                    message = %hex::encode(message.payload),
590                    "Failed to decode request"
591                );
592                return;
593            }
594        };
595
596        // Avoid printing large messages in logs
597        if message_payload_size > 1024 {
598            trace!(
599                request_type = %type_name::<Request>(),
600                %reply_subject,
601                "Processing request"
602            );
603        } else {
604            trace!(
605                request_type = %type_name::<Request>(),
606                ?request,
607                %reply_subject,
608                "Processing request"
609            );
610        }
611
612        if let Some(response) = process(request).await
613            && let Err(error) = self.publish(reply_subject, response.encode().into()).await
614        {
615            warn!(
616                request_type = %type_name::<Request>(),
617                %error,
618                "Failed to send response"
619            );
620        }
621    }
622
623    /// Make request that expects stream response
624    pub async fn stream_request<Request>(
625        &self,
626        request: &Request,
627        instance: Option<&str>,
628    ) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>
629    where
630        Request: GenericStreamRequest,
631    {
632        let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
633        let stream_response_subject = format!("stream-response.{}", Ulid::new());
634
635        let subscriber = self
636            .inner
637            .client
638            .subscribe(stream_response_subject.clone())
639            .await?;
640
641        debug!(
642            request_type = %type_name::<Request>(),
643            %stream_request_subject,
644            %stream_response_subject,
645            ?subscriber,
646            "Stream request subscription"
647        );
648
649        self.inner
650            .client
651            .publish_with_reply(
652                stream_request_subject,
653                stream_response_subject.clone(),
654                request.encode().into(),
655            )
656            .await?;
657
658        Ok(StreamResponseSubscriber::new(
659            subscriber,
660            stream_response_subject,
661            self.clone(),
662        ))
663    }
664
665    /// Responds to stream requests from the given subject using the provided processing function.
666    ///
667    /// This will create a subscription on the subject for the given instance (if provided) and
668    /// queue group. Incoming messages will be deserialized as the request type `Request` and passed
669    /// to the `process` function to produce a stream response of type `Request::Response`. The
670    /// stream response will then be sent back on the reply subject from the original request.
671    ///
672    /// Each request is processed in a newly created async tokio task.
673    ///
674    /// # Arguments
675    ///
676    /// * `instance` - Optional instance name to use in place of the `*` in the subject
677    /// * `group` - The queue group name for the subscription
678    /// * `process` - The function to call with the decoded request to produce a response
679    pub async fn stream_request_responder<Request, F, S, OP>(
680        &self,
681        instance: Option<&str>,
682        queue_group: Option<String>,
683        process: OP,
684    ) -> anyhow::Result<()>
685    where
686        Request: GenericStreamRequest,
687        F: Future<Output = Option<S>> + Send,
688        S: Stream<Item = Request::Response> + Unpin,
689        OP: Fn(Request) -> F + Send + Sync,
690    {
691        // Initialize with pending future so it never ends
692        let mut processing = FuturesUnordered::new();
693
694        let subscription = self
695            .common_subscribe(Request::SUBJECT, instance, queue_group)
696            .await
697            .map_err(|error| {
698                anyhow!(
699                    "Failed to subscribe to {} stream requests for {instance:?}: {error}",
700                    type_name::<Request>(),
701                )
702            })?;
703
704        debug!(
705            request_type = %type_name::<Request>(),
706            ?subscription,
707            "Stream requests subscription"
708        );
709        let mut subscription = subscription.fuse();
710
711        loop {
712            select! {
713                message = subscription.select_next_some() => {
714                    // Create background task for concurrent processing
715                    processing.push(
716                        self
717                        .process_stream_request(
718                            message,
719                            &process,
720                        )
721                        .in_current_span(),
722                    );
723                },
724                _ = processing.next() => {
725                    // Nothing to do here
726                },
727                complete => {
728                    break;
729                }
730            }
731        }
732
733        Ok(())
734    }
735
736    async fn process_stream_request<Request, F, S, OP>(&self, message: Message, process: OP)
737    where
738        Request: GenericStreamRequest,
739        F: Future<Output = Option<S>> + Send,
740        S: Stream<Item = Request::Response> + Unpin,
741        OP: Fn(Request) -> F + Send + Sync,
742    {
743        let Some(reply_subject) = message.reply else {
744            return;
745        };
746
747        let message_payload_size = message.payload.len();
748        let request = match Request::decode(&mut message.payload.as_ref()) {
749            Ok(request) => {
750                // Free allocation early
751                drop(message.payload);
752                request
753            }
754            Err(error) => {
755                warn!(
756                    request_type = %type_name::<Request>(),
757                    %error,
758                    message = %hex::encode(message.payload),
759                    "Failed to decode request"
760                );
761                return;
762            }
763        };
764
765        // Avoid printing large messages in logs
766        if message_payload_size > 1024 {
767            trace!(
768                request_type = %type_name::<Request>(),
769                %reply_subject,
770                "Processing request"
771            );
772        } else {
773            trace!(
774                request_type = %type_name::<Request>(),
775                ?request,
776                %reply_subject,
777                "Processing request"
778            );
779        }
780
781        if let Some(stream) = process(request).await {
782            self.stream_response::<Request, _>(reply_subject, stream)
783                .await;
784        }
785    }
786
787    /// Helper method to send responses to requests initiated with [`Self::stream_request`]
788    async fn stream_response<Request, S>(&self, response_subject: Subject, response_stream: S)
789    where
790        Request: GenericStreamRequest,
791        S: Stream<Item = Request::Response> + Unpin,
792    {
793        type Response<Request> =
794            GenericStreamResponses<<Request as GenericStreamRequest>::Response>;
795
796        let mut response_stream = response_stream.fuse();
797
798        // Pull the first element to measure response size
799        let first_element = match response_stream.next().await {
800            Some(first_element) => first_element,
801            None => {
802                if let Err(error) = self
803                    .publish(
804                        response_subject.clone(),
805                        Response::<Request>::Last {
806                            index: 0,
807                            responses: VecDeque::new(),
808                        }
809                        .encode()
810                        .into(),
811                    )
812                    .await
813                {
814                    warn!(
815                        %response_subject,
816                        %error,
817                        request_type = %type_name::<Request>(),
818                        response_type = %type_name::<Request::Response>(),
819                        "Failed to send stream response"
820                    );
821                }
822
823                return;
824            }
825        };
826        let max_message_size = self.inner.max_message_size;
827        let approximate_max_message_size = self.approximate_max_message_size();
828        let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
829
830        let ack_subject = format!("stream-response-ack.{}", Ulid::new());
831        let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
832            Ok(ack_subscription) => ack_subscription,
833            Err(error) => {
834                warn!(
835                    %response_subject,
836                    %error,
837                    request_type = %type_name::<Request>(),
838                    response_type = %type_name::<Request::Response>(),
839                    "Failed to subscribe to ack subject"
840                );
841                return;
842            }
843        };
844        debug!(
845            %response_subject,
846            request_type = %type_name::<Request>(),
847            response_type = %type_name::<Request::Response>(),
848            ?ack_subscription,
849            "Ack subscription subscription"
850        );
851        let mut index = 0;
852        // Initialize buffer that will be reused for responses
853        let mut buffer = VecDeque::with_capacity(max_responses_per_message);
854        buffer.push_back(first_element);
855        let mut overflow_buffer = VecDeque::new();
856
857        loop {
858            // Try to fill the buffer
859            if buffer.is_empty() {
860                if let Some(element) = response_stream.next().await {
861                    buffer.push_back(element);
862                }
863            }
864            while buffer.encoded_size() < approximate_max_message_size
865                && let Some(element) = response_stream.next().now_or_never().flatten()
866            {
867                buffer.push_back(element);
868            }
869
870            loop {
871                let is_done = response_stream.is_done() && overflow_buffer.is_empty();
872                let num_messages = buffer.len();
873                let response = if is_done {
874                    Response::<Request>::Last {
875                        index,
876                        responses: buffer,
877                    }
878                } else {
879                    Response::<Request>::Continue {
880                        index,
881                        responses: buffer,
882                        ack_subject: ack_subject.clone(),
883                    }
884                };
885                let encoded_response = response.encode();
886                let encoded_response_len = encoded_response.len();
887                // When encoded response is too large, remove one of the responses from it and try
888                // again
889                if encoded_response_len > max_message_size {
890                    buffer = response.into();
891                    if let Some(element) = buffer.pop_back() {
892                        if buffer.is_empty() {
893                            error!(
894                                ?element,
895                                encoded_response_len,
896                                max_message_size,
897                                "Element was too large to fit into NATS message, this is an \
898                                implementation bug"
899                            );
900                        }
901                        overflow_buffer.push_front(element);
902                        continue;
903                    } else {
904                        error!(
905                            %response_subject,
906                            request_type = %type_name::<Request>(),
907                            response_type = %type_name::<Request::Response>(),
908                            "Empty response overflown message size, this should never happen"
909                        );
910                        return;
911                    }
912                }
913
914                debug!(
915                    %response_subject,
916                    num_messages,
917                    %index,
918                    %is_done,
919                    "Publishing stream response messages",
920                );
921
922                if let Err(error) = self
923                    .publish(response_subject.clone(), encoded_response.into())
924                    .await
925                {
926                    warn!(
927                        %response_subject,
928                        %error,
929                        request_type = %type_name::<Request>(),
930                        response_type = %type_name::<Request::Response>(),
931                        "Failed to send stream response"
932                    );
933                    return;
934                }
935
936                if is_done {
937                    return;
938                } else {
939                    buffer = response.into();
940                    buffer.clear();
941                    // Fill buffer with any overflown responses that may have been stored
942                    buffer.extend(overflow_buffer.drain(..));
943                }
944
945                if index >= 1 {
946                    // Acknowledgements are received with delay
947                    let expected_index = index - 1;
948
949                    trace!(
950                        %response_subject,
951                        %expected_index,
952                        "Waiting for acknowledgement"
953                    );
954                    match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
955                        .await
956                    {
957                        Ok(Some(message)) => {
958                            if let Some(received_index) = message
959                                .payload
960                                .split_at_checked(mem::size_of::<u32>())
961                                .map(|(bytes, _)| {
962                                    u32::from_le_bytes(
963                                        bytes.try_into().expect("Correctly chunked slice; qed"),
964                                    )
965                                })
966                            {
967                                debug!(
968                                    %response_subject,
969                                    %received_index,
970                                    "Received acknowledgement"
971                                );
972                                if received_index != expected_index {
973                                    warn!(
974                                        %response_subject,
975                                        %received_index,
976                                        %expected_index,
977                                        request_type = %type_name::<Request>(),
978                                        response_type = %type_name::<Request::Response>(),
979                                        message = %hex::encode(message.payload),
980                                        "Unexpected acknowledgement index"
981                                    );
982                                    return;
983                                }
984                            } else {
985                                warn!(
986                                    %response_subject,
987                                    request_type = %type_name::<Request>(),
988                                    response_type = %type_name::<Request::Response>(),
989                                    message = %hex::encode(message.payload),
990                                    "Unexpected acknowledgement message"
991                                );
992                                return;
993                            }
994                        }
995                        Ok(None) => {
996                            warn!(
997                                %response_subject,
998                                request_type = %type_name::<Request>(),
999                                response_type = %type_name::<Request::Response>(),
1000                                "Acknowledgement stream ended unexpectedly"
1001                            );
1002                            return;
1003                        }
1004                        Err(_error) => {
1005                            warn!(
1006                                %response_subject,
1007                                %expected_index,
1008                                request_type = %type_name::<Request>(),
1009                                response_type = %type_name::<Request::Response>(),
1010                                "Acknowledgement wait timed out"
1011                            );
1012                            return;
1013                        }
1014                    }
1015                }
1016
1017                index += 1;
1018
1019                // Unless `overflow_buffer` wasn't empty abort inner loop
1020                if buffer.is_empty() {
1021                    break;
1022                }
1023            }
1024        }
1025    }
1026
1027    /// Make notification without waiting for response
1028    pub async fn notification<Notification>(
1029        &self,
1030        notification: &Notification,
1031        instance: Option<&str>,
1032    ) -> Result<(), PublishError>
1033    where
1034        Notification: GenericNotification,
1035    {
1036        self.inner
1037            .client
1038            .publish(
1039                subject_with_instance(Notification::SUBJECT, instance),
1040                notification.encode().into(),
1041            )
1042            .await
1043    }
1044
1045    /// Send a broadcast message
1046    pub async fn broadcast<Broadcast>(
1047        &self,
1048        message: &Broadcast,
1049        instance: &str,
1050    ) -> Result<(), PublishError>
1051    where
1052        Broadcast: GenericBroadcast,
1053    {
1054        self.inner
1055            .client
1056            .publish_with_headers(
1057                Broadcast::SUBJECT.replace('*', instance),
1058                {
1059                    let mut headers = HeaderMap::new();
1060                    if let Some(message_id) = message.deterministic_message_id() {
1061                        headers.insert("Nats-Msg-Id", message_id);
1062                    }
1063                    headers
1064                },
1065                message.encode().into(),
1066            )
1067            .await
1068    }
1069
1070    /// Simple subscription that will produce decoded notifications, while skipping messages that
1071    /// fail to decode
1072    pub async fn subscribe_to_notifications<Notification>(
1073        &self,
1074        instance: Option<&str>,
1075        queue_group: Option<String>,
1076    ) -> Result<SubscriberWrapper<Notification>, SubscribeError>
1077    where
1078        Notification: GenericNotification,
1079    {
1080        self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
1081            .await
1082    }
1083
1084    /// Simple subscription that will produce decoded broadcasts, while skipping messages that
1085    /// fail to decode
1086    pub async fn subscribe_to_broadcasts<Broadcast>(
1087        &self,
1088        instance: Option<&str>,
1089        queue_group: Option<String>,
1090    ) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
1091    where
1092        Broadcast: GenericBroadcast,
1093    {
1094        self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
1095            .await
1096    }
1097
1098    /// Simple subscription that will produce decoded messages, while skipping messages that fail to
1099    /// decode
1100    async fn simple_subscribe<Message>(
1101        &self,
1102        subject: &'static str,
1103        instance: Option<&str>,
1104        queue_group: Option<String>,
1105    ) -> Result<SubscriberWrapper<Message>, SubscribeError>
1106    where
1107        Message: Decode,
1108    {
1109        let subscriber = self
1110            .common_subscribe(subject, instance, queue_group)
1111            .await?;
1112        debug!(
1113            %subject,
1114            message_type = %type_name::<Message>(),
1115            ?subscriber,
1116            "Simple subscription"
1117        );
1118
1119        Ok(SubscriberWrapper {
1120            subscriber,
1121            _phantom: PhantomData,
1122        })
1123    }
1124
1125    /// Simple subscription that will produce decoded messages, while skipping messages that fail to
1126    /// decode
1127    async fn common_subscribe(
1128        &self,
1129        subject: &'static str,
1130        instance: Option<&str>,
1131        queue_group: Option<String>,
1132    ) -> Result<Subscriber, SubscribeError> {
1133        let subscriber = if let Some(queue_group) = queue_group {
1134            self.inner
1135                .client
1136                .queue_subscribe(subject_with_instance(subject, instance), queue_group)
1137                .await?
1138        } else {
1139            self.inner
1140                .client
1141                .subscribe(subject_with_instance(subject, instance))
1142                .await?
1143        };
1144
1145        Ok(subscriber)
1146    }
1147}
1148
1149fn subject_with_instance(subject: &'static str, instance: Option<&str>) -> Subject {
1150    if let Some(instance) = instance {
1151        Subject::from(subject.replace('*', instance))
1152    } else {
1153        Subject::from_static(subject)
1154    }
1155}