节点用户列表缓存(server:user:{id})永不过期,用户流量超限触发按量限速后
缓存中仍是旧的速度值,节点不会感知限速状态。
修复:每次写入 traffic_log 后,检查该订阅是否触发按量限速规则,
若 IsThrottled=true 则立即删除对应节点的用户列表缓存,
节点下次拉取时重新计算并应用降速后的 speed_limit。
Co-Authored-By: claude-flow <ruv@ruv.net>
149 lines
4.6 KiB
Go
149 lines
4.6 KiB
Go
package traffic
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/perfect-panel/server/internal/model/node"
|
|
"github.com/perfect-panel/server/pkg/logger"
|
|
"github.com/perfect-panel/server/pkg/speedlimit"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/perfect-panel/server/internal/model/traffic"
|
|
"github.com/perfect-panel/server/internal/svc"
|
|
"github.com/perfect-panel/server/queue/types"
|
|
)
|
|
|
|
//goland:noinspection GoNameStartsWithPackageName
|
|
type TrafficStatisticsLogic struct {
|
|
svc *svc.ServiceContext
|
|
}
|
|
|
|
func NewTrafficStatisticsLogic(svc *svc.ServiceContext) *TrafficStatisticsLogic {
|
|
return &TrafficStatisticsLogic{
|
|
svc: svc,
|
|
}
|
|
}
|
|
|
|
func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
|
var payload types.TrafficStatistics
|
|
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Unmarshal payload failed",
|
|
logger.Field("error", err.Error()),
|
|
logger.Field("payload", string(task.Payload())),
|
|
)
|
|
return nil
|
|
}
|
|
if len(payload.Logs) == 0 {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Payload is empty")
|
|
return nil
|
|
}
|
|
// query server info
|
|
serverInfo, err := l.svc.NodeModel.FindOneServer(ctx, payload.ServerId)
|
|
if err != nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Find server info failed",
|
|
logger.Field("serverId", payload.ServerId),
|
|
logger.Field("error", err.Error()),
|
|
)
|
|
return nil
|
|
}
|
|
// query protocol ratio
|
|
// default ratio is 1.0
|
|
|
|
protocols, err := serverInfo.UnmarshalProtocols()
|
|
if err != nil {
|
|
logger.Errorf("[TrafficStatistics] Unmarshal protocols failed: %s", err.Error())
|
|
return nil
|
|
}
|
|
var protocol *node.Protocol
|
|
|
|
var ratio float32 = 1.0
|
|
|
|
for _, p := range protocols {
|
|
if strings.ToLower(p.Type) == strings.ToLower(payload.Protocol) {
|
|
protocol = &p
|
|
break
|
|
}
|
|
}
|
|
|
|
if protocol == nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Protocol not found: %s", payload.Protocol)
|
|
return nil
|
|
}
|
|
|
|
// use protocol ratio if it's greater than 0
|
|
if protocol.Ratio > 0 {
|
|
ratio = float32(protocol.Ratio)
|
|
}
|
|
|
|
now := time.Now()
|
|
realTimeMultiplier := l.svc.NodeMultiplierManager.GetMultiplier(now)
|
|
logger.Debugf("[TrafficStatisticsLogic] Current time traffic multiplier: %.2f", realTimeMultiplier)
|
|
for _, log := range payload.Logs {
|
|
// query user Subscribe Info
|
|
sub, err := l.svc.UserModel.FindOneSubscribe(ctx, log.SID)
|
|
if err != nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Find user Subscribe Info failed",
|
|
logger.Field("uid", log.SID),
|
|
logger.Field("error", err.Error()),
|
|
)
|
|
continue
|
|
}
|
|
|
|
if log.Download+log.Upload <= l.svc.Config.Node.TrafficReportThreshold {
|
|
// no traffic, skip
|
|
continue
|
|
}
|
|
// update user subscribe with log
|
|
d := int64(float32(log.Download) * ratio * realTimeMultiplier)
|
|
u := int64(float32(log.Upload) * ratio * realTimeMultiplier)
|
|
isExpired := now.After(sub.ExpireTime)
|
|
if err := l.svc.UserModel.UpdateUserSubscribeWithTraffic(ctx, sub.Id, d, u, isExpired); err != nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Update user subscribe with log failed",
|
|
logger.Field("sid", log.SID),
|
|
logger.Field("download", float32(log.Download)*ratio),
|
|
logger.Field("upload", float32(log.Upload)*ratio),
|
|
logger.Field("is_expired", isExpired),
|
|
logger.Field("error", err.Error()),
|
|
)
|
|
continue
|
|
}
|
|
|
|
// create log log
|
|
if err = l.svc.TrafficLogModel.Insert(ctx, &traffic.TrafficLog{
|
|
ServerId: payload.ServerId,
|
|
SubscribeId: log.SID,
|
|
UserId: sub.UserId,
|
|
Upload: u,
|
|
Download: d,
|
|
Timestamp: now,
|
|
}); err != nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Create log log failed",
|
|
logger.Field("uid", log.SID),
|
|
logger.Field("download", float32(log.Download)*ratio),
|
|
logger.Field("upload", float32(log.Upload)*ratio),
|
|
logger.Field("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
// 写完流量后检查是否触发按量限速,若触发则清除节点缓存使限速立即生效
|
|
if planSub, planErr := l.svc.SubscribeModel.FindOne(ctx, sub.SubscribeId); planErr == nil &&
|
|
(planSub.SpeedLimit > 0 || planSub.TrafficLimit != "") {
|
|
throttle := speedlimit.Calculate(ctx, l.svc.DB, sub.UserId, sub.Id, planSub.SpeedLimit, planSub.TrafficLimit)
|
|
if throttle.IsThrottled {
|
|
cacheKey := fmt.Sprintf("%s%d", node.ServerUserListCacheKey, payload.ServerId)
|
|
if delErr := l.svc.Redis.Del(ctx, cacheKey).Err(); delErr != nil {
|
|
logger.WithContext(ctx).Error("[TrafficStatistics] Clear server user cache failed",
|
|
logger.Field("serverId", payload.ServerId),
|
|
logger.Field("error", delErr.Error()),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|