omnAPP/lib/app/services/kr_socket_service.dart
2025-09-23 16:23:15 +08:00

312 lines
8.7 KiB
Dart
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:kaer_with_panels/app/common/app_run_data.dart';
import 'package:kaer_with_panels/app/utils/kr_log_util.dart';
/// WebSocket 服务类
/// 用于处理与服务器的 WebSocket 连接、心跳和消息处理
class KrSocketService {
// 单例实例
static final KrSocketService _instance = KrSocketService._internal();
// 私有变量
WebSocket? _socket;
StreamSubscription? _socketSubscription; // 添加订阅管理
Timer? _heartbeatTimer;
Timer? _heartbeatTimeoutTimer;
int _heartbeatTimeoutCount = 0;
Timer? _reconnectTimer;
Timer? _vpnStateChangeTimer;
String? _baseUrl;
String? _userId;
String? _deviceNumber;
String? _token;
// 消息处理回调
Function(Map<String, dynamic>)? _onMessageCallback;
// 连接状态回调
Function(bool)? _onConnectionStateCallback;
int _reconnectAttempts = 0;
// 连接状态
bool _isConnecting = false;
bool _isConnected = false;
// 连接状态检查
bool _isConnectionStable = false;
Timer? _connectionStabilityTimer;
static const Duration _connectionStabilityTimeout = Duration(seconds: 10);
// 心跳相关
static const int _maxHeartbeatTimeout = 3; // 最大心跳超时次数
static const Duration _heartbeatTimeout = Duration(seconds: 10); // 心跳响应超时时间
// 私有构造函数
KrSocketService._internal();
// 工厂构造函数
factory KrSocketService() => _instance;
// 获取实例
static KrSocketService get instance => _instance;
/// 初始化 WebSocket 服务
void kr_init({
required String baseUrl,
required String userId,
required String deviceNumber,
required String token,
}) {
_baseUrl = baseUrl;
_userId = userId;
_deviceNumber = deviceNumber;
_token = token;
}
/// 设置消息处理回调
void setOnMessageCallback(Function(Map<String, dynamic>) callback) {
_onMessageCallback = callback;
}
/// 设置连接状态回调
void setOnConnectionStateCallback(Function(bool) callback) {
_onConnectionStateCallback = callback;
}
/// 连接到 WebSocket 服务器
Future<void> connect() async {
if (_isConnecting || _isConnected) {
KRLogUtil.kr_i('WebSocket 正在连接或已连接,跳过重复连接', tag: 'WebSocket');
return;
}
_isConnecting = true;
KRLogUtil.kr_i('开始连接 WebSocket...', tag: 'WebSocket');
try {
// 确保 URL 使用 ws:// 或 wss:// 协议
final uri = Uri.parse(_baseUrl!.startsWith('http')
? _baseUrl!.replaceFirst('http', 'ws')
: _baseUrl!);
// 构建 WebSocket URL确保格式正确
final wsUrl = Uri(
scheme: uri.scheme,
host: uri.host,
port: uri.port,
path: '/v1/app/ws/$_userId/$_deviceNumber',
).toString();
KRLogUtil.kr_i('连接地址: $wsUrl', tag: 'WebSocket');
// 清理旧的连接
_cleanup();
// 创建 WebSocket 连接
_socket = await WebSocket.connect(
wsUrl,
headers: {
'Authorization': _token!,
'Upgrade': 'websocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Version': '13',
'Sec-WebSocket-Key': 'dGhlIHNhbXBsZSBub25jZQ==',
},
);
// 设置消息监听并保存订阅
_socketSubscription = _socket!.listen(
(message) {
KRLogUtil.kr_i('收到消息: $message', tag: 'WebSocket');
_handleMessage(message);
},
onError: (error) {
KRLogUtil.kr_e('WebSocket 错误: $error', tag: 'WebSocket');
_handleConnectionError();
},
onDone: () {
KRLogUtil.kr_i('WebSocket 连接关闭', tag: 'WebSocket');
_handleConnectionError();
},
);
KRLogUtil.kr_i('WebSocket 连接成功', tag: 'WebSocket');
_isConnected = true;
_isConnecting = false;
_reconnectAttempts = 0;
// 等待一小段时间后再发送心跳
await Future.delayed(const Duration(seconds: 1));
// 开始心跳
_startHeartbeat();
_onConnectionStateCallback?.call(true);
} catch (e, stackTrace) {
KRLogUtil.kr_e('WebSocket 连接失败: $e', tag: 'WebSocket');
KRLogUtil.kr_e('错误堆栈: $stackTrace', tag: 'WebSocket');
_isConnecting = false;
_handleConnectionError();
}
}
/// 处理连接错误
void _handleConnectionError() {
_cleanup();
// 检查是否已登录
if (!KRAppRunData.getInstance().kr_isLogin.value) {
KRLogUtil.kr_i('用户已退出登录,停止重连', tag: 'WebSocket');
return;
}
_reconnectAttempts++;
// 使用固定 5 秒的重连间隔
const backoffDelay = Duration(seconds: 5);
KRLogUtil.kr_i('尝试重连 (第 $_reconnectAttempts 次, 间隔: ${backoffDelay.inSeconds}秒)...', tag: 'WebSocket');
_reconnectTimer?.cancel();
_reconnectTimer = Timer(backoffDelay, () {
connect();
});
}
/// 开始心跳
void _startHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimeoutTimer?.cancel();
_heartbeatTimeoutCount = 0;
// 确保连接成功后再发送心跳
if (_isConnected) {
KRLogUtil.kr_i('发送初始心跳...', tag: 'WebSocket');
sendMessage('ping');
_heartbeatTimer = Timer.periodic(const Duration(seconds: 20), (timer) {
if (_isConnected) {
KRLogUtil.kr_i('发送心跳...', tag: 'WebSocket');
sendMessage('ping');
// 启动心跳响应超时检测
_heartbeatTimeoutTimer?.cancel();
_heartbeatTimeoutTimer = Timer(_heartbeatTimeout, () {
_heartbeatTimeoutCount++;
KRLogUtil.kr_w('心跳响应超时 (第 $_heartbeatTimeoutCount 次)', tag: 'WebSocket');
if (_heartbeatTimeoutCount >= _maxHeartbeatTimeout) {
KRLogUtil.kr_e('心跳响应连续超时 $_maxHeartbeatTimeout 次,主动断开重连', tag: 'WebSocket');
_handleConnectionError();
}
});
} else {
timer.cancel();
_heartbeatTimeoutTimer?.cancel();
}
});
}
}
/// 处理接收到的消息
void _handleMessage(dynamic message) {
try {
if (message is String) {
if (message == 'ping') {
KRLogUtil.kr_i('收到心跳响应', tag: 'WebSocket');
// 重置心跳超时计数
_heartbeatTimeoutCount = 0;
_heartbeatTimeoutTimer?.cancel();
return;
}
final Map<String, dynamic> data = json.decode(message);
KRLogUtil.kr_i('处理消息: ${json.encode(data)}', tag: 'WebSocket');
_onMessageCallback?.call(data);
}
} catch (e) {
KRLogUtil.kr_e('消息处理错误: $e', tag: 'WebSocket');
}
}
/// 发送消息
void sendMessage(String message) {
try {
if (!_isConnected) {
KRLogUtil.kr_w('WebSocket 未连接,无法发送消息', tag: 'WebSocket');
return;
}
if (_socket == null) {
KRLogUtil.kr_w('WebSocket 实例为空,无法发送消息', tag: 'WebSocket');
return;
}
_socket!.add(message);
KRLogUtil.kr_i('发送消息: $message', tag: 'WebSocket');
} catch (e) {
KRLogUtil.kr_e('发送消息失败: $e', tag: 'WebSocket');
_handleConnectionError();
}
}
/// 发送 JSON 消息
void sendJsonMessage(Map<String, dynamic> message) {
try {
if (!_isConnected) {
KRLogUtil.kr_w('WebSocket 未连接,无法发送消息', tag: 'WebSocket');
return;
}
final jsonString = json.encode(message);
sendMessage(jsonString);
} catch (e) {
KRLogUtil.kr_e('发送 JSON 消息失败: $e', tag: 'WebSocket');
_handleConnectionError();
}
}
/// 清理资源
void _cleanup() {
_heartbeatTimer?.cancel();
_heartbeatTimeoutTimer?.cancel();
_reconnectTimer?.cancel();
_vpnStateChangeTimer?.cancel();
_connectionStabilityTimer?.cancel();
// 取消订阅
_socketSubscription?.cancel();
_socketSubscription = null;
_socket?.close();
_socket = null;
_heartbeatTimer = null;
_heartbeatTimeoutTimer = null;
_reconnectTimer = null;
_vpnStateChangeTimer = null;
_connectionStabilityTimer = null;
_isConnected = false;
_isConnecting = false;
_isConnectionStable = false;
_heartbeatTimeoutCount = 0;
}
/// 关闭连接
Future<void> disconnect() async {
KRLogUtil.kr_i('关闭 WebSocket 连接', tag: 'WebSocket');
_cleanup();
_onConnectionStateCallback?.call(false);
}
/// 检查连接状态
bool get isConnected => _isConnected;
}