1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::pallet::{
5 ChainAllowlist, InboxResponseMessageWeightTags, OutboxMessageCount, OutboxMessageWeightTags,
6 UpdatedChannels,
7};
8use crate::{
9 BalanceOf, ChannelId, ChannelState, Channels, CloseChannelBy, Config, Error, Event,
10 InboxResponses, Nonce, Outbox, OutboxMessageResult, Pallet,
11};
12#[cfg(not(feature = "std"))]
13use alloc::boxed::Box;
14#[cfg(not(feature = "std"))]
15use alloc::collections::BTreeMap;
16use frame_support::ensure;
17use sp_messenger::endpoint::{EndpointHandler, EndpointRequest, EndpointResponse};
18use sp_messenger::messages::{
19 BlockMessageWithStorageKey, BlockMessagesWithStorageKey, ChainId, ChannelOpenParams,
20 ConvertedPayload, Message, MessageId, MessageWeightTag, Payload, ProtocolMessageRequest,
21 ProtocolMessageResponse, RequestResponse, VersionedPayload,
22};
23use sp_runtime::traits::Get;
24use sp_runtime::{ArithmeticError, DispatchError, DispatchResult};
25#[cfg(feature = "std")]
26use std::collections::BTreeMap;
27
28impl<T: Config> Pallet<T> {
29 pub(crate) fn new_outbox_message(
31 src_chain_id: ChainId,
32 dst_chain_id: ChainId,
33 channel_id: ChannelId,
34 payload: VersionedPayload<BalanceOf<T>>,
35 ) -> Result<Nonce, DispatchError> {
36 ensure!(
38 src_chain_id != dst_chain_id,
39 Error::<T>::InvalidMessageDestination
40 );
41
42 Channels::<T>::try_mutate(
43 dst_chain_id,
44 channel_id,
45 |maybe_channel| -> Result<Nonce, DispatchError> {
46 let channel = maybe_channel.as_mut().ok_or(Error::<T>::MissingChannel)?;
47 let count = OutboxMessageCount::<T>::get((dst_chain_id, channel_id));
49 ensure!(
50 count < channel.max_outgoing_messages,
51 Error::<T>::OutboxFull
52 );
53
54 let weight_tag = MessageWeightTag::outbox(&payload);
55
56 let next_outbox_nonce = channel.next_outbox_nonce;
57 let msg = Message {
59 src_chain_id,
60 dst_chain_id,
61 channel_id,
62 nonce: next_outbox_nonce,
63 payload,
64 last_delivered_message_response_nonce: channel
65 .latest_response_received_message_nonce,
66 };
67 Outbox::<T>::insert((dst_chain_id, channel_id, next_outbox_nonce), msg);
68 OutboxMessageCount::<T>::try_mutate(
69 (dst_chain_id, channel_id),
70 |count| -> Result<(), DispatchError> {
71 *count = count
72 .checked_add(1u32)
73 .ok_or(Error::<T>::MessageCountOverflow)?;
74 Ok(())
75 },
76 )?;
77
78 channel.next_outbox_nonce = next_outbox_nonce
80 .checked_add(Nonce::one())
81 .ok_or(DispatchError::Arithmetic(ArithmeticError::Overflow))?;
82
83 OutboxMessageWeightTags::<T>::insert(
84 (dst_chain_id, (channel_id, next_outbox_nonce)),
85 weight_tag,
86 );
87
88 Self::deposit_event(Event::OutboxMessage {
90 chain_id: dst_chain_id,
91 channel_id,
92 nonce: next_outbox_nonce,
93 });
94 Ok(next_outbox_nonce)
95 },
96 )
97 }
98
99 pub(crate) fn process_inbox_messages(
101 msg: Message<BalanceOf<T>>,
102 msg_weight_tag: MessageWeightTag,
103 ) -> DispatchResult {
104 let (dst_chain_id, channel_id, nonce) = (msg.src_chain_id, msg.channel_id, msg.nonce);
105 let channel =
106 Channels::<T>::get(dst_chain_id, channel_id).ok_or(Error::<T>::MissingChannel)?;
107
108 let maybe_collected_fee = msg.payload.maybe_collected_fee();
109 let ConvertedPayload { payload, is_v1 } = msg.payload.into_payload_v0();
110 let response = match payload {
111 Payload::Protocol(RequestResponse::Request(req)) => Payload::Protocol(
113 RequestResponse::Response(Self::process_incoming_protocol_message_req(
114 dst_chain_id,
115 channel_id,
116 req,
117 &msg_weight_tag,
118 )),
119 ),
120
121 Payload::Endpoint(RequestResponse::Request(req)) => {
123 if let Some(collected_fee) = maybe_collected_fee {
127 Self::store_inbox_fee(
130 dst_chain_id,
131 (channel_id, nonce),
132 collected_fee.dst_chain_fee,
133 )?;
134 } else {
135 Self::store_fees_for_inbox_message(
138 (dst_chain_id, (channel_id, nonce)),
139 &channel.fee,
140 &req.src_endpoint,
141 );
142 }
143
144 let response =
145 if let Some(endpoint_handler) = T::get_endpoint_handler(&req.dst_endpoint) {
146 Self::process_incoming_endpoint_message_req(
147 dst_chain_id,
148 req,
149 channel_id,
150 nonce,
151 &msg_weight_tag,
152 endpoint_handler,
153 )
154 } else {
155 Err(Error::<T>::NoMessageHandler.into())
156 };
157
158 Payload::Endpoint(RequestResponse::Response(response))
159 }
160
161 Payload::Protocol(_) => Payload::Protocol(RequestResponse::Response(Err(
163 Error::<T>::InvalidMessagePayload.into(),
164 ))),
165 Payload::Endpoint(_) => Payload::Endpoint(RequestResponse::Response(Err(
166 Error::<T>::InvalidMessagePayload.into(),
167 ))),
168 };
169
170 let resp_payload = if is_v1 {
171 VersionedPayload::V1(response.into())
172 } else {
173 VersionedPayload::V0(response)
174 };
175
176 let weight_tag = MessageWeightTag::inbox_response(msg_weight_tag, &resp_payload);
177
178 InboxResponses::<T>::insert(
179 (dst_chain_id, channel_id, nonce),
180 Message {
181 src_chain_id: T::SelfChainId::get(),
182 dst_chain_id,
183 channel_id,
184 nonce,
185 payload: resp_payload,
186 last_delivered_message_response_nonce: None,
188 },
189 );
190
191 InboxResponseMessageWeightTags::<T>::insert(
192 (dst_chain_id, (channel_id, nonce)),
193 weight_tag,
194 );
195
196 UpdatedChannels::<T>::mutate(|updated_channels| {
197 updated_channels.insert((dst_chain_id, channel_id))
198 });
199
200 Self::reward_operators_for_inbox_execution(
203 dst_chain_id,
204 channel_id,
205 msg.last_delivered_message_response_nonce,
206 )?;
207
208 Self::deposit_event(Event::InboxMessageResponse {
209 chain_id: dst_chain_id,
210 channel_id,
211 nonce,
212 });
213
214 Ok(())
215 }
216
217 fn process_incoming_endpoint_message_req(
218 dst_chain_id: ChainId,
219 req: EndpointRequest,
220 channel_id: ChannelId,
221 nonce: Nonce,
222 msg_weight_tag: &MessageWeightTag,
223 endpoint_handler: Box<dyn sp_messenger::endpoint::EndpointHandler<MessageId>>,
224 ) -> EndpointResponse {
225 if msg_weight_tag != &MessageWeightTag::EndpointRequest(req.dst_endpoint.clone()) {
226 return Err(Error::<T>::WeightTagNotMatch.into());
227 }
228
229 let channel =
230 Channels::<T>::get(dst_chain_id, channel_id).ok_or(Error::<T>::MissingChannel)?;
231 if channel.state != ChannelState::Open {
232 return Err(Error::<T>::InvalidChannelState.into());
233 }
234
235 let pre_check_response = if !ChainAllowlist::<T>::get().contains(&dst_chain_id) {
236 Err(Error::<T>::ChainNotAllowed.into())
237 } else {
238 Ok(())
239 };
240
241 endpoint_handler.message(dst_chain_id, (channel_id, nonce), req, pre_check_response)
242 }
243
244 fn process_incoming_protocol_message_req(
245 chain_id: ChainId,
246 channel_id: ChannelId,
247 req: ProtocolMessageRequest<ChannelOpenParams<BalanceOf<T>>>,
248 weight_tag: &MessageWeightTag,
249 ) -> Result<(), DispatchError> {
250 let is_chain_allowed = ChainAllowlist::<T>::get().contains(&chain_id);
251 match req {
252 ProtocolMessageRequest::ChannelOpen(_) => {
253 if !is_chain_allowed {
254 return Err(Error::<T>::ChainNotAllowed.into());
255 }
256
257 if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
258 return Err(Error::<T>::WeightTagNotMatch.into());
259 }
260 Self::do_open_channel(chain_id, channel_id)
261 }
262 ProtocolMessageRequest::ChannelClose => {
263 if weight_tag != &MessageWeightTag::ProtocolChannelClose {
264 return Err(Error::<T>::WeightTagNotMatch.into());
265 }
266 Self::do_close_channel(chain_id, channel_id, CloseChannelBy::Sudo)
269 }
270 }
271 }
272
273 fn process_incoming_protocol_message_response(
274 chain_id: ChainId,
275 channel_id: ChannelId,
276 req: ProtocolMessageRequest<ChannelOpenParams<BalanceOf<T>>>,
277 resp: ProtocolMessageResponse,
278 weight_tag: &MessageWeightTag,
279 ) -> DispatchResult {
280 match (req, resp) {
281 (ProtocolMessageRequest::ChannelOpen(_), Ok(_)) => {
284 if weight_tag != &MessageWeightTag::ProtocolChannelOpen {
285 return Err(Error::<T>::WeightTagNotMatch.into());
286 }
287 Self::do_open_channel(chain_id, channel_id)
288 }
289
290 _ => Ok(()),
294 }
295 }
296
297 fn process_incoming_endpoint_message_response(
298 dst_chain_id: ChainId,
299 channel_id: ChannelId,
300 nonce: Nonce,
301 resp_msg_weight_tag: &MessageWeightTag,
302 req: EndpointRequest,
303 resp: EndpointResponse,
304 endpoint_handler: Box<dyn EndpointHandler<MessageId>>,
305 ) -> DispatchResult {
306 if resp_msg_weight_tag != &MessageWeightTag::EndpointResponse(req.dst_endpoint.clone()) {
307 return Err(Error::<T>::WeightTagNotMatch.into());
308 }
309
310 endpoint_handler.message_response(dst_chain_id, (channel_id, nonce), req, resp)
311 }
312
313 pub(crate) fn process_outbox_message_responses(
314 resp_msg: Message<BalanceOf<T>>,
315 resp_msg_weight_tag: MessageWeightTag,
316 ) -> DispatchResult {
317 let (dst_chain_id, channel_id, nonce) =
318 (resp_msg.src_chain_id, resp_msg.channel_id, resp_msg.nonce);
319
320 let req_msg = Outbox::<T>::take((dst_chain_id, channel_id, nonce))
322 .ok_or(Error::<T>::MissingMessage)?;
323
324 OutboxMessageCount::<T>::try_mutate(
325 (dst_chain_id, channel_id),
326 |count| -> Result<(), DispatchError> {
327 *count = count
328 .checked_sub(1u32)
329 .ok_or(Error::<T>::MessageCountUnderflow)?;
330 Ok(())
331 },
332 )?;
333
334 OutboxMessageWeightTags::<T>::remove((dst_chain_id, (channel_id, nonce)));
336
337 let ConvertedPayload {
338 payload: req,
339 is_v1: is_v1_req,
340 } = req_msg.payload.into_payload_v0();
341 let ConvertedPayload {
342 payload: resp,
343 is_v1: is_v1_resp,
344 } = resp_msg.payload.into_payload_v0();
345
346 ensure!(is_v1_req == is_v1_resp, Error::<T>::MessageVersionMismatch);
347
348 let resp = match (req, resp) {
349 (
351 Payload::Protocol(RequestResponse::Request(req)),
352 Payload::Protocol(RequestResponse::Response(resp)),
353 ) => Self::process_incoming_protocol_message_response(
354 dst_chain_id,
355 channel_id,
356 req,
357 resp,
358 &resp_msg_weight_tag,
359 ),
360
361 (
363 Payload::Endpoint(RequestResponse::Request(req)),
364 Payload::Endpoint(RequestResponse::Response(resp)),
365 ) => {
366 Self::reward_operators_for_outbox_execution(dst_chain_id, (channel_id, nonce))?;
369
370 if let Some(endpoint_handler) = T::get_endpoint_handler(&req.dst_endpoint) {
371 Self::process_incoming_endpoint_message_response(
372 dst_chain_id,
373 channel_id,
374 nonce,
375 &resp_msg_weight_tag,
376 req,
377 resp,
378 endpoint_handler,
379 )
380 } else {
381 Err(Error::<T>::NoMessageHandler.into())
382 }
383 }
384
385 (_, _) => Err(Error::<T>::InvalidMessagePayload.into()),
386 };
387
388 UpdatedChannels::<T>::mutate(|updated_channels| {
389 updated_channels.insert((dst_chain_id, channel_id))
390 });
391
392 match resp {
394 Ok(_) => Self::deposit_event(Event::OutboxMessageResult {
395 chain_id: dst_chain_id,
396 channel_id,
397 nonce,
398 result: OutboxMessageResult::Ok,
399 }),
400 Err(err) => Self::deposit_event(Event::OutboxMessageResult {
401 chain_id: dst_chain_id,
402 channel_id,
403 nonce,
404 result: OutboxMessageResult::Err(err),
405 }),
406 }
407
408 Ok(())
409 }
410
411 pub fn get_block_messages() -> BlockMessagesWithStorageKey {
412 let inbox_responses_weight_tags: BTreeMap<(ChainId, MessageId), MessageWeightTag> =
413 InboxResponseMessageWeightTags::<T>::iter().collect();
414 let outbox_weight_tags: BTreeMap<(ChainId, MessageId), MessageWeightTag> =
415 OutboxMessageWeightTags::<T>::iter().collect();
416
417 if outbox_weight_tags.is_empty() && inbox_responses_weight_tags.is_empty() {
418 return Default::default();
419 }
420
421 let mut messages_with_storage_key = BlockMessagesWithStorageKey::default();
422
423 inbox_responses_weight_tags.into_iter().for_each(
425 |((chain_id, (channel_id, nonce)), weight_tag)| {
426 let storage_key =
427 InboxResponses::<T>::hashed_key_for((chain_id, channel_id, nonce));
428 messages_with_storage_key
429 .inbox_responses
430 .push(BlockMessageWithStorageKey {
431 src_chain_id: T::SelfChainId::get(),
432 dst_chain_id: chain_id,
433 channel_id,
434 nonce,
435 storage_key,
436 weight_tag,
437 });
438 },
439 );
440
441 outbox_weight_tags
443 .into_iter()
444 .for_each(|((chain_id, (channel_id, nonce)), weight_tag)| {
445 let storage_key = Outbox::<T>::hashed_key_for((chain_id, channel_id, nonce));
446 messages_with_storage_key
447 .outbox
448 .push(BlockMessageWithStorageKey {
449 src_chain_id: T::SelfChainId::get(),
450 dst_chain_id: chain_id,
451 channel_id,
452 nonce,
453 storage_key,
454 weight_tag,
455 })
456 });
457
458 messages_with_storage_key
459 }
460}