diff --git a/apis/admin/server.api b/apis/admin/server.api index 39ddc46..f4ab1ac 100644 --- a/apis/admin/server.api +++ b/apis/admin/server.api @@ -20,24 +20,25 @@ type ( ExpiredAt int64 `json:"expired_at"` } ServerStatus { - Online []ServerOnlineUser `json:"online"` - Cpu float64 `json:"cpu"` - Mem float64 `json:"mem"` - Disk float64 `json:"disk"` + Cpu float64 `json:"cpu"` + Mem float64 `json:"mem"` + Disk float64 `json:"disk"` + Protocol string `json:"protocol"` + Online []ServerOnlineUser `json:"online"` } Server { - Id int64 `json:"id"` - Name string `json:"name"` - Country string `json:"country"` - City string `json:"city"` - Ratio float32 `json:"ratio"` - Address string `json:"address"` - Sort int `json:"sort"` - Protocols []Protocol `json:"protocols"` - LastReportedAt int64 `json:"last_reported_at"` - Status ServerStatus `json:"status"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + Id int64 `json:"id"` + Name string `json:"name"` + Country string `json:"country"` + City string `json:"city"` + Ratio float32 `json:"ratio"` + Address string `json:"address"` + Sort int `json:"sort"` + Protocols []Protocol `json:"protocols"` + LastReportedAt int64 `json:"last_reported_at"` + Status []ServerStatus `json:"status"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` } Protocol { Type string `json:"type"` diff --git a/go.mod b/go.mod index c0145a7..04daadd 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/go-sql-driver/mysql v1.8.1 github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 github.com/gofrs/uuid/v5 v5.3.0 - github.com/golang-jwt/jwt/v5 v5.2.1 + github.com/golang-jwt/jwt/v5 v5.2.2 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/hibiken/asynq v0.24.1 @@ -28,7 +28,7 @@ require ( github.com/klauspost/compress v1.17.7 github.com/nyaruka/phonenumbers v1.5.0 github.com/pkg/errors v0.9.1 - github.com/redis/go-redis/v9 v9.6.1 + github.com/redis/go-redis/v9 v9.7.2 github.com/smartwalle/alipay/v3 v3.2.23 github.com/spf13/cast v1.7.0 // indirect github.com/spf13/cobra v1.8.1 diff --git a/go.sum b/go.sum index cc90adf..9e9f2c5 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ github.com/gofrs/uuid/v5 v5.3.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= -github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= -github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= +github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-migrate/migrate/v4 v4.18.2 h1:2VSCMz7x7mjyTXx3m2zPokOY82LTRgxK1yQYKo6wWQ8= github.com/golang-migrate/migrate/v4 v4.18.2/go.mod h1:2CM6tJvn2kqPXwnXO/d3rAQYiyoIm180VsO8PRX6Rpk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -294,8 +294,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= -github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/redis/go-redis/v9 v9.7.2 h1:PSGhv13dJyrTCw1+55H0pIKM3WFov7HuUrKUmInGL0o= +github.com/redis/go-redis/v9 v9.7.2/go.mod h1:yp5+a5FnEEP0/zTYuw6u6/2nn3zivwhv274qYgWQhDM= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= diff --git a/initialize/init.go b/initialize/init.go index bad262f..02ce905 100644 --- a/initialize/init.go +++ b/initialize/init.go @@ -14,7 +14,6 @@ func StartInitSystemConfig(svc *svc.ServiceContext) { Subscribe(svc) Register(svc) Mobile(svc) - TrafficDataToRedis(svc) if !svc.Config.Debug { Telegram(svc) } diff --git a/initialize/statistics.go b/initialize/statistics.go deleted file mode 100644 index 144b31e..0000000 --- a/initialize/statistics.go +++ /dev/null @@ -1,57 +0,0 @@ -package initialize - -import ( - "context" - "time" - - "github.com/perfect-panel/server/internal/model/cache" - "github.com/perfect-panel/server/internal/svc" - "github.com/perfect-panel/server/pkg/logger" -) - -func TrafficDataToRedis(svcCtx *svc.ServiceContext) { - ctx := context.Background() - // 统计昨天的节点流量数据排行榜前10 - nodeData, err := svcCtx.TrafficLogModel.TopServersTrafficByDay(ctx, time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()-1, 0, 0, 0, 0, time.Local), 10) - if err != nil { - logger.Errorw("统计昨天的流量数据失败", logger.Field("error", err.Error())) - } - var nodeCacheData []cache.NodeTodayTrafficRank - for _, node := range nodeData { - serverInfo, err := svcCtx.NodeModel.FindOneServer(ctx, node.ServerId) - if err != nil { - logger.Errorw("查询节点信息失败", logger.Field("error", err.Error())) - continue - } - nodeCacheData = append(nodeCacheData, cache.NodeTodayTrafficRank{ - ID: node.ServerId, - Name: serverInfo.Name, - Upload: node.Upload, - Download: node.Download, - Total: node.Upload + node.Download, - }) - } - // 写入缓存 - if err = svcCtx.NodeCache.UpdateYesterdayNodeTotalTrafficRank(ctx, nodeCacheData); err != nil { - logger.Errorw("写入昨天的流量数据到缓存失败", logger.Field("error", err.Error())) - } - // 统计昨天的用户流量数据排行榜前10 - userData, err := svcCtx.TrafficLogModel.TopUsersTrafficByDay(ctx, time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()-1, 0, 0, 0, 0, time.Local), 10) - if err != nil { - logger.Errorw("统计昨天的流量数据失败", logger.Field("error", err.Error())) - } - var userCacheData []cache.UserTodayTrafficRank - for _, user := range userData { - userCacheData = append(userCacheData, cache.UserTodayTrafficRank{ - SID: user.SubscribeId, - Upload: user.Upload, - Download: user.Download, - Total: user.Upload + user.Download, - }) - } - // 写入缓存 - if err = svcCtx.NodeCache.UpdateYesterdayUserTotalTrafficRank(ctx, userCacheData); err != nil { - logger.Errorw("写入昨天的流量数据到缓存失败", logger.Field("error", err.Error())) - } - logger.Infow("初始化昨天的流量数据到缓存成功") -} diff --git a/internal/logic/admin/server/filterServerListLogic.go b/internal/logic/admin/server/filterServerListLogic.go index 5a74231..1bfedaf 100644 --- a/internal/logic/admin/server/filterServerListLogic.go +++ b/internal/logic/admin/server/filterServerListLogic.go @@ -55,7 +55,7 @@ func (l *FilterServerListLogic) FilterServerList(req *types.FilterServerListRequ tool.DeepCopy(&protocols, dst) server.Protocols = protocols // handler status - server.Status = l.handlerServerStatus(datum.Id) + server.Status = l.handlerServerStatus(datum.Id, protocols) list = append(list, server) } @@ -65,58 +65,57 @@ func (l *FilterServerListLogic) FilterServerList(req *types.FilterServerListRequ }, nil } -func (l *FilterServerListLogic) handlerServerStatus(id int64) types.ServerStatus { - var result types.ServerStatus - nodeStatus, err := l.svcCtx.NodeCache.GetNodeStatus(l.ctx, id) - if err != nil { - if !errors.Is(err, redis.Nil) { - l.Errorw("[handlerServerStatus] GetNodeStatus Error: ", logger.Field("error", err.Error()), logger.Field("node_id", id)) - } - return result - } - result = types.ServerStatus{ - Mem: nodeStatus.Mem, - Cpu: nodeStatus.Cpu, - Disk: nodeStatus.Disk, - Online: make([]types.ServerOnlineUser, 0), - } - - // parse online users - onlineUser, err := l.svcCtx.NodeCache.GetNodeOnlineUser(l.ctx, id) - if err != nil { - l.Errorw("[handlerServerStatus] GetNodeOnlineUser Error: ", logger.Field("error", err.Error()), logger.Field("node_id", id)) - return result - } - - var onlineList []types.ServerOnlineUser - var onlineMap = make(map[int64]types.ServerOnlineUser) - // group by user_id - for subId, info := range onlineUser { - data, err := l.svcCtx.UserModel.FindOneUserSubscribe(l.ctx, subId) +func (l *FilterServerListLogic) handlerServerStatus(id int64, protocols []types.Protocol) []types.ServerStatus { + var result []types.ServerStatus + for _, protocol := range protocols { + nodeStatus, err := l.svcCtx.NodeModel.StatusCache(l.ctx, id, protocol.Type) if err != nil { - l.Errorw("[handlerServerStatus] FindOneSubscribe Error: ", logger.Field("error", err.Error())) - continue - } - if online, exist := onlineMap[data.UserId]; !exist { - onlineMap[data.UserId] = types.ServerOnlineUser{ - IP: info, - UserId: data.UserId, - Subscribe: data.Subscribe.Name, - SubscribeId: data.SubscribeId, - Traffic: data.Traffic, - ExpiredAt: data.ExpireTime.UnixMilli(), + if !errors.Is(err, redis.Nil) { + l.Errorw("[handlerServerStatus] GetNodeStatus Error: ", logger.Field("error", err.Error()), logger.Field("node_id", id)) } - } else { - online.IP = append(online.IP, info...) - onlineMap[data.UserId] = online + return result } + status := types.ServerStatus{ + Mem: nodeStatus.Mem, + Cpu: nodeStatus.Cpu, + Disk: nodeStatus.Disk, + Protocol: protocol.Type, + Online: make([]types.ServerOnlineUser, 0), + } + // parse online users + onlineUser, err := l.svcCtx.NodeModel.OnlineUserSubscribe(l.ctx, id, protocol.Type) + if err != nil { + l.Errorw("[handlerServerStatus] GetNodeOnlineUser Error: ", logger.Field("error", err.Error()), logger.Field("node_id", id)) + return result + } + var onlineList []types.ServerOnlineUser + var onlineMap = make(map[int64]types.ServerOnlineUser) + // group by user_id + for subId, info := range onlineUser { + data, err := l.svcCtx.UserModel.FindOneUserSubscribe(l.ctx, subId) + if err != nil { + l.Errorw("[handlerServerStatus] FindOneSubscribe Error: ", logger.Field("error", err.Error())) + continue + } + if online, exist := onlineMap[data.UserId]; !exist { + onlineMap[data.UserId] = types.ServerOnlineUser{ + IP: info, + UserId: data.UserId, + Subscribe: data.Subscribe.Name, + SubscribeId: data.SubscribeId, + Traffic: data.Traffic, + ExpiredAt: data.ExpireTime.UnixMilli(), + } + } else { + online.IP = append(online.IP, info...) + onlineMap[data.UserId] = online + } + } + for _, online := range onlineMap { + onlineList = append(onlineList, online) + } + status.Online = onlineList + result = append(result, status) } - - for _, online := range onlineMap { - onlineList = append(onlineList, online) - } - - result.Online = onlineList - return result } diff --git a/internal/logic/server/pushOnlineUsersLogic.go b/internal/logic/server/pushOnlineUsersLogic.go index 9833e65..325aaee 100644 --- a/internal/logic/server/pushOnlineUsersLogic.go +++ b/internal/logic/server/pushOnlineUsersLogic.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" - "github.com/perfect-panel/server/internal/model/cache" + "github.com/perfect-panel/server/internal/model/node" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" @@ -46,20 +46,24 @@ func (l *PushOnlineUsersLogic) PushOnlineUsers(req *types.OnlineUsersRequest) er return fmt.Errorf("server not found: %w", err) } - userOnlineIp := make([]cache.NodeOnlineUser, 0) + var onlineUsers node.OnlineUserSubscribe for _, user := range req.Users { - userOnlineIp = append(userOnlineIp, cache.NodeOnlineUser{ - SID: user.SID, - IP: user.IP, - }) + if online, ok := onlineUsers[user.SID]; ok { + // If user already exists, update IP if different + online = append(online, user.IP) + onlineUsers[user.SID] = online + } else { + // New user, add to map + onlineUsers[user.SID] = []string{user.IP} + } } - err = l.svcCtx.NodeCache.AddOnlineUserIP(l.ctx, userOnlineIp) + err = l.svcCtx.NodeModel.UpdateOnlineUserSubscribe(l.ctx, req.ServerId, req.Protocol, onlineUsers) if err != nil { l.Errorw("[PushOnlineUsers] cache operation error", logger.Field("error", err)) return err } - err = l.svcCtx.NodeCache.UpdateNodeOnlineUser(l.ctx, req.ServerId, userOnlineIp) + err = l.svcCtx.NodeModel.UpdateOnlineUserSubscribeGlobal(l.ctx, onlineUsers) if err != nil { l.Errorw("[PushOnlineUsers] cache operation error", logger.Field("error", err)) diff --git a/internal/logic/server/serverPushStatusLogic.go b/internal/logic/server/serverPushStatusLogic.go index 5d7b4e8..1cecb5a 100644 --- a/internal/logic/server/serverPushStatusLogic.go +++ b/internal/logic/server/serverPushStatusLogic.go @@ -4,7 +4,7 @@ import ( "context" "errors" - "github.com/perfect-panel/server/internal/model/cache" + "github.com/perfect-panel/server/internal/model/node" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" @@ -32,7 +32,7 @@ func (l *ServerPushStatusLogic) ServerPushStatus(req *types.ServerPushStatusRequ l.Errorw("[PushOnlineUsers] FindOne error", logger.Field("error", err)) return errors.New("server not found") } - err = l.svcCtx.NodeCache.UpdateNodeStatus(l.ctx, req.ServerId, cache.NodeStatus{ + err = l.svcCtx.NodeModel.UpdateStatusCache(l.ctx, req.ServerId, req.Protocol, &node.Status{ Cpu: req.Cpu, Mem: req.Mem, Disk: req.Disk, diff --git a/internal/logic/server/serverPushUserTrafficLogic.go b/internal/logic/server/serverPushUserTrafficLogic.go index 0ce5b32..4f73691 100644 --- a/internal/logic/server/serverPushUserTrafficLogic.go +++ b/internal/logic/server/serverPushUserTrafficLogic.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/hibiken/asynq" - "github.com/perfect-panel/server/internal/model/cache" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" @@ -40,23 +39,9 @@ func (l *ServerPushUserTrafficLogic) ServerPushUserTraffic(req *types.ServerPush // Create traffic task var request task.TrafficStatistics - var userTraffic []cache.UserTraffic request.ServerId = serverInfo.Id tool.DeepCopy(&request.Logs, req.Traffic) - tool.DeepCopy(&userTraffic, req.Traffic) - // update today traffic rank - err = l.svcCtx.NodeCache.AddNodeTodayTraffic(l.ctx, serverInfo.Id, userTraffic) - if err != nil { - l.Errorw("[ServerPushUserTraffic] AddNodeTodayTraffic error", logger.Field("error", err)) - return errors.New("add node today traffic error") - } - for _, user := range req.Traffic { - if err = l.svcCtx.NodeCache.AddUserTodayTraffic(l.ctx, user.SID, user.Upload, user.Download); err != nil { - l.Errorw("[ServerPushUserTraffic] AddUserTodayTraffic error", logger.Field("error", err)) - continue - } - } // Push traffic task val, _ := json.Marshal(request) t := asynq.NewTask(task.ForthwithTrafficStatistics, val, asynq.MaxRetry(3)) diff --git a/internal/model/cache/constant.go b/internal/model/cache/constant.go deleted file mode 100644 index ece753b..0000000 --- a/internal/model/cache/constant.go +++ /dev/null @@ -1,44 +0,0 @@ -package cache - -const ( - // UserTodayUploadTrafficCacheKey 用户当日上传流量 - UserTodayUploadTrafficCacheKey = "node:user_today_upload_traffic" - // UserTodayDownloadTrafficCacheKey 用户当日下载流量 - UserTodayDownloadTrafficCacheKey = "node:user_today_download_traffic" - // UserTodayTotalTrafficCacheKey 用户当日总流量 - UserTodayTotalTrafficCacheKey = "node:user_today_total_traffic" - // NodeTodayUploadTrafficCacheKey 节点当日上传流量 - NodeTodayUploadTrafficCacheKey = "node:node_today_upload_traffic" - // NodeTodayDownloadTrafficCacheKey 节点当日下载流量 - NodeTodayDownloadTrafficCacheKey = "node:node_today_download_traffic" - // NodeTodayTotalTrafficCacheKey 节点当日总流量 - NodeTodayTotalTrafficCacheKey = "node:node_today_total_traffic" - // UserTodayUploadTrafficRankKey 用户当日上传流量排行榜 - UserTodayUploadTrafficRankKey = "node:user_today_upload_traffic_rank" - // UserTodayDownloadTrafficRankKey 用户当日下载流量排行榜 - UserTodayDownloadTrafficRankKey = "node:user_today_download_traffic_rank" - // UserTodayTotalTrafficRankKey 用户当日总流量排行榜 - UserTodayTotalTrafficRankKey = "node:user_today_total_traffic_rank" - // NodeTodayUploadTrafficRankKey 节点当日上传流量排行榜 - NodeTodayUploadTrafficRankKey = "node:node_today_upload_traffic_rank" - // NodeTodayDownloadTrafficRankKey 节点当日下载流量排行榜 - NodeTodayDownloadTrafficRankKey = "node:node_today_download_traffic_rank" - // NodeTodayTotalTrafficRankKey 节点当日总流量排行榜 - NodeTodayTotalTrafficRankKey = "node:node_today_total_traffic_rank" - // NodeOnlineUserCacheKey 节点在线用户 - NodeOnlineUserCacheKey = "node:node_online_user:%d" - // UserOnlineIpCacheKey 用户在线IP - UserOnlineIpCacheKey = "node:user_online_ip:%d" - // AllNodeOnlineUserCacheKey 所有节点在线用户 - AllNodeOnlineUserCacheKey = "node:all_node_online_user" - // NodeStatusCacheKey 节点状态 - NodeStatusCacheKey = "node:status:%d" - // AllNodeDownloadTrafficCacheKey 所有节点下载流量 - AllNodeDownloadTrafficCacheKey = "node:all_node_download_traffic" - // AllNodeUploadTrafficCacheKey 所有节点上传流量 - AllNodeUploadTrafficCacheKey = "node:all_node_upload_traffic" - // YesterdayTotalTrafficRank 昨日节点总流量排行榜 - YesterdayNodeTotalTrafficRank = "node:yesterday_total_traffic_rank" - // YesterdayUserTotalTrafficRank 昨日用户总流量排行榜 - YesterdayUserTotalTrafficRank = "node:yesterday_user_total_traffic_rank" -) diff --git a/internal/model/cache/node.go b/internal/model/cache/node.go deleted file mode 100644 index 9b65b15..0000000 --- a/internal/model/cache/node.go +++ /dev/null @@ -1,584 +0,0 @@ -package cache - -import ( - "context" - "encoding/json" - "fmt" - "strconv" - "sync" - "time" - - "github.com/perfect-panel/server/pkg/logger" - "github.com/redis/go-redis/v9" -) - -type NodeCacheClient struct { - *redis.Client - resetMutex sync.Mutex -} - -func NewNodeCacheClient(rds *redis.Client) *NodeCacheClient { - return &NodeCacheClient{ - Client: rds, - } -} - -// AddOnlineUserIP adds user's online IP -func (c *NodeCacheClient) AddOnlineUserIP(ctx context.Context, users []NodeOnlineUser) error { - if len(users) == 0 { - // No users to add - return nil - } - - // Use Pipeline to optimize Redis operations - pipe := c.Pipeline() - - // Add user online IPs and clean up expired IPs for each user - for _, user := range users { - if user.SID <= 0 || user.IP == "" { - logger.Errorf("invalid user data: uid=%d, ip=%s", user.SID, user.IP) - continue - } - - key := fmt.Sprintf(UserOnlineIpCacheKey, user.SID) - now := time.Now() - expireTime := now.Add(5 * time.Minute) - - // Clean up expired user online IPs - pipe.ZRemRangeByScore(ctx, key, "0", fmt.Sprintf("%d", now.Unix())) - pipe.ZRemRangeByScore(ctx, AllNodeOnlineUserCacheKey, "0", fmt.Sprintf("%d", now.Unix())) - - // Add or update user online IP - // XX: Only update elements that already exist - // NX: Only add new elements - _ = pipe.ZAdd(ctx, key, redis.Z{ - Score: float64(expireTime.Unix()), - Member: user.IP, - }).Err() - _ = pipe.ZAdd(ctx, AllNodeOnlineUserCacheKey, redis.Z{ - Score: float64(expireTime.Unix()), - Member: user.IP, - }).Err() - - // Set key expiration to 5 minutes (slightly longer than IP expiration) - pipe.Expire(ctx, key, 5*time.Minute) - pipe.Expire(ctx, AllNodeOnlineUserCacheKey, 5*time.Minute) - } - - // Execute all commands - _, err := pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to add node user online ip: %w", err) - } - return nil -} - -// GetUserOnlineIp gets user's online IPs -func (c *NodeCacheClient) GetUserOnlineIp(ctx context.Context, uid int64) ([]string, error) { - if uid <= 0 { - return nil, fmt.Errorf("invalid parameters: uid=%d", uid) - } - - // Get user's online IPs - ips, err := c.ZRevRangeByScore(ctx, fmt.Sprintf(UserOnlineIpCacheKey, uid), &redis.ZRangeBy{ - Min: "0", - Max: fmt.Sprintf("%d", time.Now().Add(5*time.Minute).Unix()), - Offset: 0, - Count: 100, - }).Result() - if err != nil { - return nil, fmt.Errorf("failed to get user online ip: %w", err) - } - return ips, nil -} - -// UpdateNodeOnlineUser updates node's online users and IPs -func (c *NodeCacheClient) UpdateNodeOnlineUser(ctx context.Context, nodeId int64, users []NodeOnlineUser) error { - if nodeId <= 0 || len(users) == 0 { - return fmt.Errorf("invalid parameters: nodeId=%d, users=%v", nodeId, users) - } - // Organize data - data := make(map[int64][]string) - for _, user := range users { - data[user.SID] = append(data[user.SID], user.IP) - } - - value, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("failed to marshal data: %w", err) - } - - c.Set(ctx, fmt.Sprintf(NodeOnlineUserCacheKey, nodeId), value, time.Minute*5) - return nil -} - -// GetNodeOnlineUser gets node's online users and IPs -func (c *NodeCacheClient) GetNodeOnlineUser(ctx context.Context, nodeId int64) (map[int64][]string, error) { - if nodeId <= 0 { - return nil, fmt.Errorf("invalid parameters: nodeId=%d", nodeId) - } - value, err := c.Get(ctx, fmt.Sprintf(NodeOnlineUserCacheKey, nodeId)).Result() - if err != nil { - return nil, fmt.Errorf("failed to get node online user: %w", err) - } - var data map[int64][]string - if err := json.Unmarshal([]byte(value), &data); err != nil { - return nil, fmt.Errorf("failed to unmarshal data: %w", err) - } - return data, nil -} - -// AddUserTodayTraffic Add user's today traffic -func (c *NodeCacheClient) AddUserTodayTraffic(ctx context.Context, uid int64, upload, download int64) error { - if uid <= 0 || upload <= 0 { - return fmt.Errorf("invalid parameters: uid=%d, upload=%d", uid, upload) - } - pipe := c.Pipeline() - // User's today upload traffic - pipe.HIncrBy(ctx, UserTodayUploadTrafficCacheKey, fmt.Sprintf("%d", uid), upload) - // User's today download traffic - pipe.HIncrBy(ctx, UserTodayDownloadTrafficCacheKey, fmt.Sprintf("%d", uid), download) - // User's today total traffic - pipe.HIncrBy(ctx, UserTodayTotalTrafficCacheKey, fmt.Sprintf("%d", uid), upload+download) - // User's today traffic ranking - pipe.ZIncrBy(ctx, UserTodayUploadTrafficRankKey, float64(upload), fmt.Sprintf("%d", uid)) - pipe.ZIncrBy(ctx, UserTodayDownloadTrafficRankKey, float64(download), fmt.Sprintf("%d", uid)) - pipe.ZIncrBy(ctx, UserTodayTotalTrafficRankKey, float64(upload+download), fmt.Sprintf("%d", uid)) - - // All node upload traffic - pipe.IncrBy(ctx, AllNodeUploadTrafficCacheKey, upload) - // All node download traffic - pipe.IncrBy(ctx, AllNodeDownloadTrafficCacheKey, download) - // Execute commands - _, err := pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to add user today upload traffic: %w", err) - } - return nil -} - -// AddNodeTodayTraffic Add node's today traffic -func (c *NodeCacheClient) AddNodeTodayTraffic(ctx context.Context, nodeId int64, userTraffic []UserTraffic) error { - if nodeId <= 0 || len(userTraffic) == 0 { - return fmt.Errorf("invalid parameters: nodeId=%d, userTraffic=%v", nodeId, userTraffic) - } - pipe := c.Pipeline() - upload, download, total := c.calculateTraffic(userTraffic) - pipe.HIncrBy(ctx, NodeTodayUploadTrafficCacheKey, fmt.Sprintf("%d", nodeId), upload) - pipe.HIncrBy(ctx, NodeTodayDownloadTrafficCacheKey, fmt.Sprintf("%d", nodeId), download) - pipe.HIncrBy(ctx, NodeTodayTotalTrafficCacheKey, fmt.Sprintf("%d", nodeId), total) - pipe.ZIncrBy(ctx, NodeTodayUploadTrafficRankKey, float64(upload), fmt.Sprintf("%d", nodeId)) - pipe.ZIncrBy(ctx, NodeTodayDownloadTrafficRankKey, float64(download), fmt.Sprintf("%d", nodeId)) - pipe.ZIncrBy(ctx, NodeTodayTotalTrafficRankKey, float64(total), fmt.Sprintf("%d", nodeId)) - // Execute commands - _, err := pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to add node today upload traffic: %w", err) - } - return nil -} - -// Get user's traffic data -func (c *NodeCacheClient) getUserTrafficData(ctx context.Context, uid int64) (upload, download int64, err error) { - upload, err = c.HGet(ctx, UserTodayUploadTrafficCacheKey, fmt.Sprintf("%d", uid)).Int64() - if err != nil { - return 0, 0, fmt.Errorf("failed to get user today upload traffic: %w", err) - } - download, err = c.HGet(ctx, UserTodayDownloadTrafficCacheKey, fmt.Sprintf("%d", uid)).Int64() - if err != nil { - return 0, 0, fmt.Errorf("failed to get user today download traffic: %w", err) - } - return upload, download, nil -} - -// Get node's traffic data -func (c *NodeCacheClient) getNodeTrafficData(ctx context.Context, nodeId int64) (upload, download int64, err error) { - upload, err = c.HGet(ctx, NodeTodayUploadTrafficCacheKey, fmt.Sprintf("%d", nodeId)).Int64() - if err != nil { - return 0, 0, fmt.Errorf("failed to get node today upload traffic: %w", err) - } - download, err = c.HGet(ctx, NodeTodayDownloadTrafficCacheKey, fmt.Sprintf("%d", nodeId)).Int64() - if err != nil { - return 0, 0, fmt.Errorf("failed to get node today download traffic: %w", err) - } - return upload, download, nil -} - -// Parse ID -func (c *NodeCacheClient) parseID(member interface{}, idType string) (int64, error) { - id, err := strconv.ParseInt(member.(string), 10, 64) - if err != nil { - return 0, fmt.Errorf("failed to parse %s id %v: %w", idType, member, err) - } - return id, nil -} - -// GetUserTodayTotalTrafficRank Get user's today total traffic ranking top N -func (c *NodeCacheClient) GetUserTodayTotalTrafficRank(ctx context.Context, n int64) ([]UserTodayTrafficRank, error) { - if n <= 0 { - return nil, fmt.Errorf("invalid parameters: n=%d", n) - } - data, err := c.ZRevRangeWithScores(ctx, UserTodayTotalTrafficRankKey, 0, n-1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get user today total traffic rank: %w", err) - } - users := make([]UserTodayTrafficRank, 0, len(data)) - for _, user := range data { - uid, err := c.parseID(user.Member, "user") - if err != nil { - logger.Errorf("%v", err) - continue - } - upload, download, err := c.getUserTrafficData(ctx, uid) - if err != nil { - logger.Errorf("%v", err) - continue - } - users = append(users, UserTodayTrafficRank{ - SID: uid, - Upload: upload, - Download: download, - Total: int64(user.Score), - }) - } - return users, nil -} - -// GetNodeTodayTotalTrafficRank Get node's today total traffic ranking top N -func (c *NodeCacheClient) GetNodeTodayTotalTrafficRank(ctx context.Context, n int64) ([]NodeTodayTrafficRank, error) { - if n <= 0 { - return nil, fmt.Errorf("invalid parameters: n=%d", n) - } - data, err := c.ZRevRangeWithScores(ctx, NodeTodayTotalTrafficRankKey, 0, n-1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get node today total traffic rank: %w", err) - } - nodes := make([]NodeTodayTrafficRank, 0, len(data)) - for _, node := range data { - nodeId, err := c.parseID(node.Member, "node") - if err != nil { - logger.Errorf("%v", err) - continue - } - upload, download, err := c.getNodeTrafficData(ctx, nodeId) - if err != nil { - logger.Errorf("%v", err) - continue - } - nodes = append(nodes, NodeTodayTrafficRank{ - ID: nodeId, - Upload: upload, - Download: download, - Total: int64(node.Score), - }) - } - return nodes, nil -} - -// GetUserTodayUploadTrafficRank Get user's today upload traffic ranking top N -func (c *NodeCacheClient) GetUserTodayUploadTrafficRank(ctx context.Context, n int64) ([]UserTodayTrafficRank, error) { - if n <= 0 { - return nil, fmt.Errorf("invalid parameters: n=%d", n) - } - data, err := c.ZRevRangeWithScores(ctx, UserTodayUploadTrafficRankKey, 0, n-1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get user today upload traffic rank: %w", err) - } - users := make([]UserTodayTrafficRank, 0, len(data)) - for _, user := range data { - uid, err := c.parseID(user.Member, "user") - if err != nil { - logger.Errorf("%v", err) - continue - } - upload, download, err := c.getUserTrafficData(ctx, uid) - if err != nil { - logger.Errorf("%v", err) - continue - } - users = append(users, UserTodayTrafficRank{ - SID: uid, - Upload: upload, - Download: download, - Total: int64(user.Score), - }) - } - return users, nil -} - -// GetUserTodayDownloadTrafficRank Get user's today download traffic ranking top N -func (c *NodeCacheClient) GetUserTodayDownloadTrafficRank(ctx context.Context, n int64) ([]UserTodayTrafficRank, error) { - if n <= 0 { - return nil, fmt.Errorf("invalid parameters: n=%d", n) - } - data, err := c.ZRevRangeWithScores(ctx, UserTodayDownloadTrafficRankKey, 0, n-1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get user today download traffic rank: %w", err) - } - users := make([]UserTodayTrafficRank, 0, len(data)) - for _, user := range data { - uid, err := c.parseID(user.Member, "user") - if err != nil { - logger.Errorf("%v", err) - continue - } - upload, download, err := c.getUserTrafficData(ctx, uid) - if err != nil { - logger.Errorf("%v", err) - continue - } - users = append(users, UserTodayTrafficRank{ - SID: uid, - Upload: upload, - Download: download, - Total: int64(user.Score), - }) - } - return users, nil -} - -// GetNodeTodayUploadTrafficRank Get node's today upload traffic ranking top N -func (c *NodeCacheClient) GetNodeTodayUploadTrafficRank(ctx context.Context, n int64) ([]NodeTodayTrafficRank, error) { - if n <= 0 { - return nil, fmt.Errorf("invalid parameters: n=%d", n) - } - data, err := c.ZRevRangeWithScores(ctx, NodeTodayUploadTrafficRankKey, 0, n-1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get node today upload traffic rank: %w", err) - } - nodes := make([]NodeTodayTrafficRank, 0, len(data)) - for _, node := range data { - nodeId, err := c.parseID(node.Member, "node") - if err != nil { - logger.Errorf("%v", err) - continue - } - upload, download, err := c.getNodeTrafficData(ctx, nodeId) - if err != nil { - logger.Errorf("%v", err) - continue - } - nodes = append(nodes, NodeTodayTrafficRank{ - ID: nodeId, - Upload: upload, - Download: download, - Total: int64(node.Score), - }) - } - return nodes, nil -} - -// GetNodeTodayDownloadTrafficRank Get node's today download traffic ranking top N -func (c *NodeCacheClient) GetNodeTodayDownloadTrafficRank(ctx context.Context, n int64) ([]NodeTodayTrafficRank, error) { - if n <= 0 { - return nil, fmt.Errorf("invalid parameters: n=%d", n) - } - data, err := c.ZRevRangeWithScores(ctx, NodeTodayDownloadTrafficRankKey, 0, n-1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get node today download traffic rank: %w", err) - } - nodes := make([]NodeTodayTrafficRank, 0, len(data)) - for _, node := range data { - nodeId, err := c.parseID(node.Member, "node") - if err != nil { - logger.Errorf("%v", err) - continue - } - upload, download, err := c.getNodeTrafficData(ctx, nodeId) - if err != nil { - logger.Errorf("%v", err) - continue - } - nodes = append(nodes, NodeTodayTrafficRank{ - ID: nodeId, - Upload: upload, - Download: download, - Total: int64(node.Score), - }) - } - return nodes, nil -} - -// ResetTodayTrafficData Reset today's traffic data -func (c *NodeCacheClient) ResetTodayTrafficData(ctx context.Context) error { - c.resetMutex.Lock() - defer c.resetMutex.Unlock() - pipe := c.Pipeline() - pipe.Del(ctx, UserTodayUploadTrafficCacheKey) - pipe.Del(ctx, UserTodayDownloadTrafficCacheKey) - pipe.Del(ctx, UserTodayTotalTrafficCacheKey) - pipe.Del(ctx, NodeTodayUploadTrafficCacheKey) - pipe.Del(ctx, NodeTodayDownloadTrafficCacheKey) - pipe.Del(ctx, NodeTodayTotalTrafficCacheKey) - pipe.Del(ctx, UserTodayUploadTrafficRankKey) - pipe.Del(ctx, UserTodayDownloadTrafficRankKey) - pipe.Del(ctx, UserTodayTotalTrafficRankKey) - pipe.Del(ctx, NodeTodayUploadTrafficRankKey) - pipe.Del(ctx, NodeTodayDownloadTrafficRankKey) - pipe.Del(ctx, NodeTodayTotalTrafficRankKey) - pipe.Del(ctx, AllNodeDownloadTrafficCacheKey) - pipe.Del(ctx, AllNodeUploadTrafficCacheKey) - _, err := pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to reset today traffic data: %w", err) - } - return nil -} - -// Calculate traffic -func (c *NodeCacheClient) calculateTraffic(data []UserTraffic) (upload, download, total int64) { - for _, userTraffic := range data { - upload += userTraffic.Upload - download += userTraffic.Download - total += userTraffic.Upload + userTraffic.Download - } - return upload, download, total -} - -// GetAllNodeOnlineUser Get all node online user -func (c *NodeCacheClient) GetAllNodeOnlineUser(ctx context.Context) ([]string, error) { - users, err := c.ZRevRange(ctx, AllNodeOnlineUserCacheKey, 0, -1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get all node online user: %w", err) - } - return users, nil -} - -// UpdateNodeStatus Update node status -func (c *NodeCacheClient) UpdateNodeStatus(ctx context.Context, nodeId int64, status NodeStatus) error { - // 参数验证 - if nodeId <= 0 { - return fmt.Errorf("invalid node id: %d", nodeId) - } - - // 验证状态数据 - if status.UpdatedAt <= 0 { - return fmt.Errorf("invalid status data: updated_at=%d", status.UpdatedAt) - } - - // 序列化状态数据 - value, err := json.Marshal(status) - if err != nil { - return fmt.Errorf("failed to marshal node status: %w", err) - } - - // 使用 Pipeline 优化性能 - pipe := c.Pipeline() - - // 设置状态数据 - pipe.Set(ctx, fmt.Sprintf(NodeStatusCacheKey, nodeId), value, time.Minute*5) - - // 执行命令 - _, err = pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to update node status: %w", err) - } - - return nil -} - -// GetNodeStatus Get node status -func (c *NodeCacheClient) GetNodeStatus(ctx context.Context, nodeId int64) (NodeStatus, error) { - status, err := c.Get(ctx, fmt.Sprintf(NodeStatusCacheKey, nodeId)).Result() - if err != nil { - return NodeStatus{}, fmt.Errorf("failed to get node status: %w", err) - } - var nodeStatus NodeStatus - if err := json.Unmarshal([]byte(status), &nodeStatus); err != nil { - return NodeStatus{}, fmt.Errorf("failed to unmarshal node status: %w", err) - } - return nodeStatus, nil -} - -// GetOnlineNodeStatusCount Get Online Node Status Count -func (c *NodeCacheClient) GetOnlineNodeStatusCount(ctx context.Context) (int64, error) { - // 获取所有节点Key - keys, err := c.Keys(ctx, "node:status:*").Result() - if err != nil { - return 0, fmt.Errorf("failed to get all node status keys: %w", err) - } - var count int64 - for _, key := range keys { - status, err := c.Get(ctx, key).Result() - if err != nil { - logger.Errorf("failed to get node status: %v", err.Error()) - continue - } - if status != "" { - count++ - } - } - return count, nil -} - -// GetAllNodeUploadTraffic Get all node upload traffic -func (c *NodeCacheClient) GetAllNodeUploadTraffic(ctx context.Context) (int64, error) { - upload, err := c.Get(ctx, AllNodeUploadTrafficCacheKey).Int64() - if err != nil { - return 0, fmt.Errorf("failed to get all node upload traffic: %w", err) - } - return upload, nil -} - -// GetAllNodeDownloadTraffic Get all node download traffic -func (c *NodeCacheClient) GetAllNodeDownloadTraffic(ctx context.Context) (int64, error) { - download, err := c.Get(ctx, AllNodeDownloadTrafficCacheKey).Int64() - if err != nil { - return 0, fmt.Errorf("failed to get all node download traffic: %w", err) - } - return download, nil -} - -// UpdateYesterdayNodeTotalTrafficRank Update yesterday node total traffic rank -func (c *NodeCacheClient) UpdateYesterdayNodeTotalTrafficRank(ctx context.Context, nodes []NodeTodayTrafficRank) error { - expireAt := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local).Add(time.Hour * 24) - t := time.Until(expireAt) - pipe := c.Pipeline() - value, _ := json.Marshal(nodes) - pipe.Set(ctx, YesterdayNodeTotalTrafficRank, value, t) - _, err := pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to update yesterday node total traffic rank: %w", err) - } - return nil -} - -// UpdateYesterdayUserTotalTrafficRank Update yesterday user total traffic rank -func (c *NodeCacheClient) UpdateYesterdayUserTotalTrafficRank(ctx context.Context, users []UserTodayTrafficRank) error { - expireAt := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local).Add(time.Hour * 24) - t := time.Until(expireAt) - pipe := c.Pipeline() - value, _ := json.Marshal(users) - pipe.Set(ctx, YesterdayUserTotalTrafficRank, value, t) - _, err := pipe.Exec(ctx) - if err != nil { - return fmt.Errorf("failed to update yesterday user total traffic rank: %w", err) - } - return nil -} - -// GetYesterdayNodeTotalTrafficRank Get yesterday node total traffic rank -func (c *NodeCacheClient) GetYesterdayNodeTotalTrafficRank(ctx context.Context) ([]NodeTodayTrafficRank, error) { - value, err := c.Get(ctx, YesterdayNodeTotalTrafficRank).Result() - if err != nil { - return nil, fmt.Errorf("failed to get yesterday node total traffic rank: %w", err) - } - var nodes []NodeTodayTrafficRank - if err := json.Unmarshal([]byte(value), &nodes); err != nil { - return nil, fmt.Errorf("failed to unmarshal yesterday node total traffic rank: %w", err) - } - return nodes, nil -} - -// GetYesterdayUserTotalTrafficRank Get yesterday user total traffic rank -func (c *NodeCacheClient) GetYesterdayUserTotalTrafficRank(ctx context.Context) ([]UserTodayTrafficRank, error) { - value, err := c.Get(ctx, YesterdayUserTotalTrafficRank).Result() - if err != nil { - return nil, fmt.Errorf("failed to get yesterday user total traffic rank: %w", err) - } - var users []UserTodayTrafficRank - if err := json.Unmarshal([]byte(value), &users); err != nil { - return nil, fmt.Errorf("failed to unmarshal yesterday user total traffic rank: %w", err) - } - return users, nil -} diff --git a/internal/model/cache/node_test.go b/internal/model/cache/node_test.go deleted file mode 100644 index b7660ab..0000000 --- a/internal/model/cache/node_test.go +++ /dev/null @@ -1,575 +0,0 @@ -package cache - -import ( - "context" - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/alicebob/miniredis/v2" - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// Create a test Redis client -func newTestRedisClient(t *testing.T) *redis.Client { - mr, err := miniredis.Run() - require.NoError(t, err) - - client := redis.NewClient(&redis.Options{ - Addr: mr.Addr(), - }) - require.NoError(t, client.Ping(context.Background()).Err()) - return client -} - -// Clean up test data -func cleanupTestData(t *testing.T, client *redis.Client) { - ctx := context.Background() - keys := []string{ - UserTodayUploadTrafficCacheKey, - UserTodayDownloadTrafficCacheKey, - UserTodayTotalTrafficCacheKey, - NodeTodayUploadTrafficCacheKey, - NodeTodayDownloadTrafficCacheKey, - NodeTodayTotalTrafficCacheKey, - UserTodayUploadTrafficRankKey, - UserTodayDownloadTrafficRankKey, - UserTodayTotalTrafficRankKey, - NodeTodayUploadTrafficRankKey, - NodeTodayDownloadTrafficRankKey, - NodeTodayTotalTrafficRankKey, - } - - // Clean up all cache keys - for _, key := range keys { - require.NoError(t, client.Del(ctx, key).Err()) - } - - // Clean up user online IP cache - for uid := int64(1); uid <= 3; uid++ { - require.NoError(t, client.Del(ctx, fmt.Sprintf(UserOnlineIpCacheKey, uid)).Err()) - } - - // Clean up node online user cache - for nodeId := int64(1); nodeId <= 3; nodeId++ { - require.NoError(t, client.Del(ctx, fmt.Sprintf(NodeOnlineUserCacheKey, nodeId)).Err()) - } -} - -func TestNodeCacheClient_AddUserTodayTraffic(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - tests := []struct { - name string - uid int64 - upload int64 - download int64 - wantErr bool - }{ - { - name: "Add traffic normally", - uid: 1, - upload: 100, - download: 200, - wantErr: false, - }, - { - name: "Invalid SID", - uid: 0, - upload: 100, - download: 200, - wantErr: true, - }, - { - name: "Invalid upload traffic", - uid: 1, - upload: 0, - download: 200, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := cache.AddUserTodayTraffic(ctx, tt.uid, tt.upload, tt.download) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - // Verify data is added correctly - upload, err := client.HGet(ctx, UserTodayUploadTrafficCacheKey, "1").Int64() - assert.NoError(t, err) - assert.Equal(t, tt.upload, upload) - - download, err := client.HGet(ctx, UserTodayDownloadTrafficCacheKey, "1").Int64() - assert.NoError(t, err) - assert.Equal(t, tt.download, download) - }) - } -} - -func TestNodeCacheClient_AddNodeTodayTraffic(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - tests := []struct { - name string - nodeId int64 - userTraffic []UserTraffic - wantErr bool - }{ - { - name: "Add node traffic normally", - nodeId: 1, - userTraffic: []UserTraffic{ - {UID: 1, Upload: 100, Download: 200}, - {UID: 2, Upload: 300, Download: 400}, - }, - wantErr: false, - }, - { - name: "Invalid node ID", - nodeId: 0, - userTraffic: []UserTraffic{ - {UID: 1, Upload: 100, Download: 200}, - }, - wantErr: true, - }, - { - name: "Empty user traffic data", - nodeId: 1, - userTraffic: []UserTraffic{}, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := cache.AddNodeTodayTraffic(ctx, tt.nodeId, tt.userTraffic) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - // Verify data is added correctly - upload, err := client.HGet(ctx, NodeTodayUploadTrafficCacheKey, "1").Int64() - assert.NoError(t, err) - assert.Equal(t, int64(400), upload) // 100 + 300 - - download, err := client.HGet(ctx, NodeTodayDownloadTrafficCacheKey, "1").Int64() - assert.NoError(t, err) - assert.Equal(t, int64(600), download) // 200 + 400 - }) - } -} - -func TestNodeCacheClient_GetUserTodayTrafficRank(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - // Prepare test data - testData := []struct { - uid int64 - upload int64 - download int64 - }{ - {1, 100, 200}, - {2, 300, 400}, - {3, 500, 600}, - } - - for _, data := range testData { - err := cache.AddUserTodayTraffic(ctx, data.uid, data.upload, data.download) - require.NoError(t, err) - } - - tests := []struct { - name string - n int64 - wantErr bool - }{ - { - name: "Get top 2 ranks", - n: 2, - wantErr: false, - }, - { - name: "Get all ranks", - n: 3, - wantErr: false, - }, - { - name: "Invalid N value", - n: 0, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ranks, err := cache.GetUserTodayTotalTrafficRank(ctx, tt.n) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - assert.Len(t, ranks, int(tt.n)) - - // Verify sorting is correct - for i := 1; i < len(ranks); i++ { - assert.GreaterOrEqual(t, ranks[i-1].Total, ranks[i].Total) - } - }) - } -} - -func TestNodeCacheClient_ResetTodayTrafficData(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - // Prepare test data - err := cache.AddUserTodayTraffic(ctx, 1, 100, 200) - require.NoError(t, err) - err = cache.AddNodeTodayTraffic(ctx, 1, []UserTraffic{{UID: 1, Upload: 100, Download: 200}}) - require.NoError(t, err) - - // Test reset functionality - err = cache.ResetTodayTrafficData(ctx) - assert.NoError(t, err) - - // Verify data is cleared - keys := []string{ - UserTodayUploadTrafficCacheKey, - UserTodayDownloadTrafficCacheKey, - UserTodayTotalTrafficCacheKey, - NodeTodayUploadTrafficCacheKey, - NodeTodayDownloadTrafficCacheKey, - NodeTodayTotalTrafficCacheKey, - } - - for _, key := range keys { - exists, err := client.Exists(ctx, key).Result() - assert.NoError(t, err) - assert.Equal(t, int64(0), exists) - } -} - -func TestNodeCacheClient_GetNodeTodayTrafficRank(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - // Prepare test data - testData := []struct { - nodeId int64 - traffic []UserTraffic - }{ - {1, []UserTraffic{{UID: 1, Upload: 100, Download: 200}}}, - {2, []UserTraffic{{UID: 2, Upload: 300, Download: 400}}}, - {3, []UserTraffic{{UID: 3, Upload: 500, Download: 600}}}, - } - - for _, data := range testData { - err := cache.AddNodeTodayTraffic(ctx, data.nodeId, data.traffic) - require.NoError(t, err) - } - - tests := []struct { - name string - n int64 - wantErr bool - }{ - { - name: "Get top 2 ranks", - n: 2, - wantErr: false, - }, - { - name: "Get all ranks", - n: 3, - wantErr: false, - }, - { - name: "Invalid N value", - n: 0, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ranks, err := cache.GetNodeTodayTotalTrafficRank(ctx, tt.n) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - assert.Len(t, ranks, int(tt.n)) - - // Verify sorting is correct - for i := 1; i < len(ranks); i++ { - assert.GreaterOrEqual(t, ranks[i-1].Total, ranks[i].Total) - } - }) - } -} - -func TestNodeCacheClient_AddNodeOnlineUser(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - tests := []struct { - name string - nodeId int64 - users []NodeOnlineUser - wantErr bool - }{ - { - name: "Add online users normally", - nodeId: 1, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - {SID: 2, IP: "192.168.1.2"}, - }, - wantErr: false, - }, - { - name: "Invalid node ID", - nodeId: 0, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - }, - wantErr: false, - }, - { - name: "Empty user list", - nodeId: 1, - users: []NodeOnlineUser{}, - wantErr: false, - }, - { - name: "Add duplicate user IP", - nodeId: 1, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - {SID: 1, IP: "192.168.1.1"}, - }, - wantErr: false, - }, - { - name: "Multiple IPs for same user", - nodeId: 1, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - {SID: 1, IP: "192.168.1.2"}, - }, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := cache.AddOnlineUserIP(ctx, tt.users) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - // Verify data is added correctly - for _, user := range tt.users { - // Get user online IPs - ips, err := cache.GetUserOnlineIp(ctx, user.SID) - assert.NoError(t, err) - assert.Contains(t, ips, user.IP) - - // Verify score is within valid range (current time to 5 minutes later) - score, err := client.ZScore(ctx, fmt.Sprintf(UserOnlineIpCacheKey, user.SID), user.IP).Result() - assert.NoError(t, err) - now := time.Now().Unix() - assert.GreaterOrEqual(t, score, float64(now)) - assert.LessOrEqual(t, score, float64(now+300)) // 5 minutes = 300 seconds - - // Verify key exists - exists, err := client.Exists(ctx, fmt.Sprintf(UserOnlineIpCacheKey, user.SID)).Result() - assert.NoError(t, err) - assert.Equal(t, int64(1), exists) - } - }) - } -} - -func TestNodeCacheClient_GetUserOnlineIp(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - // Prepare test data - testData := []struct { - nodeId int64 - users []NodeOnlineUser - }{ - { - nodeId: 1, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - {SID: 1, IP: "192.168.1.2"}, - {SID: 2, IP: "192.168.1.3"}, - }, - }, - } - - // Add test data - for _, data := range testData { - err := cache.AddOnlineUserIP(ctx, data.users) - require.NoError(t, err) - } - - tests := []struct { - name string - uid int64 - wantErr bool - wantIPs []string - }{ - { - name: "Get existing user IPs", - uid: 1, - wantErr: false, - wantIPs: []string{"192.168.1.1", "192.168.1.2"}, - }, - { - name: "Get another user's IPs", - uid: 2, - wantErr: false, - wantIPs: []string{"192.168.1.3"}, - }, - { - name: "Get non-existent user IPs", - uid: 3, - wantErr: false, - wantIPs: []string{}, - }, - { - name: "Invalid user ID", - uid: 0, - wantErr: true, - }, - { - name: "Expired IPs should not be returned", - uid: 1, - wantErr: false, - wantIPs: []string{"192.168.1.1", "192.168.1.2"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ips, err := cache.GetUserOnlineIp(ctx, tt.uid) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - assert.ElementsMatch(t, tt.wantIPs, ips) - - // Verify all returned IPs are valid - for _, ip := range ips { - score, err := client.ZScore(ctx, fmt.Sprintf(UserOnlineIpCacheKey, tt.uid), ip).Result() - assert.NoError(t, err) - now := time.Now().Unix() - assert.GreaterOrEqual(t, score, float64(now)) - } - }) - } -} - -func TestNodeCacheClient_UpdateNodeOnlineUser(t *testing.T) { - client := newTestRedisClient(t) - defer cleanupTestData(t, client) - - cache := NewNodeCacheClient(client) - ctx := context.Background() - - tests := []struct { - name string - nodeId int64 - users []NodeOnlineUser - wantErr bool - }{ - { - name: "Update online users normally", - nodeId: 1, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - {SID: 2, IP: "192.168.1.2"}, - }, - wantErr: false, - }, - { - name: "Invalid node ID", - nodeId: 0, - users: []NodeOnlineUser{ - {SID: 1, IP: "192.168.1.1"}, - }, - wantErr: true, - }, - { - name: "Empty user list", - nodeId: 1, - users: []NodeOnlineUser{}, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := cache.UpdateNodeOnlineUser(ctx, tt.nodeId, tt.users) - if tt.wantErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - // Verify data is updated correctly - data, err := client.Get(ctx, fmt.Sprintf(NodeOnlineUserCacheKey, tt.nodeId)).Result() - assert.NoError(t, err) - - var result map[int64][]string - err = json.Unmarshal([]byte(data), &result) - assert.NoError(t, err) - - // Verify data content - for _, user := range tt.users { - ips, exists := result[user.SID] - assert.True(t, exists) - assert.Contains(t, ips, user.IP) - } - }) - } -} diff --git a/internal/model/cache/types.go b/internal/model/cache/types.go deleted file mode 100644 index 89b6144..0000000 --- a/internal/model/cache/types.go +++ /dev/null @@ -1,34 +0,0 @@ -package cache - -type NodeOnlineUser struct { - SID int64 - IP string -} - -type NodeTodayTrafficRank struct { - ID int64 - Name string - Upload int64 - Download int64 - Total int64 -} - -type UserTodayTrafficRank struct { - SID int64 - Upload int64 - Download int64 - Total int64 -} - -type UserTraffic struct { - UID int64 - Upload int64 - Download int64 -} - -type NodeStatus struct { - Cpu float64 - Mem float64 - Disk float64 - UpdatedAt int64 -} diff --git a/internal/model/node/cache.go b/internal/model/node/cache.go new file mode 100644 index 0000000..a217578 --- /dev/null +++ b/internal/model/node/cache.go @@ -0,0 +1,146 @@ +package node + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" +) + +type ( + customCacheLogicModel interface { + StatusCache(ctx context.Context, serverId int64, protocol string) (Status, error) + UpdateStatusCache(ctx context.Context, serverId int64, protocol string, status *Status) error + OnlineUserSubscribe(ctx context.Context, serverId int64, protocol string) (OnlineUserSubscribe, error) + UpdateOnlineUserSubscribe(ctx context.Context, serverId int64, protocol string, subscribe OnlineUserSubscribe) error + OnlineUserSubscribeGlobal(ctx context.Context) (int64, error) + UpdateOnlineUserSubscribeGlobal(ctx context.Context, subscribe OnlineUserSubscribe) error + } + + Status struct { + Cpu float64 `json:"cpu"` + Mem float64 `json:"mem"` + Disk float64 `json:"disk"` + UpdatedAt int64 `json:"updated_at"` + } + + OnlineUserSubscribe map[int64][]string +) + +// Marshal to json string +func (s *Status) Marshal() string { + type Alias Status + data, _ := json.Marshal(&struct { + *Alias + }{ + Alias: (*Alias)(s), + }) + return string(data) +} + +// Unmarshal from json string +func (s *Status) Unmarshal(data string) error { + type Alias Status + aux := &struct { + *Alias + }{ + Alias: (*Alias)(s), + } + return json.Unmarshal([]byte(data), &aux) +} + +const ( + Expiry = 300 * time.Second // Cache expiry time in seconds + StatusCacheKey = "node:status:%d:%s" // Node status cache key format (Server ID and protocol) Example: node:status:1:shadowsocks + OnlineUserCacheKeyWithSubscribe = "node:online:subscribe:%d:%s" // Online user subscribe cache key format (Server ID and protocol) Example: node:online:subscribe:1:shadowsocks + OnlineUserSubscribeCacheKeyWithGlobal = "node:online:subscribe:global" // Online user global subscribe cache key +) + +// UpdateStatusCache Update server status to cache +func (m *customServerModel) UpdateStatusCache(ctx context.Context, serverId int64, protocol string, status *Status) error { + key := fmt.Sprintf(StatusCacheKey, serverId, protocol) + return m.Cache.Set(ctx, key, status.Marshal(), Expiry).Err() + +} + +// StatusCache Get server status from cache +func (m *customServerModel) StatusCache(ctx context.Context, serverId int64, protocol string) (Status, error) { + var status Status + key := fmt.Sprintf(StatusCacheKey, serverId, protocol) + + result, err := m.Cache.Get(ctx, key).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return status, nil + } + return status, err + } + if result == "" { + return status, nil + } + err = status.Unmarshal(result) + return status, err +} + +// OnlineUserSubscribe Get online user subscribe +func (m *customServerModel) OnlineUserSubscribe(ctx context.Context, serverId int64, protocol string) (OnlineUserSubscribe, error) { + key := fmt.Sprintf(OnlineUserCacheKeyWithSubscribe, serverId, protocol) + result, err := m.Cache.Get(ctx, key).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return OnlineUserSubscribe{}, nil + } + return nil, err + } + if result == "" { + return OnlineUserSubscribe{}, nil + } + var subscribe OnlineUserSubscribe + err = json.Unmarshal([]byte(result), &subscribe) + return subscribe, err +} + +// UpdateOnlineUserSubscribe Update online user subscribe +func (m *customServerModel) UpdateOnlineUserSubscribe(ctx context.Context, serverId int64, protocol string, subscribe OnlineUserSubscribe) error { + key := fmt.Sprintf(OnlineUserCacheKeyWithSubscribe, serverId, protocol) + data, err := json.Marshal(subscribe) + if err != nil { + return err + } + return m.Cache.Set(ctx, key, data, Expiry).Err() +} + +// OnlineUserSubscribeGlobal Get global online user subscribe count +func (m *customServerModel) OnlineUserSubscribeGlobal(ctx context.Context) (int64, error) { + now := time.Now().Unix() + // Clear expired data + if err := m.Cache.ZRemRangeByScore(ctx, OnlineUserSubscribeCacheKeyWithGlobal, "-inf", fmt.Sprintf("%d", now)).Err(); err != nil { + return 0, err + } + return m.Cache.ZCard(ctx, OnlineUserSubscribeCacheKeyWithGlobal).Result() +} + +// UpdateOnlineUserSubscribeGlobal Update global online user subscribe count +func (m *customServerModel) UpdateOnlineUserSubscribeGlobal(ctx context.Context, subscribe OnlineUserSubscribe) error { + now := time.Now() + expireTime := now.Add(5 * time.Minute).Unix() // set expire time 5 minutes later + + pipe := m.Cache.Pipeline() + + // Clear expired data + pipe.ZRemRangeByScore(ctx, OnlineUserSubscribeCacheKeyWithGlobal, "-inf", fmt.Sprintf("%d", now.Unix())) + // Add or update each subscribe with new expire time + for sub := range subscribe { + // Use ZAdd to add or update the member with new score (expire time) + pipe.ZAdd(ctx, OnlineUserSubscribeCacheKeyWithGlobal, redis.Z{ + Score: float64(expireTime), + Member: sub, + }) + } + + _, err := pipe.Exec(ctx) + return err +} diff --git a/internal/model/node/default.go b/internal/model/node/default.go index 62fd9ff..743a3b0 100644 --- a/internal/model/node/default.go +++ b/internal/model/node/default.go @@ -14,6 +14,7 @@ type ( Model interface { serverModel NodeModel + customCacheLogicModel customServerLogicModel } serverModel interface { diff --git a/internal/svc/serviceContext.go b/internal/svc/serviceContext.go index f2c224c..8c0e6ee 100644 --- a/internal/svc/serviceContext.go +++ b/internal/svc/serviceContext.go @@ -7,10 +7,8 @@ import ( "github.com/perfect-panel/server/internal/model/node" "github.com/perfect-panel/server/pkg/device" - "github.com/perfect-panel/server/internal/model/ads" - "github.com/perfect-panel/server/internal/model/cache" - "github.com/perfect-panel/server/internal/config" + "github.com/perfect-panel/server/internal/model/ads" "github.com/perfect-panel/server/internal/model/announcement" "github.com/perfect-panel/server/internal/model/auth" "github.com/perfect-panel/server/internal/model/coupon" @@ -35,11 +33,11 @@ import ( ) type ServiceContext struct { - DB *gorm.DB - Redis *redis.Client - Config config.Config - Queue *asynq.Client - NodeCache *cache.NodeCacheClient + DB *gorm.DB + Redis *redis.Client + Config config.Config + Queue *asynq.Client + //NodeCache *cache.NodeCacheClient AuthModel auth.Model AdsModel ads.Model LogModel log.Model @@ -86,11 +84,11 @@ func NewServiceContext(c config.Config) *ServiceContext { } authLimiter := limit.NewPeriodLimit(86400, 15, rds, config.SendCountLimitKeyPrefix, limit.Align()) srv := &ServiceContext{ - DB: db, - Redis: rds, - Config: c, - Queue: NewAsynqClient(c), - NodeCache: cache.NewNodeCacheClient(rds), + DB: db, + Redis: rds, + Config: c, + Queue: NewAsynqClient(c), + //NodeCache: cache.NewNodeCacheClient(rds), AuthLimiter: authLimiter, AdsModel: ads.NewModel(db, rds), LogModel: log.NewModel(db), diff --git a/internal/types/types.go b/internal/types/types.go index 24d5893..ece910e 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -1662,18 +1662,18 @@ type SendSmsCodeRequest struct { } type Server struct { - Id int64 `json:"id"` - Name string `json:"name"` - Country string `json:"country"` - City string `json:"city"` - Ratio float32 `json:"ratio"` - Address string `json:"address"` - Sort int `json:"sort"` - Protocols []Protocol `json:"protocols"` - LastReportedAt int64 `json:"last_reported_at"` - Status ServerStatus `json:"status"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + Id int64 `json:"id"` + Name string `json:"name"` + Country string `json:"country"` + City string `json:"city"` + Ratio float32 `json:"ratio"` + Address string `json:"address"` + Sort int `json:"sort"` + Protocols []Protocol `json:"protocols"` + LastReportedAt int64 `json:"last_reported_at"` + Status []ServerStatus `json:"status"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` } type ServerBasic struct { @@ -1731,10 +1731,11 @@ type ServerRuleGroup struct { } type ServerStatus struct { - Online []ServerOnlineUser `json:"online"` - Cpu float64 `json:"cpu"` - Mem float64 `json:"mem"` - Disk float64 `json:"disk"` + Cpu float64 `json:"cpu"` + Mem float64 `json:"mem"` + Disk float64 `json:"disk"` + Protocol string `json:"protocol"` + Online []ServerOnlineUser `json:"online"` } type ServerTotalDataResponse struct { diff --git a/queue/logic/traffic/resetTrafficLogic.go b/queue/logic/traffic/resetTrafficLogic.go index 83192f1..11c5c7d 100644 --- a/queue/logic/traffic/resetTrafficLogic.go +++ b/queue/logic/traffic/resetTrafficLogic.go @@ -119,13 +119,6 @@ func (l *ResetTrafficLogic) ProcessTask(ctx context.Context, _ *asynq.Task) erro } }() - // Reset today's traffic data - err = l.svc.NodeCache.ResetTodayTrafficData(ctx) - if err != nil { - logger.Errorw("[ResetTodayTraffic] Failed to reset today traffic data", - logger.Field("error", err.Error())) - } - // Load last reset time from cache var cache resetTrafficCache cacheData, err := l.svc.Redis.Get(ctx, cacheKey).Result()