Indexer WebSocket 连接稳定性优化

Indexer WebSocket 连接稳定性优化

1. 问题发现

在开发 Solana Indexer 时,在测试网遇到 WebSocket 连接频繁断开的问题。

错误现象

日志显示以下错误:

  • WebSocket connection failed or dropped
  • Protocol(ResetWithoutClosingHandshake)

这导致无法稳定捕获程序日志,影响数据处理的实时性。

2. 根本原因分析

经过代码分析,发现问题的根本原因是异步阻塞:耗时操作阻塞了 WebSocket 主循环,导致无法及时响应心跳。

问题细节

  1. 阻塞来源:
    为了处理重复交易,我将耗时逻辑(JSON 解析、事件构造、数据库发送)放到了 WebSocket 接收循环中。

  2. 阻塞后果:
    tokio::select! 异步环境中,耗时操作占用执行时间,导致心跳响应延迟。

  3. 协议违规:
    WebSocket 要求及时响应 Ping 帧。阻塞导致 Pong 响应延迟,服务器认为连接“假死”而强制断开。

架构问题

原始设计将 I/O 和业务逻辑耦合在同一线程:

[原始架构]
WebSocket 接收线程
├── 接收消息
├── 解析 JSON (耗时)
├── 构造事件
├── 发送到下游 (await)
└── 响应心跳 (被阻塞)

结论: 业务逻辑阻塞破坏了 WebSocket 协议要求,是连接不稳定的根本原因。

3. 解决方案

采用生产者-消费者模型重构架构,将 I/O 操作与业务逻辑分离。

核心思路

  • 生产者(WebSocket 主循环): 专注接收消息,快速入队到 Channel,非阻塞处理。
  • 消费者(后台 Worker): 异步处理耗时操作,不阻塞网络层。

关键改进

  1. 解耦 I/O 和业务逻辑: 使用 tokio::sync::mpsc::UnboundedSender 实现消息队列。
  2. 保持心跳响应: 主循环轻量级,确保及时响应 Ping/Pong。
  3. 添加健康监控: 自动检测连接状态,超时重连。

核心代码实现

以下是优化后的 IndexerFetcher 模块核心代码,完整实现了生产者-消费者模型:

use futures::{SinkExt, StreamExt};
use serde_json::Value;
use solana_client::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use sqlx::{MySql, Pool};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::time::{interval, timeout, Instant};
use tokio_tungstenite::tungstenite::Message;

use crate::event_parser::try_parse_event;
use crate::events::ProgramEvent;

/// A log entry paired with its transaction signature, ready for processing
type LogTask = (String, String);

pub struct IndexerFetcher {
    rpc_client: RpcClient,
    program_id: Pubkey,
    db_pool: Arc<Pool<MySql>>,
    event_sender: Sender<ProgramEvent>,
}

impl Clone for IndexerFetcher {
    fn clone(&self) -> Self {
        Self {
            rpc_client: RpcClient::new(self.rpc_client.url()),
            program_id: self.program_id,
            db_pool: self.db_pool.clone(),
            event_sender: self.event_sender.clone(),
        }
    }
}

impl IndexerFetcher {
    pub fn new(
        rpc_url: &str,
        program_id: Pubkey,
        db_pool: Arc<Pool<MySql>>,
        event_sender: Sender<ProgramEvent>,
    ) -> Self {
        let rpc_client = RpcClient::new(rpc_url.to_string());
        Self {
            rpc_client,
            program_id,
            db_pool,
            event_sender,
        }
    }

    /// Starts WebSocket connection with automatic reconnection on failures.
    /// 
    /// This is the main entry point for the indexer. It handles:
    /// - Initial connection establishment
    /// - Automatic reconnection with exponential backoff
    /// - Connection health monitoring
    /// 
    /// The function runs indefinitely and will only return if explicitly stopped.
    pub async fn start_listening(&self) -> Result<(), String> {
        let ws_url = self.rpc_url_to_ws_url();
        let mut reconnect_delay = Duration::from_secs(2);
        let max_reconnect_delay = Duration::from_secs(60);
        let mut consecutive_failures = 0u32;

        loop {
            println!(
                "Attempting WebSocket connection: {} (delay: {:?}, failures: {})",
                ws_url, reconnect_delay, consecutive_failures
            );
            
            match self.listen_for_logs(&ws_url).await {
                Ok(_) => {
                    // Normal disconnection (rare) - reset backoff and retry immediately
                    println!("WebSocket connection closed normally, reconnecting...");
                    consecutive_failures = 0;
                    reconnect_delay = Duration::from_secs(2);
                }
                Err(e) => {
                    consecutive_failures += 1;
                    eprintln!(
                        "WebSocket error: {}. Failures: {}. Reconnecting in {:?}",
                        e, consecutive_failures, reconnect_delay
                    );

                    // Wait before reconnecting
                    tokio::time::sleep(reconnect_delay).await;

                    // Implement gradual backoff strategy:
                    // - First 5 failures: retry quickly (2s)
                    // - After 5 failures: gradually increase delay
                    if consecutive_failures < 5 {
                        reconnect_delay = Duration::from_secs(2);
                    } else {
                        reconnect_delay = std::cmp::min(
                            reconnect_delay + Duration::from_secs(5),
                            max_reconnect_delay
                        );
                    }
                }
            }
        }
    }

