domain_client_message_relayer/
aux_schema.rs1use crate::CHANNEL_PROCESSED_STATE_CACHE_LIMIT;
4use parity_scale_codec::{Decode, Encode};
5use sc_client_api::backend::AuxStore;
6use sp_blockchain::{Error as ClientError, HeaderBackend, Result as ClientResult};
7use sp_messenger::messages::{ChainId, ChannelId, Nonce};
8use sp_runtime::traits::{Block as BlockT, NumberFor};
9use sp_runtime::{SaturatedConversion, Saturating};
10use std::sync::Arc;
11
12const CHANNEL_PROCESSED_STATE: &[u8] = b"channel_processed_state";
13const OUTBOX_MESSAGES_PREFIX: &[u8] = b"outbox_messages";
14const INBOX_RESPONSE_MESSAGES_PREFIX: &[u8] = b"inbox_responses_messages";
15
16fn channel_processed_state_key(
17 prefix: &[u8],
18 src_chain_id: ChainId,
19 dst_chain_id: ChainId,
20 channel_id: ChannelId,
21) -> Vec<u8> {
22 (
23 CHANNEL_PROCESSED_STATE,
24 prefix,
25 src_chain_id,
26 dst_chain_id,
27 channel_id,
28 )
29 .encode()
30}
31
32fn load_decode<Backend: AuxStore, T: Decode>(
33 backend: &Backend,
34 key: &[u8],
35) -> ClientResult<Option<T>> {
36 match backend.get_aux(key)? {
37 None => Ok(None),
38 Some(t) => T::decode(&mut &t[..])
39 .map_err(|e| {
40 ClientError::Backend(format!("Relayer DB is corrupted. Decode error: {e}"))
41 })
42 .map(Some),
43 }
44}
45
46#[derive(Debug, Encode, Decode, Clone)]
48pub struct ChannelProcessedState<Block: BlockT> {
49 pub block_number: NumberFor<Block>,
51 pub block_hash: Block::Hash,
53 pub channel_id: ChannelId,
55 pub nonce: Option<Nonce>,
57}
58
59fn get_channel_outbox_processed_state<Backend, Block: BlockT>(
61 backend: &Backend,
62 src_chain_id: ChainId,
63 dst_chain_id: ChainId,
64 channel_id: ChannelId,
65) -> ClientResult<Option<ChannelProcessedState<Block>>>
66where
67 Backend: AuxStore,
68{
69 load_decode::<_, ChannelProcessedState<Block>>(
70 backend,
71 channel_processed_state_key(
72 OUTBOX_MESSAGES_PREFIX,
73 src_chain_id,
74 dst_chain_id,
75 channel_id,
76 )
77 .as_slice(),
78 )
79}
80
81fn get_channel_inbox_message_response_processed_state<Backend, Block: BlockT>(
83 backend: &Backend,
84 src_chain_id: ChainId,
85 dst_chain_id: ChainId,
86 channel_id: ChannelId,
87) -> ClientResult<Option<ChannelProcessedState<Block>>>
88where
89 Backend: AuxStore,
90{
91 load_decode::<_, ChannelProcessedState<Block>>(
92 backend,
93 channel_processed_state_key(
94 INBOX_RESPONSE_MESSAGES_PREFIX,
95 src_chain_id,
96 dst_chain_id,
97 channel_id,
98 )
99 .as_slice(),
100 )
101}
102
103pub(crate) fn set_channel_outbox_processed_state<Backend, Block: BlockT>(
105 backend: &Backend,
106 src_chain_id: ChainId,
107 dst_chain_id: ChainId,
108 channel_state: ChannelProcessedState<Block>,
109) -> ClientResult<()>
110where
111 Backend: AuxStore,
112{
113 backend.insert_aux(
114 &[(
115 channel_processed_state_key(
116 OUTBOX_MESSAGES_PREFIX,
117 src_chain_id,
118 dst_chain_id,
119 channel_state.channel_id,
120 )
121 .as_slice(),
122 channel_state.encode().as_slice(),
123 )],
124 vec![],
125 )
126}
127
128pub(crate) fn set_channel_inbox_response_processed_state<Backend, Block: BlockT>(
130 backend: &Backend,
131 src_chain_id: ChainId,
132 dst_chain_id: ChainId,
133 channel_state: ChannelProcessedState<Block>,
134) -> ClientResult<()>
135where
136 Backend: AuxStore,
137{
138 backend.insert_aux(
139 &[(
140 channel_processed_state_key(
141 INBOX_RESPONSE_MESSAGES_PREFIX,
142 src_chain_id,
143 dst_chain_id,
144 channel_state.channel_id,
145 )
146 .as_slice(),
147 channel_state.encode().as_slice(),
148 )],
149 vec![],
150 )
151}
152
153#[derive(Debug, Clone)]
155pub(crate) struct LastProcessedNonces {
156 pub outbox_nonce: Option<Nonce>,
157 pub inbox_response_nonce: Option<Nonce>,
158}
159
160pub(crate) fn get_last_processed_nonces<Backend, Client, Block>(
162 backend: &Backend,
163 client: &Arc<Client>,
164 latest_hash: Block::Hash,
165 src_chain_id: ChainId,
166 dst_chain_id: ChainId,
167 channel_id: ChannelId,
168) -> ClientResult<LastProcessedNonces>
169where
170 Backend: AuxStore,
171 Block: BlockT,
172 Client: HeaderBackend<Block>,
173{
174 let last_processed_outbox_nonce = get_channel_outbox_processed_state::<_, Block>(
175 backend,
176 src_chain_id,
177 dst_chain_id,
178 channel_id,
179 )?
180 .and_then(|state| {
181 is_last_processed_nonce_valid(
182 client,
183 latest_hash,
184 state.block_number,
185 state.block_hash,
186 state.nonce,
187 )
188 .ok()
189 .flatten()
190 });
191
192 let last_processed_inbox_response_nonce = get_channel_inbox_message_response_processed_state::<
193 _,
194 Block,
195 >(
196 backend, src_chain_id, dst_chain_id, channel_id
197 )?
198 .and_then(|state| {
199 is_last_processed_nonce_valid(
200 client,
201 latest_hash,
202 state.block_number,
203 state.block_hash,
204 state.nonce,
205 )
206 .ok()
207 .flatten()
208 });
209
210 Ok(LastProcessedNonces {
211 outbox_nonce: last_processed_outbox_nonce,
212 inbox_response_nonce: last_processed_inbox_response_nonce,
213 })
214}
215
216fn is_last_processed_nonce_valid<Client, Block>(
217 client: &Arc<Client>,
218 latest_hash: Block::Hash,
219 processed_block_number: NumberFor<Block>,
220 processed_block_hash: Block::Hash,
221 last_process_nonce: Option<Nonce>,
222) -> ClientResult<Option<Nonce>>
223where
224 Block: BlockT,
225 Client: HeaderBackend<Block>,
226{
227 if last_process_nonce.is_none() {
229 return Ok(None);
230 }
231
232 let Some(block_hash) = client.hash(processed_block_number)? else {
234 return Ok(None);
235 };
236
237 if block_hash != processed_block_hash {
239 return Ok(None);
240 }
241
242 let Some(latest_number) = client.number(latest_hash)? else {
243 return Ok(None);
244 };
245
246 if latest_number.saturating_sub(processed_block_number)
248 > CHANNEL_PROCESSED_STATE_CACHE_LIMIT.saturated_into()
249 {
250 Ok(None)
251 } else {
252 Ok(last_process_nonce)
253 }
254}