feat(traffic): add traffic statistics logging and scheduling

This commit is contained in:
Chang lue Tsen 2025-08-21 14:11:03 -04:00
parent 6b1b365734
commit d33f4cd1ce
11 changed files with 363 additions and 13 deletions

View File

@ -65,6 +65,7 @@ type (
TelephoneAreaCode string `json:"telephone_area_code" validate:"required"`
Password string `json:"password"`
IP string `header:"X-Original-Forwarded-For"`
UserAgent string `header:"User-Agent"`
CfToken string `json:"cf_token,optional"`
}
// Check user is exist request
@ -84,6 +85,7 @@ type (
Invite string `json:"invite,optional"`
Code string `json:"code,optional"`
IP string `header:"X-Original-Forwarded-For"`
UserAgent string `header:"User-Agent"`
CfToken string `json:"cf_token,optional"`
}
// User login response
@ -93,6 +95,7 @@ type (
Password string `json:"password" validate:"required"`
Code string `json:"code,optional"`
IP string `header:"X-Original-Forwarded-For"`
UserAgent string `header:"User-Agent"`
CfToken string `json:"cf_token,optional"`
}
AppleLoginCallbackRequest {

View File

@ -497,7 +497,6 @@ func (l *OAuthLoginGetTokenLogic) recordLoginStatus(loginStatus bool, userInfo *
}
content, _ := loginLog.Marshal()
if err := l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
Id: 0,
Type: log.TypeLogin.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: userInfo.Id,

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/perfect-panel/server/internal/config"
"github.com/perfect-panel/server/internal/model/log"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
"github.com/perfect-panel/server/pkg/constant"
@ -100,6 +101,29 @@ func (l *TelephoneResetPasswordLogic) TelephoneResetPassword(req *types.Telephon
if err = l.svcCtx.Redis.Set(l.ctx, sessionIdCacheKey, userInfo.Id, time.Duration(l.svcCtx.Config.JwtAuth.AccessExpire)*time.Second).Err(); err != nil {
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "set session id error: %v", err.Error())
}
defer func() {
if token != "" && userInfo.Id != 0 {
loginLog := log.Login{
LoginIP: req.IP,
UserAgent: req.UserAgent,
Success: token != "",
}
content, _ := loginLog.Marshal()
if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
Id: 0,
Type: log.TypeLogin.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: userInfo.Id,
Content: string(content),
}); err != nil {
l.Errorw("failed to insert login log",
logger.Field("user_id", userInfo.Id),
logger.Field("ip", req.IP),
logger.Field("error", err.Error()),
)
}
}
}()
return &types.LoginResponse{
Token: token,
}, nil

View File

@ -6,6 +6,7 @@ import (
"fmt"
"time"
"github.com/perfect-panel/server/internal/model/log"
"github.com/perfect-panel/server/pkg/constant"
"github.com/perfect-panel/server/internal/config"
@ -154,6 +155,51 @@ func (l *TelephoneUserRegisterLogic) TelephoneUserRegister(req *types.TelephoneR
if err = l.svcCtx.Redis.Set(l.ctx, sessionIdCacheKey, userInfo.Id, time.Duration(l.svcCtx.Config.JwtAuth.AccessExpire)*time.Second).Err(); err != nil {
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "set session id error: %v", err.Error())
}
defer func() {
if token != "" && userInfo.Id != 0 {
loginLog := log.Login{
LoginIP: req.IP,
UserAgent: req.UserAgent,
Success: token != "",
}
content, _ := loginLog.Marshal()
if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
Id: 0,
Type: log.TypeLogin.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: userInfo.Id,
Content: string(content),
}); err != nil {
l.Errorw("failed to insert login log",
logger.Field("user_id", userInfo.Id),
logger.Field("ip", req.IP),
logger.Field("error", err.Error()),
)
}
// Register log
registerLog := log.Register{
AuthMethod: "mobile",
Identifier: phoneNumber,
RegisterIP: req.IP,
UserAgent: req.UserAgent,
RegisterTime: time.Now().UnixMilli(),
}
content, _ = registerLog.Marshal()
if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
Type: log.TypeRegister.Uint8(),
ObjectID: userInfo.Id,
Date: time.Now().Format("2006-01-02"),
Content: string(content),
}); err != nil {
l.Errorw("failed to insert login log",
logger.Field("user_id", userInfo.Id),
logger.Field("ip", req.IP),
logger.Field("error", err.Error()))
}
}
}()
return &types.LoginResponse{
Token: token,
}, nil

