pallet_messenger/
messages.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::pallet::{
5    ChainAllowlist, InboxResponseMessageWeightTags, OutboxMessageCount, OutboxMessageWeightTags,
6    UpdatedChannels,
7};
8use crate::{
9    BalanceOf, ChannelId, ChannelState, Channels, CloseChannelBy, Config, Error, Event,
10    InboxResponses, Nonce, Outbox, OutboxMessageResult, Pallet,
11};
12#[cfg(not(feature = "std"))]
13use alloc::boxed::Box;
14#[cfg(not(feature = "std"))]
15use alloc::collections::BTreeMap;
16use frame_support::ensure;
17use sp_messenger::endpoint::{EndpointHandler, EndpointRequest, EndpointResponse};
18use sp_messenger::messages::{
19    BlockMessagesQuery, ChainId, ChannelOpenParamsV1, Message, MessageId, MessageKey,
20    MessageNonceWithStorageKey, MessageWeightTag, MessagesWithStorageKey, PayloadV1,
21    ProtocolMessageRequest, ProtocolMessageResponse, RequestResponse, VersionedPayload,
22};
23
24use sp_messenger::MAX_FUTURE_ALLOWED_NONCES;
25use sp_runtime::traits::{Get, One};
26use sp_runtime::{ArithmeticError, DispatchError, DispatchResult};
27#[cfg(feature = "std")]
28use std::collections::BTreeMap;
29
30impl<T: Config> Pallet<T> {
31    /// Takes a new message destined for dst_chain and adds the message to the outbox.
32    pub(crate) fn new_outbox_message(
33        src_chain_id: ChainId,
34        dst_chain_id: ChainId,
35        channel_id: ChannelId,
36        payload: VersionedPayload<BalanceOf<T>>,
37    ) -> Result<Nonce, DispatchError> {
38        // ensure message is not meant to self.
39        ensure!(
40            src_chain_id != dst_chain_id,
41            Error::<T>::InvalidMessageDestination
42        );
43
44        Channels::<T>::try_mutate(
45            dst_chain_id,
46            channel_id,
47            |maybe_channel| -> Result<Nonce, DispatchError> {
48                let channel = maybe_channel.as_mut().ok_or(Error::<T>::MissingChannel)?;
49                // check if the outbox is full
50                let count = OutboxMessageCount::<T>::get((dst_chain_id, channel_id));
51                ensure!(
52                    count < channel.max_outgoing_messages,
53                    Error::<T>::OutboxFull
54                );
55
56                let weight_tag = MessageWeightTag::outbox(&payload);
57
58                let next_outbox_nonce = channel.next_outbox_nonce;
59                // add message to outbox
60                let msg = Message {
61                    src_chain_id,
62                    dst_chain_id,
63                    channel_id,
64                    nonce: next_outbox_nonce,
65                    payload,
66                    last_delivered_message_response_nonce: channel
67                        .latest_response_received_message_nonce,
68                };
69                Outbox::<T>::insert((dst_chain_id, channel_id, next_outbox_nonce), msg);
70                OutboxMessageCount::<T>::try_mutate(
71                    (dst_chain_id, channel_id),
72                    |count| -> Result<(), DispatchError> {
73                        *count = count
74                            .checked_add(1u32)
75                            .ok_or(Error::<T>::MessageCountOverflow)?;
76                        Ok(())
77                    },
78                )?;
79
80                // update channel state
81                channel.next_outbox_nonce = next_outbox_nonce
82                    .checked_add(Nonce::one())
83                    .ok_or(DispatchError::Arithmetic(ArithmeticError::Overflow))?;
84
85                OutboxMessageWeightTags::<T>::insert(
86                    (dst_chain_id, (channel_id, next_outbox_nonce)),
87                    weight_tag,
88                );
89
90                // emit event to notify relayer
91                Self::deposit_event(Event::OutboxMessage {
92                    chain_id: dst_chain_id,
93                    channel_id,
94                    nonce: next_outbox_nonce,
95                });
96                Ok(next_outbox_nonce)
97            },
98        )
99    }
100
101    /// Process the incoming messages from given chain_id and channel_id.
102    pub(crate) fn process_inbox_messages(
103        msg: Message<BalanceOf<T>>,
104        msg_weight_tag: MessageWeightTag,
105    ) -> DispatchResult {
106        let (dst_chain_id, channel_id, nonce) = (msg.src_chain_id, msg.channel_id, msg.nonce);
107
108        let resp_payload = VersionedPayload::V1(match msg.payload {
109            // process incoming protocol message.
110            VersionedPayload::V1(PayloadV1::Protocol(RequestResponse::Request(req))) => {
111                PayloadV1::Protocol(RequestResponse::Response(
112                    Self::process_incoming_protocol_message_req(
113                        dst_chain_id,
114                        channel_id,
115                        req,
116                        &msg_weight_tag,
117                    ),
118                ))
119            }
120
121            // process incoming endpoint message.
122            VersionedPayload::V1(PayloadV1::Endpoint(RequestResponse::Request(req))) => {
123                // Firstly, store fees for inbox message execution regardless what the execution result is,
124                // since the fee is already charged from the sender of the src chain and processing of the
125                // XDM in this end is finished.
126
127                // since v1 collects fee on behalf of dst_chain, this chain,
128                // so we do not recalculate the fee but instead use the collected fee as is
129                Self::store_inbox_fee(
130                    dst_chain_id,
131                    (channel_id, nonce),
132                    req.collected_fee.dst_chain_fee,
133                )?;
134
135                let response = if let Some(endpoint_handler) =
136                    T::get_endpoint_handler(&req.req.dst_endpoint)
137                {
138                    Self::process_incoming_endpoint_message_req(
139                        dst_chain_id,
140                        req.req,
141                        channel_id,
142                        nonce,
143                        &msg_weight_tag,
144                        endpoint_handler,
145                    )
146                } else {
147                    Err(Error::<T>::NoMessageHandler.into())
148                };
149
150                PayloadV1::Endpoint(RequestResponse::Response(response))
151            }
152
153            // return error for all the remaining branches
154            VersionedPayload::V1(PayloadV1::Protocol(_)) => PayloadV1::Protocol(
155                RequestResponse::Response(Err(Error::<T>::InvalidMessagePayload.into())),
156            ),
157            VersionedPayload::V1(PayloadV1::Endpoint(_)) => PayloadV1::Endpoint(
158                RequestResponse::Response(Err(Error::<T>::InvalidMessagePayload.into())),
159            ),
160        });
161
162        let weight_tag = MessageWeightTag::inbox_response(msg_weight_tag, &resp_payload);
163
164        InboxResponses::<T>::insert(
165            (dst_chain_id, channel_id, nonce),
166            Message {
167                src_chain_id: T::SelfChainId::get(),
168                dst_chain_id,
169                channel_id,
170                nonce,
171                payload: resp_payload,
172                // this nonce is not considered in response context.
173                last_delivered_message_response_nonce: None,
174            },
175        );
176
177        InboxResponseMessageWeightTags::<T>::insert(
178            (dst_chain_id, (channel_id, nonce)),
179            weight_tag,
180        );
181
182        UpdatedChannels::<T>::mutate(|updated_channels| {
183            updated_channels.insert((dst_chain_id, channel_id))
184        });
185
186        // reward relayers for relaying message responses to src_chain.
187        // clean any delivered inbox responses
188        Self::reward_operators_for_inbox_execution(
189            dst_chain_id,
190            channel_id,
191            msg.last_delivered_message_response_nonce,
192        )?;
193
194        Self::deposit_event(Event::InboxMessageResponse {
195            chain_id: dst_chain_id,
196            channel_id,
197            nonce,
198        });
199
200        Ok(())
201    }
202
203    fn process_incoming_endpoint_message_req(
204        dst_chain_id: ChainId,
205        req: EndpointRequest,
206        channel_id: ChannelId,
207        nonce: Nonce,
208        msg_weight_tag: &MessageWeightTag,
209        endpoint_handler: Box<dyn sp_messenger::endpoint::EndpointHandler<MessageId>>,
210    ) -> EndpointResponse {
211        let dst_endpoint = req.dst_endpoint.clone();
212        let pre_check_handler = || {
213            ensure!(
214                msg_weight_tag == &MessageWeightTag::EndpointRequest(dst_endpoint),
215                Error::<T>::WeightTagNotMatch
216            );
217
218            let channel =
219                Channels::<T>::get(dst_chain_id, channel_id).ok_or(Error::<T>::MissingChannel)?;
220            ensure!(
221                channel.state == ChannelState::Open,
222                Error::<T>::InvalidChannelState
223            );
224
225            ensure!(
226                ChainAllowlist::<T>::get().contains(&dst_chain_id),
227                Error::<T>::ChainNotAllowed
228            );
229
230            Ok(())
231        };
232
233        endpoint_handler.message(dst_chain_id, (channel_id, nonce), req, pre_check_handler())
234    }
235
236    fn process_incoming_protocol_message_req(
237        chain_id: ChainId,
238        channel_id: ChannelId,
239        req: ProtocolMessageRequest<ChannelOpenParamsV1>,
240        weight_tag: &MessageWeightTag,
241    ) -> Result<(), DispatchError> {
242        let is_chain_allowed = ChainAllowlist::<T>::get().contains(&chain_id);
243        match req {
244            ProtocolMessageRequest::ChannelOpen(_) => {
245                if !is_chain_allowed {
246                    return Err(Error::<T>::ChainNotAllowed.into());
247                }
248
249                if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
250                    return Err(Error::<T>::WeightTagNotMatch.into());
251                }
252                Self::do_open_channel(chain_id, channel_id)
253            }
254            ProtocolMessageRequest::ChannelClose => {
255                if weight_tag != &MessageWeightTag::ProtocolChannelClose {
256                    return Err(Error::<T>::WeightTagNotMatch.into());
257                }
258                // closing of this channel is coming from the other chain
259                // so safe to close it as Sudo here
260                Self::do_close_channel(chain_id, channel_id, CloseChannelBy::Sudo)
261            }
262        }
263    }
264
265    fn process_incoming_protocol_message_response(
266        chain_id: ChainId,
267        channel_id: ChannelId,
268        req: ProtocolMessageRequest<ChannelOpenParamsV1>,
269        resp: ProtocolMessageResponse,
270        weight_tag: &MessageWeightTag,
271    ) -> DispatchResult {
272        match (req, resp) {
273            // channel open request is accepted by dst_chain.
274            // open channel on our end.
275            (ProtocolMessageRequest::ChannelOpen(_), Ok(_)) => {
276                if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
277                    return Err(Error::<T>::WeightTagNotMatch.into());
278                }
279                Self::do_open_channel(chain_id, channel_id)
280            }
281
282            // for rest of the branches we dont care about the outcome and return Ok
283            // for channel close request, we do not care about the response as channel is already closed.
284            // for channel open request and request is rejected, channel is left in init state and no new messages are accepted.
285            _ => Ok(()),
286        }
287    }
288
289    fn process_incoming_endpoint_message_response(
290        dst_chain_id: ChainId,
291        channel_id: ChannelId,
292        nonce: Nonce,
293        resp_msg_weight_tag: &MessageWeightTag,
294        req: EndpointRequest,
295        resp: EndpointResponse,
296        endpoint_handler: Box<dyn EndpointHandler<MessageId>>,
297    ) -> DispatchResult {
298        if resp_msg_weight_tag != &MessageWeightTag::EndpointResponse(req.dst_endpoint.clone()) {
299            return Err(Error::<T>::WeightTagNotMatch.into());
300        }
301
302        endpoint_handler.message_response(dst_chain_id, (channel_id, nonce), req, resp)
303    }
304
305    pub(crate) fn process_outbox_message_responses(
306        resp_msg: Message<BalanceOf<T>>,
307        resp_msg_weight_tag: MessageWeightTag,
308    ) -> DispatchResult {
309        let (dst_chain_id, channel_id, nonce) =
310            (resp_msg.src_chain_id, resp_msg.channel_id, resp_msg.nonce);
311
312        // fetch original request
313        let req_msg = Outbox::<T>::take((dst_chain_id, channel_id, nonce))
314            .ok_or(Error::<T>::MissingMessage)?;
315
316        OutboxMessageCount::<T>::try_mutate(
317            (dst_chain_id, channel_id),
318            |count| -> Result<(), DispatchError> {
319                *count = count
320                    .checked_sub(1u32)
321                    .ok_or(Error::<T>::MessageCountUnderflow)?;
322                Ok(())
323            },
324        )?;
325
326        // clear outbox message weight tag
327        OutboxMessageWeightTags::<T>::remove((dst_chain_id, (channel_id, nonce)));
328
329        let resp = match (req_msg.payload, resp_msg.payload) {
330            // process incoming protocol outbox message response.
331            (
332                VersionedPayload::V1(PayloadV1::Protocol(RequestResponse::Request(req))),
333                VersionedPayload::V1(PayloadV1::Protocol(RequestResponse::Response(resp))),
334            ) => Self::process_incoming_protocol_message_response(
335                dst_chain_id,
336                channel_id,
337                req,
338                resp,
339                &resp_msg_weight_tag,
340            ),
341
342            // process incoming endpoint outbox message response.
343            (
344                VersionedPayload::V1(PayloadV1::Endpoint(RequestResponse::Request(req))),
345                VersionedPayload::V1(PayloadV1::Endpoint(RequestResponse::Response(resp))),
346            ) => {
347                // Firstly, distribute the fees for outbox message execution regardless what the result is,
348                // since the fee is already charged from the sender and the processing of the XDM is finished.
349                Self::reward_operators_for_outbox_execution(dst_chain_id, (channel_id, nonce))?;
350
351                if let Some(endpoint_handler) = T::get_endpoint_handler(&req.req.dst_endpoint) {
352                    Self::process_incoming_endpoint_message_response(
353                        dst_chain_id,
354                        channel_id,
355                        nonce,
356                        &resp_msg_weight_tag,
357                        req.req,
358                        resp,
359                        endpoint_handler,
360                    )
361                } else {
362                    Err(Error::<T>::NoMessageHandler.into())
363                }
364            }
365
366            (_, _) => Err(Error::<T>::InvalidMessagePayload.into()),
367        };
368
369        UpdatedChannels::<T>::mutate(|updated_channels| {
370            updated_channels.insert((dst_chain_id, channel_id))
371        });
372
373        // deposit event notifying the message status.
374        match resp {
375            Ok(_) => Self::deposit_event(Event::OutboxMessageResult {
376                chain_id: dst_chain_id,
377                channel_id,
378                nonce,
379                result: OutboxMessageResult::Ok,
380            }),
381            Err(err) => Self::deposit_event(Event::OutboxMessageResult {
382                chain_id: dst_chain_id,
383                channel_id,
384                nonce,
385                result: OutboxMessageResult::Err(err),
386            }),
387        }
388
389        Ok(())
390    }
391
392    pub fn get_block_messages(query: BlockMessagesQuery) -> MessagesWithStorageKey {
393        let BlockMessagesQuery {
394            chain_id,
395            channel_id,
396            outbox_from,
397            inbox_responses_from,
398        } = query;
399
400        let inbox_responses_weight_tags = Self::get_weight_tags(
401            (chain_id, channel_id, inbox_responses_from),
402            InboxResponseMessageWeightTags::<T>::get,
403        );
404
405        let outbox_weight_tags = Self::get_weight_tags(
406            (chain_id, channel_id, outbox_from),
407            OutboxMessageWeightTags::<T>::get,
408        );
409
410        if outbox_weight_tags.is_empty() && inbox_responses_weight_tags.is_empty() {
411            return Default::default();
412        }
413
414        let mut messages_with_storage_key = MessagesWithStorageKey::default();
415
416        // create storage keys for inbox responses
417        inbox_responses_weight_tags
418            .into_iter()
419            .for_each(|(nonce, weight_tag)| {
420                let storage_key =
421                    InboxResponses::<T>::hashed_key_for((chain_id, channel_id, nonce));
422                messages_with_storage_key
423                    .inbox_responses
424                    .push(MessageNonceWithStorageKey {
425                        nonce,
426                        storage_key,
427                        weight_tag,
428                    });
429            });
430
431        // create storage keys for outbox
432        outbox_weight_tags
433            .into_iter()
434            .for_each(|(nonce, weight_tag)| {
435                let storage_key = Outbox::<T>::hashed_key_for((chain_id, channel_id, nonce));
436                messages_with_storage_key
437                    .outbox
438                    .push(MessageNonceWithStorageKey {
439                        nonce,
440                        storage_key,
441                        weight_tag,
442                    })
443            });
444
445        messages_with_storage_key
446    }
447
448    fn get_weight_tags<WTG>(
449        from: MessageKey,
450        weight_tag_getter: WTG,
451    ) -> BTreeMap<Nonce, MessageWeightTag>
452    where
453        WTG: Fn((ChainId, MessageId)) -> Option<MessageWeightTag>,
454    {
455        let (chain_id, channel_id, mut nonce) = from;
456        let mut weight_tags = BTreeMap::new();
457        while weight_tags.len() as u32 <= MAX_FUTURE_ALLOWED_NONCES {
458            match weight_tag_getter((chain_id, (channel_id, nonce))) {
459                // if the nonce is already processed, short circuit and return
460                None => return weight_tags,
461                Some(weight_tag) => {
462                    weight_tags.insert(nonce, weight_tag);
463                }
464            };
465
466            nonce = match nonce.checked_add(One::one()) {
467                None => return weight_tags,
468                Some(nonce) => nonce,
469            }
470        }
471
472        weight_tags
473    }
474}