ario_server/queue/logic/traffic/trafficStatisticsLogic.go
shanshanzhong c582087c0f
Some checks failed
Build docker and publish / build (20.15.1) (push) Failing after 6m27s
refactor: 更新项目引用路径从perfect-panel/ppanel-server到perfect-panel/server
feat: 添加版本和构建时间变量
fix: 修正短信队列类型注释错误
style: 清理未使用的代码和测试文件
docs: 更新安装文档中的下载链接
chore: 迁移数据库脚本添加日志和订阅配置
2025-10-13 01:33:03 -07:00

129 lines
3.6 KiB
Go

package traffic
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/perfect-panel/server/internal/model/node"
"github.com/perfect-panel/server/pkg/logger"
"github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/model/traffic"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/queue/types"
)
//goland:noinspection GoNameStartsWithPackageName
type TrafficStatisticsLogic struct {
svc *svc.ServiceContext
}
func NewTrafficStatisticsLogic(svc *svc.ServiceContext) *TrafficStatisticsLogic {
return &TrafficStatisticsLogic{
svc: svc,
}
}
func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Task) error {
var payload types.TrafficStatistics
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
logger.WithContext(ctx).Error("[TrafficStatistics] Unmarshal payload failed",
logger.Field("error", err.Error()),
logger.Field("payload", string(task.Payload())),
)
return nil
}
if len(payload.Logs) == 0 {
logger.WithContext(ctx).Error("[TrafficStatistics] Payload is empty")
return nil
}
// query server info
serverInfo, err := l.svc.NodeModel.FindOneServer(ctx, payload.ServerId)
if err != nil {
logger.WithContext(ctx).Error("[TrafficStatistics] Find server info failed",
logger.Field("serverId", payload.ServerId),
logger.Field("error", err.Error()),
)
return nil
}
// query protocol ratio
// default ratio is 1.0
protocols, err := serverInfo.UnmarshalProtocols()
if err != nil {
logger.Errorf("[TrafficStatistics] Unmarshal protocols failed: %s", err.Error())
return nil
}
var protocol *node.Protocol
var ratio float32 = 1.0
for _, p := range protocols {
if strings.ToLower(p.Type) == strings.ToLower(payload.Protocol) {
protocol = &p
break
}
}
if protocol == nil {
logger.WithContext(ctx).Error("[TrafficStatistics] Protocol not found: %s", payload.Protocol)
return nil
}
// use protocol ratio if it's greater than 0
if protocol.Ratio > 0 {
ratio = float32(protocol.Ratio)
}
now := time.Now()
realTimeMultiplier := l.svc.NodeMultiplierManager.GetMultiplier(now)
for _, log := range payload.Logs {
// query user Subscribe Info
sub, err := l.svc.UserModel.FindOneSubscribe(ctx, log.SID)
if err != nil {
logger.WithContext(ctx).Error("[TrafficStatistics] Find user Subscribe Info failed",
logger.Field("uid", log.SID),
logger.Field("error", err.Error()),
)
continue
}
if log.Download+log.Upload <= l.svc.Config.Node.TrafficReportThreshold {
// no traffic, skip
continue
}
// update user subscribe with log
d := int64(float32(log.Download) * ratio * realTimeMultiplier)
u := int64(float32(log.Upload) * ratio * realTimeMultiplier)
if err := l.svc.UserModel.UpdateUserSubscribeWithTraffic(ctx, sub.Id, d, u); err != nil {
logger.WithContext(ctx).Error("[TrafficStatistics] Update user subscribe with log failed",
logger.Field("sid", log.SID),
logger.Field("download", float32(log.Download)*ratio),
logger.Field("upload", float32(log.Upload)*ratio),
logger.Field("error", err.Error()),
)
continue
}
// create log log
if err = l.svc.TrafficLogModel.Insert(ctx, &traffic.TrafficLog{
ServerId: payload.ServerId,
SubscribeId: log.SID,
UserId: sub.UserId,
Upload: u,
Download: d,
Timestamp: now,
}); err != nil {
logger.WithContext(ctx).Error("[TrafficStatistics] Create log log failed",
logger.Field("uid", log.SID),
logger.Field("download", float32(log.Download)*ratio),
logger.Field("upload", float32(log.Upload)*ratio),
logger.Field("error", err.Error()),
)
}
}
return nil
}