View File

@ -48,7 +48,7 @@ func (l *UserLoginLogic) UserLogin(req *types.UserLoginRequest) (resp *types.Log
Success: loginStatus,
}
content, _ := loginLog.Marshal()
if err := l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
Type: log.TypeLogin.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: userInfo.Id,

View File

@ -165,6 +165,27 @@ func (l *UserRegisterLogic) UserRegister(req *types.UserRegisterRequest) (resp *
logger.Field("error", err.Error()),
)
}
// Register log
registerLog := log.Register{
AuthMethod: "email",
Identifier: req.Email,
RegisterIP: req.IP,
UserAgent: req.UserAgent,
RegisterTime: time.Now().UnixMilli(),
}
content, _ = registerLog.Marshal()
if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{
Type: log.TypeRegister.Uint8(),
ObjectID: userInfo.Id,
Date: time.Now().Format("2006-01-02"),
Content: string(content),
}); err != nil {
l.Errorw("failed to insert login log",
logger.Field("user_id", userInfo.Id),
logger.Field("ip", req.IP),
logger.Field("error", err.Error()))
}
}
}()
return &types.LoginResponse{

View File

@ -8,17 +8,19 @@ import (
type Type uint8
const (
TypeEmailMessage Type = iota + 1 // Message log
TypeMobileMessage // Mobile message log
TypeSubscribe // Subscription log
TypeSubscribeTraffic // Subscription traffic log
TypeServerTraffic // Server traffic log
TypeLogin // Login log
TypeRegister // Registration log
TypeBalance // Balance log
TypeCommission // Commission log
TypeResetSubscribe // Reset subscription log
TypeGift // Gift log
TypeEmailMessage Type = iota + 1 // Message log
TypeMobileMessage // Mobile message log
TypeSubscribe // Subscription log
TypeSubscribeTraffic // Subscription traffic log
TypeServerTraffic // Server traffic log
TypeLogin // Login log
TypeRegister // Registration log
TypeBalance // Balance log
TypeCommission // Commission log
TypeResetSubscribe // Reset subscription log
TypeGift // Gift log
TypeUserTrafficRank // Top 10 User traffic rank log
TypeServerTrafficRank // Top 10 Server traffic rank log
)
// Uint8 converts Type to uint8.
@ -296,3 +298,97 @@ func (g *Gift) Unmarshal(data []byte) error {
aux := (*Alias)(g)
return json.Unmarshal(data, aux)
}
// UserTraffic represents a user traffic log entry.
type UserTraffic struct {
SubscribeId int64 `json:"subscribe_id"` // Subscribe ID
UserId int64 `json:"user_id"` // User ID
Upload int64 `json:"upload"` // Upload traffic in bytes
Download int64 `json:"download"` // Download traffic in bytes
Total int64 `json:"total"` // Total traffic in bytes (Upload + Download)
}
// Marshal implements the json.Marshaler interface for UserTraffic.
func (u *UserTraffic) Marshal() ([]byte, error) {
type Alias UserTraffic
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(u),
})
}
// Unmarshal implements the json.Unmarshaler interface for UserTraffic.
func (u *UserTraffic) Unmarshal(data []byte) error {
type Alias UserTraffic
aux := (*Alias)(u)
return json.Unmarshal(data, aux)
}
// UserTrafficRank represents a user traffic rank entry.
type UserTrafficRank struct {
Rank map[uint8]UserTraffic `json:"rank"` // Key is rank ,type is UserTraffic
}
// Marshal implements the json.Marshaler interface for UserTrafficRank.
func (u *UserTrafficRank) Marshal() ([]byte, error) {
type Alias UserTrafficRank
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(u),
})
}
// Unmarshal implements the json.Unmarshaler interface for UserTrafficRank.
func (u *UserTrafficRank) Unmarshal(data []byte) error {
type Alias UserTrafficRank
aux := (*Alias)(u)
return json.Unmarshal(data, aux)
}
// ServerTraffic represents a server traffic log entry.
type ServerTraffic struct {
ServerId int64 `json:"server_id"` // Server ID
Upload int64 `json:"upload"` // Upload traffic in bytes
Download int64 `json:"download"` // Download traffic in bytes
}
// Marshal implements the json.Marshaler interface for ServerTraffic.
func (s *ServerTraffic) Marshal() ([]byte, error) {
type Alias ServerTraffic
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(s),
})
}
// Unmarshal implements the json.Unmarshaler interface for ServerTraffic.
func (s *ServerTraffic) Unmarshal(data []byte) error {
type Alias ServerTraffic
aux := (*Alias)(s)
return json.Unmarshal(data, aux)
}
// ServerTrafficRank represents a server traffic rank entry.
type ServerTrafficRank struct {
Rank map[uint8]ServerTraffic `json:"rank"` // Key is rank ,type is ServerTraffic
}
// Marshal implements the json.Marshaler interface for ServerTrafficRank.
func (s *ServerTrafficRank) Marshal() ([]byte, error) {
type Alias ServerTrafficRank
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(s),
})
}
// Unmarshal implements the json.Unmarshaler interface for ServerTrafficRank.
func (s *ServerTrafficRank) Unmarshal(data []byte) error {
type Alias ServerTrafficRank
aux := (*Alias)(s)
return json.Unmarshal(data, aux)
}

