Indexer WebSocket 连接稳定性优化

AI-摘要
sonia33 GPT
AI初始化中...
介绍自己 🙈
生成本文简介 👋
推荐相关文章 📖
前往主页 🏠
前往爱发电购买
Indexer WebSocket 连接稳定性优化
SoniaChenIndexer WebSocket 连接稳定性优化
1. 问题发现
在开发 Solana Indexer 时,在测试网遇到 WebSocket 连接频繁断开的问题。
错误现象
日志显示以下错误:
WebSocket connection failed or droppedProtocol(ResetWithoutClosingHandshake)
这导致无法稳定捕获程序日志,影响数据处理的实时性。
2. 根本原因分析
经过代码分析,发现问题的根本原因是异步阻塞:耗时操作阻塞了 WebSocket 主循环,导致无法及时响应心跳。
问题细节
-
阻塞来源:
为了处理重复交易,我将耗时逻辑(JSON 解析、事件构造、数据库发送)放到了 WebSocket 接收循环中。 -
阻塞后果:
在tokio::select!异步环境中,耗时操作占用执行时间,导致心跳响应延迟。 -
协议违规:
WebSocket 要求及时响应 Ping 帧。阻塞导致 Pong 响应延迟,服务器认为连接“假死”而强制断开。
架构问题
原始设计将 I/O 和业务逻辑耦合在同一线程:
[原始架构]
WebSocket 接收线程
├── 接收消息
├── 解析 JSON (耗时)
├── 构造事件
├── 发送到下游 (await)
└── 响应心跳 (被阻塞)
结论: 业务逻辑阻塞破坏了 WebSocket 协议要求,是连接不稳定的根本原因。
3. 解决方案
采用生产者-消费者模型重构架构,将 I/O 操作与业务逻辑分离。
核心思路
- 生产者(WebSocket 主循环): 专注接收消息,快速入队到 Channel,非阻塞处理。
- 消费者(后台 Worker): 异步处理耗时操作,不阻塞网络层。
关键改进
- 解耦 I/O 和业务逻辑: 使用
tokio::sync::mpsc::UnboundedSender实现消息队列。 - 保持心跳响应: 主循环轻量级,确保及时响应 Ping/Pong。
- 添加健康监控: 自动检测连接状态,超时重连。
核心代码实现
以下是优化后的 IndexerFetcher 模块核心代码,完整实现了生产者-消费者模型:
use futures::{SinkExt, StreamExt};
use serde_json::Value;
use solana_client::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use sqlx::{MySql, Pool};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::time::{interval, timeout, Instant};
use tokio_tungstenite::tungstenite::Message;
use crate::event_parser::try_parse_event;
use crate::events::ProgramEvent;
/// A log entry paired with its transaction signature, ready for processing
type LogTask = (String, String);
pub struct IndexerFetcher {
rpc_client: RpcClient,
program_id: Pubkey,
db_pool: Arc<Pool<MySql>>,
event_sender: Sender<ProgramEvent>,
}
impl Clone for IndexerFetcher {
fn clone(&self) -> Self {
Self {
rpc_client: RpcClient::new(self.rpc_client.url()),
program_id: self.program_id,
db_pool: self.db_pool.clone(),
event_sender: self.event_sender.clone(),
}
}
}
impl IndexerFetcher {
pub fn new(
rpc_url: &str,
program_id: Pubkey,
db_pool: Arc<Pool<MySql>>,
event_sender: Sender<ProgramEvent>,
) -> Self {
let rpc_client = RpcClient::new(rpc_url.to_string());
Self {
rpc_client,
program_id,
db_pool,
event_sender,
}
}
/// Starts WebSocket connection with automatic reconnection on failures.
///
/// This is the main entry point for the indexer. It handles:
/// - Initial connection establishment
/// - Automatic reconnection with exponential backoff
/// - Connection health monitoring
///
/// The function runs indefinitely and will only return if explicitly stopped.
pub async fn start_listening(&self) -> Result<(), String> {
let ws_url = self.rpc_url_to_ws_url();
let mut reconnect_delay = Duration::from_secs(2);
let max_reconnect_delay = Duration::from_secs(60);
let mut consecutive_failures = 0u32;
loop {
println!(
"Attempting WebSocket connection: {} (delay: {:?}, failures: {})",
ws_url, reconnect_delay, consecutive_failures
);
match self.listen_for_logs(&ws_url).await {
Ok(_) => {
// Normal disconnection (rare) - reset backoff and retry immediately
println!("WebSocket connection closed normally, reconnecting...");
consecutive_failures = 0;
reconnect_delay = Duration::from_secs(2);
}
Err(e) => {
consecutive_failures += 1;
eprintln!(
"WebSocket error: {}. Failures: {}. Reconnecting in {:?}",
e, consecutive_failures, reconnect_delay
);
// Wait before reconnecting
tokio::time::sleep(reconnect_delay).await;
// Implement gradual backoff strategy:
// - First 5 failures: retry quickly (2s)
// - After 5 failures: gradually increase delay
if consecutive_failures < 5 {
reconnect_delay = Duration::from_secs(2);
} else {
reconnect_delay = std::cmp::min(
reconnect_delay + Duration::from_secs(5),
max_reconnect_delay
);
}
}
}
}
}
/// Processes incoming WebSocket messages and extracts log data.
///
/// This function is NON-BLOCKING in the receive loop. It quickly enqueues
/// log tasks to a background worker without awaiting.
///
/// Handles different message types:
/// - Subscription confirmations (initial handshake)
/// - Log notifications containing transaction data
async fn process_websocket_message(
&self,
text: &str,
processing_tx: &UnboundedSender<LogTask>,
) -> Result<(), String> {
let json: Value =
serde_json::from_str(text).map_err(|e| format!("Failed to parse JSON: {}", e))?;
// Handle subscription confirmation response
if json.get("result").is_some() && json.get("id").is_some() {
println!("Subscription confirmed (id: {})", json["id"]);
return Ok(());
}
// Extract transaction signature from the notification
let transaction_signature = json
.get("params")
.and_then(|p| p.get("result"))
.and_then(|r| r.get("value"))
.and_then(|v| v.get("signature"))
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string();
// Process log entries if present
if let Some(log_data) = json
.get("params")
.and_then(|p| p.get("result"))
.and_then(|r| r.get("value"))
.and_then(|v| v.get("logs"))
.and_then(|l| l.as_array())
{
println!("Received {} log entries from tx: {}",
log_data.len(),
&transaction_signature[..8.min(transaction_signature.len())]
);
for log_str_val in log_data {
if let Some(log_str) = log_str_val.as_str() {
// CRITICAL: Non-blocking enqueue to background worker
// This ensures the receive loop can continue processing control frames
if let Err(_) = processing_tx.send((log_str.to_string(), transaction_signature.clone())) {
// Channel closed, worker likely died - will be recreated on reconnect
eprintln!("Failed to enqueue log: worker channel closed");
}
}
}
}
Ok(())
}
/// Manages a single WebSocket connection lifecycle.
///
/// This function handles:
/// - Initial connection with timeout
/// - Subscription to program logs
/// - Message processing loop with health checks
/// - Background worker management
/// - Graceful disconnection handling
///
/// Returns Ok(()) on normal disconnection, Err() on connection failures.
async fn listen_for_logs(&self, ws_url: &str) -> Result<(), String> {
// Connect to WebSocket with timeout protection
println!("Connecting to: {}", ws_url);
let connect_future = tokio_tungstenite::connect_async(ws_url);
let timeout_duration = Duration::from_secs(30);
let (mut ws_stream, _) = match timeout(timeout_duration, connect_future).await {
Ok(Ok(result)) => {
println!("Connected successfully");
result
}
Ok(Err(e)) => {
return Err(format!("Connection failed: {}", e));
}
Err(_) => {
return Err(format!("Connection timeout after {:?}", timeout_duration));
}
};
// Subscribe to program logs with finalized commitment
let subscribe_message = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{ "mentions": [self.program_id.to_string()] },
{ "commitment": "finalized" }
]
});
ws_stream
.send(Message::Text(subscribe_message.to_string()))
.await
.map_err(|e| format!("Failed to send subscription: {}", e))?;
println!("Subscribed to program logs");
// Create background processing channel and worker
// Using unbounded channel for simplicity, but consider bounded with backpressure in production
let (processing_tx, mut processing_rx) = tokio::sync::mpsc::unbounded_channel::<LogTask>();
// Clone self for the worker task
let worker_self = self.clone();
// Spawn background worker to process logs
// This worker runs independently and can await I/O operations without blocking the receive loop
let worker_handle = tokio::spawn(async move {
let mut processed_count = 0u64;
while let Some((log_str, tx_sig)) = processing_rx.recv().await {
worker_self.parse_and_send(&log_str, &tx_sig).await;
processed_count += 1;
}
println!("Background worker exiting. Processed {} logs", processed_count);
});
// Initialize connection health monitoring
let mut ping_interval = interval(Duration::from_secs(30)); // Send ping every 30s
let mut last_message_time = Instant::now();
let message_timeout = Duration::from_secs(90); // Detect stale connection after 90s
// Main message processing loop with concurrent health checks
let loop_result = loop {
tokio::select! {
// Handle incoming WebSocket messages
msg = ws_stream.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
last_message_time = Instant::now();
if let Err(e) = self.process_websocket_message(&text, &processing_tx).await {
eprintln!("Error processing message: {:?}", e);
// Continue processing other messages despite errors
}
}
Some(Ok(Message::Ping(payload))) => {
last_message_time = Instant::now();
// Respond to server ping immediately
if let Err(e) = ws_stream.send(Message::Pong(payload)).await {
eprintln!("Failed to send pong: {:?}", e);
break Err(format!("Pong send failed: {}", e));
}
}
Some(Ok(Message::Pong(_))) => {
last_message_time = Instant::now();
// Pong received, connection is alive
}
Some(Ok(Message::Close(frame))) => {
println!("Server closed connection: {:?}", frame);
break Ok(());
}
Some(Ok(other)) => {
println!("Received unexpected message type: {:?}", other);
}
Some(Err(e)) => {
eprintln!("Stream error: {:?}", e);
break Err(format!("Stream error: {}", e));
}
None => {
println!("Stream ended");
break Ok(());
}
}
}
// Send periodic ping to keep connection alive
_ = ping_interval.tick() => {
if let Err(e) = ws_stream.send(Message::Ping(vec![])).await {
eprintln!("Failed to send ping: {:?}", e);
break Err(format!("Ping send failed: {}", e));
}
}
// Detect stale connections (no messages received for too long)
_ = tokio::time::sleep(Duration::from_secs(5)) => {
if last_message_time.elapsed() > message_timeout {
eprintln!("No messages for {:?}, connection appears dead",
message_timeout);
break Err("Connection timeout".to_string());
}
}
}
};
// Connection closed - clean up
// Drop the sender to signal the worker to finish
drop(processing_tx);
// Wait for worker to finish processing remaining items (with timeout)
match timeout(Duration::from_secs(10), worker_handle).await {
Ok(Ok(())) => {
println!("Background worker shut down cleanly");
}
Ok(Err(e)) => {
eprintln!("Background worker panicked: {:?}", e);
}
Err(_) => {
eprintln!("Background worker shutdown timeout - may have pending items");
}
}
loop_result
}
/// Parses a log string and sends the event to the processing channel.
///
/// This function:
/// 1. Attempts to parse the log string into a known event type
/// 2. Attaches the transaction signature to the parsed event
/// 3. Sends the event to the worker thread via the channel
///
/// If parsing fails (unrecognized log format), the log is silently ignored.
async fn parse_and_send(&self, log_str: &str, transaction_signature: &str) {
if let Some(mut parsed_event) = try_parse_event(log_str) {
// Attach transaction signature to the parsed event
match &mut parsed_event {
ProgramEvent::DonationCompleted {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::RewardsProcessed {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::DonationNFTMinted {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::FortuneDrawn {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::WishCreated {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::WishTowerUpdated {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::IncenseBurned {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::AmuletDropped {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::AmuletMinted {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::ShopConfigUpdated {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
ProgramEvent::FortuneNFTMinted {
transaction_signature: ts,
..
} => *ts = transaction_signature.to_string(),
}
// Send to downstream worker thread for database processing
if let Err(e) = self.event_sender.send(parsed_event).await {
eprintln!("CRITICAL: Failed to send event to downstream worker: {:?}. Worker may have crashed!", e);
}
}
}
/// Converts HTTP(S) RPC URL to WebSocket URL.
///
/// Handles different URL schemes:
/// - https:// -> wss://
/// - http:// -> ws://
/// - Special case: local testnet (8899 -> 8900)
fn rpc_url_to_ws_url(&self) -> String {
let url = self.rpc_client.url();
// Special handling for local testnet: RPC on 8899, WebSocket on 8900
if url.starts_with("http://127.0.0.1:8899") || url.starts_with("http://localhost:8899") {
return "ws://127.0.0.1:8900".to_string();
}
// Standard HTTPS -> WSS conversion
if url.starts_with("https://") {
return url.replacen("https://", "wss://", 1);
}
// Standard HTTP -> WS conversion
if url.starts_with("http://") {
return url.replacen("http://", "ws://", 1);
}
// Fallback for URLs without scheme
format!(
"wss://{}",
url.trim_start_matches("https://")
.trim_start_matches("http://")
)
}
}
4. 优化效果
应用新架构后,WebSocket 连接不再频繁断开,错误日志消失。测试网环境下,连接稳定性显著提升,数据捕获更加可靠。
5. 总结
通过生产者-消费者模型解决了异步阻塞问题:
- 核心改进: I/O 与业务逻辑解耦,保持心跳响应及时。
- 技术要点: 使用 Channel 实现非阻塞消息传递,后台 Worker 处理耗时操作。
- 经验: 在异步网络编程中,严格分离 I/O 和业务逻辑至关重要。







