pallet_messenger/
messages.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::pallet::{
5    ChainAllowlist, InboxResponseMessageWeightTags, OutboxMessageCount, OutboxMessageWeightTags,
6    UpdatedChannels,
7};
8use crate::{
9    BalanceOf, ChannelId, ChannelState, Channels, CloseChannelBy, Config, Error, Event,
10    InboxResponses, Nonce, Outbox, OutboxMessageResult, Pallet,
11};
12#[cfg(not(feature = "std"))]
13use alloc::boxed::Box;
14#[cfg(not(feature = "std"))]
15use alloc::collections::BTreeMap;
16use frame_support::ensure;
17use sp_messenger::endpoint::{EndpointHandler, EndpointRequest, EndpointResponse};
18use sp_messenger::messages::{
19    BlockMessageWithStorageKey, BlockMessagesWithStorageKey, ChainId, ChannelOpenParams,
20    ConvertedPayload, Message, MessageId, MessageWeightTag, Payload, ProtocolMessageRequest,
21    ProtocolMessageResponse, RequestResponse, VersionedPayload,
22};
23use sp_runtime::traits::Get;
24use sp_runtime::{ArithmeticError, DispatchError, DispatchResult};
25#[cfg(feature = "std")]
26use std::collections::BTreeMap;
27
28impl<T: Config> Pallet<T> {
29    /// Takes a new message destined for dst_chain and adds the message to the outbox.
30    pub(crate) fn new_outbox_message(
31        src_chain_id: ChainId,
32        dst_chain_id: ChainId,
33        channel_id: ChannelId,
34        payload: VersionedPayload<BalanceOf<T>>,
35    ) -> Result<Nonce, DispatchError> {
36        // ensure message is not meant to self.
37        ensure!(
38            src_chain_id != dst_chain_id,
39            Error::<T>::InvalidMessageDestination
40        );
41
42        Channels::<T>::try_mutate(
43            dst_chain_id,
44            channel_id,
45            |maybe_channel| -> Result<Nonce, DispatchError> {
46                let channel = maybe_channel.as_mut().ok_or(Error::<T>::MissingChannel)?;
47                // check if the outbox is full
48                let count = OutboxMessageCount::<T>::get((dst_chain_id, channel_id));
49                ensure!(
50                    count < channel.max_outgoing_messages,
51                    Error::<T>::OutboxFull
52                );
53
54                let weight_tag = MessageWeightTag::outbox(&payload);
55
56                let next_outbox_nonce = channel.next_outbox_nonce;
57                // add message to outbox
58                let msg = Message {
59                    src_chain_id,
60                    dst_chain_id,
61                    channel_id,
62                    nonce: next_outbox_nonce,
63                    payload,
64                    last_delivered_message_response_nonce: channel
65                        .latest_response_received_message_nonce,
66                };
67                Outbox::<T>::insert((dst_chain_id, channel_id, next_outbox_nonce), msg);
68                OutboxMessageCount::<T>::try_mutate(
69                    (dst_chain_id, channel_id),
70                    |count| -> Result<(), DispatchError> {
71                        *count = count
72                            .checked_add(1u32)
73                            .ok_or(Error::<T>::MessageCountOverflow)?;
74                        Ok(())
75                    },
76                )?;
77
78                // update channel state
79                channel.next_outbox_nonce = next_outbox_nonce
80                    .checked_add(Nonce::one())
81                    .ok_or(DispatchError::Arithmetic(ArithmeticError::Overflow))?;
82
83                OutboxMessageWeightTags::<T>::insert(
84                    (dst_chain_id, (channel_id, next_outbox_nonce)),
85                    weight_tag,
86                );
87
88                // emit event to notify relayer
89                Self::deposit_event(Event::OutboxMessage {
90                    chain_id: dst_chain_id,
91                    channel_id,
92                    nonce: next_outbox_nonce,
93                });
94                Ok(next_outbox_nonce)
95            },
96        )
97    }
98
99    /// Process the incoming messages from given chain_id and channel_id.
100    pub(crate) fn process_inbox_messages(
101        msg: Message<BalanceOf<T>>,
102        msg_weight_tag: MessageWeightTag,
103    ) -> DispatchResult {
104        let (dst_chain_id, channel_id, nonce) = (msg.src_chain_id, msg.channel_id, msg.nonce);
105        let channel =
106            Channels::<T>::get(dst_chain_id, channel_id).ok_or(Error::<T>::MissingChannel)?;
107
108        let maybe_collected_fee = msg.payload.maybe_collected_fee();
109        let ConvertedPayload { payload, is_v1 } = msg.payload.into_payload_v0();
110        let response = match payload {
111            // process incoming protocol message.
112            Payload::Protocol(RequestResponse::Request(req)) => Payload::Protocol(
113                RequestResponse::Response(Self::process_incoming_protocol_message_req(
114                    dst_chain_id,
115                    channel_id,
116                    req,
117                    &msg_weight_tag,
118                )),
119            ),
120
121            // process incoming endpoint message.
122            Payload::Endpoint(RequestResponse::Request(req)) => {
123                // Firstly, store fees for inbox message execution regardless what the execution result is,
124                // since the fee is already charged from the sender of the src chain and processing of the
125                // XDM in this end is finished.
126                if let Some(collected_fee) = maybe_collected_fee {
127                    // since v1 collects fee on behalf of dst_chain, this chain,
128                    // so we do not recalculate the fee but instead use the collected fee as is
129                    Self::store_inbox_fee(
130                        dst_chain_id,
131                        (channel_id, nonce),
132                        collected_fee.dst_chain_fee,
133                    )?;
134                } else {
135                    // for v0, use the weight to fee conversion to calculate the fee
136                    // and store the fee
137                    Self::store_fees_for_inbox_message(
138                        (dst_chain_id, (channel_id, nonce)),
139                        &channel.fee,
140                        &req.src_endpoint,
141                    );
142                }
143
144                let response =
145                    if let Some(endpoint_handler) = T::get_endpoint_handler(&req.dst_endpoint) {
146                        Self::process_incoming_endpoint_message_req(
147                            dst_chain_id,
148                            req,
149                            channel_id,
150                            nonce,
151                            &msg_weight_tag,
152                            endpoint_handler,
153                        )
154                    } else {
155                        Err(Error::<T>::NoMessageHandler.into())
156                    };
157
158                Payload::Endpoint(RequestResponse::Response(response))
159            }
160
161            // return error for all the remaining branches
162            Payload::Protocol(_) => Payload::Protocol(RequestResponse::Response(Err(
163                Error::<T>::InvalidMessagePayload.into(),
164            ))),
165            Payload::Endpoint(_) => Payload::Endpoint(RequestResponse::Response(Err(
166                Error::<T>::InvalidMessagePayload.into(),
167            ))),
168        };
169
170        let resp_payload = if is_v1 {
171            VersionedPayload::V1(response.into())
172        } else {
173            VersionedPayload::V0(response)
174        };
175
176        let weight_tag = MessageWeightTag::inbox_response(msg_weight_tag, &resp_payload);
177
178        InboxResponses::<T>::insert(
179            (dst_chain_id, channel_id, nonce),
180            Message {
181                src_chain_id: T::SelfChainId::get(),
182                dst_chain_id,
183                channel_id,
184                nonce,
185                payload: resp_payload,
186                // this nonce is not considered in response context.
187                last_delivered_message_response_nonce: None,
188            },
189        );
190
191        InboxResponseMessageWeightTags::<T>::insert(
192            (dst_chain_id, (channel_id, nonce)),
193            weight_tag,
194        );
195
196        UpdatedChannels::<T>::mutate(|updated_channels| {
197            updated_channels.insert((dst_chain_id, channel_id))
198        });
199
200        // reward relayers for relaying message responses to src_chain.
201        // clean any delivered inbox responses
202        Self::reward_operators_for_inbox_execution(
203            dst_chain_id,
204            channel_id,
205            msg.last_delivered_message_response_nonce,
206        )?;
207
208        Self::deposit_event(Event::InboxMessageResponse {
209            chain_id: dst_chain_id,
210            channel_id,
211            nonce,
212        });
213
214        Ok(())
215    }
216
217    fn process_incoming_endpoint_message_req(
218        dst_chain_id: ChainId,
219        req: EndpointRequest,
220        channel_id: ChannelId,
221        nonce: Nonce,
222        msg_weight_tag: &MessageWeightTag,
223        endpoint_handler: Box<dyn sp_messenger::endpoint::EndpointHandler<MessageId>>,
224    ) -> EndpointResponse {
225        if msg_weight_tag != &MessageWeightTag::EndpointRequest(req.dst_endpoint.clone()) {
226            return Err(Error::<T>::WeightTagNotMatch.into());
227        }
228
229        let channel =
230            Channels::<T>::get(dst_chain_id, channel_id).ok_or(Error::<T>::MissingChannel)?;
231        if channel.state != ChannelState::Open {
232            return Err(Error::<T>::InvalidChannelState.into());
233        }
234
235        let pre_check_response = if !ChainAllowlist::<T>::get().contains(&dst_chain_id) {
236            Err(Error::<T>::ChainNotAllowed.into())
237        } else {
238            Ok(())
239        };
240
241        endpoint_handler.message(dst_chain_id, (channel_id, nonce), req, pre_check_response)
242    }
243
244    fn process_incoming_protocol_message_req(
245        chain_id: ChainId,
246        channel_id: ChannelId,
247        req: ProtocolMessageRequest<ChannelOpenParams<BalanceOf<T>>>,
248        weight_tag: &MessageWeightTag,
249    ) -> Result<(), DispatchError> {
250        let is_chain_allowed = ChainAllowlist::<T>::get().contains(&chain_id);
251        match req {
252            ProtocolMessageRequest::ChannelOpen(_) => {
253                if !is_chain_allowed {
254                    return Err(Error::<T>::ChainNotAllowed.into());
255                }
256
257                if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
258                    return Err(Error::<T>::WeightTagNotMatch.into());
259                }
260                Self::do_open_channel(chain_id, channel_id)
261            }
262            ProtocolMessageRequest::ChannelClose => {
263                if weight_tag != &MessageWeightTag::ProtocolChannelClose {
264                    return Err(Error::<T>::WeightTagNotMatch.into());
265                }
266                // closing of this channel is coming from the other chain
267                // so safe to close it as Sudo here
268                Self::do_close_channel(chain_id, channel_id, CloseChannelBy::Sudo)
269            }
270        }
271    }
272
273    fn process_incoming_protocol_message_response(
274        chain_id: ChainId,
275        channel_id: ChannelId,
276        req: ProtocolMessageRequest<ChannelOpenParams<BalanceOf<T>>>,
277        resp: ProtocolMessageResponse,
278        weight_tag: &MessageWeightTag,
279    ) -> DispatchResult {
280        match (req, resp) {
281            // channel open request is accepted by dst_chain.
282            // open channel on our end.
283            (ProtocolMessageRequest::ChannelOpen(_), Ok(_)) => {
284                if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
285                    return Err(Error::<T>::WeightTagNotMatch.into());
286                }
287                Self::do_open_channel(chain_id, channel_id)
288            }
289
290            // for rest of the branches we dont care about the outcome and return Ok
291            // for channel close request, we do not care about the response as channel is already closed.
292            // for channel open request and request is rejected, channel is left in init state and no new messages are accepted.
293            _ => Ok(()),
294        }
295    }
296
297    fn process_incoming_endpoint_message_response(
298        dst_chain_id: ChainId,
299        channel_id: ChannelId,
300        nonce: Nonce,
301        resp_msg_weight_tag: &MessageWeightTag,
302        req: EndpointRequest,
303        resp: EndpointResponse,
304        endpoint_handler: Box<dyn EndpointHandler<MessageId>>,
305    ) -> DispatchResult {
306        if resp_msg_weight_tag != &MessageWeightTag::EndpointResponse(req.dst_endpoint.clone()) {
307            return Err(Error::<T>::WeightTagNotMatch.into());
308        }
309
310        endpoint_handler.message_response(dst_chain_id, (channel_id, nonce), req, resp)
311    }
312
313    pub(crate) fn process_outbox_message_responses(
314        resp_msg: Message<BalanceOf<T>>,
315        resp_msg_weight_tag: MessageWeightTag,
316    ) -> DispatchResult {
317        let (dst_chain_id, channel_id, nonce) =
318            (resp_msg.src_chain_id, resp_msg.channel_id, resp_msg.nonce);
319
320        // fetch original request
321        let req_msg = Outbox::<T>::take((dst_chain_id, channel_id, nonce))
322            .ok_or(Error::<T>::MissingMessage)?;
323
324        OutboxMessageCount::<T>::try_mutate(
325            (dst_chain_id, channel_id),
326            |count| -> Result<(), DispatchError> {
327                *count = count
328                    .checked_sub(1u32)
329                    .ok_or(Error::<T>::MessageCountUnderflow)?;
330                Ok(())
331            },
332        )?;
333
334        // clear outbox message weight tag
335        OutboxMessageWeightTags::<T>::remove((dst_chain_id, (channel_id, nonce)));
336
337        let ConvertedPayload {
338            payload: req,
339            is_v1: is_v1_req,
340        } = req_msg.payload.into_payload_v0();
341        let ConvertedPayload {
342            payload: resp,
343            is_v1: is_v1_resp,
344        } = resp_msg.payload.into_payload_v0();
345
346        ensure!(is_v1_req == is_v1_resp, Error::<T>::MessageVersionMismatch);
347
348        let resp = match (req, resp) {
349            // process incoming protocol outbox message response.
350            (
351                Payload::Protocol(RequestResponse::Request(req)),
352                Payload::Protocol(RequestResponse::Response(resp)),
353            ) => Self::process_incoming_protocol_message_response(
354                dst_chain_id,
355                channel_id,
356                req,
357                resp,
358                &resp_msg_weight_tag,
359            ),
360
361            // process incoming endpoint outbox message response.
362            (
363                Payload::Endpoint(RequestResponse::Request(req)),
364                Payload::Endpoint(RequestResponse::Response(resp)),
365            ) => {
366                // Firstly, distribute the fees for outbox message execution regardless what the result is,
367                // since the fee is already charged from the sender and the processing of the XDM is finished.
368                Self::reward_operators_for_outbox_execution(dst_chain_id, (channel_id, nonce))?;
369
370                if let Some(endpoint_handler) = T::get_endpoint_handler(&req.dst_endpoint) {
371                    Self::process_incoming_endpoint_message_response(
372                        dst_chain_id,
373                        channel_id,
374                        nonce,
375                        &resp_msg_weight_tag,
376                        req,
377                        resp,
378                        endpoint_handler,
379                    )
380                } else {
381                    Err(Error::<T>::NoMessageHandler.into())
382                }
383            }
384
385            (_, _) => Err(Error::<T>::InvalidMessagePayload.into()),
386        };
387
388        UpdatedChannels::<T>::mutate(|updated_channels| {
389            updated_channels.insert((dst_chain_id, channel_id))
390        });
391
392        // deposit event notifying the message status.
393        match resp {
394            Ok(_) => Self::deposit_event(Event::OutboxMessageResult {
395                chain_id: dst_chain_id,
396                channel_id,
397                nonce,
398                result: OutboxMessageResult::Ok,
399            }),
400            Err(err) => Self::deposit_event(Event::OutboxMessageResult {
401                chain_id: dst_chain_id,
402                channel_id,
403                nonce,
404                result: OutboxMessageResult::Err(err),
405            }),
406        }
407
408        Ok(())
409    }
410
411    pub fn get_block_messages() -> BlockMessagesWithStorageKey {
412        let inbox_responses_weight_tags: BTreeMap<(ChainId, MessageId), MessageWeightTag> =
413            InboxResponseMessageWeightTags::<T>::iter().collect();
414        let outbox_weight_tags: BTreeMap<(ChainId, MessageId), MessageWeightTag> =
415            OutboxMessageWeightTags::<T>::iter().collect();
416
417        if outbox_weight_tags.is_empty() && inbox_responses_weight_tags.is_empty() {
418            return Default::default();
419        }
420
421        let mut messages_with_storage_key = BlockMessagesWithStorageKey::default();
422
423        // create storage keys for inbox responses
424        inbox_responses_weight_tags.into_iter().for_each(
425            |((chain_id, (channel_id, nonce)), weight_tag)| {
426                let storage_key =
427                    InboxResponses::<T>::hashed_key_for((chain_id, channel_id, nonce));
428                messages_with_storage_key
429                    .inbox_responses
430                    .push(BlockMessageWithStorageKey {
431                        src_chain_id: T::SelfChainId::get(),
432                        dst_chain_id: chain_id,
433                        channel_id,
434                        nonce,
435                        storage_key,
436                        weight_tag,
437                    });
438            },
439        );
440
441        // create storage keys for outbox
442        outbox_weight_tags
443            .into_iter()
444            .for_each(|((chain_id, (channel_id, nonce)), weight_tag)| {
445                let storage_key = Outbox::<T>::hashed_key_for((chain_id, channel_id, nonce));
446                messages_with_storage_key
447                    .outbox
448                    .push(BlockMessageWithStorageKey {
449                        src_chain_id: T::SelfChainId::get(),
450                        dst_chain_id: chain_id,
451                        channel_id,
452                        nonce,
453                        storage_key,
454                        weight_tag,
455                    })
456            });
457
458        messages_with_storage_key
459    }
460}