Parcel #x1peh1v1832eywe
Created by Anonymous
Public
Created June 1, 2025 Expires in 9 days
Loading editor...
use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use futures::{SinkExt, StreamExt}; use sha2::{Digest, Sha256}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::rpc_client::SerializableTransaction; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::signer::Signer; use solana_sdk::system_transaction; use solana_sdk::transaction::VersionedTransaction; use timedmap::TimedMap; use tokio::net::TcpStream; use tokio::select; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Notify; use tokio::time::interval; use tokio_util::codec::{FramedRead, FramedWrite}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use tracing_subscriber; // Import the proper paladin codecs use paladin::{BundleStreamCodec, MempoolCodec, MempoolMsg}; // Event type for the paladin library (mimicking the original) #[derive(Debug, Clone)] pub enum Event { VersionedTransaction { source: Pubkey, tx: VersionedTransaction, }, Bundle { source: Pubkey, bundle: Vec<VersionedTransaction>, }, } // Environment utility function pub fn get_env(key: &str) -> String { std::env::var(key).unwrap_or_else(|_| match key { "TX_SUB_URL" => "198.73.56.166:7002".to_string(), "RPC_URL" => "https://api.mainnet-beta.solana.com".to_string(), _ => panic!("Environment variable {} not set", key), }) } // Track restart events function fn track_restart(restart_events: &mut TimedMap<String, bool>, reason: &str) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); restart_events.insert( format!("{}-{}", reason, now), true, Duration::from_secs(300), ); warn!("Mempool restart: {}", reason); } // Bundle hash function pub fn custom_bundle_hash(transactions: &[VersionedTransaction]) -> String { let mut hasher = Sha256::new(); for tx in transactions { hasher.update(tx.signatures[0]); } hex::encode(hasher.finalize()) } // Metrics macros (simplified for testing) macro_rules! increment_counter { ($name:expr) => { info!("METRIC: {} incremented", $name); }; } macro_rules! histogram { ($name:expr, $value:expr) => { info!("METRIC: {} = {}", $name, $value); }; } // THE EXACT CODE FROM USER pub async fn mempool_subscriber( event_sender: Sender<Event>, mut bundle_rx: Receiver<(Vec<VersionedTransaction>, String)>, ) { let tx_sub_url = get_env("TX_SUB_URL"); // Missing variables that need to be defined let mut cache = TimedMap::new(); let mut restart_events = TimedMap::new(); loop { let exit = Arc::new(CancellationToken::new()); info!("Mempool: Connecting to {}", tx_sub_url); let stream_result = TcpStream::connect(&tx_sub_url).await; if let Err(err) = stream_result { error!("Failed to connect to mempool stream: {}", err); track_restart( &mut restart_events, "Failed to connect to mempool stream after", ); tokio::time::sleep(Duration::from_millis(500)).await; continue; } info!("Mempool: Connected to {}", tx_sub_url); let stream = stream_result.unwrap(); let (mempool_stream, bundle_stream) = stream.into_split(); let mut mempool_stream = FramedRead::new(mempool_stream, MempoolCodec::default()); let mut bundle_stream = FramedWrite::new(bundle_stream, BundleStreamCodec::default()); let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::<(Vec<VersionedTransaction>, String)>(10); let write_task = tokio::spawn(async move { loop { match write_rx.recv().await { Some(bundle) => { let mut total_bundles = 0; for (bundle, bundle_id) in std::iter::once(bundle) .chain(std::iter::from_fn(|| write_rx.try_recv().ok())) { if let Err(e) = bundle_stream.feed(bundle).await { error!("Error on Mempool stream: {e}"); } info!("Sent bundle: {}", bundle_id); total_bundles += 1; } if let Err(e) = bundle_stream.flush().await { error!("Error on Mempool stream: {e}"); break; } info!("Sent {} bundles", total_bundles); } None => { error!("Write receiver closed."); break; } } } }); loop { if write_task.is_finished() { error!("Write task failed."); exit.cancel(); track_restart(&mut restart_events, "Error on Mempool stream"); break; } tokio::select! { opt = bundle_rx.recv() => match opt { Some((bundle, bundle_id)) => { if let Err(e) = write_tx.try_send((bundle, bundle_id)) { panic!("unable to send bundle to write task: {e}"); } } None => { exit.cancel(); break; } }, opt = mempool_stream.next() => match opt { Some(Ok((source, msg))) => { let event = match msg { MempoolMsg::Packet(tx) => { let Ok(tx) = bincode::deserialize::<VersionedTransaction>(&tx) else { increment_counter!("mempool.err_deserialize_packet"); continue; }; // Skip if this is a duplicate transaction. let sig = tx.get_signature(); if cache.get(sig).is_some() { increment_counter!("mempool.duplicate_txs"); continue; }; // Insert the new signature with a 30s expiry. increment_counter!("mempool.unique_txs"); cache.insert(*sig, true, Duration::from_secs(30)); Event::VersionedTransaction { source, tx } } MempoolMsg::Bundle(bundle) => { let Ok(bundle) = bundle .into_iter() .try_fold(Vec::default(), |mut bundle, tx| { let tx = bincode::deserialize::<VersionedTransaction>(&tx)?; bundle.push(tx); Ok::<_, Box<bincode::ErrorKind>>(bundle) }) else { increment_counter!("mempool.err_deserialize_bundle"); continue; }; Event::Bundle { source, bundle } } }; if let Err(e) = event_sender.try_send(event) { error!("failed to send event: {e}"); } } Some(Err(err)) => { error!("Error on Mempool stream: {err}"); track_restart(&mut restart_events, "Error on Mempool stream"); exit.cancel(); break; } None => { exit.cancel(); break; } } } } write_task.abort(); exit.cancel(); tokio::time::sleep(Duration::from_millis(500)).await; increment_counter!("mempool.reconnection_attempts"); } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // Initialize tracing for logging with info level tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .init(); println!("=== Testing EXACT Mempool Subscriber Code ==="); println!("This tests the exact code provided by the user"); println!("Connecting to 198.73.56.166:7002...\n"); // Create channels for the subscriber let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<Event>(1000); let (bundle_tx, bundle_rx) = tokio::sync::mpsc::channel::<(Vec<VersionedTransaction>, String)>(10); // Spawn the exact mempool subscriber let subscriber_handle = tokio::spawn(async move { info!("Starting EXACT mempool subscriber implementation"); mempool_subscriber(event_tx, bundle_rx).await; }); // Process events for 30 seconds let start_time = Instant::now(); let timeout_duration = Duration::from_secs(30); let mut event_count = 0; let mut tx_count = 0; let mut bundle_count = 0; println!("š Subscriber started! Processing events for 30 seconds...\n"); loop { if start_time.elapsed() >= timeout_duration { println!("ā° 30 seconds elapsed, stopping test"); break; } match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await { Ok(Some(event)) => { event_count += 1; match event { Event::VersionedTransaction { source, tx } => { tx_count += 1; println!("š¦ Event #{}: Single Transaction", event_count); println!(" Source: {}", source); println!(" Signature: {}", tx.signatures[0]); println!(" Accounts: {}", tx.message.static_account_keys().len()); } Event::Bundle { source, bundle } => { bundle_count += 1; tx_count += bundle.len(); println!( "š¦ Event #{}: Bundle with {} transactions", event_count, bundle.len() ); println!(" Source: {}", source); for (i, tx) in bundle.iter().take(3).enumerate() { println!(" Tx {}: {}", i + 1, tx.signatures[0]); } if bundle.len() > 3 { println!(" ... and {} more transactions", bundle.len() - 3); } } } println!(); // Show progress every 10 events if event_count % 10 == 0 { println!( "š Progress: {} events, {} total transactions, {} bundles", event_count, tx_count, bundle_count ); } } Ok(None) => { println!("Event channel closed"); break; } Err(_) => { // Timeout - no events received continue; } } } println!("\n=== Final Results ==="); println!("Total events received: {}", event_count); println!("Total transactions: {}", tx_count); println!("Total bundles: {}", bundle_count); println!("Test duration: {:.1}s", start_time.elapsed().as_secs_f32()); // Stop the subscriber subscriber_handle.abort(); if event_count > 0 { println!("\nā SUCCESS: The exact code is working and processing events!"); println!("The get_signature() method and all logic is functioning correctly."); } else { println!("\nā ļø No events received - server may not be sending valid paladin data"); println!("But the exact code structure and logic is correctly implemented."); } Ok(()) }