View File

@ -1731,6 +1731,7 @@ type TelephoneLoginRequest struct {
TelephoneAreaCode string `json:"telephone_area_code" validate:"required"`
Password string `json:"password"`
IP string `header:"X-Original-Forwarded-For"`
UserAgent string `header:"User-Agent"`
CfToken string `json:"cf_token,optional"`
}
@ -1741,6 +1742,7 @@ type TelephoneRegisterRequest struct {
Invite string `json:"invite,optional"`
Code string `json:"code,optional"`
IP string `header:"X-Original-Forwarded-For"`
UserAgent string `header:"User-Agent"`
CfToken string `json:"cf_token,optional"`
}
@ -1750,6 +1752,7 @@ type TelephoneResetPasswordRequest struct {
Password string `json:"password" validate:"required"`
Code string `json:"code,optional"`
IP string `header:"X-Original-Forwarded-For"`
UserAgent string `header:"User-Agent"`
CfToken string `json:"cf_token,optional"`
}

View File

@ -0,0 +1,151 @@
package traffic
import (
"context"
"time"
"github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/model/log"
"github.com/perfect-panel/server/internal/model/traffic"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/pkg/logger"
)
type StatLogic struct {
svc *svc.ServiceContext
}
func NewStatLogic(svc *svc.ServiceContext) *StatLogic {
return &StatLogic{
svc: svc,
}
}
func (l *StatLogic) ProcessTask(ctx context.Context, _ *asynq.Task) error {
now := time.Now()
tx := l.svc.DB.Begin()
var err error
defer func(err error) {
if err != nil {
logger.Errorf("[Traffic Stat Queue] Process task failed: %v", err.Error())
tx.Rollback()
} else {
logger.Infof("[Traffic Stat Queue] Process task completed successfully, consuming: %s", time.Since(now).String())
// 提交事务
if err = tx.Commit().Error; err != nil {
logger.Errorf("[Traffic Stat Queue] Commit transaction failed: %v", err.Error())
}
}
}(err)
// 获取全部有效订阅
var userTraffic []log.UserTraffic
// 获取统计时间范围
start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
end := start.Add(24 * time.Hour).Add(-time.Nanosecond)
// 查询用户流量统计, 按用户和订阅分组
err = tx.WithContext(ctx).Model(&traffic.TrafficLog{}).
Select("user_id, subscribe_id, SUM(download + upload) AS total, SUM(download) AS download, SUM(upload) AS upload").
Where("timestamp BETWEEN ? AND ?", start, end).
Group("user_id, subscribe_id").
Order("total DESC").
Scan(&userTraffic).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Query user traffic failed: %v", err.Error())
return err
}
date := start.Format(time.DateOnly)
userTop10 := log.UserTrafficRank{
Rank: make(map[uint8]log.UserTraffic),
}
// 更新用户流量统计
for i, trafficData := range userTraffic {
if i < 10 {
userTop10.Rank[uint8(i+1)] = trafficData
}
// 更新用户流量统计日志
content, _ := trafficData.Marshal()
err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeSubscribeTraffic.Uint8(),
Date: date,
ObjectID: trafficData.SubscribeId,
Content: string(content),
}).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Create user traffic log failed: %v", err.Error())
return err
}
}
userTop10Content, _ := userTop10.Marshal()
// 更新用户排行榜
err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeUserTrafficRank.Uint8(),
Date: date,
ObjectID: 0, // 0表示全局用户排行榜
Content: string(userTop10Content),
}).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Create user traffic rank log failed: %v", err.Error())
return err
}
// 统计服务器流量
var serverTraffic []log.ServerTraffic
err = tx.WithContext(ctx).Model(&traffic.TrafficLog{}).
Select("server_id, SUM(download + upload) AS total, SUM(download) AS download, SUM(upload) AS upload").
Where("timestamp BETWEEN ? AND ?", start, end).
Group("server_id").
Order("total DESC").
Scan(&serverTraffic).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Query server traffic failed: %v", err.Error())
return err
}
serverTop10 := log.ServerTrafficRank{
Rank: make(map[uint8]log.ServerTraffic),
}
for i, trafficData := range serverTraffic {
if i < 10 {
serverTop10.Rank[uint8(i+1)] = trafficData
}
// 更新服务器流量统计日志
content, _ := trafficData.Marshal()
err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeServerTraffic.Uint8(),
Date: date,
ObjectID: trafficData.ServerId,
Content: string(content),
}).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Create server traffic log failed: %v", err.Error())
return err
}
}
serverTop10Content, _ := serverTop10.Marshal()
// 更新服务器排行榜
err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeServerTrafficRank.Uint8(),
Date: date,
ObjectID: 0, // 0表示全局服务器排行榜
Content: string(serverTop10Content),
}).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Create server traffic rank log failed: %v", err.Error())
return err
}
// 删除过期的流量日志
err = tx.WithContext(ctx).Model(&traffic.TrafficLog{}).Where("created_at <= ?", end).Delete(&traffic.TrafficLog{}).Error
if err != nil {
logger.Errorf("[Traffic Stat Queue] Delete server traffic log failed: %v", err.Error())
}
return nil
}

View File

@ -4,4 +4,5 @@ const (
SchedulerCheckSubscription = "scheduler:check:subscription"
SchedulerTotalServerData = "scheduler:total:server"
SchedulerResetTraffic = "scheduler:reset:traffic"
SchedulerTrafficStat = "scheduler:traffic:stat"
)

View File

@ -40,6 +40,12 @@ func (m *Service) Start() {
logger.Errorf("register reset traffic task failed: %s", err.Error())
}
// schedule traffic stat task: every day at 00:00
trafficStatTask := asynq.NewTask(types.SchedulerTrafficStat, nil)
if _, err := m.server.Register("0 0 * * *", trafficStatTask, asynq.MaxRetry(3)); err != nil {
logger.Errorf("register traffic stat task failed: %s", err.Error())
}
if err := m.server.Run(); err != nil {
logger.Errorf("run scheduler failed: %s", err.Error())
}