1use crate::utils::AsyncJoinOnDrop;
17use anyhow::anyhow;
18use async_nats::{
19 Client, ConnectOptions, HeaderMap, HeaderValue, Message, PublishError, RequestError,
20 RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs,
21};
22use backoff::backoff::Backoff;
23use backoff::ExponentialBackoff;
24use futures::channel::mpsc;
25use futures::stream::FuturesUnordered;
26use futures::{select, FutureExt, Stream, StreamExt};
27use parity_scale_codec::{Decode, Encode};
28use std::any::type_name;
29use std::collections::VecDeque;
30use std::future::Future;
31use std::marker::PhantomData;
32use std::ops::Deref;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::task::{Context, Poll};
36use std::time::Duration;
37use std::{fmt, mem};
38use thiserror::Error;
39use tracing::{debug, error, trace, warn, Instrument};
40use ulid::Ulid;
41
42const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
43const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1);
44const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);
47
48pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
54 const SUBJECT: &'static str;
56 type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
58}
59
60pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
67 const SUBJECT: &'static str;
69 type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
75}
76
77#[derive(Debug, Encode, Decode)]
81enum GenericStreamResponses<Response> {
82 Continue {
84 index: u32,
86 responses: VecDeque<Response>,
88 ack_subject: String,
91 },
92 Last {
94 index: u32,
96 responses: VecDeque<Response>,
98 },
99}
100
101impl<Response> From<GenericStreamResponses<Response>> for VecDeque<Response> {
102 #[inline]
103 fn from(value: GenericStreamResponses<Response>) -> Self {
104 match value {
105 GenericStreamResponses::Continue { responses, .. } => responses,
106 GenericStreamResponses::Last { responses, .. } => responses,
107 }
108 }
109}
110
111impl<Response> GenericStreamResponses<Response> {
112 fn next(&mut self) -> Option<Response> {
113 match self {
114 GenericStreamResponses::Continue { responses, .. } => responses.pop_front(),
115 GenericStreamResponses::Last { responses, .. } => responses.pop_front(),
116 }
117 }
118
119 fn index(&self) -> u32 {
120 match self {
121 GenericStreamResponses::Continue { index, .. } => *index,
122 GenericStreamResponses::Last { index, .. } => *index,
123 }
124 }
125
126 fn ack_subject(&self) -> Option<&str> {
127 if let GenericStreamResponses::Continue { ack_subject, .. } = self {
128 Some(ack_subject)
129 } else {
130 None
131 }
132 }
133
134 fn is_last(&self) -> bool {
135 matches!(self, Self::Last { .. })
136 }
137}
138
139#[derive(Debug, Error)]
141pub enum StreamRequestError {
142 #[error("Subscribe error: {0}")]
144 Subscribe(#[from] SubscribeError),
145 #[error("Publish error: {0}")]
147 Publish(#[from] PublishError),
148}
149
150#[derive(Debug)]
153#[pin_project::pin_project]
154pub struct StreamResponseSubscriber<Response> {
155 #[pin]
156 subscriber: Subscriber,
157 response_subject: String,
158 buffered_responses: Option<GenericStreamResponses<Response>>,
159 next_index: u32,
160 acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
161 _background_task: AsyncJoinOnDrop<()>,
162 _phantom: PhantomData<Response>,
163}
164
165impl<Response> Stream for StreamResponseSubscriber<Response>
166where
167 Response: Decode,
168{
169 type Item = Response;
170
171 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 if let Some(buffered_responses) = self.buffered_responses.as_mut() {
173 if let Some(response) = buffered_responses.next() {
174 return Poll::Ready(Some(response));
175 } else if buffered_responses.is_last() {
176 return Poll::Ready(None);
177 }
178
179 self.buffered_responses.take();
180 self.next_index += 1;
181 }
182
183 let mut projected = self.project();
184 match projected.subscriber.poll_next_unpin(cx) {
185 Poll::Ready(Some(message)) => {
186 match GenericStreamResponses::<Response>::decode(&mut message.payload.as_ref()) {
187 Ok(mut responses) => {
188 if responses.index() != *projected.next_index {
189 warn!(
190 actual_index = %responses.index(),
191 expected_index = %*projected.next_index,
192 message_type = %type_name::<Response>(),
193 response_subject = %projected.response_subject,
194 "Received unexpected response stream index, aborting stream"
195 );
196
197 return Poll::Ready(None);
198 }
199
200 if let Some(ack_subject) = responses.ack_subject() {
201 let index = responses.index();
202 let ack_subject = ack_subject.to_string();
203
204 if let Err(error) = projected
205 .acknowledgement_sender
206 .unbounded_send((ack_subject.clone(), index))
207 {
208 warn!(
209 %error,
210 %index,
211 message_type = %type_name::<Response>(),
212 response_subject = %projected.response_subject,
213 %ack_subject,
214 "Failed to send acknowledgement for stream response"
215 );
216 }
217 }
218
219 if let Some(response) = responses.next() {
220 *projected.buffered_responses = Some(responses);
221 Poll::Ready(Some(response))
222 } else {
223 Poll::Ready(None)
224 }
225 }
226 Err(error) => {
227 warn!(
228 %error,
229 response_type = %type_name::<Response>(),
230 response_subject = %projected.response_subject,
231 message = %hex::encode(message.payload),
232 "Failed to decode stream response"
233 );
234
235 Poll::Ready(None)
236 }
237 }
238 }
239 Poll::Ready(None) => Poll::Ready(None),
240 Poll::Pending => Poll::Pending,
241 }
242 }
243}
244
245impl<Response> StreamResponseSubscriber<Response> {
246 fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
247 let (acknowledgement_sender, mut acknowledgement_receiver) =
248 mpsc::unbounded::<(String, u32)>();
249
250 let ack_publisher_fut = {
251 let response_subject = response_subject.clone();
252
253 async move {
254 while let Some((subject, index)) = acknowledgement_receiver.next().await {
255 trace!(
256 %subject,
257 %index,
258 %response_subject,
259 %index,
260 "Sending stream response acknowledgement"
261 );
262 if let Err(error) = nats_client
263 .publish(subject.clone(), index.to_le_bytes().to_vec().into())
264 .await
265 {
266 warn!(
267 %error,
268 %subject,
269 %index,
270 %response_subject,
271 %index,
272 "Failed to send stream response acknowledgement"
273 );
274 return;
275 }
276 }
277 }
278 };
279 let background_task =
280 AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);
281
282 Self {
283 response_subject,
284 subscriber,
285 buffered_responses: None,
286 next_index: 0,
287 acknowledgement_sender,
288 _background_task: background_task,
289 _phantom: PhantomData,
290 }
291 }
292}
293
294pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static {
296 const SUBJECT: &'static str;
299}
300
301pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static {
307 const SUBJECT: &'static str;
309
310 fn deterministic_message_id(&self) -> Option<HeaderValue> {
313 None
314 }
315}
316
317#[derive(Debug)]
319#[pin_project::pin_project]
320pub struct SubscriberWrapper<Message> {
321 #[pin]
322 subscriber: Subscriber,
323 _phantom: PhantomData<Message>,
324}
325
326impl<Message> Stream for SubscriberWrapper<Message>
327where
328 Message: Decode,
329{
330 type Item = Message;
331
332 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333 match self.project().subscriber.poll_next_unpin(cx) {
334 Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) {
335 Ok(message) => Poll::Ready(Some(message)),
336 Err(error) => {
337 warn!(
338 %error,
339 message_type = %type_name::<Message>(),
340 message = %hex::encode(message.payload),
341 "Failed to decode stream message"
342 );
343
344 Poll::Pending
345 }
346 },
347 Poll::Ready(None) => Poll::Ready(None),
348 Poll::Pending => Poll::Pending,
349 }
350 }
351}
352
353#[derive(Debug)]
354struct Inner {
355 client: Client,
356 request_retry_backoff_policy: ExponentialBackoff,
357 approximate_max_message_size: usize,
358 max_message_size: usize,
359}
360
361#[derive(Debug, Clone)]
363pub struct NatsClient {
364 inner: Arc<Inner>,
365}
366
367impl Deref for NatsClient {
368 type Target = Client;
369
370 #[inline]
371 fn deref(&self) -> &Self::Target {
372 &self.inner.client
373 }
374}
375
376impl NatsClient {
377 pub async fn new<A: ToServerAddrs>(
379 addrs: A,
380 request_retry_backoff_policy: ExponentialBackoff,
381 ) -> Result<Self, async_nats::Error> {
382 let servers = addrs.to_server_addrs()?.collect::<Vec<_>>();
383 Self::from_client(
384 async_nats::connect_with_options(
385 &servers,
386 ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)),
387 )
388 .await?,
389 request_retry_backoff_policy,
390 )
391 }
392
393 pub fn from_client(
395 client: Client,
396 request_retry_backoff_policy: ExponentialBackoff,
397 ) -> Result<Self, async_nats::Error> {
398 let max_payload = client.server_info().max_payload;
399 if max_payload < EXPECTED_MESSAGE_SIZE {
400 return Err(format!(
401 "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
402 increase it by specifying max_payload = 2MB or higher number in NATS configuration"
403 )
404 .into());
405 }
406
407 let inner = Inner {
408 client,
409 request_retry_backoff_policy,
410 approximate_max_message_size: max_payload * 9 / 10,
412 max_message_size: max_payload,
414 };
415
416 Ok(Self {
417 inner: Arc::new(inner),
418 })
419 }
420
421 pub fn approximate_max_message_size(&self) -> usize {
424 self.inner.approximate_max_message_size
425 }
426
427 pub async fn request<Request>(
429 &self,
430 request: &Request,
431 instance: Option<&str>,
432 ) -> Result<Request::Response, RequestError>
433 where
434 Request: GenericRequest,
435 {
436 let subject = subject_with_instance(Request::SUBJECT, instance);
437 let mut maybe_retry_backoff = None;
438 let message = loop {
439 match self
440 .inner
441 .client
442 .request(subject.clone(), request.encode().into())
443 .await
444 {
445 Ok(message) => {
446 break message;
447 }
448 Err(error) => {
449 match error.kind() {
450 RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
451 }
453 RequestErrorKind::Other => {
454 return Err(error);
455 }
456 }
457
458 let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| {
459 let mut retry_backoff = self.inner.request_retry_backoff_policy.clone();
460 retry_backoff.reset();
461 retry_backoff
462 });
463
464 if let Some(delay) = retry_backoff.next_backoff() {
465 debug!(
466 %subject,
467 %error,
468 request_type = %type_name::<Request>(),
469 ?delay,
470 "Failed to make request, retrying after some delay"
471 );
472
473 tokio::time::sleep(delay).await;
474 continue;
475 } else {
476 return Err(error);
477 }
478 }
479 }
480 };
481
482 let response =
483 Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
484 warn!(
485 %subject,
486 %error,
487 response_type = %type_name::<Request::Response>(),
488 response = %hex::encode(message.payload),
489 "Response decoding failed"
490 );
491
492 RequestErrorKind::Other
493 })?;
494
495 Ok(response)
496 }
497
498 pub async fn request_responder<Request, F, OP>(
513 &self,
514 instance: Option<&str>,
515 queue_group: Option<String>,
516 process: OP,
517 ) -> anyhow::Result<()>
518 where
519 Request: GenericRequest,
520 F: Future<Output = Option<Request::Response>> + Send,
521 OP: Fn(Request) -> F + Send + Sync,
522 {
523 let mut processing = FuturesUnordered::new();
525
526 let subscription = self
527 .common_subscribe(Request::SUBJECT, instance, queue_group)
528 .await
529 .map_err(|error| {
530 anyhow!(
531 "Failed to subscribe to {} requests for {instance:?}: {error}",
532 type_name::<Request>(),
533 )
534 })?;
535
536 debug!(
537 request_type = %type_name::<Request>(),
538 ?subscription,
539 "Requests subscription"
540 );
541 let mut subscription = subscription.fuse();
542
543 loop {
544 select! {
545 message = subscription.select_next_some() => {
546 processing.push(
548 self
549 .process_request(
550 message,
551 &process,
552 )
553 .in_current_span(),
554 );
555 },
556 _ = processing.next() => {
557 },
559 complete => {
560 break;
561 }
562 }
563 }
564
565 Ok(())
566 }
567
568 async fn process_request<Request, F, OP>(&self, message: Message, process: OP)
569 where
570 Request: GenericRequest,
571 F: Future<Output = Option<Request::Response>> + Send,
572 OP: Fn(Request) -> F + Send + Sync,
573 {
574 let Some(reply_subject) = message.reply else {
575 return;
576 };
577
578 let message_payload_size = message.payload.len();
579 let request = match Request::decode(&mut message.payload.as_ref()) {
580 Ok(request) => {
581 drop(message.payload);
583 request
584 }
585 Err(error) => {
586 warn!(
587 request_type = %type_name::<Request>(),
588 %error,
589 message = %hex::encode(message.payload),
590 "Failed to decode request"
591 );
592 return;
593 }
594 };
595
596 if message_payload_size > 1024 {
598 trace!(
599 request_type = %type_name::<Request>(),
600 %reply_subject,
601 "Processing request"
602 );
603 } else {
604 trace!(
605 request_type = %type_name::<Request>(),
606 ?request,
607 %reply_subject,
608 "Processing request"
609 );
610 }
611
612 if let Some(response) = process(request).await
613 && let Err(error) = self.publish(reply_subject, response.encode().into()).await
614 {
615 warn!(
616 request_type = %type_name::<Request>(),
617 %error,
618 "Failed to send response"
619 );
620 }
621 }
622
623 pub async fn stream_request<Request>(
625 &self,
626 request: &Request,
627 instance: Option<&str>,
628 ) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>
629 where
630 Request: GenericStreamRequest,
631 {
632 let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
633 let stream_response_subject = format!("stream-response.{}", Ulid::new());
634
635 let subscriber = self
636 .inner
637 .client
638 .subscribe(stream_response_subject.clone())
639 .await?;
640
641 debug!(
642 request_type = %type_name::<Request>(),
643 %stream_request_subject,
644 %stream_response_subject,
645 ?subscriber,
646 "Stream request subscription"
647 );
648
649 self.inner
650 .client
651 .publish_with_reply(
652 stream_request_subject,
653 stream_response_subject.clone(),
654 request.encode().into(),
655 )
656 .await?;
657
658 Ok(StreamResponseSubscriber::new(
659 subscriber,
660 stream_response_subject,
661 self.clone(),
662 ))
663 }
664
665 pub async fn stream_request_responder<Request, F, S, OP>(
680 &self,
681 instance: Option<&str>,
682 queue_group: Option<String>,
683 process: OP,
684 ) -> anyhow::Result<()>
685 where
686 Request: GenericStreamRequest,
687 F: Future<Output = Option<S>> + Send,
688 S: Stream<Item = Request::Response> + Unpin,
689 OP: Fn(Request) -> F + Send + Sync,
690 {
691 let mut processing = FuturesUnordered::new();
693
694 let subscription = self
695 .common_subscribe(Request::SUBJECT, instance, queue_group)
696 .await
697 .map_err(|error| {
698 anyhow!(
699 "Failed to subscribe to {} stream requests for {instance:?}: {error}",
700 type_name::<Request>(),
701 )
702 })?;
703
704 debug!(
705 request_type = %type_name::<Request>(),
706 ?subscription,
707 "Stream requests subscription"
708 );
709 let mut subscription = subscription.fuse();
710
711 loop {
712 select! {
713 message = subscription.select_next_some() => {
714 processing.push(
716 self
717 .process_stream_request(
718 message,
719 &process,
720 )
721 .in_current_span(),
722 );
723 },
724 _ = processing.next() => {
725 },
727 complete => {
728 break;
729 }
730 }
731 }
732
733 Ok(())
734 }
735
736 async fn process_stream_request<Request, F, S, OP>(&self, message: Message, process: OP)
737 where
738 Request: GenericStreamRequest,
739 F: Future<Output = Option<S>> + Send,
740 S: Stream<Item = Request::Response> + Unpin,
741 OP: Fn(Request) -> F + Send + Sync,
742 {
743 let Some(reply_subject) = message.reply else {
744 return;
745 };
746
747 let message_payload_size = message.payload.len();
748 let request = match Request::decode(&mut message.payload.as_ref()) {
749 Ok(request) => {
750 drop(message.payload);
752 request
753 }
754 Err(error) => {
755 warn!(
756 request_type = %type_name::<Request>(),
757 %error,
758 message = %hex::encode(message.payload),
759 "Failed to decode request"
760 );
761 return;
762 }
763 };
764
765 if message_payload_size > 1024 {
767 trace!(
768 request_type = %type_name::<Request>(),
769 %reply_subject,
770 "Processing request"
771 );
772 } else {
773 trace!(
774 request_type = %type_name::<Request>(),
775 ?request,
776 %reply_subject,
777 "Processing request"
778 );
779 }
780
781 if let Some(stream) = process(request).await {
782 self.stream_response::<Request, _>(reply_subject, stream)
783 .await;
784 }
785 }
786
787 async fn stream_response<Request, S>(&self, response_subject: Subject, response_stream: S)
789 where
790 Request: GenericStreamRequest,
791 S: Stream<Item = Request::Response> + Unpin,
792 {
793 type Response<Request> =
794 GenericStreamResponses<<Request as GenericStreamRequest>::Response>;
795
796 let mut response_stream = response_stream.fuse();
797
798 let first_element = match response_stream.next().await {
800 Some(first_element) => first_element,
801 None => {
802 if let Err(error) = self
803 .publish(
804 response_subject.clone(),
805 Response::<Request>::Last {
806 index: 0,
807 responses: VecDeque::new(),
808 }
809 .encode()
810 .into(),
811 )
812 .await
813 {
814 warn!(
815 %response_subject,
816 %error,
817 request_type = %type_name::<Request>(),
818 response_type = %type_name::<Request::Response>(),
819 "Failed to send stream response"
820 );
821 }
822
823 return;
824 }
825 };
826 let max_message_size = self.inner.max_message_size;
827 let approximate_max_message_size = self.approximate_max_message_size();
828 let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
829
830 let ack_subject = format!("stream-response-ack.{}", Ulid::new());
831 let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
832 Ok(ack_subscription) => ack_subscription,
833 Err(error) => {
834 warn!(
835 %response_subject,
836 %error,
837 request_type = %type_name::<Request>(),
838 response_type = %type_name::<Request::Response>(),
839 "Failed to subscribe to ack subject"
840 );
841 return;
842 }
843 };
844 debug!(
845 %response_subject,
846 request_type = %type_name::<Request>(),
847 response_type = %type_name::<Request::Response>(),
848 ?ack_subscription,
849 "Ack subscription subscription"
850 );
851 let mut index = 0;
852 let mut buffer = VecDeque::with_capacity(max_responses_per_message);
854 buffer.push_back(first_element);
855 let mut overflow_buffer = VecDeque::new();
856
857 loop {
858 if buffer.is_empty() {
860 if let Some(element) = response_stream.next().await {
861 buffer.push_back(element);
862 }
863 }
864 while buffer.encoded_size() < approximate_max_message_size
865 && let Some(element) = response_stream.next().now_or_never().flatten()
866 {
867 buffer.push_back(element);
868 }
869
870 loop {
871 let is_done = response_stream.is_done() && overflow_buffer.is_empty();
872 let num_messages = buffer.len();
873 let response = if is_done {
874 Response::<Request>::Last {
875 index,
876 responses: buffer,
877 }
878 } else {
879 Response::<Request>::Continue {
880 index,
881 responses: buffer,
882 ack_subject: ack_subject.clone(),
883 }
884 };
885 let encoded_response = response.encode();
886 let encoded_response_len = encoded_response.len();
887 if encoded_response_len > max_message_size {
890 buffer = response.into();
891 if let Some(element) = buffer.pop_back() {
892 if buffer.is_empty() {
893 error!(
894 ?element,
895 encoded_response_len,
896 max_message_size,
897 "Element was too large to fit into NATS message, this is an \
898 implementation bug"
899 );
900 }
901 overflow_buffer.push_front(element);
902 continue;
903 } else {
904 error!(
905 %response_subject,
906 request_type = %type_name::<Request>(),
907 response_type = %type_name::<Request::Response>(),
908 "Empty response overflown message size, this should never happen"
909 );
910 return;
911 }
912 }
913
914 debug!(
915 %response_subject,
916 num_messages,
917 %index,
918 %is_done,
919 "Publishing stream response messages",
920 );
921
922 if let Err(error) = self
923 .publish(response_subject.clone(), encoded_response.into())
924 .await
925 {
926 warn!(
927 %response_subject,
928 %error,
929 request_type = %type_name::<Request>(),
930 response_type = %type_name::<Request::Response>(),
931 "Failed to send stream response"
932 );
933 return;
934 }
935
936 if is_done {
937 return;
938 } else {
939 buffer = response.into();
940 buffer.clear();
941 buffer.extend(overflow_buffer.drain(..));
943 }
944
945 if index >= 1 {
946 let expected_index = index - 1;
948
949 trace!(
950 %response_subject,
951 %expected_index,
952 "Waiting for acknowledgement"
953 );
954 match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
955 .await
956 {
957 Ok(Some(message)) => {
958 if let Some(received_index) = message
959 .payload
960 .split_at_checked(mem::size_of::<u32>())
961 .map(|(bytes, _)| {
962 u32::from_le_bytes(
963 bytes.try_into().expect("Correctly chunked slice; qed"),
964 )
965 })
966 {
967 debug!(
968 %response_subject,
969 %received_index,
970 "Received acknowledgement"
971 );
972 if received_index != expected_index {
973 warn!(
974 %response_subject,
975 %received_index,
976 %expected_index,
977 request_type = %type_name::<Request>(),
978 response_type = %type_name::<Request::Response>(),
979 message = %hex::encode(message.payload),
980 "Unexpected acknowledgement index"
981 );
982 return;
983 }
984 } else {
985 warn!(
986 %response_subject,
987 request_type = %type_name::<Request>(),
988 response_type = %type_name::<Request::Response>(),
989 message = %hex::encode(message.payload),
990 "Unexpected acknowledgement message"
991 );
992 return;
993 }
994 }
995 Ok(None) => {
996 warn!(
997 %response_subject,
998 request_type = %type_name::<Request>(),
999 response_type = %type_name::<Request::Response>(),
1000 "Acknowledgement stream ended unexpectedly"
1001 );
1002 return;
1003 }
1004 Err(_error) => {
1005 warn!(
1006 %response_subject,
1007 %expected_index,
1008 request_type = %type_name::<Request>(),
1009 response_type = %type_name::<Request::Response>(),
1010 "Acknowledgement wait timed out"
1011 );
1012 return;
1013 }
1014 }
1015 }
1016
1017 index += 1;
1018
1019 if buffer.is_empty() {
1021 break;
1022 }
1023 }
1024 }
1025 }
1026
1027 pub async fn notification<Notification>(
1029 &self,
1030 notification: &Notification,
1031 instance: Option<&str>,
1032 ) -> Result<(), PublishError>
1033 where
1034 Notification: GenericNotification,
1035 {
1036 self.inner
1037 .client
1038 .publish(
1039 subject_with_instance(Notification::SUBJECT, instance),
1040 notification.encode().into(),
1041 )
1042 .await
1043 }
1044
1045 pub async fn broadcast<Broadcast>(
1047 &self,
1048 message: &Broadcast,
1049 instance: &str,
1050 ) -> Result<(), PublishError>
1051 where
1052 Broadcast: GenericBroadcast,
1053 {
1054 self.inner
1055 .client
1056 .publish_with_headers(
1057 Broadcast::SUBJECT.replace('*', instance),
1058 {
1059 let mut headers = HeaderMap::new();
1060 if let Some(message_id) = message.deterministic_message_id() {
1061 headers.insert("Nats-Msg-Id", message_id);
1062 }
1063 headers
1064 },
1065 message.encode().into(),
1066 )
1067 .await
1068 }
1069
1070 pub async fn subscribe_to_notifications<Notification>(
1073 &self,
1074 instance: Option<&str>,
1075 queue_group: Option<String>,
1076 ) -> Result<SubscriberWrapper<Notification>, SubscribeError>
1077 where
1078 Notification: GenericNotification,
1079 {
1080 self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
1081 .await
1082 }
1083
1084 pub async fn subscribe_to_broadcasts<Broadcast>(
1087 &self,
1088 instance: Option<&str>,
1089 queue_group: Option<String>,
1090 ) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
1091 where
1092 Broadcast: GenericBroadcast,
1093 {
1094 self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
1095 .await
1096 }
1097
1098 async fn simple_subscribe<Message>(
1101 &self,
1102 subject: &'static str,
1103 instance: Option<&str>,
1104 queue_group: Option<String>,
1105 ) -> Result<SubscriberWrapper<Message>, SubscribeError>
1106 where
1107 Message: Decode,
1108 {
1109 let subscriber = self
1110 .common_subscribe(subject, instance, queue_group)
1111 .await?;
1112 debug!(
1113 %subject,
1114 message_type = %type_name::<Message>(),
1115 ?subscriber,
1116 "Simple subscription"
1117 );
1118
1119 Ok(SubscriberWrapper {
1120 subscriber,
1121 _phantom: PhantomData,
1122 })
1123 }
1124
1125 async fn common_subscribe(
1128 &self,
1129 subject: &'static str,
1130 instance: Option<&str>,
1131 queue_group: Option<String>,
1132 ) -> Result<Subscriber, SubscribeError> {
1133 let subscriber = if let Some(queue_group) = queue_group {
1134 self.inner
1135 .client
1136 .queue_subscribe(subject_with_instance(subject, instance), queue_group)
1137 .await?
1138 } else {
1139 self.inner
1140 .client
1141 .subscribe(subject_with_instance(subject, instance))
1142 .await?
1143 };
1144
1145 Ok(subscriber)
1146 }
1147}
1148
1149fn subject_with_instance(subject: &'static str, instance: Option<&str>) -> Subject {
1150 if let Some(instance) = instance {
1151 Subject::from(subject.replace('*', instance))
1152 } else {
1153 Subject::from_static(subject)
1154 }
1155}