    /// Processes incoming WebSocket messages and extracts log data.
    /// 
    /// This function is NON-BLOCKING in the receive loop. It quickly enqueues
    /// log tasks to a background worker without awaiting.
    /// 
    /// Handles different message types:
    /// - Subscription confirmations (initial handshake)
    /// - Log notifications containing transaction data
    async fn process_websocket_message(
        &self,
        text: &str,
        processing_tx: &UnboundedSender<LogTask>,
    ) -> Result<(), String> {
        let json: Value =
            serde_json::from_str(text).map_err(|e| format!("Failed to parse JSON: {}", e))?;

        // Handle subscription confirmation response
        if json.get("result").is_some() && json.get("id").is_some() {
            println!("Subscription confirmed (id: {})", json["id"]);
            return Ok(());
        }

        // Extract transaction signature from the notification
        let transaction_signature = json
            .get("params")
            .and_then(|p| p.get("result"))
            .and_then(|r| r.get("value"))
            .and_then(|v| v.get("signature"))
            .and_then(|s| s.as_str())
            .unwrap_or("")
            .to_string();

        // Process log entries if present
        if let Some(log_data) = json
            .get("params")
            .and_then(|p| p.get("result"))
            .and_then(|r| r.get("value"))
            .and_then(|v| v.get("logs"))
            .and_then(|l| l.as_array())
        {
            println!("Received {} log entries from tx: {}", 
                log_data.len(), 
                &transaction_signature[..8.min(transaction_signature.len())]
            );
            
            for log_str_val in log_data {
                if let Some(log_str) = log_str_val.as_str() {
                    // CRITICAL: Non-blocking enqueue to background worker
                    // This ensures the receive loop can continue processing control frames
                    if let Err(_) = processing_tx.send((log_str.to_string(), transaction_signature.clone())) {
                        // Channel closed, worker likely died - will be recreated on reconnect
                        eprintln!("Failed to enqueue log: worker channel closed");
                    }
                }
            }
        }

        Ok(())
    }

    /// Manages a single WebSocket connection lifecycle.
    /// 
    /// This function handles:
    /// - Initial connection with timeout
    /// - Subscription to program logs
    /// - Message processing loop with health checks
    /// - Background worker management
    /// - Graceful disconnection handling
    /// 
    /// Returns Ok(()) on normal disconnection, Err() on connection failures.
    async fn listen_for_logs(&self, ws_url: &str) -> Result<(), String> {
        // Connect to WebSocket with timeout protection
        println!("Connecting to: {}", ws_url);
        let connect_future = tokio_tungstenite::connect_async(ws_url);
        let timeout_duration = Duration::from_secs(30);

        let (mut ws_stream, _) = match timeout(timeout_duration, connect_future).await {
            Ok(Ok(result)) => {
                println!("Connected successfully");
                result
            }
            Ok(Err(e)) => {
                return Err(format!("Connection failed: {}", e));
            }
            Err(_) => {
                return Err(format!("Connection timeout after {:?}", timeout_duration));
            }
        };

        // Subscribe to program logs with finalized commitment
        let subscribe_message = serde_json::json!({
            "jsonrpc": "2.0",
            "id": 1,
            "method": "logsSubscribe",
            "params": [
                { "mentions": [self.program_id.to_string()] },
                { "commitment": "finalized" }
            ]
        });

        ws_stream
            .send(Message::Text(subscribe_message.to_string()))
            .await
            .map_err(|e| format!("Failed to send subscription: {}", e))?;
        
        println!("Subscribed to program logs");

        // Create background processing channel and worker
        // Using unbounded channel for simplicity, but consider bounded with backpressure in production
        let (processing_tx, mut processing_rx) = tokio::sync::mpsc::unbounded_channel::<LogTask>();
        
        // Clone self for the worker task
        let worker_self = self.clone();
        
        // Spawn background worker to process logs
        // This worker runs independently and can await I/O operations without blocking the receive loop
        let worker_handle = tokio::spawn(async move {
            let mut processed_count = 0u64;
            while let Some((log_str, tx_sig)) = processing_rx.recv().await {
                worker_self.parse_and_send(&log_str, &tx_sig).await;
                processed_count += 1;
            }
            println!("Background worker exiting. Processed {} logs", processed_count);
        });

        // Initialize connection health monitoring
        let mut ping_interval = interval(Duration::from_secs(30)); // Send ping every 30s
        let mut last_message_time = Instant::now();
        let message_timeout = Duration::from_secs(90); // Detect stale connection after 90s

        // Main message processing loop with concurrent health checks
        let loop_result = loop {
            tokio::select! {
                // Handle incoming WebSocket messages
                msg = ws_stream.next() => {
                    match msg {
                        Some(Ok(Message::Text(text))) => {
                            last_message_time = Instant::now();
                            
                            if let Err(e) = self.process_websocket_message(&text, &processing_tx).await {
                                eprintln!("Error processing message: {:?}", e);
                                // Continue processing other messages despite errors
                            }
                        }
                        Some(Ok(Message::Ping(payload))) => {
                            last_message_time = Instant::now();
                            // Respond to server ping immediately
                            if let Err(e) = ws_stream.send(Message::Pong(payload)).await {
                                eprintln!("Failed to send pong: {:?}", e);
                                break Err(format!("Pong send failed: {}", e));
                            }
                        }
                        Some(Ok(Message::Pong(_))) => {
                            last_message_time = Instant::now();
                            // Pong received, connection is alive
                        }
                        Some(Ok(Message::Close(frame))) => {
                            println!("Server closed connection: {:?}", frame);
                            break Ok(());
                        }
                        Some(Ok(other)) => {
                            println!("Received unexpected message type: {:?}", other);
                        }
                        Some(Err(e)) => {
                            eprintln!("Stream error: {:?}", e);
                            break Err(format!("Stream error: {}", e));
                        }
                        None => {
                            println!("Stream ended");
                            break Ok(());
                        }
                    }
                }
                
                // Send periodic ping to keep connection alive
                _ = ping_interval.tick() => {
                    if let Err(e) = ws_stream.send(Message::Ping(vec![])).await {
                        eprintln!("Failed to send ping: {:?}", e);
                        break Err(format!("Ping send failed: {}", e));
                    }
                }
                
                // Detect stale connections (no messages received for too long)
                _ = tokio::time::sleep(Duration::from_secs(5)) => {
                    if last_message_time.elapsed() > message_timeout {
                        eprintln!("No messages for {:?}, connection appears dead", 
                            message_timeout);
                        break Err("Connection timeout".to_string());
                    }
                }
            }
        };

        // Connection closed - clean up
        // Drop the sender to signal the worker to finish
        drop(processing_tx);
        
        // Wait for worker to finish processing remaining items (with timeout)
        match timeout(Duration::from_secs(10), worker_handle).await {
            Ok(Ok(())) => {
                println!("Background worker shut down cleanly");
            }
            Ok(Err(e)) => {
                eprintln!("Background worker panicked: {:?}", e);
            }
            Err(_) => {
                eprintln!("Background worker shutdown timeout - may have pending items");
            }
        }

        loop_result
    }

