1use crate::RELAYER_PREFIX;
4use parity_scale_codec::{Decode, Encode};
5use sc_client_api::backend::AuxStore;
6use sp_blockchain::{Error as ClientError, Info, Result as ClientResult};
7use sp_core::bytes::to_hex;
8use sp_core::H256;
9use sp_messenger::messages::{ChainId, ChannelId, ChannelState, Nonce};
10use sp_messenger::{ChannelNonce, XdmId};
11use sp_runtime::traits::{Block as BlockT, NumberFor};
12use subspace_runtime_primitives::BlockNumber;
13
14const CHANNEL_DETAIL: &[u8] = b"channel_detail";
15const LOG_TARGET: &str = "gossip_aux_schema";
16
17fn channel_detail_key(
18 src_chain_id: ChainId,
19 self_chain_id: ChainId,
20 channel_id: ChannelId,
21) -> Vec<u8> {
22 (CHANNEL_DETAIL, src_chain_id, self_chain_id, channel_id).encode()
23}
24
25fn load_decode<Backend: AuxStore, T: Decode>(
26 backend: &Backend,
27 key: &[u8],
28) -> ClientResult<Option<T>> {
29 match backend.get_aux(key)? {
30 None => Ok(None),
31 Some(t) => T::decode(&mut &t[..])
32 .map_err(|e| {
33 ClientError::Backend(format!("Relayer DB is corrupted. Decode error: {e}"))
34 })
35 .map(Some),
36 }
37}
38
39#[derive(Debug, Encode, Decode, Clone)]
41pub struct ChannelDetail {
42 pub block_number: BlockNumber,
44 pub block_hash: H256,
46 pub state_root: H256,
48 pub channel_id: ChannelId,
50 pub state: ChannelState,
52 pub next_inbox_nonce: Nonce,
54 pub next_outbox_nonce: Nonce,
56 pub latest_response_received_message_nonce: Option<Nonce>,
58}
59
60pub fn get_channel_state<Backend>(
62 backend: &Backend,
63 src_chain_id: ChainId,
64 self_chain_id: ChainId,
65 channel_id: ChannelId,
66) -> ClientResult<Option<ChannelDetail>>
67where
68 Backend: AuxStore,
69{
70 load_decode(
71 backend,
72 channel_detail_key(src_chain_id, self_chain_id, channel_id).as_slice(),
73 )
74}
75
76pub fn set_channel_state<Backend>(
78 backend: &Backend,
79 src_chain_id: ChainId,
80 self_chain_id: ChainId,
81 channel_detail: ChannelDetail,
82) -> ClientResult<()>
83where
84 Backend: AuxStore,
85{
86 backend.insert_aux(
87 &[(
88 channel_detail_key(src_chain_id, self_chain_id, channel_detail.channel_id).as_slice(),
89 channel_detail.encode().as_slice(),
90 )],
91 vec![],
92 )?;
93
94 let channel_nonce = ChannelNonce {
95 relay_msg_nonce: Some(channel_detail.next_inbox_nonce),
96 relay_response_msg_nonce: channel_detail.latest_response_received_message_nonce,
97 };
98 let prefix = (RELAYER_PREFIX, src_chain_id, self_chain_id).encode();
99 cleanup_chain_channel_storages(
100 backend,
101 &prefix,
102 src_chain_id,
103 channel_detail.channel_id,
104 channel_nonce,
105 )
106}
107
108mod xdm_keys {
109 use parity_scale_codec::Encode;
110 use sp_domains::{ChainId, ChannelId};
111 use sp_messenger::messages::MessageKey;
112 use sp_messenger::XdmId;
113
114 const XDM: &[u8] = b"xdm";
115 const XDM_RELAY: &[u8] = b"relay_msg";
116 const XDM_RELAY_RESPONSE: &[u8] = b"relay_msg_response";
117 const XDM_LAST_CLEANUP_NONCE: &[u8] = b"xdm_last_cleanup_nonce";
118
119 pub(super) fn get_key_for_xdm_id(prefix: &[u8], xdm_id: XdmId) -> Vec<u8> {
120 match xdm_id {
121 XdmId::RelayMessage(id) => get_key_for_xdm_relay(prefix, id),
122 XdmId::RelayResponseMessage(id) => get_key_for_xdm_relay_response(prefix, id),
123 }
124 }
125
126 pub(super) fn get_key_for_last_cleanup_relay_nonce(
127 prefix: &[u8],
128 chain_id: ChainId,
129 channel_id: ChannelId,
130 ) -> Vec<u8> {
131 (
132 prefix,
133 XDM,
134 XDM_RELAY,
135 XDM_LAST_CLEANUP_NONCE,
136 chain_id,
137 channel_id,
138 )
139 .encode()
140 }
141
142 pub(super) fn get_key_for_last_cleanup_relay_response_nonce(
143 prefix: &[u8],
144 chain_id: ChainId,
145 channel_id: ChannelId,
146 ) -> Vec<u8> {
147 (
148 prefix,
149 XDM,
150 XDM_RELAY_RESPONSE,
151 XDM_LAST_CLEANUP_NONCE,
152 chain_id,
153 channel_id,
154 )
155 .encode()
156 }
157
158 pub(super) fn get_key_for_xdm_relay(prefix: &[u8], id: MessageKey) -> Vec<u8> {
159 (prefix, XDM, XDM_RELAY, id).encode()
160 }
161
162 pub(super) fn get_key_for_xdm_relay_response(prefix: &[u8], id: MessageKey) -> Vec<u8> {
163 (prefix, XDM, XDM_RELAY_RESPONSE, id).encode()
164 }
165}
166
167#[derive(Debug, Encode, Decode, Clone)]
168pub struct BlockId<Block: BlockT> {
169 pub number: NumberFor<Block>,
170 pub hash: Block::Hash,
171}
172
173impl<Block: BlockT> From<Info<Block>> for BlockId<Block> {
174 fn from(value: Info<Block>) -> Self {
175 BlockId {
176 number: value.best_number,
177 hash: value.best_hash,
178 }
179 }
180}
181
182pub fn set_xdm_message_processed_at<Backend, Block>(
184 backend: &Backend,
185 prefix: &[u8],
186 xdm_id: XdmId,
187 block_id: BlockId<Block>,
188) -> ClientResult<()>
189where
190 Backend: AuxStore,
191 Block: BlockT,
192{
193 let key = xdm_keys::get_key_for_xdm_id(prefix, xdm_id);
194 backend.insert_aux(&[(key.as_slice(), block_id.encode().as_slice())], vec![])
195}
196
197pub fn get_xdm_processed_block_number<Backend, Block>(
199 backend: &Backend,
200 prefix: &[u8],
201 xdm_id: XdmId,
202) -> ClientResult<Option<BlockId<Block>>>
203where
204 Backend: AuxStore,
205 Block: BlockT,
206{
207 load_decode(
208 backend,
209 xdm_keys::get_key_for_xdm_id(prefix, xdm_id).as_slice(),
210 )
211}
212
213pub fn cleanup_chain_channel_storages<Backend>(
215 backend: &Backend,
216 prefix: &[u8],
217 chain_id: ChainId,
218 channel_id: ChannelId,
219 channel_nonce: ChannelNonce,
220) -> ClientResult<()>
221where
222 Backend: AuxStore,
223{
224 let mut to_insert = vec![];
225 let mut to_delete = vec![];
226 if let Some(latest_relay_nonce) = channel_nonce.relay_msg_nonce {
227 let last_cleanup_relay_nonce_key =
228 xdm_keys::get_key_for_last_cleanup_relay_nonce(prefix, chain_id, channel_id);
229 let last_cleaned_up_nonce =
230 load_decode::<_, Nonce>(backend, last_cleanup_relay_nonce_key.as_slice())?;
231
232 let mut from_nonce = match last_cleaned_up_nonce {
233 None => Nonce::zero(),
234 Some(last_nonce) => last_nonce.saturating_add(Nonce::one()),
235 };
236
237 tracing::debug!(
238 target: LOG_TARGET,
239 "[{:?}]Cleaning Relay xdm keys for {:?} channel: {:?} from: {:?} to: {:?}",
240 to_hex(prefix, false),
241 chain_id,
242 channel_id,
243 from_nonce,
244 latest_relay_nonce
245 );
246
247 while from_nonce <= latest_relay_nonce {
248 to_delete.push(xdm_keys::get_key_for_xdm_relay(
249 prefix,
250 (chain_id, channel_id, from_nonce),
251 ));
252 from_nonce = from_nonce.saturating_add(Nonce::one());
253 }
254
255 to_insert.push((last_cleanup_relay_nonce_key, latest_relay_nonce.encode()));
256 }
257
258 if let Some(latest_relay_response_nonce) = channel_nonce.relay_response_msg_nonce {
259 let last_cleanup_relay_response_nonce_key =
260 xdm_keys::get_key_for_last_cleanup_relay_response_nonce(prefix, chain_id, channel_id);
261 let last_cleaned_up_nonce =
262 load_decode::<_, Nonce>(backend, last_cleanup_relay_response_nonce_key.as_slice())?;
263
264 let mut from_nonce = match last_cleaned_up_nonce {
265 None => Nonce::zero(),
266 Some(last_nonce) => last_nonce.saturating_add(Nonce::one()),
267 };
268
269 tracing::debug!(
270 target: LOG_TARGET,
271 "[{:?}]Cleaning Relay response xdm keys for {:?} channel: {:?} from: {:?} to: {:?}",
272 to_hex(prefix, false),
273 chain_id,
274 channel_id,
275 from_nonce,
276 latest_relay_response_nonce
277 );
278
279 while from_nonce <= latest_relay_response_nonce {
280 to_delete.push(xdm_keys::get_key_for_xdm_relay_response(
281 prefix,
282 (chain_id, channel_id, from_nonce),
283 ));
284 from_nonce = from_nonce.saturating_add(Nonce::one());
285 }
286
287 to_insert.push((
288 last_cleanup_relay_response_nonce_key,
289 latest_relay_response_nonce.encode(),
290 ));
291 }
292
293 backend.insert_aux(
294 &to_insert
295 .iter()
296 .map(|(k, v)| (k.as_slice(), v.as_slice()))
297 .collect::<Vec<_>>(),
298 &to_delete.iter().map(|k| k.as_slice()).collect::<Vec<_>>(),
299 )
300}