Skip to content

Commit 6d1e368

Browse files
committed
[chronik] Add a subscription to txid
Summary: So we can monitor the status change for a single txid. Typically: subscribe, broadcast, check it enters the mempool, check it finalizes. Test Plan: ./test/functional/test_runner.py chronik_ws Reviewers: #bitcoin_abc, PiRK Reviewed By: #bitcoin_abc, PiRK Differential Revision: https://reviews.bitcoinabc.org/D18230
1 parent 6264d6e commit 6d1e368

File tree

8 files changed

+303
-5
lines changed

8 files changed

+303
-5
lines changed

chronik/chronik-db/src/groups/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
mod lokad_id;
88
mod script;
99
mod token_id;
10+
mod txid;
1011

1112
pub use self::lokad_id::*;
1213
pub use self::script::*;
1314
pub use self::token_id::*;
15+
pub use self::txid::*;

chronik/chronik-db/src/groups/txid.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2025 The Bitcoin developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
use bitcoinsuite_core::{hash::Hashed, tx::TxId};
6+
7+
use crate::{
8+
group::{Group, GroupQuery, MemberItem, UtxoDataOutput},
9+
io::{GroupHistoryConf, GroupUtxoConf},
10+
};
11+
12+
/// Group txs by txid.
13+
#[derive(Clone, Debug)]
14+
pub struct TxIdGroup;
15+
16+
impl Group for TxIdGroup {
17+
type Aux = ();
18+
type Iter<'a> = Vec<MemberItem<TxId>>;
19+
type Member<'a> = TxId;
20+
type MemberSer = [u8; 32];
21+
type UtxoData = UtxoDataOutput;
22+
23+
fn input_members<'a>(
24+
&self,
25+
_query: GroupQuery<'a>,
26+
_aux: &(),
27+
) -> Self::Iter<'a> {
28+
// Don't use; actual parsing happens in output_members
29+
vec![]
30+
}
31+
32+
fn output_members<'a>(
33+
&self,
34+
query: GroupQuery<'a>,
35+
_aux: &(),
36+
) -> Self::Iter<'a> {
37+
vec![MemberItem {
38+
idx: 0,
39+
member: query.tx.txid(),
40+
}]
41+
}
42+
43+
fn ser_member(&self, member: &Self::Member<'_>) -> Self::MemberSer {
44+
member.hash().to_be_bytes()
45+
}
46+
47+
fn is_hash_member_supported(&self) -> bool {
48+
false
49+
}
50+
51+
fn ser_hash_member(&self, _member: &Self::Member<'_>) -> [u8; 32] {
52+
unimplemented!("There is no use case for hashing TxIdGroup")
53+
}
54+
55+
fn tx_history_conf() -> GroupHistoryConf {
56+
panic!("TxIdGroup should not be used to group history")
57+
}
58+
59+
fn utxo_conf() -> GroupUtxoConf {
60+
panic!("TxIdGroup should not be used to group UTXOs")
61+
}
62+
}

chronik/chronik-http/src/ws.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{collections::HashMap, time::Duration};
88

