diff --git a/apis/auth/auth.api b/apis/auth/auth.api index 50c82d1..cb7225d 100644 --- a/apis/auth/auth.api +++ b/apis/auth/auth.api @@ -65,6 +65,7 @@ type ( TelephoneAreaCode string `json:"telephone_area_code" validate:"required"` Password string `json:"password"` IP string `header:"X-Original-Forwarded-For"` + UserAgent string `header:"User-Agent"` CfToken string `json:"cf_token,optional"` } // Check user is exist request @@ -84,6 +85,7 @@ type ( Invite string `json:"invite,optional"` Code string `json:"code,optional"` IP string `header:"X-Original-Forwarded-For"` + UserAgent string `header:"User-Agent"` CfToken string `json:"cf_token,optional"` } // User login response @@ -93,6 +95,7 @@ type ( Password string `json:"password" validate:"required"` Code string `json:"code,optional"` IP string `header:"X-Original-Forwarded-For"` + UserAgent string `header:"User-Agent"` CfToken string `json:"cf_token,optional"` } AppleLoginCallbackRequest { diff --git a/internal/logic/auth/oauth/oAuthLoginGetTokenLogic.go b/internal/logic/auth/oauth/oAuthLoginGetTokenLogic.go index 8f0e2ed..4107065 100644 --- a/internal/logic/auth/oauth/oAuthLoginGetTokenLogic.go +++ b/internal/logic/auth/oauth/oAuthLoginGetTokenLogic.go @@ -497,7 +497,6 @@ func (l *OAuthLoginGetTokenLogic) recordLoginStatus(loginStatus bool, userInfo * } content, _ := loginLog.Marshal() if err := l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ - Id: 0, Type: log.TypeLogin.Uint8(), Date: time.Now().Format("2006-01-02"), ObjectID: userInfo.Id, diff --git a/internal/logic/auth/telephoneResetPasswordLogic.go b/internal/logic/auth/telephoneResetPasswordLogic.go index 972b9ae..21f0062 100644 --- a/internal/logic/auth/telephoneResetPasswordLogic.go +++ b/internal/logic/auth/telephoneResetPasswordLogic.go @@ -6,6 +6,7 @@ import ( "time" "github.com/perfect-panel/server/internal/config" + "github.com/perfect-panel/server/internal/model/log" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/constant" @@ -100,6 +101,29 @@ func (l *TelephoneResetPasswordLogic) TelephoneResetPassword(req *types.Telephon if err = l.svcCtx.Redis.Set(l.ctx, sessionIdCacheKey, userInfo.Id, time.Duration(l.svcCtx.Config.JwtAuth.AccessExpire)*time.Second).Err(); err != nil { return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "set session id error: %v", err.Error()) } + defer func() { + if token != "" && userInfo.Id != 0 { + loginLog := log.Login{ + LoginIP: req.IP, + UserAgent: req.UserAgent, + Success: token != "", + } + content, _ := loginLog.Marshal() + if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ + Id: 0, + Type: log.TypeLogin.Uint8(), + Date: time.Now().Format("2006-01-02"), + ObjectID: userInfo.Id, + Content: string(content), + }); err != nil { + l.Errorw("failed to insert login log", + logger.Field("user_id", userInfo.Id), + logger.Field("ip", req.IP), + logger.Field("error", err.Error()), + ) + } + } + }() return &types.LoginResponse{ Token: token, }, nil diff --git a/internal/logic/auth/telephoneUserRegisterLogic.go b/internal/logic/auth/telephoneUserRegisterLogic.go index 0ff2c6d..d0e22f4 100644 --- a/internal/logic/auth/telephoneUserRegisterLogic.go +++ b/internal/logic/auth/telephoneUserRegisterLogic.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/perfect-panel/server/internal/model/log" "github.com/perfect-panel/server/pkg/constant" "github.com/perfect-panel/server/internal/config" @@ -154,6 +155,51 @@ func (l *TelephoneUserRegisterLogic) TelephoneUserRegister(req *types.TelephoneR if err = l.svcCtx.Redis.Set(l.ctx, sessionIdCacheKey, userInfo.Id, time.Duration(l.svcCtx.Config.JwtAuth.AccessExpire)*time.Second).Err(); err != nil { return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "set session id error: %v", err.Error()) } + + defer func() { + if token != "" && userInfo.Id != 0 { + loginLog := log.Login{ + LoginIP: req.IP, + UserAgent: req.UserAgent, + Success: token != "", + } + content, _ := loginLog.Marshal() + if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ + Id: 0, + Type: log.TypeLogin.Uint8(), + Date: time.Now().Format("2006-01-02"), + ObjectID: userInfo.Id, + Content: string(content), + }); err != nil { + l.Errorw("failed to insert login log", + logger.Field("user_id", userInfo.Id), + logger.Field("ip", req.IP), + logger.Field("error", err.Error()), + ) + } + + // Register log + registerLog := log.Register{ + AuthMethod: "mobile", + Identifier: phoneNumber, + RegisterIP: req.IP, + UserAgent: req.UserAgent, + RegisterTime: time.Now().UnixMilli(), + } + content, _ = registerLog.Marshal() + if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ + Type: log.TypeRegister.Uint8(), + ObjectID: userInfo.Id, + Date: time.Now().Format("2006-01-02"), + Content: string(content), + }); err != nil { + l.Errorw("failed to insert login log", + logger.Field("user_id", userInfo.Id), + logger.Field("ip", req.IP), + logger.Field("error", err.Error())) + } + } + }() return &types.LoginResponse{ Token: token, }, nil diff --git a/internal/logic/auth/userLoginLogic.go b/internal/logic/auth/userLoginLogic.go index 5e7abed..81be1ae 100644 --- a/internal/logic/auth/userLoginLogic.go +++ b/internal/logic/auth/userLoginLogic.go @@ -48,7 +48,7 @@ func (l *UserLoginLogic) UserLogin(req *types.UserLoginRequest) (resp *types.Log Success: loginStatus, } content, _ := loginLog.Marshal() - if err := l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ + if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ Type: log.TypeLogin.Uint8(), Date: time.Now().Format("2006-01-02"), ObjectID: userInfo.Id, diff --git a/internal/logic/auth/userRegisterLogic.go b/internal/logic/auth/userRegisterLogic.go index ebe23e0..a3d7fc2 100644 --- a/internal/logic/auth/userRegisterLogic.go +++ b/internal/logic/auth/userRegisterLogic.go @@ -165,6 +165,27 @@ func (l *UserRegisterLogic) UserRegister(req *types.UserRegisterRequest) (resp * logger.Field("error", err.Error()), ) } + + // Register log + registerLog := log.Register{ + AuthMethod: "email", + Identifier: req.Email, + RegisterIP: req.IP, + UserAgent: req.UserAgent, + RegisterTime: time.Now().UnixMilli(), + } + content, _ = registerLog.Marshal() + if err = l.svcCtx.LogModel.Insert(l.ctx, &log.SystemLog{ + Type: log.TypeRegister.Uint8(), + ObjectID: userInfo.Id, + Date: time.Now().Format("2006-01-02"), + Content: string(content), + }); err != nil { + l.Errorw("failed to insert login log", + logger.Field("user_id", userInfo.Id), + logger.Field("ip", req.IP), + logger.Field("error", err.Error())) + } } }() return &types.LoginResponse{ diff --git a/internal/model/log/log.go b/internal/model/log/log.go index 28574c2..463332e 100644 --- a/internal/model/log/log.go +++ b/internal/model/log/log.go @@ -8,17 +8,19 @@ import ( type Type uint8 const ( - TypeEmailMessage Type = iota + 1 // Message log - TypeMobileMessage // Mobile message log - TypeSubscribe // Subscription log - TypeSubscribeTraffic // Subscription traffic log - TypeServerTraffic // Server traffic log - TypeLogin // Login log - TypeRegister // Registration log - TypeBalance // Balance log - TypeCommission // Commission log - TypeResetSubscribe // Reset subscription log - TypeGift // Gift log + TypeEmailMessage Type = iota + 1 // Message log + TypeMobileMessage // Mobile message log + TypeSubscribe // Subscription log + TypeSubscribeTraffic // Subscription traffic log + TypeServerTraffic // Server traffic log + TypeLogin // Login log + TypeRegister // Registration log + TypeBalance // Balance log + TypeCommission // Commission log + TypeResetSubscribe // Reset subscription log + TypeGift // Gift log + TypeUserTrafficRank // Top 10 User traffic rank log + TypeServerTrafficRank // Top 10 Server traffic rank log ) // Uint8 converts Type to uint8. @@ -296,3 +298,97 @@ func (g *Gift) Unmarshal(data []byte) error { aux := (*Alias)(g) return json.Unmarshal(data, aux) } + +// UserTraffic represents a user traffic log entry. +type UserTraffic struct { + SubscribeId int64 `json:"subscribe_id"` // Subscribe ID + UserId int64 `json:"user_id"` // User ID + Upload int64 `json:"upload"` // Upload traffic in bytes + Download int64 `json:"download"` // Download traffic in bytes + Total int64 `json:"total"` // Total traffic in bytes (Upload + Download) +} + +// Marshal implements the json.Marshaler interface for UserTraffic. +func (u *UserTraffic) Marshal() ([]byte, error) { + type Alias UserTraffic + return json.Marshal(&struct { + *Alias + }{ + Alias: (*Alias)(u), + }) +} + +// Unmarshal implements the json.Unmarshaler interface for UserTraffic. +func (u *UserTraffic) Unmarshal(data []byte) error { + type Alias UserTraffic + aux := (*Alias)(u) + return json.Unmarshal(data, aux) +} + +// UserTrafficRank represents a user traffic rank entry. +type UserTrafficRank struct { + Rank map[uint8]UserTraffic `json:"rank"` // Key is rank ,type is UserTraffic +} + +// Marshal implements the json.Marshaler interface for UserTrafficRank. +func (u *UserTrafficRank) Marshal() ([]byte, error) { + type Alias UserTrafficRank + return json.Marshal(&struct { + *Alias + }{ + Alias: (*Alias)(u), + }) +} + +// Unmarshal implements the json.Unmarshaler interface for UserTrafficRank. +func (u *UserTrafficRank) Unmarshal(data []byte) error { + type Alias UserTrafficRank + aux := (*Alias)(u) + return json.Unmarshal(data, aux) +} + +// ServerTraffic represents a server traffic log entry. +type ServerTraffic struct { + ServerId int64 `json:"server_id"` // Server ID + Upload int64 `json:"upload"` // Upload traffic in bytes + Download int64 `json:"download"` // Download traffic in bytes +} + +// Marshal implements the json.Marshaler interface for ServerTraffic. +func (s *ServerTraffic) Marshal() ([]byte, error) { + type Alias ServerTraffic + return json.Marshal(&struct { + *Alias + }{ + Alias: (*Alias)(s), + }) +} + +// Unmarshal implements the json.Unmarshaler interface for ServerTraffic. +func (s *ServerTraffic) Unmarshal(data []byte) error { + type Alias ServerTraffic + aux := (*Alias)(s) + return json.Unmarshal(data, aux) +} + +// ServerTrafficRank represents a server traffic rank entry. +type ServerTrafficRank struct { + Rank map[uint8]ServerTraffic `json:"rank"` // Key is rank ,type is ServerTraffic +} + +// Marshal implements the json.Marshaler interface for ServerTrafficRank. +func (s *ServerTrafficRank) Marshal() ([]byte, error) { + type Alias ServerTrafficRank + return json.Marshal(&struct { + *Alias + }{ + Alias: (*Alias)(s), + }) +} + +// Unmarshal implements the json.Unmarshaler interface for ServerTrafficRank. +func (s *ServerTrafficRank) Unmarshal(data []byte) error { + type Alias ServerTrafficRank + aux := (*Alias)(s) + return json.Unmarshal(data, aux) +} diff --git a/internal/types/types.go b/internal/types/types.go index f6221fe..f173018 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -1731,6 +1731,7 @@ type TelephoneLoginRequest struct { TelephoneAreaCode string `json:"telephone_area_code" validate:"required"` Password string `json:"password"` IP string `header:"X-Original-Forwarded-For"` + UserAgent string `header:"User-Agent"` CfToken string `json:"cf_token,optional"` } @@ -1741,6 +1742,7 @@ type TelephoneRegisterRequest struct { Invite string `json:"invite,optional"` Code string `json:"code,optional"` IP string `header:"X-Original-Forwarded-For"` + UserAgent string `header:"User-Agent"` CfToken string `json:"cf_token,optional"` } @@ -1750,6 +1752,7 @@ type TelephoneResetPasswordRequest struct { Password string `json:"password" validate:"required"` Code string `json:"code,optional"` IP string `header:"X-Original-Forwarded-For"` + UserAgent string `header:"User-Agent"` CfToken string `json:"cf_token,optional"` } diff --git a/queue/logic/traffic/trafficStatLogic.go b/queue/logic/traffic/trafficStatLogic.go new file mode 100644 index 0000000..7b962c8 --- /dev/null +++ b/queue/logic/traffic/trafficStatLogic.go @@ -0,0 +1,151 @@ +package traffic + +import ( + "context" + "time" + + "github.com/hibiken/asynq" + "github.com/perfect-panel/server/internal/model/log" + "github.com/perfect-panel/server/internal/model/traffic" + "github.com/perfect-panel/server/internal/svc" + "github.com/perfect-panel/server/pkg/logger" +) + +type StatLogic struct { + svc *svc.ServiceContext +} + +func NewStatLogic(svc *svc.ServiceContext) *StatLogic { + return &StatLogic{ + svc: svc, + } +} + +func (l *StatLogic) ProcessTask(ctx context.Context, _ *asynq.Task) error { + now := time.Now() + tx := l.svc.DB.Begin() + var err error + defer func(err error) { + if err != nil { + logger.Errorf("[Traffic Stat Queue] Process task failed: %v", err.Error()) + tx.Rollback() + } else { + logger.Infof("[Traffic Stat Queue] Process task completed successfully, consuming: %s", time.Since(now).String()) + // 提交事务 + if err = tx.Commit().Error; err != nil { + logger.Errorf("[Traffic Stat Queue] Commit transaction failed: %v", err.Error()) + } + } + }(err) + + // 获取全部有效订阅 + var userTraffic []log.UserTraffic + // 获取统计时间范围 + start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local) + end := start.Add(24 * time.Hour).Add(-time.Nanosecond) + + // 查询用户流量统计, 按用户和订阅分组 + err = tx.WithContext(ctx).Model(&traffic.TrafficLog{}). + Select("user_id, subscribe_id, SUM(download + upload) AS total, SUM(download) AS download, SUM(upload) AS upload"). + Where("timestamp BETWEEN ? AND ?", start, end). + Group("user_id, subscribe_id"). + Order("total DESC"). + Scan(&userTraffic).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Query user traffic failed: %v", err.Error()) + return err + } + + date := start.Format(time.DateOnly) + + userTop10 := log.UserTrafficRank{ + Rank: make(map[uint8]log.UserTraffic), + } + + // 更新用户流量统计 + for i, trafficData := range userTraffic { + if i < 10 { + userTop10.Rank[uint8(i+1)] = trafficData + } + // 更新用户流量统计日志 + content, _ := trafficData.Marshal() + err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{ + Type: log.TypeSubscribeTraffic.Uint8(), + Date: date, + ObjectID: trafficData.SubscribeId, + Content: string(content), + }).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Create user traffic log failed: %v", err.Error()) + return err + } + } + + userTop10Content, _ := userTop10.Marshal() + + // 更新用户排行榜 + err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{ + Type: log.TypeUserTrafficRank.Uint8(), + Date: date, + ObjectID: 0, // 0表示全局用户排行榜 + Content: string(userTop10Content), + }).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Create user traffic rank log failed: %v", err.Error()) + return err + } + + // 统计服务器流量 + var serverTraffic []log.ServerTraffic + err = tx.WithContext(ctx).Model(&traffic.TrafficLog{}). + Select("server_id, SUM(download + upload) AS total, SUM(download) AS download, SUM(upload) AS upload"). + Where("timestamp BETWEEN ? AND ?", start, end). + Group("server_id"). + Order("total DESC"). + Scan(&serverTraffic).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Query server traffic failed: %v", err.Error()) + return err + } + + serverTop10 := log.ServerTrafficRank{ + Rank: make(map[uint8]log.ServerTraffic), + } + for i, trafficData := range serverTraffic { + if i < 10 { + serverTop10.Rank[uint8(i+1)] = trafficData + } + // 更新服务器流量统计日志 + content, _ := trafficData.Marshal() + err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{ + Type: log.TypeServerTraffic.Uint8(), + Date: date, + ObjectID: trafficData.ServerId, + Content: string(content), + }).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Create server traffic log failed: %v", err.Error()) + return err + } + } + serverTop10Content, _ := serverTop10.Marshal() + // 更新服务器排行榜 + err = tx.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{ + Type: log.TypeServerTrafficRank.Uint8(), + Date: date, + ObjectID: 0, // 0表示全局服务器排行榜 + Content: string(serverTop10Content), + }).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Create server traffic rank log failed: %v", err.Error()) + return err + } + + // 删除过期的流量日志 + err = tx.WithContext(ctx).Model(&traffic.TrafficLog{}).Where("created_at <= ?", end).Delete(&traffic.TrafficLog{}).Error + if err != nil { + logger.Errorf("[Traffic Stat Queue] Delete server traffic log failed: %v", err.Error()) + } + + return nil +} diff --git a/queue/types/scheduler.go b/queue/types/scheduler.go index 26a32f0..51ef48c 100644 --- a/queue/types/scheduler.go +++ b/queue/types/scheduler.go @@ -4,4 +4,5 @@ const ( SchedulerCheckSubscription = "scheduler:check:subscription" SchedulerTotalServerData = "scheduler:total:server" SchedulerResetTraffic = "scheduler:reset:traffic" + SchedulerTrafficStat = "scheduler:traffic:stat" ) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 813f41e..7c691ef 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -40,6 +40,12 @@ func (m *Service) Start() { logger.Errorf("register reset traffic task failed: %s", err.Error()) } + // schedule traffic stat task: every day at 00:00 + trafficStatTask := asynq.NewTask(types.SchedulerTrafficStat, nil) + if _, err := m.server.Register("0 0 * * *", trafficStatTask, asynq.MaxRetry(3)); err != nil { + logger.Errorf("register traffic stat task failed: %s", err.Error()) + } + if err := m.server.Run(); err != nil { logger.Errorf("run scheduler failed: %s", err.Error()) }