hi-client/lib/singbox/service/ffi_singbox_service.dart

679 lines
19 KiB
Dart
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'dart:async';
import 'dart:convert';
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'package:flutter/foundation.dart';
import 'package:kaer_with_panels/utils/isolate_worker.dart';
import 'package:ffi/ffi.dart';
import 'package:fpdart/fpdart.dart';
import 'package:kaer_with_panels/core/model/directories.dart';
import 'package:kaer_with_panels/gen/singbox_generated_bindings.dart';
import 'package:kaer_with_panels/singbox/model/singbox_config_option.dart';
import 'package:kaer_with_panels/singbox/model/singbox_outbound.dart';
import 'package:kaer_with_panels/singbox/model/singbox_stats.dart';
import 'package:kaer_with_panels/singbox/model/singbox_status.dart';
import 'package:kaer_with_panels/singbox/model/warp_account.dart';
import 'package:kaer_with_panels/singbox/service/singbox_service.dart';
import 'package:kaer_with_panels/utils/utils.dart';
import 'package:loggy/loggy.dart';
import 'package:path/path.dart' as p;
import 'package:rxdart/rxdart.dart';
import 'package:watcher/watcher.dart';
final _logger = Loggy('FFISingboxService');
class FFISingboxService with InfraLogger implements SingboxService {
static final SingboxNativeLibrary _box = _gen();
late final ValueStream<SingboxStatus> _status;
late final ReceivePort _statusReceiver;
Stream<SingboxStats>? _serviceStatsStream;
Stream<List<SingboxOutboundGroup>>? _outboundsStream;
static SingboxNativeLibrary _gen() {
String fullPath = _getLibraryPath();
_logger.debug('singbox native libs path: "$fullPath"');
final lib = DynamicLibrary.open(fullPath);
return SingboxNativeLibrary(lib);
}
static String _getLibraryPath() {
String libName;
if (Platform.isWindows) {
libName = "libcore.dll";
} else if (Platform.isMacOS) {
libName = "libcore.dylib";
} else {
libName = "libcore.so";
}
// 测试环境
if (Platform.environment.containsKey('FLUTTER_TEST')) {
return p.join("libcore", libName);
}
// 🔧 修复:开发环境使用绝对路径
// 尝试开发环境路径(相对于当前工作目录)
final devPath = p.join("libcore", "bin", libName);
if (kDebugMode) {
print('🔍 [FFI] 检查开发环境路径: $devPath');
print('🔍 [FFI] 当前工作目录: ${Directory.current.path}');
print('🔍 [FFI] 文件是否存在: ${File(devPath).existsSync()}');
}
if (File(devPath).existsSync()) {
if (kDebugMode) {
print('✅ [FFI] 使用开发环境路径: $devPath');
}
return devPath;
}
// 生产环境使用相对路径bundle中的路径
if (kDebugMode) {
print('⚠️ [FFI] 开发环境路径不存在,使用生产环境路径: $libName');
}
return libName;
}
@override
Future<void> init() async {
if (kDebugMode) {
print('🚀 [FFI] init() 开始');
}
loggy.debug("initializing");
// 注意setupOnce 会在 worker isolate 中调用(见 _ffiLoadLibrary
// 在主 isolate 中调用会导致阻塞,因此这里跳过
if (kDebugMode) {
print('⏭️ [FFI] 跳过主 isolate 中的 setupOnce将在 worker isolate 中执行)');
}
if (kDebugMode) {
print('📡 [FFI] 创建 ReceivePort');
}
_statusReceiver = ReceivePort('service status receiver');
if (kDebugMode) {
print('🔄 [FFI] 设置状态流');
}
final source = _statusReceiver.asBroadcastStream().map((event) => jsonDecode(event as String)).map(SingboxStatus.fromEvent);
_status = ValueConnectableStream.seeded(
source,
const SingboxStopped(),
).autoConnect();
if (kDebugMode) {
print('✅ [FFI] init() 完成');
}
}
@override
TaskEither<String, Unit> setup(
Directories directories,
bool debug,
) {
final port = _statusReceiver.sendPort.nativePort;
final baseDir = directories.baseDir.path;
final workingDir = directories.workingDir.path;
final tempDir = directories.tempDir.path;
final debugFlag = debug ? 1 : 0;
return TaskEither(() async {
try {
final startTime = DateTime.now();
_logger.debug('[黑屏调试] setup() 开始调用 libcore.dll - $startTime');
final err = await IsolateWorker().execute(
() => _ffiSetup(baseDir, workingDir, tempDir, port, debugFlag),
allowSyncFallback: false,
);
final endTime = DateTime.now();
final durationMs = endTime.difference(startTime).inMilliseconds;
_logger.debug('[黑屏调试] setup() 完成(耗时: ${durationMs}ms');
if (err != null && err.isNotEmpty) {
_logger.error('[黑屏调试] setup() 错误: $err');
return left(err);
}
return right(unit);
} catch (e) {
_logger.error('[黑屏调试] setup() 异常: $e');
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> validateConfigByPath(
String path,
String tempPath,
bool debug,
) {
final debugFlag = debug ? 1 : 0;
return TaskEither(() async {
try {
final err = await IsolateWorker().execute(
() => _ffiValidateConfig(path, tempPath, debugFlag),
allowSyncFallback: false,
);
if (err != null && err.isNotEmpty) {
return left(err);
}
return right(unit);
} catch (e) {
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> changeOptions(SingboxConfigOption options) {
final json = jsonEncode(options.toJson());
return TaskEither(() async {
try {
final startTime = DateTime.now();
_logger.debug('[黑屏调试] changeOptions 开始调用 libcore.dll - $startTime');
final err = await IsolateWorker().execute(
() => _ffiChangeOptions(json),
allowSyncFallback: false,
);
final endTime = DateTime.now();
final durationMs = endTime.difference(startTime).inMilliseconds;
_logger.debug('[黑屏调试] changeOptions 完成(耗时: ${durationMs}ms');
if (err != null && err.isNotEmpty) {
_logger.error('[黑屏调试] changeOptions 错误: $err');
return left(err);
}
return right(unit);
} catch (e) {
_logger.error('[黑屏调试] changeOptions 异常: $e');
return left(e.toString());
}
});
}
@override
TaskEither<String, String> generateFullConfigByPath(
String path,
) {
return TaskEither(() async {
try {
final result = await IsolateWorker().execute(
() => _ffiGenerateFullConfig(path),
allowSyncFallback: false,
);
final ok = result.isNotEmpty && result[0] == true;
final payload = result.length > 1 ? result[1] as String : '';
if (!ok) {
return left(payload);
}
return right(payload);
} catch (e) {
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> start(
String configPath,
String name,
bool disableMemoryLimit,
) {
loggy.debug("starting, memory limit: [${!disableMemoryLimit}]");
return TaskEither(() async {
try {
final startTime = DateTime.now();
_logger.debug('[黑屏调试] start() 开始调用 libcore.dll - $startTime');
final err = await IsolateWorker().execute(
() => _ffiStart(configPath, disableMemoryLimit),
allowSyncFallback: false,
);
final endTime = DateTime.now();
final durationMs = endTime.difference(startTime).inMilliseconds;
_logger.debug('[黑屏调试] start() 完成(耗时: ${durationMs}ms');
if (err != null && err.isNotEmpty) {
_logger.error('[黑屏调试] start() 错误: $err');
return left(err);
}
return right(unit);
} catch (e) {
_logger.error('[黑屏调试] start() 异常: $e');
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> stop() {
return TaskEither(() async {
try {
final startTime = DateTime.now();
_logger.debug('[黑屏调试] stop() 开始调用 libcore.dll - $startTime');
final err = await IsolateWorker().execute(
_ffiStop,
allowSyncFallback: false,
);
final endTime = DateTime.now();
final durationMs = endTime.difference(startTime).inMilliseconds;
_logger.debug('[黑屏调试] stop() 完成(耗时: ${durationMs}ms');
if (err != null && err.isNotEmpty) {
_logger.error('[黑屏调试] stop() 错误: $err');
return left(err);
}
return right(unit);
} catch (e) {
_logger.error('[黑屏调试] stop() 异常: $e');
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> restart(
String configPath,
String name,
bool disableMemoryLimit,
) {
loggy.debug("restarting, memory limit: [${!disableMemoryLimit}]");
return TaskEither(() async {
try {
final err = await IsolateWorker().execute(
() => _ffiRestart(configPath, disableMemoryLimit),
allowSyncFallback: false,
);
if (err != null && err.isNotEmpty) {
return left(err);
}
return right(unit);
} catch (e) {
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> resetTunnel() {
throw UnimplementedError(
"reset tunnel function unavailable on platform",
);
}
@override
Stream<SingboxStatus> watchStatus() => _status;
@override
Stream<SingboxStats> watchStats() {
if (_serviceStatsStream != null) return _serviceStatsStream!;
final receiver = ReceivePort('stats');
final statusStream = receiver.asBroadcastStream(
onCancel: (_) {
_logger.debug("stopping stats command client");
final err = _box.stopCommandClient(1).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.error("error stopping stats client");
}
receiver.close();
_serviceStatsStream = null;
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
loggy.error("[service stats client] error received: $event");
throw event.replaceFirst('error:', "");
}
return SingboxStats.fromJson(
jsonDecode(event) as Map<String, dynamic>,
);
}
loggy.error("[service status client] unexpected type, msg: $event");
throw "invalid type";
},
);
final err = _box.startCommandClient(1, receiver.sendPort.nativePort).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
loggy.error("error starting status command: $err");
throw err;
}
return _serviceStatsStream = statusStream;
}
@override
Stream<List<SingboxOutboundGroup>> watchGroups() {
final logger = newLoggy("watchGroups");
if (_outboundsStream != null) return _outboundsStream!;
final receiver = ReceivePort('groups');
final outboundsStream = receiver.asBroadcastStream(
onCancel: (_) {
logger.debug("stopping");
receiver.close();
_outboundsStream = null;
final err = _box.stopCommandClient(5).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
_logger.error("error stopping group client");
}
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
logger.error("error received: $event");
throw event.replaceFirst('error:', "");
}
return (jsonDecode(event) as List).map((e) {
return SingboxOutboundGroup.fromJson(e as Map<String, dynamic>);
}).toList();
}
logger.error("unexpected type, msg: $event");
throw "invalid type";
},
);
try {
final err = _box.startCommandClient(5, receiver.sendPort.nativePort).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
logger.error("error starting group command: $err");
throw err;
}
} catch (e) {
receiver.close();
rethrow;
}
return _outboundsStream = outboundsStream;
}
@override
Stream<List<SingboxOutboundGroup>> watchActiveGroups() {
final logger = newLoggy("[ActiveGroupsClient]");
final receiver = ReceivePort('active groups');
final outboundsStream = receiver.asBroadcastStream(
onCancel: (_) {
logger.debug("stopping");
receiver.close();
final err = _box.stopCommandClient(13).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
logger.error("failed stopping: $err");
}
},
).map(
(event) {
if (event case String _) {
if (event.startsWith('error:')) {
logger.error(event);
throw event.replaceFirst('error:', "");
}
return (jsonDecode(event) as List).map((e) {
return SingboxOutboundGroup.fromJson(e as Map<String, dynamic>);
}).toList();
}
logger.error("unexpected type, msg: $event");
throw "invalid type";
},
);
try {
final err = _box.startCommandClient(13, receiver.sendPort.nativePort).cast<Utf8>().toDartString();
if (err.isNotEmpty) {
logger.error("error starting: $err");
throw err;
}
} catch (e) {
receiver.close();
rethrow;
}
return outboundsStream;
}
@override
TaskEither<String, Unit> selectOutbound(String groupTag, String outboundTag) {
return TaskEither(() async {
try {
final err = await IsolateWorker().execute(
() => _ffiSelectOutbound(groupTag, outboundTag),
allowSyncFallback: false,
);
if (err != null && err.isNotEmpty) {
return left(err);
}
return right(unit);
} catch (e) {
return left(e.toString());
}
});
}
@override
TaskEither<String, Unit> urlTest(String groupTag) {
return TaskEither(() async {
try {
final err = await IsolateWorker().execute(
() => _ffiUrlTest(groupTag),
allowSyncFallback: false,
);
if (err != null && err.isNotEmpty) {
return left(err);
}
return right(unit);
} catch (e) {
return left(e.toString());
}
});
}
final _logBuffer = <String>[];
int _logFilePosition = 0;
@override
Stream<List<String>> watchLogs(String path) async* {
yield await _readLogFile(File(path));
yield* Watcher(path, pollingDelay: const Duration(seconds: 1)).events.asyncMap((event) async {
if (event.type == ChangeType.MODIFY) {
await _readLogFile(File(path));
}
return _logBuffer;
});
}
@override
TaskEither<String, Unit> clearLogs() {
return TaskEither(() async {
_logBuffer.clear();
return right(unit);
});
}
Future<List<String>> _readLogFile(File file) async {
if (_logFilePosition == 0 && file.lengthSync() == 0) return [];
final content = await file.openRead(_logFilePosition).transform(utf8.decoder).join();
_logFilePosition = file.lengthSync();
final lines = const LineSplitter().convert(content);
if (lines.length > 300) {
lines.removeRange(0, lines.length - 300);
}
for (final line in lines) {
_logBuffer.add(line);
if (_logBuffer.length > 300) {
_logBuffer.removeAt(0);
}
}
return _logBuffer;
}
@override
TaskEither<String, WarpResponse> generateWarpConfig({
required String licenseKey,
required String previousAccountId,
required String previousAccessToken,
}) {
loggy.debug("generating warp config");
return TaskEither(() async {
try {
final result = await IsolateWorker().execute(
() => _ffiGenerateWarpConfig(licenseKey, previousAccountId, previousAccessToken),
allowSyncFallback: false,
);
final ok = result.isNotEmpty && result[0] == true;
final payload = result.length > 1 ? result[1] as String : '';
if (!ok) {
return left(payload);
}
return right(warpFromJson(jsonDecode(payload)));
} catch (e) {
return left(e.toString());
}
});
}
}
SingboxNativeLibrary _ffiLoadLibrary() {
final fullPath = FFISingboxService._getLibraryPath();
final lib = DynamicLibrary.open(fullPath);
final box = SingboxNativeLibrary(lib);
box.setupOnce(NativeApi.initializeApiDLData);
return box;
}
String? _ffiSetup(
String baseDir,
String workingDir,
String tempDir,
int statusPort,
int debugFlag,
) {
final box = _ffiLoadLibrary();
final err = box
.setup(
baseDir.toNativeUtf8().cast(),
workingDir.toNativeUtf8().cast(),
tempDir.toNativeUtf8().cast(),
statusPort,
debugFlag,
)
.cast<Utf8>()
.toDartString();
return err.isEmpty ? null : err;
}
String? _ffiValidateConfig(
String path,
String tempPath,
int debugFlag,
) {
final box = _ffiLoadLibrary();
final err = box
.parse(
path.toNativeUtf8().cast(),
tempPath.toNativeUtf8().cast(),
debugFlag,
)
.cast<Utf8>()
.toDartString();
return err.isEmpty ? null : err;
}
String? _ffiChangeOptions(String optionsJson) {
final box = _ffiLoadLibrary();
final err = box.changeHiddifyOptions(optionsJson.toNativeUtf8().cast()).cast<Utf8>().toDartString();
return err.isEmpty ? null : err;
}
List<Object?> _ffiGenerateFullConfig(String path) {
final box = _ffiLoadLibrary();
final response = box
.generateConfig(
path.toNativeUtf8().cast(),
)
.cast<Utf8>()
.toDartString();
if (response.startsWith("error")) {
return [false, response.replaceFirst("error", "")];
}
return [true, response];
}
String? _ffiStart(String configPath, bool disableMemoryLimit) {
final box = _ffiLoadLibrary();
final err = box
.start(
configPath.toNativeUtf8().cast(),
disableMemoryLimit ? 1 : 0,
)
.cast<Utf8>()
.toDartString();
return err.isEmpty ? null : err;
}
String? _ffiStop() {
final box = _ffiLoadLibrary();
final err = box.stop().cast<Utf8>().toDartString();
return err.isEmpty ? null : err;
}
String? _ffiRestart(String configPath, bool disableMemoryLimit) {
final box = _ffiLoadLibrary();
final err = box
.restart(
configPath.toNativeUtf8().cast(),
disableMemoryLimit ? 1 : 0,
)
.cast<Utf8>()
.toDartString();
return err.isEmpty ? null : err;
}
String? _ffiSelectOutbound(String groupTag, String outboundTag) {
final box = _ffiLoadLibrary();
final err = box
.selectOutbound(
groupTag.toNativeUtf8().cast(),
outboundTag.toNativeUtf8().cast(),
)
.cast<Utf8>()
.toDartString();
return err.isEmpty ? null : err;
}
String? _ffiUrlTest(String groupTag) {
final box = _ffiLoadLibrary();
final err = box.urlTest(groupTag.toNativeUtf8().cast()).cast<Utf8>().toDartString();
return err.isEmpty ? null : err;
}
List<Object?> _ffiGenerateWarpConfig(
String licenseKey,
String previousAccountId,
String previousAccessToken,
) {
final box = _ffiLoadLibrary();
final response = box
.generateWarpConfig(
licenseKey.toNativeUtf8().cast(),
previousAccountId.toNativeUtf8().cast(),
previousAccessToken.toNativeUtf8().cast(),
)
.cast<Utf8>()
.toDartString();
if (response.startsWith("error:")) {
return [false, response.replaceFirst("error:", "")];
}
return [true, response];
}