From bcefb274ab808216ff428e886fa089bbd475aedd Mon Sep 17 00:00:00 2001 From: shanshanzhong Date: Wed, 29 Apr 2026 01:37:59 -0700 Subject: [PATCH] perf(server): cache speed limit calculations --- .../logic/server/getServerUserListLogic.go | 27 ++++++++-- pkg/speedlimit/calculator.go | 53 ++++++++++++++++--- queue/logic/traffic/trafficStatisticsLogic.go | 8 ++- 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/internal/logic/server/getServerUserListLogic.go b/internal/logic/server/getServerUserListLogic.go index 817ea70..b8cf825 100644 --- a/internal/logic/server/getServerUserListLogic.go +++ b/internal/logic/server/getServerUserListLogic.go @@ -137,7 +137,7 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR return nil, err } } - + if len(subs) == 0 { return &types.GetServerUserListResponse{ Users: []types.ServerUser{ @@ -194,7 +194,7 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR val, _ := json.Marshal(resp) etag := tool.GenerateETag(val) l.ctx.Header("ETag", etag) - err = l.svcCtx.Redis.Set(l.ctx, cacheKey, string(val), -1).Err() + err = l.svcCtx.Redis.Set(l.ctx, cacheKey, string(val), l.serverUserListCacheTTL()).Err() if err != nil { l.Errorw("[ServerUserListCacheKey] redis set error", logger.Field("error", err.Error())) } @@ -205,6 +205,18 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR return resp, nil } +func (l *GetServerUserListLogic) serverUserListCacheTTL() time.Duration { + pullInterval := l.svcCtx.Config.Node.NodePullInterval + if pullInterval <= 0 { + pullInterval = 60 + } + ttl := time.Duration(pullInterval*2) * time.Second + if ttl < time.Minute { + return time.Minute + } + return ttl +} + func (l *GetServerUserListLogic) shouldIncludeServerUser(userSub *user.Subscribe, serverNodeGroupIds []int64) bool { if userSub == nil { return false @@ -295,6 +307,15 @@ func (l *GetServerUserListLogic) canUseExpiredNodeGroup(userSub *user.Subscribe, // calculateEffectiveSpeedLimit 计算用户的实际限速值(考虑按量限速规则) func (l *GetServerUserListLogic) calculateEffectiveSpeedLimit(sub *subscribe.Subscribe, userSub *user.Subscribe) int64 { - result := speedlimit.Calculate(l.ctx.Request.Context(), l.svcCtx.DB, userSub.UserId, userSub.Id, sub.SpeedLimit, sub.TrafficLimit) + result := speedlimit.CalculateWithCache( + l.ctx.Request.Context(), + l.svcCtx.Redis, + l.svcCtx.DB, + userSub.UserId, + userSub.Id, + sub.SpeedLimit, + sub.TrafficLimit, + 30*time.Second, + ) return result.EffectiveSpeed } diff --git a/pkg/speedlimit/calculator.go b/pkg/speedlimit/calculator.go index 287d6c1..55900b5 100644 --- a/pkg/speedlimit/calculator.go +++ b/pkg/speedlimit/calculator.go @@ -2,10 +2,13 @@ package speedlimit import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "time" + "github.com/redis/go-redis/v9" "gorm.io/gorm" ) @@ -19,13 +22,44 @@ type TrafficLimitRule struct { // ThrottleResult contains the computed speed limit status for a user subscription. type ThrottleResult struct { - BaseSpeed int64 `json:"base_speed"` // Plan base speed limit (Mbps, 0=unlimited) - EffectiveSpeed int64 `json:"effective_speed"` // Current effective speed limit (Mbps) - IsThrottled bool `json:"is_throttled"` // Whether the user is currently throttled - ThrottleRule string `json:"throttle_rule"` // Description of the matched rule (empty if not throttled) - UsedTrafficGB float64 `json:"used_traffic_gb"` // Traffic used in the matched rule's window (GB) - ThrottleStart int64 `json:"throttle_start"` // Window start Unix timestamp (seconds), 0 if not throttled - ThrottleEnd int64 `json:"throttle_end"` // Window end Unix timestamp (seconds), 0 if not throttled + BaseSpeed int64 `json:"base_speed"` // Plan base speed limit (Mbps, 0=unlimited) + EffectiveSpeed int64 `json:"effective_speed"` // Current effective speed limit (Mbps) + IsThrottled bool `json:"is_throttled"` // Whether the user is currently throttled + ThrottleRule string `json:"throttle_rule"` // Description of the matched rule (empty if not throttled) + UsedTrafficGB float64 `json:"used_traffic_gb"` // Traffic used in the matched rule's window (GB) + ThrottleStart int64 `json:"throttle_start"` // Window start Unix timestamp (seconds), 0 if not throttled + ThrottleEnd int64 `json:"throttle_end"` // Window end Unix timestamp (seconds), 0 if not throttled +} + +// CalculateWithCache computes the effective speed limit with a short Redis cache. +// It is intended for hot read paths such as node user-list pulls where many nodes +// can ask for the same subscription limits in a short period. +func CalculateWithCache(ctx context.Context, cache *redis.Client, db *gorm.DB, userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string, ttl time.Duration) *ThrottleResult { + if cache == nil || ttl <= 0 || trafficLimitJSON == "" { + return Calculate(ctx, db, userId, subscribeId, baseSpeedLimit, trafficLimitJSON) + } + + key := cacheKey(userId, subscribeId, baseSpeedLimit, trafficLimitJSON) + if cached, err := cache.Get(ctx, key).Result(); err == nil && cached != "" { + var result ThrottleResult + if err := json.Unmarshal([]byte(cached), &result); err == nil { + return &result + } + } + + result := Calculate(ctx, db, userId, subscribeId, baseSpeedLimit, trafficLimitJSON) + if payload, err := json.Marshal(result); err == nil { + _ = cache.Set(ctx, key, string(payload), ttl).Err() + } + return result +} + +// ClearCache removes a cached speed-limit calculation for a user subscription. +func ClearCache(ctx context.Context, cache *redis.Client, userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string) error { + if cache == nil || trafficLimitJSON == "" { + return nil + } + return cache.Del(ctx, cacheKey(userId, subscribeId, baseSpeedLimit, trafficLimitJSON)).Err() } // Calculate computes the effective speed limit for a user subscription, @@ -107,3 +141,8 @@ func Calculate(ctx context.Context, db *gorm.DB, userId, subscribeId, baseSpeedL return result } + +func cacheKey(userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string) string { + sum := sha256.Sum256([]byte(trafficLimitJSON)) + return fmt.Sprintf("speedlimit:%d:%d:%d:%s", userId, subscribeId, baseSpeedLimit, hex.EncodeToString(sum[:8])) +} diff --git a/queue/logic/traffic/trafficStatisticsLogic.go b/queue/logic/traffic/trafficStatisticsLogic.go index 9c5e9b1..c7d9307 100644 --- a/queue/logic/traffic/trafficStatisticsLogic.go +++ b/queue/logic/traffic/trafficStatisticsLogic.go @@ -131,9 +131,15 @@ func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Ta // 写完流量后检查是否触发按量限速,若触发则清除节点缓存使限速立即生效 if planSub, planErr := l.svc.SubscribeModel.FindOne(ctx, sub.SubscribeId); planErr == nil && - (planSub.SpeedLimit > 0 || planSub.TrafficLimit != "") { + planSub.TrafficLimit != "" { throttle := speedlimit.Calculate(ctx, l.svc.DB, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit) if throttle.IsThrottled { + if delErr := speedlimit.ClearCache(ctx, l.svc.Redis, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit); delErr != nil { + logger.WithContext(ctx).Error("[TrafficStatistics] Clear speed limit cache failed", + logger.Field("subscribeId", sub.Id), + logger.Field("error", delErr.Error()), + ) + } cacheKey := fmt.Sprintf("%s%d", node.ServerUserListCacheKey, payload.ServerId) if delErr := l.svc.Redis.Del(ctx, cacheKey).Err(); delErr != nil { logger.WithContext(ctx).Error("[TrafficStatistics] Clear server user cache failed",