Parcel #yt0sha96zxqcse7
Created by Anonymous
Public
Created May 8, 2025 Expires in 1 day
Loading editor...
pub async fn mempool_subscriber(event_sender: Sender<Event>, mut bundle_rx: Receiver<Vec<BasePacket>>) { // For deduping transactions let cache = TimedMap::new(); // Track individual restart events within a 30-minute window let mut restart_events = Vec::new(); let error_threshold = 3; // Panic after 3 errors within the timeframe let error_window_secs = 30 * 60; // 30 minutes in seconds let track_restart = |events: &mut Vec<u64>, error_msg: &str| { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); events.push(now); let cutoff = now.saturating_sub(error_window_secs); events.retain(|×tamp| timestamp >= cutoff); if events.len() >= error_threshold { panic!("{}: {} errors within 30 minutes", error_msg, error_threshold); } }; let tx_sub_url = get_env("TX_SUB_URL"); loop { info!("Connecting to {}", tx_sub_url); let stream_result = TcpStream::connect(&tx_sub_url).await; if let Err(err) = stream_result { error!("Failed to connect to mempool stream: {}", err); track_restart(&mut restart_events, "Failed to connect to mempool stream after"); tokio::time::sleep(Duration::from_millis(100)).await; continue; } let stream = stream_result.unwrap(); let (mempool_stream, bundle_stream) = stream.into_split(); let mut mempool_stream = FramedRead::new(mempool_stream, MempoolCodec::default()); let mut bundle_stream = FramedWrite::new(bundle_stream, BundleStreamCodec::default()); let (write_tx, mut write_rx) = tokio::sync::mpsc::channel(100); let write_task = tokio::spawn(async move { loop { match write_rx.recv().await { Some(bundle) => { for bundle in std::iter::once(bundle) .chain(std::iter::from_fn(|| write_rx.try_recv().ok())) { if let Err(e) = bundle_stream.feed(bundle).await { error!("Error on Mempool stream: {e}"); } if let Err(e) = bundle_stream.flush().await { error!("Error on Mempool stream: {e}"); break; } } } None => { break; } } } }); loop { if write_task.is_finished() { error!("Write task failed."); track_restart(&mut restart_events, "Error on Mempool stream"); break; } tokio::select! { opt = bundle_rx.recv() => match opt { Some(bundle) => { if let Err(e) = write_tx.try_send(bundle) { panic!("unable to send bundle to write task: {e}"); } } None => { break; } }, opt = mempool_stream.next() => match opt { Some(Ok((source, msg))) => { let event = match msg { MempoolMsg::Packet(tx) => { let Ok(tx) = bincode::deserialize::<VersionedTransaction>(&tx) else { increment_counter!("mempool.err_deserialize_packet"); continue; }; // Skip if this is a duplicate transaction. let sig = tx.get_signature(); if cache.get(sig).is_some() { increment_counter!("mempool.duplicate_txs"); continue; }; // Insert the new signature with a 30s expiry. increment_counter!("mempool.unique_txs"); cache.insert(*sig, true, Duration::from_secs(30)); Event::VersionedTransaction { source, tx } } MempoolMsg::Bundle(bundle) => { let Ok(bundle) = bundle .into_iter() .try_fold(Vec::default(), |mut bundle, tx| { let tx = bincode::deserialize::<VersionedTransaction>(&tx)?; bundle.push(tx); Ok::<_, Box<bincode::ErrorKind>>(bundle) }) else { increment_counter!("mempool.err_deserialize_bundle"); continue; }; Event::Bundle { source, bundle } } }; if let Err(e) = event_sender.try_send(event) { error!("failed to send event: {e}"); } } Some(Err(err)) => { error!("Error on Mempool stream: {err}"); track_restart(&mut restart_events, "Error on Mempool stream"); break; } None => { break; } } } } write_task.abort(); tokio::time::sleep(Duration::from_millis(100)).await; increment_counter!("mempool.reconnection_attempts"); } }