    /// Parses a log string and sends the event to the processing channel.
    /// 
    /// This function:
    /// 1. Attempts to parse the log string into a known event type
    /// 2. Attaches the transaction signature to the parsed event
    /// 3. Sends the event to the worker thread via the channel
    /// 
    /// If parsing fails (unrecognized log format), the log is silently ignored.
    async fn parse_and_send(&self, log_str: &str, transaction_signature: &str) {
        if let Some(mut parsed_event) = try_parse_event(log_str) {
            // Attach transaction signature to the parsed event
            match &mut parsed_event {
                ProgramEvent::DonationCompleted {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::RewardsProcessed {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::DonationNFTMinted {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::FortuneDrawn {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::WishCreated {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::WishTowerUpdated {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::IncenseBurned {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::AmuletDropped {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::AmuletMinted {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::ShopConfigUpdated {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
                ProgramEvent::FortuneNFTMinted {
                    transaction_signature: ts,
                    ..
                } => *ts = transaction_signature.to_string(),
            }

            // Send to downstream worker thread for database processing
            if let Err(e) = self.event_sender.send(parsed_event).await {
                eprintln!("CRITICAL: Failed to send event to downstream worker: {:?}. Worker may have crashed!", e);
            }
        }
    }

    /// Converts HTTP(S) RPC URL to WebSocket URL.
    /// 
    /// Handles different URL schemes:
    /// - https:// -> wss://
    /// - http:// -> ws://
    /// - Special case: local testnet (8899 -> 8900)
    fn rpc_url_to_ws_url(&self) -> String {
        let url = self.rpc_client.url();
        
        // Special handling for local testnet: RPC on 8899, WebSocket on 8900
        if url.starts_with("http://127.0.0.1:8899") || url.starts_with("http://localhost:8899") {
            return "ws://127.0.0.1:8900".to_string();
        }
        
        // Standard HTTPS -> WSS conversion
        if url.starts_with("https://") {
            return url.replacen("https://", "wss://", 1);
        }
        
        // Standard HTTP -> WS conversion
        if url.starts_with("http://") {
            return url.replacen("http://", "ws://", 1);
        }
        
        // Fallback for URLs without scheme
        format!(
            "wss://{}",
            url.trim_start_matches("https://")
                .trim_start_matches("http://")
        )
    }
}

4. 优化效果

应用新架构后,WebSocket 连接不再频繁断开,错误日志消失。测试网环境下,连接稳定性显著提升,数据捕获更加可靠。

5. 总结

通过生产者-消费者模型解决了异步阻塞问题:

  • 核心改进: I/O 与业务逻辑解耦,保持心跳响应及时。
  • 技术要点: 使用 Channel 实现非阻塞消息传递,后台 Worker 处理耗时操作。
  • 经验: 在异步网络编程中,严格分离 I/O 和业务逻辑至关重要。