perf(server): cache speed limit calculations
All checks were successful
Build docker and publish / build (20.15.1) (push) Successful in 5m37s
All checks were successful
Build docker and publish / build (20.15.1) (push) Successful in 5m37s
This commit is contained in:
parent
3ae85f68ea
commit
bcefb274ab
@ -194,7 +194,7 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR
|
|||||||
val, _ := json.Marshal(resp)
|
val, _ := json.Marshal(resp)
|
||||||
etag := tool.GenerateETag(val)
|
etag := tool.GenerateETag(val)
|
||||||
l.ctx.Header("ETag", etag)
|
l.ctx.Header("ETag", etag)
|
||||||
err = l.svcCtx.Redis.Set(l.ctx, cacheKey, string(val), -1).Err()
|
err = l.svcCtx.Redis.Set(l.ctx, cacheKey, string(val), l.serverUserListCacheTTL()).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Errorw("[ServerUserListCacheKey] redis set error", logger.Field("error", err.Error()))
|
l.Errorw("[ServerUserListCacheKey] redis set error", logger.Field("error", err.Error()))
|
||||||
}
|
}
|
||||||
@ -205,6 +205,18 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *GetServerUserListLogic) serverUserListCacheTTL() time.Duration {
|
||||||
|
pullInterval := l.svcCtx.Config.Node.NodePullInterval
|
||||||
|
if pullInterval <= 0 {
|
||||||
|
pullInterval = 60
|
||||||
|
}
|
||||||
|
ttl := time.Duration(pullInterval*2) * time.Second
|
||||||
|
if ttl < time.Minute {
|
||||||
|
return time.Minute
|
||||||
|
}
|
||||||
|
return ttl
|
||||||
|
}
|
||||||
|
|
||||||
func (l *GetServerUserListLogic) shouldIncludeServerUser(userSub *user.Subscribe, serverNodeGroupIds []int64) bool {
|
func (l *GetServerUserListLogic) shouldIncludeServerUser(userSub *user.Subscribe, serverNodeGroupIds []int64) bool {
|
||||||
if userSub == nil {
|
if userSub == nil {
|
||||||
return false
|
return false
|
||||||
@ -295,6 +307,15 @@ func (l *GetServerUserListLogic) canUseExpiredNodeGroup(userSub *user.Subscribe,
|
|||||||
|
|
||||||
// calculateEffectiveSpeedLimit 计算用户的实际限速值(考虑按量限速规则)
|
// calculateEffectiveSpeedLimit 计算用户的实际限速值(考虑按量限速规则)
|
||||||
func (l *GetServerUserListLogic) calculateEffectiveSpeedLimit(sub *subscribe.Subscribe, userSub *user.Subscribe) int64 {
|
func (l *GetServerUserListLogic) calculateEffectiveSpeedLimit(sub *subscribe.Subscribe, userSub *user.Subscribe) int64 {
|
||||||
result := speedlimit.Calculate(l.ctx.Request.Context(), l.svcCtx.DB, userSub.UserId, userSub.Id, sub.SpeedLimit, sub.TrafficLimit)
|
result := speedlimit.CalculateWithCache(
|
||||||
|
l.ctx.Request.Context(),
|
||||||
|
l.svcCtx.Redis,
|
||||||
|
l.svcCtx.DB,
|
||||||
|
userSub.UserId,
|
||||||
|
userSub.Id,
|
||||||
|
sub.SpeedLimit,
|
||||||
|
sub.TrafficLimit,
|
||||||
|
30*time.Second,
|
||||||
|
)
|
||||||
return result.EffectiveSpeed
|
return result.EffectiveSpeed
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,10 +2,13 @@ package speedlimit
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,13 +22,44 @@ type TrafficLimitRule struct {
|
|||||||
|
|
||||||
// ThrottleResult contains the computed speed limit status for a user subscription.
|
// ThrottleResult contains the computed speed limit status for a user subscription.
|
||||||
type ThrottleResult struct {
|
type ThrottleResult struct {
|
||||||
BaseSpeed int64 `json:"base_speed"` // Plan base speed limit (Mbps, 0=unlimited)
|
BaseSpeed int64 `json:"base_speed"` // Plan base speed limit (Mbps, 0=unlimited)
|
||||||
EffectiveSpeed int64 `json:"effective_speed"` // Current effective speed limit (Mbps)
|
EffectiveSpeed int64 `json:"effective_speed"` // Current effective speed limit (Mbps)
|
||||||
IsThrottled bool `json:"is_throttled"` // Whether the user is currently throttled
|
IsThrottled bool `json:"is_throttled"` // Whether the user is currently throttled
|
||||||
ThrottleRule string `json:"throttle_rule"` // Description of the matched rule (empty if not throttled)
|
ThrottleRule string `json:"throttle_rule"` // Description of the matched rule (empty if not throttled)
|
||||||
UsedTrafficGB float64 `json:"used_traffic_gb"` // Traffic used in the matched rule's window (GB)
|
UsedTrafficGB float64 `json:"used_traffic_gb"` // Traffic used in the matched rule's window (GB)
|
||||||
ThrottleStart int64 `json:"throttle_start"` // Window start Unix timestamp (seconds), 0 if not throttled
|
ThrottleStart int64 `json:"throttle_start"` // Window start Unix timestamp (seconds), 0 if not throttled
|
||||||
ThrottleEnd int64 `json:"throttle_end"` // Window end Unix timestamp (seconds), 0 if not throttled
|
ThrottleEnd int64 `json:"throttle_end"` // Window end Unix timestamp (seconds), 0 if not throttled
|
||||||
|
}
|
||||||
|
|
||||||
|
// CalculateWithCache computes the effective speed limit with a short Redis cache.
|
||||||
|
// It is intended for hot read paths such as node user-list pulls where many nodes
|
||||||
|
// can ask for the same subscription limits in a short period.
|
||||||
|
func CalculateWithCache(ctx context.Context, cache *redis.Client, db *gorm.DB, userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string, ttl time.Duration) *ThrottleResult {
|
||||||
|
if cache == nil || ttl <= 0 || trafficLimitJSON == "" {
|
||||||
|
return Calculate(ctx, db, userId, subscribeId, baseSpeedLimit, trafficLimitJSON)
|
||||||
|
}
|
||||||
|
|
||||||
|
key := cacheKey(userId, subscribeId, baseSpeedLimit, trafficLimitJSON)
|
||||||
|
if cached, err := cache.Get(ctx, key).Result(); err == nil && cached != "" {
|
||||||
|
var result ThrottleResult
|
||||||
|
if err := json.Unmarshal([]byte(cached), &result); err == nil {
|
||||||
|
return &result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := Calculate(ctx, db, userId, subscribeId, baseSpeedLimit, trafficLimitJSON)
|
||||||
|
if payload, err := json.Marshal(result); err == nil {
|
||||||
|
_ = cache.Set(ctx, key, string(payload), ttl).Err()
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearCache removes a cached speed-limit calculation for a user subscription.
|
||||||
|
func ClearCache(ctx context.Context, cache *redis.Client, userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string) error {
|
||||||
|
if cache == nil || trafficLimitJSON == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return cache.Del(ctx, cacheKey(userId, subscribeId, baseSpeedLimit, trafficLimitJSON)).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate computes the effective speed limit for a user subscription,
|
// Calculate computes the effective speed limit for a user subscription,
|
||||||
@ -107,3 +141,8 @@ func Calculate(ctx context.Context, db *gorm.DB, userId, subscribeId, baseSpeedL
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cacheKey(userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string) string {
|
||||||
|
sum := sha256.Sum256([]byte(trafficLimitJSON))
|
||||||
|
return fmt.Sprintf("speedlimit:%d:%d:%d:%s", userId, subscribeId, baseSpeedLimit, hex.EncodeToString(sum[:8]))
|
||||||
|
}
|
||||||
|
|||||||
@ -131,9 +131,15 @@ func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Ta
|
|||||||
|
|
||||||
// 写完流量后检查是否触发按量限速,若触发则清除节点缓存使限速立即生效
|
// 写完流量后检查是否触发按量限速,若触发则清除节点缓存使限速立即生效
|
||||||
if planSub, planErr := l.svc.SubscribeModel.FindOne(ctx, sub.SubscribeId); planErr == nil &&
|
if planSub, planErr := l.svc.SubscribeModel.FindOne(ctx, sub.SubscribeId); planErr == nil &&
|
||||||
(planSub.SpeedLimit > 0 || planSub.TrafficLimit != "") {
|
planSub.TrafficLimit != "" {
|
||||||
throttle := speedlimit.Calculate(ctx, l.svc.DB, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit)
|
throttle := speedlimit.Calculate(ctx, l.svc.DB, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit)
|
||||||
if throttle.IsThrottled {
|
if throttle.IsThrottled {
|
||||||
|
if delErr := speedlimit.ClearCache(ctx, l.svc.Redis, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit); delErr != nil {
|
||||||
|
logger.WithContext(ctx).Error("[TrafficStatistics] Clear speed limit cache failed",
|
||||||
|
logger.Field("subscribeId", sub.Id),
|
||||||
|
logger.Field("error", delErr.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
cacheKey := fmt.Sprintf("%s%d", node.ServerUserListCacheKey, payload.ServerId)
|
cacheKey := fmt.Sprintf("%s%d", node.ServerUserListCacheKey, payload.ServerId)
|
||||||
if delErr := l.svc.Redis.Del(ctx, cacheKey).Err(); delErr != nil {
|
if delErr := l.svc.Redis.Del(ctx, cacheKey).Err(); delErr != nil {
|
||||||
logger.WithContext(ctx).Error("[TrafficStatistics] Clear server user cache failed",
|
logger.WithContext(ctx).Error("[TrafficStatistics] Clear server user cache failed",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user