domain_client_message_relayer/
aux_schema.rs

1//! Schema for processed channel data
2
3use crate::CHANNEL_PROCESSED_STATE_CACHE_LIMIT;
4use parity_scale_codec::{Decode, Encode};
5use sc_client_api::backend::AuxStore;
6use sp_blockchain::{Error as ClientError, HeaderBackend, Result as ClientResult};
7use sp_messenger::messages::{ChainId, ChannelId, Nonce};
8use sp_runtime::traits::{Block as BlockT, NumberFor};
9use sp_runtime::{SaturatedConversion, Saturating};
10use std::sync::Arc;
11
12const CHANNEL_PROCESSED_STATE: &[u8] = b"channel_processed_state";
13const OUTBOX_MESSAGES_PREFIX: &[u8] = b"outbox_messages";
14const INBOX_RESPONSE_MESSAGES_PREFIX: &[u8] = b"inbox_responses_messages";
15
16fn channel_processed_state_key(
17    prefix: &[u8],
18    src_chain_id: ChainId,
19    dst_chain_id: ChainId,
20    channel_id: ChannelId,
21) -> Vec<u8> {
22    (
23        CHANNEL_PROCESSED_STATE,
24        prefix,
25        src_chain_id,
26        dst_chain_id,
27        channel_id,
28    )
29        .encode()
30}
31
32fn load_decode<Backend: AuxStore, T: Decode>(
33    backend: &Backend,
34    key: &[u8],
35) -> ClientResult<Option<T>> {
36    match backend.get_aux(key)? {
37        None => Ok(None),
38        Some(t) => T::decode(&mut &t[..])
39            .map_err(|e| {
40                ClientError::Backend(format!("Relayer DB is corrupted. Decode error: {e}"))
41            })
42            .map(Some),
43    }
44}
45
46/// Channel processed state for given dst_chain and channel ID.
47#[derive(Debug, Encode, Decode, Clone)]
48pub struct ChannelProcessedState<Block: BlockT> {
49    // Block number of chain at which the channel state is updated
50    pub block_number: NumberFor<Block>,
51    /// Block hash of the chain at which the channel state is updated.
52    pub block_hash: Block::Hash,
53    /// Channel identifier.
54    pub channel_id: ChannelId,
55    /// Last processed channel nonce.
56    pub nonce: Option<Nonce>,
57}
58
59/// Load the channel outbox processed state
60fn get_channel_outbox_processed_state<Backend, Block: BlockT>(
61    backend: &Backend,
62    src_chain_id: ChainId,
63    dst_chain_id: ChainId,
64    channel_id: ChannelId,
65) -> ClientResult<Option<ChannelProcessedState<Block>>>
66where
67    Backend: AuxStore,
68{
69    load_decode::<_, ChannelProcessedState<Block>>(
70        backend,
71        channel_processed_state_key(
72            OUTBOX_MESSAGES_PREFIX,
73            src_chain_id,
74            dst_chain_id,
75            channel_id,
76        )
77        .as_slice(),
78    )
79}
80
81/// Load the channel inbox response processed state
82fn get_channel_inbox_message_response_processed_state<Backend, Block: BlockT>(
83    backend: &Backend,
84    src_chain_id: ChainId,
85    dst_chain_id: ChainId,
86    channel_id: ChannelId,
87) -> ClientResult<Option<ChannelProcessedState<Block>>>
88where
89    Backend: AuxStore,
90{
91    load_decode::<_, ChannelProcessedState<Block>>(
92        backend,
93        channel_processed_state_key(
94            INBOX_RESPONSE_MESSAGES_PREFIX,
95            src_chain_id,
96            dst_chain_id,
97            channel_id,
98        )
99        .as_slice(),
100    )
101}
102
103/// Set the channel outbox processed state
104pub(crate) fn set_channel_outbox_processed_state<Backend, Block: BlockT>(
105    backend: &Backend,
106    src_chain_id: ChainId,
107    dst_chain_id: ChainId,
108    channel_state: ChannelProcessedState<Block>,
109) -> ClientResult<()>
110where
111    Backend: AuxStore,
112{
113    backend.insert_aux(
114        &[(
115            channel_processed_state_key(
116                OUTBOX_MESSAGES_PREFIX,
117                src_chain_id,
118                dst_chain_id,
119                channel_state.channel_id,
120            )
121            .as_slice(),
122            channel_state.encode().as_slice(),
123        )],
124        vec![],
125    )
126}
127
128/// Set the channel inbox processed state
129pub(crate) fn set_channel_inbox_response_processed_state<Backend, Block: BlockT>(
130    backend: &Backend,
131    src_chain_id: ChainId,
132    dst_chain_id: ChainId,
133    channel_state: ChannelProcessedState<Block>,
134) -> ClientResult<()>
135where
136    Backend: AuxStore,
137{
138    backend.insert_aux(
139        &[(
140            channel_processed_state_key(
141                INBOX_RESPONSE_MESSAGES_PREFIX,
142                src_chain_id,
143                dst_chain_id,
144                channel_state.channel_id,
145            )
146            .as_slice(),
147            channel_state.encode().as_slice(),
148        )],
149        vec![],
150    )
151}
152
153/// Last processed nonce data.
154#[derive(Debug, Clone)]
155pub(crate) struct LastProcessedNonces {
156    pub outbox_nonce: Option<Nonce>,
157    pub inbox_response_nonce: Option<Nonce>,
158}
159
160/// Returns non expired last processed nonces.
161pub(crate) fn get_last_processed_nonces<Backend, Client, Block>(
162    backend: &Backend,
163    client: &Arc<Client>,
164    latest_hash: Block::Hash,
165    src_chain_id: ChainId,
166    dst_chain_id: ChainId,
167    channel_id: ChannelId,
168) -> ClientResult<LastProcessedNonces>
169where
170    Backend: AuxStore,
171    Block: BlockT,
172    Client: HeaderBackend<Block>,
173{
174    let last_processed_outbox_nonce = get_channel_outbox_processed_state::<_, Block>(
175        backend,
176        src_chain_id,
177        dst_chain_id,
178        channel_id,
179    )?
180    .and_then(|state| {
181        is_last_processed_nonce_valid(
182            client,
183            latest_hash,
184            state.block_number,
185            state.block_hash,
186            state.nonce,
187        )
188        .ok()
189        .flatten()
190    });
191
192    let last_processed_inbox_response_nonce = get_channel_inbox_message_response_processed_state::<
193        _,
194        Block,
195    >(
196        backend, src_chain_id, dst_chain_id, channel_id
197    )?
198    .and_then(|state| {
199        is_last_processed_nonce_valid(
200            client,
201            latest_hash,
202            state.block_number,
203            state.block_hash,
204            state.nonce,
205        )
206        .ok()
207        .flatten()
208    });
209
210    Ok(LastProcessedNonces {
211        outbox_nonce: last_processed_outbox_nonce,
212        inbox_response_nonce: last_processed_inbox_response_nonce,
213    })
214}
215
216fn is_last_processed_nonce_valid<Client, Block>(
217    client: &Arc<Client>,
218    latest_hash: Block::Hash,
219    processed_block_number: NumberFor<Block>,
220    processed_block_hash: Block::Hash,
221    last_process_nonce: Option<Nonce>,
222) -> ClientResult<Option<Nonce>>
223where
224    Block: BlockT,
225    Client: HeaderBackend<Block>,
226{
227    // short circuit if there is no last processed nonce
228    if last_process_nonce.is_none() {
229        return Ok(None);
230    }
231
232    // there is no block at this number, could be due to re-org
233    let Some(block_hash) = client.hash(processed_block_number)? else {
234        return Ok(None);
235    };
236
237    // hash mismatch could be due to re-org
238    if block_hash != processed_block_hash {
239        return Ok(None);
240    }
241
242    let Some(latest_number) = client.number(latest_hash)? else {
243        return Ok(None);
244    };
245
246    // if the cache limit is reached, return none
247    if latest_number.saturating_sub(processed_block_number)
248        > CHANNEL_PROCESSED_STATE_CACHE_LIMIT.saturated_into()
249    {
250        Ok(None)
251    } else {
252        Ok(last_process_nonce)
253    }
254}