Parcel #yt0sha96zxqcse7

Created by Anonymous
Public

Created May 8, 2025 Expires in 1 day

Loading editor...

pub async fn mempool_subscriber(event_sender: Sender<Event>, mut bundle_rx: Receiver<Vec<BasePacket>>) {
    // For deduping transactions
    let cache = TimedMap::new();

    // Track individual restart events within a 30-minute window
    let mut restart_events = Vec::new();
    let error_threshold = 3; // Panic after 3 errors within the timeframe
    let error_window_secs = 30 * 60; // 30 minutes in seconds

    let track_restart = |events: &mut Vec<u64>, error_msg: &str| {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();

        events.push(now);
        let cutoff = now.saturating_sub(error_window_secs);
        events.retain(|&timestamp| timestamp >= cutoff);

        if events.len() >= error_threshold {
            panic!("{}: {} errors within 30 minutes", error_msg, error_threshold);
        }
    };

    let tx_sub_url = get_env("TX_SUB_URL");

    loop {
        info!("Connecting to {}", tx_sub_url);

        let stream_result = TcpStream::connect(&tx_sub_url).await;

        if let Err(err) = stream_result {
            error!("Failed to connect to mempool stream: {}", err);
            track_restart(&mut restart_events, "Failed to connect to mempool stream after");
            tokio::time::sleep(Duration::from_millis(100)).await;
            continue;
        }

        let stream = stream_result.unwrap();
        let (mempool_stream, bundle_stream) = stream.into_split();
        let mut mempool_stream = FramedRead::new(mempool_stream, MempoolCodec::default());
        let mut bundle_stream = FramedWrite::new(bundle_stream, BundleStreamCodec::default());

        let (write_tx, mut write_rx) = tokio::sync::mpsc::channel(100);

        let write_task = tokio::spawn(async move {
            loop {
                match write_rx.recv().await {
                    Some(bundle) => {
                        for bundle in std::iter::once(bundle)
                            .chain(std::iter::from_fn(|| write_rx.try_recv().ok())) {
                            if let Err(e) = bundle_stream.feed(bundle).await {
                                error!("Error on Mempool stream: {e}");
                            }
                            if let Err(e) = bundle_stream.flush().await {
                                error!("Error on Mempool stream: {e}");
                                break;
                            }
                        }
                    }
                    None => {
                        break;
                    }
                }
            }
        });

        loop {
            if write_task.is_finished() {
                error!("Write task failed.");
                track_restart(&mut restart_events, "Error on Mempool stream");
                break;
            }
            tokio::select! {
                opt = bundle_rx.recv() => match opt {
                    Some(bundle) => {
                        if let Err(e) = write_tx.try_send(bundle) {
                            panic!("unable to send bundle to write task: {e}");
                        }
                    }
                    None => {
                        break;
                    }
                },
                opt = mempool_stream.next() => match opt {
                    Some(Ok((source, msg))) => {
                        let event = match msg {
                            MempoolMsg::Packet(tx) => {
                                let Ok(tx) = bincode::deserialize::<VersionedTransaction>(&tx) else {
                                    increment_counter!("mempool.err_deserialize_packet");
                                    continue;
                                };
            
                                // Skip if this is a duplicate transaction.
                                let sig = tx.get_signature();
                                if cache.get(sig).is_some() {
                                    increment_counter!("mempool.duplicate_txs");
                                    continue;
                                };
            
                                // Insert the new signature with a 30s expiry.
                                increment_counter!("mempool.unique_txs");
                                cache.insert(*sig, true, Duration::from_secs(30));
            
                                Event::VersionedTransaction { source, tx }
                            }
                            MempoolMsg::Bundle(bundle) => {
                                let Ok(bundle) =
                                    bundle
                                        .into_iter()
                                        .try_fold(Vec::default(), |mut bundle, tx| {
                                            let tx = bincode::deserialize::<VersionedTransaction>(&tx)?;
                                            bundle.push(tx);
            
                                            Ok::<_, Box<bincode::ErrorKind>>(bundle)
                                        })
                                else {
                                    increment_counter!("mempool.err_deserialize_bundle");
                                    continue;
                                };
            
                                Event::Bundle { source, bundle }
                            }
                        };
            
                        if let Err(e) = event_sender.try_send(event) {
                            error!("failed to send event: {e}");
                        }
                        
                        
                    }
                    Some(Err(err)) => {
                        error!("Error on Mempool stream: {err}");
                        track_restart(&mut restart_events, "Error on Mempool stream");
                        break;
                    }
                    None => {
                       break;
                    }
                }
            }

                        
        }

        write_task.abort();

        tokio::time::sleep(Duration::from_millis(100)).await;
        increment_counter!("mempool.reconnection_attempts");
    }
}