import 'dart:async'; import 'dart:convert'; import 'dart:ffi'; import 'dart:io'; import 'dart:isolate'; import 'package:kaer_with_panels/utils/isolate_worker.dart'; import 'package:ffi/ffi.dart'; import 'package:fpdart/fpdart.dart'; import 'package:kaer_with_panels/core/model/directories.dart'; import 'package:kaer_with_panels/gen/singbox_generated_bindings.dart'; import 'package:kaer_with_panels/singbox/model/singbox_config_option.dart'; import 'package:kaer_with_panels/singbox/model/singbox_outbound.dart'; import 'package:kaer_with_panels/singbox/model/singbox_stats.dart'; import 'package:kaer_with_panels/singbox/model/singbox_status.dart'; import 'package:kaer_with_panels/singbox/model/warp_account.dart'; import 'package:kaer_with_panels/singbox/service/singbox_service.dart'; import 'package:kaer_with_panels/utils/utils.dart'; import 'package:loggy/loggy.dart'; import 'package:path/path.dart' as p; import 'package:rxdart/rxdart.dart'; import 'package:watcher/watcher.dart'; final _logger = Loggy('FFISingboxService'); class FFISingboxService with InfraLogger implements SingboxService { static final SingboxNativeLibrary _box = _gen(); late final ValueStream _status; late final ReceivePort _statusReceiver; Stream? _serviceStatsStream; Stream>? _outboundsStream; static SingboxNativeLibrary _gen() { String fullPath = ""; if (Platform.environment.containsKey('FLUTTER_TEST')) { fullPath = "libcore"; } if (Platform.isWindows) { fullPath = p.join(fullPath, "libcore.dll"); } else if (Platform.isMacOS) { fullPath = p.join(fullPath, "libcore.dylib"); } else { fullPath = p.join(fullPath, "libcore.so"); } _logger.debug('singbox native libs path: "$fullPath"'); final lib = DynamicLibrary.open(fullPath); return SingboxNativeLibrary(lib); } @override Future init() async { loggy.debug("initializing"); _box.setupOnce(NativeApi.initializeApiDLData); _statusReceiver = ReceivePort('service status receiver'); final source = _statusReceiver.asBroadcastStream().map((event) => jsonDecode(event as String)).map(SingboxStatus.fromEvent); _status = ValueConnectableStream.seeded( source, const SingboxStopped(), ).autoConnect(); } @override TaskEither setup( Directories directories, bool debug, ) { final port = _statusReceiver.sendPort.nativePort; final baseDir = directories.baseDir.path; final workingDir = directories.workingDir.path; final tempDir = directories.tempDir.path; final debugFlag = debug ? 1 : 0; return TaskEither(() async { try { final startTime = DateTime.now(); _logger.debug('[黑屏调试] setup() 开始调用 libcore.dll - $startTime'); final err = await IsolateWorker().execute( () => _ffiSetup(baseDir, workingDir, tempDir, port, debugFlag), allowSyncFallback: false, ); final endTime = DateTime.now(); final durationMs = endTime.difference(startTime).inMilliseconds; _logger.debug('[黑屏调试] setup() 完成(耗时: ${durationMs}ms)'); if (err != null && err.isNotEmpty) { _logger.error('[黑屏调试] setup() 错误: $err'); return left(err); } return right(unit); } catch (e) { _logger.error('[黑屏调试] setup() 异常: $e'); return left(e.toString()); } }); } @override TaskEither validateConfigByPath( String path, String tempPath, bool debug, ) { final debugFlag = debug ? 1 : 0; return TaskEither(() async { try { final err = await IsolateWorker().execute( () => _ffiValidateConfig(path, tempPath, debugFlag), allowSyncFallback: false, ); if (err != null && err.isNotEmpty) { return left(err); } return right(unit); } catch (e) { return left(e.toString()); } }); } @override TaskEither changeOptions(SingboxConfigOption options) { final json = jsonEncode(options.toJson()); return TaskEither(() async { try { final startTime = DateTime.now(); _logger.debug('[黑屏调试] changeOptions 开始调用 libcore.dll - $startTime'); final err = await IsolateWorker().execute( () => _ffiChangeOptions(json), allowSyncFallback: false, ); final endTime = DateTime.now(); final durationMs = endTime.difference(startTime).inMilliseconds; _logger.debug('[黑屏调试] changeOptions 完成(耗时: ${durationMs}ms)'); if (err != null && err.isNotEmpty) { _logger.error('[黑屏调试] changeOptions 错误: $err'); return left(err); } return right(unit); } catch (e) { _logger.error('[黑屏调试] changeOptions 异常: $e'); return left(e.toString()); } }); } @override TaskEither generateFullConfigByPath( String path, ) { return TaskEither(() async { try { final result = await IsolateWorker().execute( () => _ffiGenerateFullConfig(path), allowSyncFallback: false, ); final ok = result.isNotEmpty && result[0] == true; final payload = result.length > 1 ? result[1] as String : ''; if (!ok) { return left(payload); } return right(payload); } catch (e) { return left(e.toString()); } }); } @override TaskEither start( String configPath, String name, bool disableMemoryLimit, ) { loggy.debug("starting, memory limit: [${!disableMemoryLimit}]"); return TaskEither(() async { try { final startTime = DateTime.now(); _logger.debug('[黑屏调试] start() 开始调用 libcore.dll - $startTime'); final err = await IsolateWorker().execute( () => _ffiStart(configPath, disableMemoryLimit), allowSyncFallback: false, ); final endTime = DateTime.now(); final durationMs = endTime.difference(startTime).inMilliseconds; _logger.debug('[黑屏调试] start() 完成(耗时: ${durationMs}ms)'); if (err != null && err.isNotEmpty) { _logger.error('[黑屏调试] start() 错误: $err'); return left(err); } return right(unit); } catch (e) { _logger.error('[黑屏调试] start() 异常: $e'); return left(e.toString()); } }); } @override TaskEither stop() { return TaskEither(() async { try { final startTime = DateTime.now(); _logger.debug('[黑屏调试] stop() 开始调用 libcore.dll - $startTime'); final err = await IsolateWorker().execute( _ffiStop, allowSyncFallback: false, ); final endTime = DateTime.now(); final durationMs = endTime.difference(startTime).inMilliseconds; _logger.debug('[黑屏调试] stop() 完成(耗时: ${durationMs}ms)'); if (err != null && err.isNotEmpty) { _logger.error('[黑屏调试] stop() 错误: $err'); return left(err); } return right(unit); } catch (e) { _logger.error('[黑屏调试] stop() 异常: $e'); return left(e.toString()); } }); } @override TaskEither restart( String configPath, String name, bool disableMemoryLimit, ) { loggy.debug("restarting, memory limit: [${!disableMemoryLimit}]"); return TaskEither(() async { try { final err = await IsolateWorker().execute( () => _ffiRestart(configPath, disableMemoryLimit), allowSyncFallback: false, ); if (err != null && err.isNotEmpty) { return left(err); } return right(unit); } catch (e) { return left(e.toString()); } }); } @override TaskEither resetTunnel() { throw UnimplementedError( "reset tunnel function unavailable on platform", ); } @override Stream watchStatus() => _status; @override Stream watchStats() { if (_serviceStatsStream != null) return _serviceStatsStream!; final receiver = ReceivePort('stats'); final statusStream = receiver.asBroadcastStream( onCancel: (_) { _logger.debug("stopping stats command client"); final err = _box.stopCommandClient(1).cast().toDartString(); if (err.isNotEmpty) { _logger.error("error stopping stats client"); } receiver.close(); _serviceStatsStream = null; }, ).map( (event) { if (event case String _) { if (event.startsWith('error:')) { loggy.error("[service stats client] error received: $event"); throw event.replaceFirst('error:', ""); } return SingboxStats.fromJson( jsonDecode(event) as Map, ); } loggy.error("[service status client] unexpected type, msg: $event"); throw "invalid type"; }, ); final err = _box.startCommandClient(1, receiver.sendPort.nativePort).cast().toDartString(); if (err.isNotEmpty) { loggy.error("error starting status command: $err"); throw err; } return _serviceStatsStream = statusStream; } @override Stream> watchGroups() { final logger = newLoggy("watchGroups"); if (_outboundsStream != null) return _outboundsStream!; final receiver = ReceivePort('groups'); final outboundsStream = receiver.asBroadcastStream( onCancel: (_) { logger.debug("stopping"); receiver.close(); _outboundsStream = null; final err = _box.stopCommandClient(5).cast().toDartString(); if (err.isNotEmpty) { _logger.error("error stopping group client"); } }, ).map( (event) { if (event case String _) { if (event.startsWith('error:')) { logger.error("error received: $event"); throw event.replaceFirst('error:', ""); } return (jsonDecode(event) as List).map((e) { return SingboxOutboundGroup.fromJson(e as Map); }).toList(); } logger.error("unexpected type, msg: $event"); throw "invalid type"; }, ); try { final err = _box.startCommandClient(5, receiver.sendPort.nativePort).cast().toDartString(); if (err.isNotEmpty) { logger.error("error starting group command: $err"); throw err; } } catch (e) { receiver.close(); rethrow; } return _outboundsStream = outboundsStream; } @override Stream> watchActiveGroups() { final logger = newLoggy("[ActiveGroupsClient]"); final receiver = ReceivePort('active groups'); final outboundsStream = receiver.asBroadcastStream( onCancel: (_) { logger.debug("stopping"); receiver.close(); final err = _box.stopCommandClient(13).cast().toDartString(); if (err.isNotEmpty) { logger.error("failed stopping: $err"); } }, ).map( (event) { if (event case String _) { if (event.startsWith('error:')) { logger.error(event); throw event.replaceFirst('error:', ""); } return (jsonDecode(event) as List).map((e) { return SingboxOutboundGroup.fromJson(e as Map); }).toList(); } logger.error("unexpected type, msg: $event"); throw "invalid type"; }, ); try { final err = _box.startCommandClient(13, receiver.sendPort.nativePort).cast().toDartString(); if (err.isNotEmpty) { logger.error("error starting: $err"); throw err; } } catch (e) { receiver.close(); rethrow; } return outboundsStream; } @override TaskEither selectOutbound(String groupTag, String outboundTag) { return TaskEither(() async { try { final err = await IsolateWorker().execute( () => _ffiSelectOutbound(groupTag, outboundTag), allowSyncFallback: false, ); if (err != null && err.isNotEmpty) { return left(err); } return right(unit); } catch (e) { return left(e.toString()); } }); } @override TaskEither urlTest(String groupTag) { return TaskEither(() async { try { final err = await IsolateWorker().execute( () => _ffiUrlTest(groupTag), allowSyncFallback: false, ); if (err != null && err.isNotEmpty) { return left(err); } return right(unit); } catch (e) { return left(e.toString()); } }); } final _logBuffer = []; int _logFilePosition = 0; @override Stream> watchLogs(String path) async* { yield await _readLogFile(File(path)); yield* Watcher(path, pollingDelay: const Duration(seconds: 1)).events.asyncMap((event) async { if (event.type == ChangeType.MODIFY) { await _readLogFile(File(path)); } return _logBuffer; }); } @override TaskEither clearLogs() { return TaskEither(() async { _logBuffer.clear(); return right(unit); }); } Future> _readLogFile(File file) async { if (_logFilePosition == 0 && file.lengthSync() == 0) return []; final content = await file.openRead(_logFilePosition).transform(utf8.decoder).join(); _logFilePosition = file.lengthSync(); final lines = const LineSplitter().convert(content); if (lines.length > 300) { lines.removeRange(0, lines.length - 300); } for (final line in lines) { _logBuffer.add(line); if (_logBuffer.length > 300) { _logBuffer.removeAt(0); } } return _logBuffer; } @override TaskEither generateWarpConfig({ required String licenseKey, required String previousAccountId, required String previousAccessToken, }) { loggy.debug("generating warp config"); return TaskEither(() async { try { final result = await IsolateWorker().execute( () => _ffiGenerateWarpConfig(licenseKey, previousAccountId, previousAccessToken), allowSyncFallback: false, ); final ok = result.isNotEmpty && result[0] == true; final payload = result.length > 1 ? result[1] as String : ''; if (!ok) { return left(payload); } return right(warpFromJson(jsonDecode(payload))); } catch (e) { return left(e.toString()); } }); } } SingboxNativeLibrary _ffiLoadLibrary() { String fullPath = ""; if (Platform.environment.containsKey('FLUTTER_TEST')) { fullPath = "libcore"; } if (Platform.isWindows) { fullPath = p.join(fullPath, "libcore.dll"); } else if (Platform.isMacOS) { fullPath = p.join(fullPath, "libcore.dylib"); } else { fullPath = p.join(fullPath, "libcore.so"); } final lib = DynamicLibrary.open(fullPath); final box = SingboxNativeLibrary(lib); box.setupOnce(NativeApi.initializeApiDLData); return box; } String? _ffiSetup( String baseDir, String workingDir, String tempDir, int statusPort, int debugFlag, ) { final box = _ffiLoadLibrary(); final err = box .setup( baseDir.toNativeUtf8().cast(), workingDir.toNativeUtf8().cast(), tempDir.toNativeUtf8().cast(), statusPort, debugFlag, ) .cast() .toDartString(); return err.isEmpty ? null : err; } String? _ffiValidateConfig( String path, String tempPath, int debugFlag, ) { final box = _ffiLoadLibrary(); final err = box .parse( path.toNativeUtf8().cast(), tempPath.toNativeUtf8().cast(), debugFlag, ) .cast() .toDartString(); return err.isEmpty ? null : err; } String? _ffiChangeOptions(String optionsJson) { final box = _ffiLoadLibrary(); final err = box.changeHiddifyOptions(optionsJson.toNativeUtf8().cast()).cast().toDartString(); return err.isEmpty ? null : err; } List _ffiGenerateFullConfig(String path) { final box = _ffiLoadLibrary(); final response = box .generateConfig( path.toNativeUtf8().cast(), ) .cast() .toDartString(); if (response.startsWith("error")) { return [false, response.replaceFirst("error", "")]; } return [true, response]; } String? _ffiStart(String configPath, bool disableMemoryLimit) { final box = _ffiLoadLibrary(); final err = box .start( configPath.toNativeUtf8().cast(), disableMemoryLimit ? 1 : 0, ) .cast() .toDartString(); return err.isEmpty ? null : err; } String? _ffiStop() { final box = _ffiLoadLibrary(); final err = box.stop().cast().toDartString(); return err.isEmpty ? null : err; } String? _ffiRestart(String configPath, bool disableMemoryLimit) { final box = _ffiLoadLibrary(); final err = box .restart( configPath.toNativeUtf8().cast(), disableMemoryLimit ? 1 : 0, ) .cast() .toDartString(); return err.isEmpty ? null : err; } String? _ffiSelectOutbound(String groupTag, String outboundTag) { final box = _ffiLoadLibrary(); final err = box .selectOutbound( groupTag.toNativeUtf8().cast(), outboundTag.toNativeUtf8().cast(), ) .cast() .toDartString(); return err.isEmpty ? null : err; } String? _ffiUrlTest(String groupTag) { final box = _ffiLoadLibrary(); final err = box.urlTest(groupTag.toNativeUtf8().cast()).cast().toDartString(); return err.isEmpty ? null : err; } List _ffiGenerateWarpConfig( String licenseKey, String previousAccountId, String previousAccessToken, ) { final box = _ffiLoadLibrary(); final response = box .generateWarpConfig( licenseKey.toNativeUtf8().cast(), previousAccountId.toNativeUtf8().cast(), previousAccessToken.toNativeUtf8().cast(), ) .cast() .toDartString(); if (response.startsWith("error:")) { return [false, response.replaceFirst("error:", "")]; } return [true, response]; }