pub struct NatsClient { /* private fields */ }
Expand description
NATS client wrapper that can be used to interact with other Subspace-specific clients
Implementations§
Source§impl NatsClient
impl NatsClient
Sourcepub async fn new<A: ToServerAddrs>(
addrs: A,
request_retry_backoff_policy: ExponentialBackoff,
) -> Result<Self, Error>
pub async fn new<A: ToServerAddrs>( addrs: A, request_retry_backoff_policy: ExponentialBackoff, ) -> Result<Self, Error>
Create new instance by connecting to specified addresses
Sourcepub fn from_client(
client: Client,
request_retry_backoff_policy: ExponentialBackoff,
) -> Result<Self, Error>
pub fn from_client( client: Client, request_retry_backoff_policy: ExponentialBackoff, ) -> Result<Self, Error>
Create new client from existing NATS instance
Sourcepub fn approximate_max_message_size(&self) -> usize
pub fn approximate_max_message_size(&self) -> usize
Approximate max message size (a few more bytes will not hurt), the actual limit is expected to be a bit higher
Sourcepub async fn request<Request>(
&self,
request: &Request,
instance: Option<&str>,
) -> Result<Request::Response, RequestError>where
Request: GenericRequest,
pub async fn request<Request>(
&self,
request: &Request,
instance: Option<&str>,
) -> Result<Request::Response, RequestError>where
Request: GenericRequest,
Make request and wait for response
Sourcepub async fn request_responder<Request, F, OP>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
process: OP,
) -> Result<()>
pub async fn request_responder<Request, F, OP>( &self, instance: Option<&str>, queue_group: Option<String>, process: OP, ) -> Result<()>
Responds to requests from the given subject using the provided processing function.
This will create a subscription on the subject for the given instance (if provided) and
queue group. Incoming messages will be deserialized as the request type Request
and passed
to the process
function to produce a response of type Request::Response
. The response
will then be sent back on the reply subject from the original request.
Each request is processed in a newly created async tokio task.
§Arguments
instance
- Optional instance name to use in place of the*
in the subjectgroup
- The queue group name for the subscriptionprocess
- The function to call with the decoded request to produce a response
Sourcepub async fn stream_request<Request>(
&self,
request: &Request,
instance: Option<&str>,
) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>where
Request: GenericStreamRequest,
pub async fn stream_request<Request>(
&self,
request: &Request,
instance: Option<&str>,
) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>where
Request: GenericStreamRequest,
Make request that expects stream response
Sourcepub async fn stream_request_responder<Request, F, S, OP>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
process: OP,
) -> Result<()>
pub async fn stream_request_responder<Request, F, S, OP>( &self, instance: Option<&str>, queue_group: Option<String>, process: OP, ) -> Result<()>
Responds to stream requests from the given subject using the provided processing function.
This will create a subscription on the subject for the given instance (if provided) and
queue group. Incoming messages will be deserialized as the request type Request
and passed
to the process
function to produce a stream response of type Request::Response
. The
stream response will then be sent back on the reply subject from the original request.
Each request is processed in a newly created async tokio task.
§Arguments
instance
- Optional instance name to use in place of the*
in the subjectgroup
- The queue group name for the subscriptionprocess
- The function to call with the decoded request to produce a response
Sourcepub async fn notification<Notification>(
&self,
notification: &Notification,
instance: Option<&str>,
) -> Result<(), PublishError>where
Notification: GenericNotification,
pub async fn notification<Notification>(
&self,
notification: &Notification,
instance: Option<&str>,
) -> Result<(), PublishError>where
Notification: GenericNotification,
Make notification without waiting for response
Sourcepub async fn broadcast<Broadcast>(
&self,
message: &Broadcast,
instance: &str,
) -> Result<(), PublishError>where
Broadcast: GenericBroadcast,
pub async fn broadcast<Broadcast>(
&self,
message: &Broadcast,
instance: &str,
) -> Result<(), PublishError>where
Broadcast: GenericBroadcast,
Send a broadcast message
Sourcepub async fn subscribe_to_notifications<Notification>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Notification>, SubscribeError>where
Notification: GenericNotification,
pub async fn subscribe_to_notifications<Notification>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Notification>, SubscribeError>where
Notification: GenericNotification,
Simple subscription that will produce decoded notifications, while skipping messages that fail to decode
Sourcepub async fn subscribe_to_broadcasts<Broadcast>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>where
Broadcast: GenericBroadcast,
pub async fn subscribe_to_broadcasts<Broadcast>(
&self,
instance: Option<&str>,
queue_group: Option<String>,
) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>where
Broadcast: GenericBroadcast,
Simple subscription that will produce decoded broadcasts, while skipping messages that fail to decode
Methods from Deref<Target = Client>§
pub fn server_info(&self) -> ServerInfo
pub fn server_info(&self) -> ServerInfo
Returns last received info from the server.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
println!("info: {:?}", client.server_info());
pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool
pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool
Returns true if the server version is compatible with the version components.
This has to be used with caution, as it is not guaranteed that the server that client is connected to is the same version that the one that is a JetStream meta/stream/consumer leader, especially across leafnodes.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
assert!(client.is_server_compatible(2, 8, 4));
pub async fn publish<S>(
&self,
subject: S,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
pub async fn publish<S>(
&self,
subject: S,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
Publish a [Message] to a given subject.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
client.publish("events.data", "payload".into()).await?;
pub async fn publish_with_headers<S>(
&self,
subject: S,
headers: HeaderMap,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
pub async fn publish_with_headers<S>(
&self,
subject: S,
headers: HeaderMap,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
Publish a [Message] with headers to a given subject.
§Examples
use std::str::FromStr;
let client = async_nats::connect("demo.nats.io").await?;
let mut headers = async_nats::HeaderMap::new();
headers.insert(
"X-Header",
async_nats::HeaderValue::from_str("Value").unwrap(),
);
client
.publish_with_headers("events.data", headers, "payload".into())
.await?;
pub async fn publish_with_reply<S, R>(
&self,
subject: S,
reply: R,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
R: ToSubject,
pub async fn publish_with_reply<S, R>(
&self,
subject: S,
reply: R,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
R: ToSubject,
Publish a [Message] to a given subject, with specified response subject to which the subscriber can respond. This method does not await for the response.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
client
.publish_with_reply("events.data", "reply_subject", "payload".into())
.await?;
pub async fn publish_with_reply_and_headers<S, R>(
&self,
subject: S,
reply: R,
headers: HeaderMap,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
R: ToSubject,
pub async fn publish_with_reply_and_headers<S, R>(
&self,
subject: S,
reply: R,
headers: HeaderMap,
payload: Bytes,
) -> Result<(), Error<PublishErrorKind>>where
S: ToSubject,
R: ToSubject,
Publish a [Message] to a given subject with headers and specified response subject to which the subscriber can respond. This method does not await for the response.
§Examples
use std::str::FromStr;
let client = async_nats::connect("demo.nats.io").await?;
let mut headers = async_nats::HeaderMap::new();
client
.publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
.await?;
pub async fn request<S>(
&self,
subject: S,
payload: Bytes,
) -> Result<Message, Error<RequestErrorKind>>where
S: ToSubject,
pub async fn request<S>(
&self,
subject: S,
payload: Bytes,
) -> Result<Message, Error<RequestErrorKind>>where
S: ToSubject,
Sends the request with headers.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let response = client.request("service", "data".into()).await?;
pub async fn request_with_headers<S>(
&self,
subject: S,
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error<RequestErrorKind>>where
S: ToSubject,
pub async fn request_with_headers<S>(
&self,
subject: S,
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error<RequestErrorKind>>where
S: ToSubject,
Sends the request with headers.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let mut headers = async_nats::HeaderMap::new();
headers.insert("Key", "Value");
let response = client
.request_with_headers("service", headers, "data".into())
.await?;
pub async fn send_request<S>(
&self,
subject: S,
request: Request,
) -> Result<Message, Error<RequestErrorKind>>where
S: ToSubject,
pub async fn send_request<S>(
&self,
subject: S,
request: Request,
) -> Result<Message, Error<RequestErrorKind>>where
S: ToSubject,
Sends the request created by the [Request].
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let request = async_nats::Request::new().payload("data".into());
let response = client.send_request("service", request).await?;
pub fn new_inbox(&self) -> String
pub fn new_inbox(&self) -> String
Create a new globally unique inbox which can be used for replies.
§Examples
let reply = nc.new_inbox();
let rsub = nc.subscribe(reply).await?;
pub async fn subscribe<S>(
&self,
subject: S,
) -> Result<Subscriber, SubscribeError>where
S: ToSubject,
pub async fn subscribe<S>(
&self,
subject: S,
) -> Result<Subscriber, SubscribeError>where
S: ToSubject,
Subscribes to a subject to receive [messages][Message].
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io").await?;
let mut subscription = client.subscribe("events.>").await?;
while let Some(message) = subscription.next().await {
println!("received message: {:?}", message);
}
pub async fn queue_subscribe<S>(
&self,
subject: S,
queue_group: String,
) -> Result<Subscriber, SubscribeError>where
S: ToSubject,
pub async fn queue_subscribe<S>(
&self,
subject: S,
queue_group: String,
) -> Result<Subscriber, SubscribeError>where
S: ToSubject,
Subscribes to a subject with a queue group to receive [messages][Message].
§Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io").await?;
let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
while let Some(message) = subscription.next().await {
println!("received message: {:?}", message);
}
pub async fn flush(&self) -> Result<(), Error<FlushErrorKind>>
pub async fn flush(&self) -> Result<(), Error<FlushErrorKind>>
Flushes the internal buffer ensuring that all messages are sent.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
client.flush().await?;
pub fn connection_state(&self) -> State
pub fn connection_state(&self) -> State
Returns the current state of the connection.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
println!("connection state: {}", client.connection_state());
pub async fn force_reconnect(&self) -> Result<(), ReconnectError>
pub async fn force_reconnect(&self) -> Result<(), ReconnectError>
Forces the client to reconnect.
Keep in mind that client will reconnect automatically if the connection is lost and this
method does not have to be used in normal circumstances.
However, if you want to force the client to reconnect, for example to re-trigger
the auth-callback
, or manually rebalance connections, this method can be useful.
This method does not wait for connection to be re-established.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
client.force_reconnect().await?;
pub fn statistics(&self) -> Arc<Statistics>
pub fn statistics(&self) -> Arc<Statistics>
Returns struct representing statistics of the whole lifecycle of the client. This includes number of bytes sent/received, number of messages sent/received, and number of times the connection was established. As this returns Arc with [AtomicU64] fields, it can be safely reused and shared across threads.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let statistics = client.statistics();
println!("client statistics: {:#?}", statistics);
Trait Implementations§
Source§impl Clone for NatsClient
impl Clone for NatsClient
Source§fn clone(&self) -> NatsClient
fn clone(&self) -> NatsClient
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreSource§impl Debug for NatsClient
impl Debug for NatsClient
Auto Trait Implementations§
impl Freeze for NatsClient
impl !RefUnwindSafe for NatsClient
impl Send for NatsClient
impl Sync for NatsClient
impl Unpin for NatsClient
impl !UnwindSafe for NatsClient
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.