1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::pallet::{
5 ChainAllowlist, InboxResponseMessageWeightTags, OutboxMessageCount, OutboxMessageWeightTags,
6 UpdatedChannels,
7};
8use crate::{
9 BalanceOf, ChannelId, ChannelState, Channels, CloseChannelBy, Config, Error, Event,
10 InboxResponses, Nonce, Outbox, OutboxMessageResult, Pallet,
11};
12#[cfg(not(feature = "std"))]
13use alloc::boxed::Box;
14#[cfg(not(feature = "std"))]
15use alloc::collections::BTreeMap;
16use frame_support::ensure;
17use sp_messenger::endpoint::{EndpointHandler, EndpointRequest, EndpointResponse};
18use sp_messenger::messages::{
19 BlockMessagesQuery, ChainId, ChannelOpenParamsV1, Message, MessageId, MessageKey,
20 MessageNonceWithStorageKey, MessageWeightTag, MessagesWithStorageKey, PayloadV1,
21 ProtocolMessageRequest, ProtocolMessageResponse, RequestResponse, VersionedPayload,
22};
23
24use sp_messenger::MAX_FUTURE_ALLOWED_NONCES;
25use sp_runtime::traits::{Get, One};
26use sp_runtime::{ArithmeticError, DispatchError, DispatchResult};
27#[cfg(feature = "std")]
28use std::collections::BTreeMap;
29
30impl<T: Config> Pallet<T> {
31 pub(crate) fn new_outbox_message(
33 src_chain_id: ChainId,
34 dst_chain_id: ChainId,
35 channel_id: ChannelId,
36 payload: VersionedPayload<BalanceOf<T>>,
37 ) -> Result<Nonce, DispatchError> {
38 ensure!(
40 src_chain_id != dst_chain_id,
41 Error::<T>::InvalidMessageDestination
42 );
43
44 Channels::<T>::try_mutate(
45 dst_chain_id,
46 channel_id,
47 |maybe_channel| -> Result<Nonce, DispatchError> {
48 let channel = maybe_channel.as_mut().ok_or(Error::<T>::MissingChannel)?;
49 let count = OutboxMessageCount::<T>::get((dst_chain_id, channel_id));
51 ensure!(
52 count < channel.max_outgoing_messages,
53 Error::<T>::OutboxFull
54 );
55
56 let weight_tag = MessageWeightTag::outbox(&payload);
57
58 let next_outbox_nonce = channel.next_outbox_nonce;
59 let msg = Message {
61 src_chain_id,
62 dst_chain_id,
63 channel_id,
64 nonce: next_outbox_nonce,
65 payload,
66 last_delivered_message_response_nonce: channel
67 .latest_response_received_message_nonce,
68 };
69 Outbox::<T>::insert((dst_chain_id, channel_id, next_outbox_nonce), msg);
70 OutboxMessageCount::<T>::try_mutate(
71 (dst_chain_id, channel_id),
72 |count| -> Result<(), DispatchError> {
73 *count = count
74 .checked_add(1u32)
75 .ok_or(Error::<T>::MessageCountOverflow)?;
76 Ok(())
77 },
78 )?;
79
80 channel.next_outbox_nonce = next_outbox_nonce
82 .checked_add(Nonce::one())
83 .ok_or(DispatchError::Arithmetic(ArithmeticError::Overflow))?;
84
85 OutboxMessageWeightTags::<T>::insert(
86 (dst_chain_id, (channel_id, next_outbox_nonce)),
87 weight_tag,
88 );
89
90 Self::deposit_event(Event::OutboxMessage {
92 chain_id: dst_chain_id,
93 channel_id,
94 nonce: next_outbox_nonce,
95 });
96 Ok(next_outbox_nonce)
97 },
98 )
99 }
100
101 pub(crate) fn process_inbox_messages(
103 msg: Message<BalanceOf<T>>,
104 msg_weight_tag: MessageWeightTag,
105 ) -> DispatchResult {
106 let (dst_chain_id, channel_id, nonce) = (msg.src_chain_id, msg.channel_id, msg.nonce);
107
108 let resp_payload = VersionedPayload::V1(match msg.payload {
109 VersionedPayload::V1(PayloadV1::Protocol(RequestResponse::Request(req))) => {
111 PayloadV1::Protocol(RequestResponse::Response(
112 Self::process_incoming_protocol_message_req(
113 dst_chain_id,
114 channel_id,
115 req,
116 &msg_weight_tag,
117 ),
118 ))
119 }
120
121 VersionedPayload::V1(PayloadV1::Endpoint(RequestResponse::Request(req))) => {
123 Self::store_inbox_fee(
130 dst_chain_id,
131 (channel_id, nonce),
132 req.collected_fee.dst_chain_fee,
133 )?;
134
135 let response = if let Some(endpoint_handler) =
136 T::get_endpoint_handler(&req.req.dst_endpoint)
137 {
138 Self::process_incoming_endpoint_message_req(
139 dst_chain_id,
140 req.req,
141 channel_id,
142 nonce,
143 &msg_weight_tag,
144 endpoint_handler,
145 )
146 } else {
147 Err(Error::<T>::NoMessageHandler.into())
148 };
149
150 PayloadV1::Endpoint(RequestResponse::Response(response))
151 }
152
153 VersionedPayload::V1(PayloadV1::Protocol(_)) => PayloadV1::Protocol(
155 RequestResponse::Response(Err(Error::<T>::InvalidMessagePayload.into())),
156 ),
157 VersionedPayload::V1(PayloadV1::Endpoint(_)) => PayloadV1::Endpoint(
158 RequestResponse::Response(Err(Error::<T>::InvalidMessagePayload.into())),
159 ),
160 });
161
162 let weight_tag = MessageWeightTag::inbox_response(msg_weight_tag, &resp_payload);
163
164 InboxResponses::<T>::insert(
165 (dst_chain_id, channel_id, nonce),
166 Message {
167 src_chain_id: T::SelfChainId::get(),
168 dst_chain_id,
169 channel_id,
170 nonce,
171 payload: resp_payload,
172 last_delivered_message_response_nonce: None,
174 },
175 );
176
177 InboxResponseMessageWeightTags::<T>::insert(
178 (dst_chain_id, (channel_id, nonce)),
179 weight_tag,
180 );
181
182 UpdatedChannels::<T>::mutate(|updated_channels| {
183 updated_channels.insert((dst_chain_id, channel_id))
184 });
185
186 Self::reward_operators_for_inbox_execution(
189 dst_chain_id,
190 channel_id,
191 msg.last_delivered_message_response_nonce,
192 )?;
193
194 Self::deposit_event(Event::InboxMessageResponse {
195 chain_id: dst_chain_id,
196 channel_id,
197 nonce,
198 });
199
200 Ok(())
201 }
202
203 fn process_incoming_endpoint_message_req(
204 dst_chain_id: ChainId,
205 req: EndpointRequest,
206 channel_id: ChannelId,
207 nonce: Nonce,
208 msg_weight_tag: &MessageWeightTag,
209 endpoint_handler: Box<dyn sp_messenger::endpoint::EndpointHandler<MessageId>>,
210 ) -> EndpointResponse {
211 let dst_endpoint = req.dst_endpoint.clone();
212 let pre_check_handler = || {
213 ensure!(
214 msg_weight_tag == &MessageWeightTag::EndpointRequest(dst_endpoint),
215 Error::<T>::WeightTagNotMatch
216 );
217
218 let channel =
219 Channels::<T>::get(dst_chain_id, channel_id).ok_or(Error::<T>::MissingChannel)?;
220 ensure!(
221 channel.state == ChannelState::Open,
222 Error::<T>::InvalidChannelState
223 );
224
225 ensure!(
226 ChainAllowlist::<T>::get().contains(&dst_chain_id),
227 Error::<T>::ChainNotAllowed
228 );
229
230 Ok(())
231 };
232
233 endpoint_handler.message(dst_chain_id, (channel_id, nonce), req, pre_check_handler())
234 }
235
236 fn process_incoming_protocol_message_req(
237 chain_id: ChainId,
238 channel_id: ChannelId,
239 req: ProtocolMessageRequest<ChannelOpenParamsV1>,
240 weight_tag: &MessageWeightTag,
241 ) -> Result<(), DispatchError> {
242 let is_chain_allowed = ChainAllowlist::<T>::get().contains(&chain_id);
243 match req {
244 ProtocolMessageRequest::ChannelOpen(_) => {
245 if !is_chain_allowed {
246 return Err(Error::<T>::ChainNotAllowed.into());
247 }
248
249 if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
250 return Err(Error::<T>::WeightTagNotMatch.into());
251 }
252 Self::do_open_channel(chain_id, channel_id)
253 }
254 ProtocolMessageRequest::ChannelClose => {
255 if weight_tag != &MessageWeightTag::ProtocolChannelClose {
256 return Err(Error::<T>::WeightTagNotMatch.into());
257 }
258 Self::do_close_channel(chain_id, channel_id, CloseChannelBy::Sudo)
261 }
262 }
263 }
264
265 fn process_incoming_protocol_message_response(
266 chain_id: ChainId,
267 channel_id: ChannelId,
268 req: ProtocolMessageRequest<ChannelOpenParamsV1>,
269 resp: ProtocolMessageResponse,
270 weight_tag: &MessageWeightTag,
271 ) -> DispatchResult {
272 match (req, resp) {
273 (ProtocolMessageRequest::ChannelOpen(_), Ok(_)) => {
276 if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
277 return Err(Error::<T>::WeightTagNotMatch.into());
278 }
279 Self::do_open_channel(chain_id, channel_id)
280 }
281
282 _ => Ok(()),
286 }
287 }
288
289 fn process_incoming_endpoint_message_response(
290 dst_chain_id: ChainId,
291 channel_id: ChannelId,
292 nonce: Nonce,
293 resp_msg_weight_tag: &MessageWeightTag,
294 req: EndpointRequest,
295 resp: EndpointResponse,
296 endpoint_handler: Box<dyn EndpointHandler<MessageId>>,
297 ) -> DispatchResult {
298 if resp_msg_weight_tag != &MessageWeightTag::EndpointResponse(req.dst_endpoint.clone()) {
299 return Err(Error::<T>::WeightTagNotMatch.into());
300 }
301
302 endpoint_handler.message_response(dst_chain_id, (channel_id, nonce), req, resp)
303 }
304
305 pub(crate) fn process_outbox_message_responses(
306 resp_msg: Message<BalanceOf<T>>,
307 resp_msg_weight_tag: MessageWeightTag,
308 ) -> DispatchResult {
309 let (dst_chain_id, channel_id, nonce) =
310 (resp_msg.src_chain_id, resp_msg.channel_id, resp_msg.nonce);
311
312 let req_msg = Outbox::<T>::take((dst_chain_id, channel_id, nonce))
314 .ok_or(Error::<T>::MissingMessage)?;
315
316 OutboxMessageCount::<T>::try_mutate(
317 (dst_chain_id, channel_id),
318 |count| -> Result<(), DispatchError> {
319 *count = count
320 .checked_sub(1u32)
321 .ok_or(Error::<T>::MessageCountUnderflow)?;
322 Ok(())
323 },
324 )?;
325
326 OutboxMessageWeightTags::<T>::remove((dst_chain_id, (channel_id, nonce)));
328
329 let resp = match (req_msg.payload, resp_msg.payload) {
330 (
332 VersionedPayload::V1(PayloadV1::Protocol(RequestResponse::Request(req))),
333 VersionedPayload::V1(PayloadV1::Protocol(RequestResponse::Response(resp))),
334 ) => Self::process_incoming_protocol_message_response(
335 dst_chain_id,
336 channel_id,
337 req,
338 resp,
339 &resp_msg_weight_tag,
340 ),
341
342 (
344 VersionedPayload::V1(PayloadV1::Endpoint(RequestResponse::Request(req))),
345 VersionedPayload::V1(PayloadV1::Endpoint(RequestResponse::Response(resp))),
346 ) => {
347 Self::reward_operators_for_outbox_execution(dst_chain_id, (channel_id, nonce))?;
350
351 if let Some(endpoint_handler) = T::get_endpoint_handler(&req.req.dst_endpoint) {
352 Self::process_incoming_endpoint_message_response(
353 dst_chain_id,
354 channel_id,
355 nonce,
356 &resp_msg_weight_tag,
357 req.req,
358 resp,
359 endpoint_handler,
360 )
361 } else {
362 Err(Error::<T>::NoMessageHandler.into())
363 }
364 }
365
366 (_, _) => Err(Error::<T>::InvalidMessagePayload.into()),
367 };
368
369 UpdatedChannels::<T>::mutate(|updated_channels| {
370 updated_channels.insert((dst_chain_id, channel_id))
371 });
372
373 match resp {
375 Ok(_) => Self::deposit_event(Event::OutboxMessageResult {
376 chain_id: dst_chain_id,
377 channel_id,
378 nonce,
379 result: OutboxMessageResult::Ok,
380 }),
381 Err(err) => Self::deposit_event(Event::OutboxMessageResult {
382 chain_id: dst_chain_id,
383 channel_id,
384 nonce,
385 result: OutboxMessageResult::Err(err),
386 }),
387 }
388
389 Ok(())
390 }
391
392 pub fn get_block_messages(query: BlockMessagesQuery) -> MessagesWithStorageKey {
393 let BlockMessagesQuery {
394 chain_id,
395 channel_id,
396 outbox_from,
397 inbox_responses_from,
398 } = query;
399
400 let inbox_responses_weight_tags = Self::get_weight_tags(
401 (chain_id, channel_id, inbox_responses_from),
402 InboxResponseMessageWeightTags::<T>::get,
403 );
404
405 let outbox_weight_tags = Self::get_weight_tags(
406 (chain_id, channel_id, outbox_from),
407 OutboxMessageWeightTags::<T>::get,
408 );
409
410 if outbox_weight_tags.is_empty() && inbox_responses_weight_tags.is_empty() {
411 return Default::default();
412 }
413
414 let mut messages_with_storage_key = MessagesWithStorageKey::default();
415
416 inbox_responses_weight_tags
418 .into_iter()
419 .for_each(|(nonce, weight_tag)| {
420 let storage_key =
421 InboxResponses::<T>::hashed_key_for((chain_id, channel_id, nonce));
422 messages_with_storage_key
423 .inbox_responses
424 .push(MessageNonceWithStorageKey {
425 nonce,
426 storage_key,
427 weight_tag,
428 });
429 });
430
431 outbox_weight_tags
433 .into_iter()
434 .for_each(|(nonce, weight_tag)| {
435 let storage_key = Outbox::<T>::hashed_key_for((chain_id, channel_id, nonce));
436 messages_with_storage_key
437 .outbox
438 .push(MessageNonceWithStorageKey {
439 nonce,
440 storage_key,
441 weight_tag,
442 })
443 });
444
445 messages_with_storage_key
446 }
447
448 fn get_weight_tags<WTG>(
449 from: MessageKey,
450 weight_tag_getter: WTG,
451 ) -> BTreeMap<Nonce, MessageWeightTag>
452 where
453 WTG: Fn((ChainId, MessageId)) -> Option<MessageWeightTag>,
454 {
455 let (chain_id, channel_id, mut nonce) = from;
456 let mut weight_tags = BTreeMap::new();
457 while weight_tags.len() as u32 <= MAX_FUTURE_ALLOWED_NONCES {
458 match weight_tag_getter((chain_id, (channel_id, nonce))) {
459 None => return weight_tags,
461 Some(weight_tag) => {
462 weight_tags.insert(nonce, weight_tag);
463 }
464 };
465
466 nonce = match nonce.checked_add(One::one()) {
467 None => return weight_tags,
468 Some(nonce) => nonce,
469 }
470 }
471
472 weight_tags
473 }
474}