import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:kaer_with_panels/app/common/app_run_data.dart'; import 'package:kaer_with_panels/app/utils/kr_log_util.dart'; /// WebSocket 服务类 /// 用于处理与服务器的 WebSocket 连接、心跳和消息处理 class KrSocketService { // 单例实例 static final KrSocketService _instance = KrSocketService._internal(); // 私有变量 WebSocket? _socket; StreamSubscription? _socketSubscription; // 添加订阅管理 Timer? _heartbeatTimer; Timer? _heartbeatTimeoutTimer; int _heartbeatTimeoutCount = 0; Timer? _reconnectTimer; Timer? _vpnStateChangeTimer; String? _baseUrl; String? _userId; String? _deviceNumber; String? _token; // 消息处理回调 Function(Map)? _onMessageCallback; // 连接状态回调 Function(bool)? _onConnectionStateCallback; int _reconnectAttempts = 0; // 连接状态 bool _isConnecting = false; bool _isConnected = false; // 连接状态检查 bool _isConnectionStable = false; Timer? _connectionStabilityTimer; static const Duration _connectionStabilityTimeout = Duration(seconds: 10); // 心跳相关 static const int _maxHeartbeatTimeout = 3; // 最大心跳超时次数 static const Duration _heartbeatTimeout = Duration(seconds: 10); // 心跳响应超时时间 // 私有构造函数 KrSocketService._internal(); // 工厂构造函数 factory KrSocketService() => _instance; // 获取实例 static KrSocketService get instance => _instance; /// 初始化 WebSocket 服务 void kr_init({ required String baseUrl, required String userId, required String deviceNumber, required String token, }) { _baseUrl = baseUrl; _userId = userId; _deviceNumber = deviceNumber; _token = token; } /// 设置消息处理回调 void setOnMessageCallback(Function(Map) callback) { _onMessageCallback = callback; } /// 设置连接状态回调 void setOnConnectionStateCallback(Function(bool) callback) { _onConnectionStateCallback = callback; } /// 连接到 WebSocket 服务器 Future connect() async { if (_isConnecting || _isConnected) { KRLogUtil.kr_i('WebSocket 正在连接或已连接,跳过重复连接', tag: 'WebSocket'); return; } _isConnecting = true; KRLogUtil.kr_i('开始连接 WebSocket...', tag: 'WebSocket'); try { // 确保 URL 使用 ws:// 或 wss:// 协议 final uri = Uri.parse(_baseUrl!.startsWith('http') ? _baseUrl!.replaceFirst('http', 'ws') : _baseUrl!); // 构建 WebSocket URL,确保格式正确 final wsUrl = Uri( scheme: uri.scheme, host: uri.host, port: uri.port, path: '/v1/app/ws/$_userId/$_deviceNumber', ).toString(); KRLogUtil.kr_i('连接地址: $wsUrl', tag: 'WebSocket'); // 清理旧的连接 _cleanup(); // 创建 WebSocket 连接 _socket = await WebSocket.connect( wsUrl, headers: { 'Authorization': _token!, 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Version': '13', 'Sec-WebSocket-Key': 'dGhlIHNhbXBsZSBub25jZQ==', }, ); // 设置消息监听并保存订阅 _socketSubscription = _socket!.listen( (message) { KRLogUtil.kr_i('收到消息: $message', tag: 'WebSocket'); _handleMessage(message); }, onError: (error) { KRLogUtil.kr_e('WebSocket 错误: $error', tag: 'WebSocket'); _handleConnectionError(); }, onDone: () { KRLogUtil.kr_i('WebSocket 连接关闭', tag: 'WebSocket'); _handleConnectionError(); }, ); KRLogUtil.kr_i('WebSocket 连接成功', tag: 'WebSocket'); _isConnected = true; _isConnecting = false; _reconnectAttempts = 0; // 等待一小段时间后再发送心跳 await Future.delayed(const Duration(seconds: 1)); // 开始心跳 _startHeartbeat(); _onConnectionStateCallback?.call(true); } catch (e, stackTrace) { KRLogUtil.kr_e('WebSocket 连接失败: $e', tag: 'WebSocket'); KRLogUtil.kr_e('错误堆栈: $stackTrace', tag: 'WebSocket'); _isConnecting = false; _handleConnectionError(); } } /// 处理连接错误 void _handleConnectionError() { _cleanup(); // 检查是否已登录 if (!KRAppRunData.getInstance().kr_isLogin.value) { KRLogUtil.kr_i('用户已退出登录,停止重连', tag: 'WebSocket'); return; } _reconnectAttempts++; // 使用固定 5 秒的重连间隔 const backoffDelay = Duration(seconds: 5); KRLogUtil.kr_i('尝试重连 (第 $_reconnectAttempts 次, 间隔: ${backoffDelay.inSeconds}秒)...', tag: 'WebSocket'); _reconnectTimer?.cancel(); _reconnectTimer = Timer(backoffDelay, () { connect(); }); } /// 开始心跳 void _startHeartbeat() { _heartbeatTimer?.cancel(); _heartbeatTimeoutTimer?.cancel(); _heartbeatTimeoutCount = 0; // 确保连接成功后再发送心跳 if (_isConnected) { KRLogUtil.kr_i('发送初始心跳...', tag: 'WebSocket'); sendMessage('ping'); _heartbeatTimer = Timer.periodic(const Duration(seconds: 20), (timer) { if (_isConnected) { KRLogUtil.kr_i('发送心跳...', tag: 'WebSocket'); sendMessage('ping'); // 启动心跳响应超时检测 _heartbeatTimeoutTimer?.cancel(); _heartbeatTimeoutTimer = Timer(_heartbeatTimeout, () { _heartbeatTimeoutCount++; KRLogUtil.kr_w('心跳响应超时 (第 $_heartbeatTimeoutCount 次)', tag: 'WebSocket'); if (_heartbeatTimeoutCount >= _maxHeartbeatTimeout) { KRLogUtil.kr_e('心跳响应连续超时 $_maxHeartbeatTimeout 次,主动断开重连', tag: 'WebSocket'); _handleConnectionError(); } }); } else { timer.cancel(); _heartbeatTimeoutTimer?.cancel(); } }); } } /// 处理接收到的消息 void _handleMessage(dynamic message) { try { if (message is String) { if (message == 'ping') { KRLogUtil.kr_i('收到心跳响应', tag: 'WebSocket'); // 重置心跳超时计数 _heartbeatTimeoutCount = 0; _heartbeatTimeoutTimer?.cancel(); return; } final Map data = json.decode(message); KRLogUtil.kr_i('处理消息: ${json.encode(data)}', tag: 'WebSocket'); _onMessageCallback?.call(data); } } catch (e) { KRLogUtil.kr_e('消息处理错误: $e', tag: 'WebSocket'); } } /// 发送消息 void sendMessage(String message) { try { if (!_isConnected) { KRLogUtil.kr_w('WebSocket 未连接,无法发送消息', tag: 'WebSocket'); return; } if (_socket == null) { KRLogUtil.kr_w('WebSocket 实例为空,无法发送消息', tag: 'WebSocket'); return; } _socket!.add(message); KRLogUtil.kr_i('发送消息: $message', tag: 'WebSocket'); } catch (e) { KRLogUtil.kr_e('发送消息失败: $e', tag: 'WebSocket'); _handleConnectionError(); } } /// 发送 JSON 消息 void sendJsonMessage(Map message) { try { if (!_isConnected) { KRLogUtil.kr_w('WebSocket 未连接,无法发送消息', tag: 'WebSocket'); return; } final jsonString = json.encode(message); sendMessage(jsonString); } catch (e) { KRLogUtil.kr_e('发送 JSON 消息失败: $e', tag: 'WebSocket'); _handleConnectionError(); } } /// 清理资源 void _cleanup() { _heartbeatTimer?.cancel(); _heartbeatTimeoutTimer?.cancel(); _reconnectTimer?.cancel(); _vpnStateChangeTimer?.cancel(); _connectionStabilityTimer?.cancel(); // 取消订阅 _socketSubscription?.cancel(); _socketSubscription = null; _socket?.close(); _socket = null; _heartbeatTimer = null; _heartbeatTimeoutTimer = null; _reconnectTimer = null; _vpnStateChangeTimer = null; _connectionStabilityTimer = null; _isConnected = false; _isConnecting = false; _isConnectionStable = false; _heartbeatTimeoutCount = 0; } /// 关闭连接 Future disconnect() async { KRLogUtil.kr_i('关闭 WebSocket 连接', tag: 'WebSocket'); _cleanup(); _onConnectionStateCallback?.call(false); } /// 检查连接状态 bool get isConnected => _isConnected; }