feat(traffic): add protocol handling and update traffic statistics calculation
This commit is contained in:
parent
5aa9bb61b7
commit
d74d636735
@ -41,6 +41,7 @@ func (l *ServerPushUserTrafficLogic) ServerPushUserTraffic(req *types.ServerPush
|
|||||||
// Create traffic task
|
// Create traffic task
|
||||||
var request task.TrafficStatistics
|
var request task.TrafficStatistics
|
||||||
request.ServerId = serverInfo.Id
|
request.ServerId = serverInfo.Id
|
||||||
|
request.Protocol = req.Protocol
|
||||||
tool.DeepCopy(&request.Logs, req.Traffic)
|
tool.DeepCopy(&request.Logs, req.Traffic)
|
||||||
|
|
||||||
// Push traffic task
|
// Push traffic task
|
||||||
|
|||||||
@ -3,8 +3,10 @@ package traffic
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/perfect-panel/server/internal/model/node"
|
||||||
"github.com/perfect-panel/server/pkg/logger"
|
"github.com/perfect-panel/server/pkg/logger"
|
||||||
|
|
||||||
"github.com/hibiken/asynq"
|
"github.com/hibiken/asynq"
|
||||||
@ -46,29 +48,38 @@ func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Ta
|
|||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var serverRatio float32 = 1.0
|
// query protocol ratio
|
||||||
if serverInfo.Ratio > 0 {
|
// default ratio is 1.0
|
||||||
serverRatio = serverInfo.Ratio
|
|
||||||
|
protocols, err := serverInfo.UnmarshalProtocols()
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("[TrafficStatistics] Unmarshal protocols failed: %s", err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var protocol *node.Protocol
|
||||||
|
|
||||||
|
var ratio float32 = 1.0
|
||||||
|
|
||||||
|
for _, p := range protocols {
|
||||||
|
if strings.ToLower(p.Type) == strings.ToLower(payload.Protocol) {
|
||||||
|
protocol = &p
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if protocol == nil {
|
||||||
|
logger.WithContext(ctx).Error("[TrafficStatistics] Protocol not found: %s", payload.Protocol)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// use protocol ratio if it's greater than 0
|
||||||
|
if protocol.Ratio > 0 {
|
||||||
|
ratio = float32(protocol.Ratio)
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
realTimeMultiplier := l.svc.NodeMultiplierManager.GetMultiplier(now)
|
realTimeMultiplier := l.svc.NodeMultiplierManager.GetMultiplier(now)
|
||||||
for _, log := range payload.Logs {
|
for _, log := range payload.Logs {
|
||||||
if log.Upload == 0 && log.Download == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// update user subscribe with log
|
|
||||||
d := int64(float32(log.Download) * serverRatio * realTimeMultiplier)
|
|
||||||
u := int64(float32(log.Upload) * serverRatio * realTimeMultiplier)
|
|
||||||
if err := l.svc.UserModel.UpdateUserSubscribeWithTraffic(ctx, log.SID, d, u); err != nil {
|
|
||||||
logger.WithContext(ctx).Error("[TrafficStatistics] Update user subscribe with log failed",
|
|
||||||
logger.Field("sid", log.SID),
|
|
||||||
logger.Field("download", float32(log.Download)*serverRatio),
|
|
||||||
logger.Field("upload", float32(log.Upload)*serverRatio),
|
|
||||||
logger.Field("error", err.Error()),
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// query user Subscribe Info
|
// query user Subscribe Info
|
||||||
sub, err := l.svc.UserModel.FindOneSubscribe(ctx, log.SID)
|
sub, err := l.svc.UserModel.FindOneSubscribe(ctx, log.SID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -79,8 +90,25 @@ func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Ta
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if log.Download+log.Upload <= l.svc.Config.Node.TrafficReportThreshold {
|
||||||
|
// no traffic, skip
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// update user subscribe with log
|
||||||
|
d := int64(float32(log.Download) * ratio * realTimeMultiplier)
|
||||||
|
u := int64(float32(log.Upload) * ratio * realTimeMultiplier)
|
||||||
|
if err := l.svc.UserModel.UpdateUserSubscribeWithTraffic(ctx, sub.Id, d, u); err != nil {
|
||||||
|
logger.WithContext(ctx).Error("[TrafficStatistics] Update user subscribe with log failed",
|
||||||
|
logger.Field("sid", log.SID),
|
||||||
|
logger.Field("download", float32(log.Download)*ratio),
|
||||||
|
logger.Field("upload", float32(log.Upload)*ratio),
|
||||||
|
logger.Field("error", err.Error()),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// create log log
|
// create log log
|
||||||
if err := l.svc.TrafficLogModel.Insert(ctx, &traffic.TrafficLog{
|
if err = l.svc.TrafficLogModel.Insert(ctx, &traffic.TrafficLog{
|
||||||
ServerId: payload.ServerId,
|
ServerId: payload.ServerId,
|
||||||
SubscribeId: log.SID,
|
SubscribeId: log.SID,
|
||||||
UserId: sub.UserId,
|
UserId: sub.UserId,
|
||||||
@ -90,8 +118,8 @@ func (l *TrafficStatisticsLogic) ProcessTask(ctx context.Context, task *asynq.Ta
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
logger.WithContext(ctx).Error("[TrafficStatistics] Create log log failed",
|
logger.WithContext(ctx).Error("[TrafficStatistics] Create log log failed",
|
||||||
logger.Field("uid", log.SID),
|
logger.Field("uid", log.SID),
|
||||||
logger.Field("download", float32(log.Download)*serverRatio),
|
logger.Field("download", float32(log.Download)*ratio),
|
||||||
logger.Field("upload", float32(log.Upload)*serverRatio),
|
logger.Field("upload", float32(log.Upload)*ratio),
|
||||||
logger.Field("error", err.Error()),
|
logger.Field("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,6 +10,7 @@ type UserTraffic struct {
|
|||||||
|
|
||||||
type TrafficStatistics struct {
|
type TrafficStatistics struct {
|
||||||
ServerId int64 `json:"server_id"`
|
ServerId int64 `json:"server_id"`
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
Logs []UserTraffic `json:"logs"`
|
Logs []UserTraffic `json:"logs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user