Compare commits
No commits in common. "internal" and "main" have entirely different histories.
@ -137,7 +137,7 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if len(subs) == 0 {
|
||||
return &types.GetServerUserListResponse{
|
||||
Users: []types.ServerUser{
|
||||
@ -194,7 +194,7 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR
|
||||
val, _ := json.Marshal(resp)
|
||||
etag := tool.GenerateETag(val)
|
||||
l.ctx.Header("ETag", etag)
|
||||
err = l.svcCtx.Redis.Set(l.ctx, cacheKey, string(val), l.serverUserListCacheTTL()).Err()
|
||||
err = l.svcCtx.Redis.Set(l.ctx, cacheKey, string(val), -1).Err()
|
||||
if err != nil {
|
||||
l.Errorw("[ServerUserListCacheKey] redis set error", logger.Field("error", err.Error()))
|
||||
}
|
||||
@ -205,18 +205,6 @@ func (l *GetServerUserListLogic) GetServerUserList(req *types.GetServerUserListR
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (l *GetServerUserListLogic) serverUserListCacheTTL() time.Duration {
|
||||
pullInterval := l.svcCtx.Config.Node.NodePullInterval
|
||||
if pullInterval <= 0 {
|
||||
pullInterval = 60
|
||||
}
|
||||
ttl := time.Duration(pullInterval*2) * time.Second
|
||||
if ttl < time.Minute {
|
||||
return time.Minute
|
||||
}
|
||||
return ttl
|
||||
}
|
||||
|
||||
func (l *GetServerUserListLogic) shouldIncludeServerUser(userSub *user.Subscribe, serverNodeGroupIds []int64) bool {
|
||||
if userSub == nil {
|
||||
return false
|
||||
@ -307,15 +295,6 @@ func (l *GetServerUserListLogic) canUseExpiredNodeGroup(userSub *user.Subscribe,
|
||||
|
||||
// calculateEffectiveSpeedLimit 计算用户的实际限速值(考虑按量限速规则)
|
||||
func (l *GetServerUserListLogic) calculateEffectiveSpeedLimit(sub *subscribe.Subscribe, userSub *user.Subscribe) int64 {
|
||||
result := speedlimit.CalculateWithCache(
|
||||
l.ctx.Request.Context(),
|
||||
l.svcCtx.Redis,
|
||||
l.svcCtx.DB,
|
||||
userSub.UserId,
|
||||
userSub.Id,
|
||||
sub.SpeedLimit,
|
||||
sub.TrafficLimit,
|
||||
30*time.Second,
|
||||
)
|
||||
result := speedlimit.Calculate(l.ctx.Request.Context(), l.svcCtx.DB, userSub.UserId, userSub.Id, sub.SpeedLimit, sub.TrafficLimit)
|
||||
return result.EffectiveSpeed
|
||||
}
|
||||
|
||||
@ -2,13 +2,10 @@ package speedlimit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
@ -22,44 +19,13 @@ type TrafficLimitRule struct {
|
||||
|
||||
// ThrottleResult contains the computed speed limit status for a user subscription.
|
||||
type ThrottleResult struct {
|
||||
BaseSpeed int64 `json:"base_speed"` // Plan base speed limit (Mbps, 0=unlimited)
|
||||
EffectiveSpeed int64 `json:"effective_speed"` // Current effective speed limit (Mbps)
|
||||
IsThrottled bool `json:"is_throttled"` // Whether the user is currently throttled
|
||||
ThrottleRule string `json:"throttle_rule"` // Description of the matched rule (empty if not throttled)
|
||||
UsedTrafficGB float64 `json:"used_traffic_gb"` // Traffic used in the matched rule's window (GB)
|
||||
ThrottleStart int64 `json:"throttle_start"` // Window start Unix timestamp (seconds), 0 if not throttled
|
||||
ThrottleEnd int64 `json:"throttle_end"` // Window end Unix timestamp (seconds), 0 if not throttled
|
||||
}
|
||||
|
||||
// CalculateWithCache computes the effective speed limit with a short Redis cache.
|
||||
// It is intended for hot read paths such as node user-list pulls where many nodes
|
||||
// can ask for the same subscription limits in a short period.
|
||||
func CalculateWithCache(ctx context.Context, cache *redis.Client, db *gorm.DB, userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string, ttl time.Duration) *ThrottleResult {
|
||||
if cache == nil || ttl <= 0 || trafficLimitJSON == "" {
|
||||
return Calculate(ctx, db, userId, subscribeId, baseSpeedLimit, trafficLimitJSON)
|
||||
}
|
||||
|
||||
key := cacheKey(userId, subscribeId, baseSpeedLimit, trafficLimitJSON)
|
||||
if cached, err := cache.Get(ctx, key).Result(); err == nil && cached != "" {
|
||||
var result ThrottleResult
|
||||
if err := json.Unmarshal([]byte(cached), &result); err == nil {
|
||||
return &result
|
||||
}
|
||||
}
|
||||
|
||||
result := Calculate(ctx, db, userId, subscribeId, baseSpeedLimit, trafficLimitJSON)
|
||||
if payload, err := json.Marshal(result); err == nil {
|
||||
_ = cache.Set(ctx, key, string(payload), ttl).Err()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ClearCache removes a cached speed-limit calculation for a user subscription.
|
||||
func ClearCache(ctx context.Context, cache *redis.Client, userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string) error {
|
||||
if cache == nil || trafficLimitJSON == "" {
|
||||
return nil
|
||||
}
|
||||
return cache.Del(ctx, cacheKey(userId, subscribeId, baseSpeedLimit, trafficLimitJSON)).Err()
|
||||
BaseSpeed int64 `json:"base_speed"` // Plan base speed limit (Mbps, 0=unlimited)
|
||||
EffectiveSpeed int64 `json:"effective_speed"` // Current effective speed limit (Mbps)
|
||||
IsThrottled bool `json:"is_throttled"` // Whether the user is currently throttled
|
||||
ThrottleRule string `json:"throttle_rule"` // Description of the matched rule (empty if not throttled)
|
||||
UsedTrafficGB float64 `json:"used_traffic_gb"` // Traffic used in the matched rule's window (GB)
|
||||
ThrottleStart int64 `json:"throttle_start"` // Window start Unix timestamp (seconds), 0 if not throttled
|
||||
ThrottleEnd int64 `json:"throttle_end"` // Window end Unix timestamp (seconds), 0 if not throttled
|
||||
}
|
||||
|
||||
// Calculate computes the effective speed limit for a user subscription,
|
||||
@ -141,8 +107,3 @@ func Calculate(ctx context.Context, db *gorm.DB, userId, subscribeId, baseSpeedL
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func cacheKey(userId, subscribeId, baseSpeedLimit int64, trafficLimitJSON string) string {
|
||||
sum := sha256.Sum256([]byte(trafficLimitJSON))
|
||||
return fmt.Sprintf("speedlimit:%d:%d:%d:%s", userId, subscribeId, baseSpeedLimit, hex.EncodeToString(sum[:8]))
|
||||
}
|
||||
|
||||
@ -131,15 +131,9 @@ func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Ta
|
||||
|
||||
// 写完流量后检查是否触发按量限速,若触发则清除节点缓存使限速立即生效
|
||||
if planSub, planErr := l.svc.SubscribeModel.FindOne(ctx, sub.SubscribeId); planErr == nil &&
|
||||
planSub.TrafficLimit != "" {
|
||||
(planSub.SpeedLimit > 0 || planSub.TrafficLimit != "") {
|
||||
throttle := speedlimit.Calculate(ctx, l.svc.DB, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit)
|
||||
if throttle.IsThrottled {
|
||||
if delErr := speedlimit.ClearCache(ctx, l.svc.Redis, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit); delErr != nil {
|
||||
logger.WithContext(ctx).Error("[TrafficStatistics] Clear speed limit cache failed",
|
||||
logger.Field("subscribeId", sub.Id),
|
||||
logger.Field("error", delErr.Error()),
|
||||
)
|
||||
}
|
||||
cacheKey := fmt.Sprintf("%s%d", node.ServerUserListCacheKey, payload.ServerId)
|
||||
if delErr := l.svc.Redis.Del(ctx, cacheKey).Err(); delErr != nil {
|
||||
logger.WithContext(ctx).Error("[TrafficStatistics] Clear server user cache failed",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user