99
use abc_rust_error::Result;
1010
use axum::extract::ws::{self, WebSocket};
11-
use bitcoinsuite_core::script::ScriptVariant;
11+
use bitcoinsuite_core::{script::ScriptVariant, tx::TxId};
1212
use bitcoinsuite_slp::{lokad_id::LokadId, token_id::TokenId};
1313
use chronik_db::plugins::PluginMember;
1414
use chronik_indexer::{
@@ -61,20 +61,23 @@ struct WsSub {
6161

6262
enum WsSubType {
6363
Blocks,
64+
TxId(TxId),
6465
Script(ScriptVariant),
6566
TokenId(TokenId),
6667
LokadId(LokadId),
6768
PluginGroup(String, Vec<u8>),
6869
}
6970

7071
type SubRecvBlocks = Option<broadcast::Receiver<BlockMsg>>;
72+
type SubRecvTxIds = HashMap<TxId, broadcast::Receiver<TxMsg>>;
7173
type SubRecvScripts = HashMap<ScriptVariant, broadcast::Receiver<TxMsg>>;
7274
type SubRecvTokenId = HashMap<TokenId, broadcast::Receiver<TxMsg>>;
7375
type SubRecvLokadId = HashMap<LokadId, broadcast::Receiver<TxMsg>>;
7476
type SubRecvPluginGroups = HashMap<PluginGroup, broadcast::Receiver<TxMsg>>;
7577

7678
struct SubRecv {
7779
blocks: SubRecvBlocks,
80+
txids: SubRecvTxIds,
7881
scripts: SubRecvScripts,
7982
token_ids: SubRecvTokenId,
8083
lokad_ids: SubRecvLokadId,
@@ -87,6 +90,7 @@ impl SubRecv {
8790
tokio::select! {
8891
biased;
8992
action = Self::recv_blocks(&mut self.blocks) => action,
93+
action = Self::recv_txids(&mut self.txids) => action,
9094
action = Self::recv_scripts(&mut self.scripts) => action,
9195
action = Self::recv_token_ids(&mut self.token_ids) => action,
9296
action = Self::recv_lokad_ids(&mut self.lokad_ids) => action,
@@ -102,6 +106,18 @@ impl SubRecv {
102106
}
103107
}
104108

109+
async fn recv_txids(txids: &mut SubRecvTxIds) -> Result<WsAction> {
110+
if txids.is_empty() {
111+
futures::future::pending().await
112+
} else {
113+
let txids_receivers = select_all(
114+
txids.values_mut().map(|receiver| Box::pin(receiver.recv())),
115+
);
116+
let (tx_msg, _, _) = txids_receivers.await;
117+
sub_tx_msg_action(tx_msg)
118+
}
119+
}
120+
105121
#[allow(clippy::mutable_key_type)]
106122
async fn recv_scripts(scripts: &mut SubRecvScripts) -> Result<WsAction> {
107123
if scripts.is_empty() {
@@ -191,6 +207,17 @@ impl SubRecv {
191207
}
192208
}
193209
}
210+
WsSubType::TxId(txid) => {
211+
if sub.is_unsub {
212+
log_chronik!("WS unsubscribe from txid {txid}\n");
213+
std::mem::drop(self.txids.remove(&txid));
214+
subs.subs_txid_mut().unsubscribe_from_member(&txid)
215+
} else {
216+
log_chronik!("WS subscribe to txid {txid}\n");
217+
let recv = subs.subs_txid_mut().subscribe_to_member(&txid);
218+
self.txids.insert(txid, recv);
219+
}
220+
}
194221
WsSubType::Script(script_variant) => {
195222
let script = script_variant.to_script();
196223
if sub.is_unsub {
@@ -298,6 +325,9 @@ fn sub_client_msg_action(
298325
sub_type: match sub.sub_type {
299326
None => return Err(MissingSubType.into()),
300327
Some(SubType::Blocks(_)) => WsSubType::Blocks,
328+
Some(SubType::Txid(txid)) => {
329+
WsSubType::TxId(txid.txid.parse::<TxId>()?)
330+
}
301331
Some(SubType::Script(script)) => {
302332
WsSubType::Script(parse_script_variant(
303333
&script.script_type,
@@ -397,6 +427,7 @@ pub async fn handle_subscribe_socket(
397427
) {
398428
let mut recv = SubRecv {
399429
blocks: Default::default(),
430+
txids: Default::default(),
400431
scripts: Default::default(),
401432
token_ids: Default::default(),
402433
lokad_ids: Default::default(),

chronik/chronik-indexer/src/subs.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use bitcoinsuite_core::{
1111
tx::{OutPoint, Tx},
1212
};
1313
use chronik_db::{
14-
groups::{LokadIdGroup, ScriptGroup, TokenIdGroup, TokenIdGroupAux},
14+
groups::{
15+
LokadIdGroup, ScriptGroup, TokenIdGroup, TokenIdGroupAux, TxIdGroup,
16+
},
1517
io::BlockHeight,
1618
plugins::PluginsGroup,
1719
};
@@ -56,6 +58,7 @@ const BLOCK_CHANNEL_CAPACITY: usize = 16;
5658
#[derive(Debug)]
5759
pub struct Subs {
5860
subs_block: broadcast::Sender<BlockMsg>,
61+
subs_txid: SubsGroup<TxIdGroup>,
5962
subs_script: SubsGroup<ScriptGroup>,
6063
subs_token_id: SubsGroup<TokenIdGroup>,
6164
subs_lokad_id: SubsGroup<LokadIdGroup>,
@@ -67,6 +70,7 @@ impl Subs {
6770
pub fn new(script_group: ScriptGroup) -> Self {
6871
Subs {
6972
subs_block: broadcast::channel(BLOCK_CHANNEL_CAPACITY).0,
73+
subs_txid: SubsGroup::new(TxIdGroup),
7074
subs_script: SubsGroup::new(script_group),
7175
subs_token_id: SubsGroup::new(TokenIdGroup),
7276
subs_lokad_id: SubsGroup::new(LokadIdGroup),
@@ -79,6 +83,11 @@ impl Subs {
7983
self.subs_block.subscribe()
8084
}
8185

86+
/// Mutable reference to the txid subscribers.
87+
pub fn subs_txid_mut(&mut self) -> &mut SubsGroup<TxIdGroup> {
88+
&mut self.subs_txid
89+
}
90+
8291
/// Mutable reference to the script subscribers.
8392
pub fn subs_script_mut(&mut self) -> &mut SubsGroup<ScriptGroup> {
8493
&mut self.subs_script
@@ -107,6 +116,7 @@ impl Subs {
107116
token_id_aux: &TokenIdGroupAux,
108117
plugin_outputs: &BTreeMap<OutPoint, PluginOutput>,
109118
) {
119+
self.subs_txid.handle_tx_event(tx, &(), msg_type);
110120
self.subs_script.handle_tx_event(tx, &(), msg_type);
111121
self.subs_token_id
112122
.handle_tx_event(tx, token_id_aux, msg_type);
@@ -123,7 +133,8 @@ impl Subs {
123133
token_id_aux: &TokenIdGroupAux,
124134
plugin_outputs: &BTreeMap<OutPoint, PluginOutput>,
125135
) {
126-
if self.subs_script.is_empty()
136+
if self.subs_txid.is_empty()
137+
&& self.subs_script.is_empty()
127138
&& self.subs_token_id.is_empty()
128139
&& self.subs_lokad_id.is_empty()
129140
&& self.subs_plugins.is_empty()
@@ -132,6 +143,7 @@ impl Subs {
132143
return;
133144
}
134145
for tx in txs {
146+
self.subs_txid.handle_tx_event(tx, &(), msg_type);
135147
self.subs_script.handle_tx_event(tx, &(), msg_type);
136148
self.subs_token_id
137149
.handle_tx_event(tx, token_id_aux, msg_type);

chronik/chronik-proto/proto/chronik.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,8 @@ message WsSub {
463463
WsSubLokadId lokad_id = 5;
464464
// Subscription to a plugin group
465465
WsPlugin plugin = 6;
466+
// Subscription to a txid
467+
WsSubTxId txid = 7;
466468
}
467469
}
468470

@@ -507,6 +509,12 @@ message WsPlugin {
507509
bytes group = 2;
508510
}
509511

512+
// Subscription to a txid. They will be sent every time a tx status confirmation changes.
513+
message WsSubTxId {
514+
// Hex txid to subscribe to.
515+
string txid = 1;
516+
}
517+
510518
// Message coming from the WebSocket
511519
message WsMsg {
512520
// Kind of message

0 commit comments

Comments
 (0)