From 3f5aac239b4f9afbc683a4a32c9f1a6091ec084a Mon Sep 17 00:00:00 2001 From: Chang lue Tsen Date: Wed, 10 Sep 2025 14:53:48 -0400 Subject: [PATCH] feat(quota): enhance quota task management with new request structures and processing logic --- apis/admin/marketing.api | 62 +-- .../admin/marketing/createQuotaTaskLogic.go | 77 ++- .../marketing/getPreSendEmailCountLogic.go | 16 +- .../marketing/queryQuotaTaskListLogic.go | 59 ++- .../marketing/queryQuotaTaskPreCountLogic.go | 31 +- .../marketing/queryQuotaTaskStatusLogic.go | 20 +- internal/model/log/log.go | 1 + internal/model/task/task.go | 39 +- internal/model/user/model.go | 2 +- internal/types/types.go | 58 +-- pkg/tool/time.go | 7 + queue/handler/routes.go | 4 + queue/logic/task/quotaLogic.go | 448 ++++++++++++++++++ queue/types/email.go | 2 - queue/types/task.go | 9 + 15 files changed, 740 insertions(+), 95 deletions(-) create mode 100644 queue/logic/task/quotaLogic.go create mode 100644 queue/types/task.go diff --git a/apis/admin/marketing.api b/apis/admin/marketing.api index dd0a1fa..8014c0d 100644 --- a/apis/admin/marketing.api +++ b/apis/admin/marketing.api @@ -53,9 +53,9 @@ type ( Id int64 `json:"id"` } GetPreSendEmailCountRequest { - Scope string `json:"scope"` - RegisterStartTime int64 `json:"register_start_time,omitempty"` - RegisterEndTime int64 `json:"register_end_time,omitempty"` + Scope int8 `json:"scope"` + RegisterStartTime int64 `json:"register_start_time,omitempty"` + RegisterEndTime int64 `json:"register_end_time,omitempty"` } GetPreSendEmailCountResponse { Count int64 `json:"count"` @@ -70,33 +70,38 @@ type ( Errors string `json:"errors"` } CreateQuotaTaskRequest { - Scope int8 `json:"scope"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` - QuotaType uint8 `json:"quota_type"` - Days uint64 `json:"days"` // Number of days for the quota - Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift amount for quota + Subscribers []int64 `json:"subscribers"` + IsActive *bool `json:"is_active"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + ResetTraffic bool `json:"reset_traffic"` + Days uint64 `json:"days"` + GiftType uint8 `json:"gift_type"` + GiftValue uint64 `json:"gift_value"` } QuotaTask { - Id int64 `json:"id"` - Scope int8 `json:"scope"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` - QuotaType uint8 `json:"quota_type"` - Days uint64 `json:"days"` // Number of days for the quota - Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift - Recipients []int64 `json:"recipients"` // UserSubscribe IDs of recipients - Status uint8 `json:"status"` - Total int64 `json:"total"` - Current int64 `json:"current"` - Errors string `json:"errors"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + Id int64 `json:"id"` + Subscribers []int64 `json:"subscribers"` + IsActive *bool `json:"is_active"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + ResetTraffic bool `json:"reset_traffic"` + Days uint64 `json:"days"` + GiftType uint8 `json:"gift_type"` + GiftValue uint64 `json:"gift_value"` + Objects []int64 `json:"objects"` // UserSubscribe IDs + Status uint8 `json:"status"` + Total int64 `json:"total"` + Current int64 `json:"current"` + Errors string `json:"errors"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` } QueryQuotaTaskPreCountRequest { - Scope uint8 `json:"scope"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` + Subscribers []int64 `json:"subscribers"` + IsActive *bool `json:"is_active"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` } QueryQuotaTaskPreCountResponse { Count int64 `json:"count"` @@ -104,7 +109,6 @@ type ( QueryQuotaTaskListRequest { Page int `form:"page"` Size int `form:"size"` - Scope *uint8 `form:"scope,omitempty"` Status *uint8 `form:"status,omitempty"` } QueryQuotaTaskListResponse { @@ -159,9 +163,5 @@ service ppanel { @doc "Query quota task list" @handler QueryQuotaTaskList get /quota/list (QueryQuotaTaskListRequest) returns (QueryQuotaTaskListResponse) - - @doc "Query quota task status" - @handler QueryQuotaTaskStatus - post /quota/status (QueryQuotaTaskStatusRequest) returns (QueryQuotaTaskStatusResponse) } diff --git a/internal/logic/admin/marketing/createQuotaTaskLogic.go b/internal/logic/admin/marketing/createQuotaTaskLogic.go index 5e052dc..8eb29d5 100644 --- a/internal/logic/admin/marketing/createQuotaTaskLogic.go +++ b/internal/logic/admin/marketing/createQuotaTaskLogic.go @@ -2,10 +2,18 @@ package marketing import ( "context" + "strconv" + "time" + "github.com/hibiken/asynq" + "github.com/perfect-panel/server/internal/model/task" + "github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" + "github.com/perfect-panel/server/pkg/xerr" + queueType "github.com/perfect-panel/server/queue/types" + "github.com/pkg/errors" ) type CreateQuotaTaskLogic struct { @@ -14,7 +22,7 @@ type CreateQuotaTaskLogic struct { svcCtx *svc.ServiceContext } -// Create a quota task +// NewCreateQuotaTaskLogic Create a quota task func NewCreateQuotaTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateQuotaTaskLogic { return &CreateQuotaTaskLogic{ Logger: logger.WithContext(ctx), @@ -24,7 +32,72 @@ func NewCreateQuotaTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C } func (l *CreateQuotaTaskLogic) CreateQuotaTask(req *types.CreateQuotaTaskRequest) error { - // todo: add your logic here and delete this line + var subs []*user.Subscribe + query := l.svcCtx.DB.WithContext(l.ctx).Model(&user.Subscribe{}) + if len(req.Subscribers) > 0 { + query = query.Where("`subscribe_id` IN ?", req.Subscribers) + } + if req.IsActive != nil && *req.IsActive { + query = query.Where("`status` IN ?", []int64{0, 1, 2}) // 0: Pending 1: Active 2: Finished + } + if req.StartTime != 0 { + start := time.UnixMilli(req.StartTime) + query = query.Where("`start_time` >= ?", start) + } + if req.EndTime != 0 { + end := time.UnixMilli(req.EndTime) + query = query.Where("`start_time` <= ?", end) + } + + if err := query.Find(&subs).Error; err != nil { + l.Errorf("[CreateQuotaTask] find subscribers error: %v", err.Error()) + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "find subscribers error") + } + if len(subs) == 0 { + return errors.Wrapf(xerr.NewErrMsg("No subscribers found"), "no subscribers found") + } + var subIds []int64 + for _, sub := range subs { + subIds = append(subIds, sub.Id) + } + + scopeInfo := task.QuotaScope{ + Subscribers: req.Subscribers, + IsActive: req.IsActive, + StartTime: req.StartTime, + EndTime: req.EndTime, + } + scopeBytes, _ := scopeInfo.Marshal() + contentInfo := task.QuotaContent{ + ResetTraffic: req.ResetTraffic, + Days: req.Days, + GiftType: req.GiftType, + GiftValue: req.GiftValue, + } + contentBytes, _ := contentInfo.Marshal() + // create task + newTask := &task.Task{ + Type: task.TypeQuota, + Status: 0, + Scope: string(scopeBytes), + Content: string(contentBytes), + Total: uint64(len(subIds)), + Current: 0, + Errors: "", + } + + if err := l.svcCtx.DB.WithContext(l.ctx).Model(&task.Task{}).Create(newTask).Error; err != nil { + l.Errorf("[CreateQuotaTask] create task error: %v", err.Error()) + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "create task error") + } + + // enqueue task + queueTask := asynq.NewTask(queueType.ForthwithQuotaTask, []byte(strconv.FormatInt(newTask.Id, 10))) + if _, err := l.svcCtx.Queue.EnqueueContext(l.ctx, queueTask); err != nil { + l.Errorf("[CreateQuotaTask] enqueue task error: %v", err.Error()) + return errors.Wrapf(xerr.NewErrCode(xerr.QueueEnqueueError), "enqueue task error") + } + logger.Infof("[CreateQuotaTask] Successfully created task with ID: %d", newTask.Id) return nil } diff --git a/internal/logic/admin/marketing/getPreSendEmailCountLogic.go b/internal/logic/admin/marketing/getPreSendEmailCountLogic.go index 6928098..9fbdbe4 100644 --- a/internal/logic/admin/marketing/getPreSendEmailCountLogic.go +++ b/internal/logic/admin/marketing/getPreSendEmailCountLogic.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/perfect-panel/server/internal/model/task" "github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" @@ -50,28 +51,29 @@ func (l *GetPreSendEmailCountLogic) GetPreSendEmailCount(req *types.GetPreSendEm return query } var query *gorm.DB - switch req.Scope { - case "all": + scope := task.ParseScopeType(req.Scope) + + switch scope { + case task.ScopeAll: query = baseQuery() - case "active": + case task.ScopeActive: query = baseQuery(). Joins("JOIN user_subscribe ON user.id = user_subscribe.user_id"). Where("user_subscribe.status IN ?", []int64{1, 2}) - case "expired": + case task.ScopeExpired: query = baseQuery(). Joins("JOIN user_subscribe ON user.id = user_subscribe.user_id"). Where("user_subscribe.status = ?", 3) - case "none": + case task.ScopeNone: query = baseQuery(). Joins("LEFT JOIN user_subscribe ON user.id = user_subscribe.user_id"). Where("user_subscribe.user_id IS NULL") - case "skip": + case task.ScopeSkip: // Skip scope does not require a count query = nil - default: l.Errorf("[CreateBatchSendEmailTask] Invalid scope: %v", req.Scope) return nil, xerr.NewErrMsg("Invalid email scope") diff --git a/internal/logic/admin/marketing/queryQuotaTaskListLogic.go b/internal/logic/admin/marketing/queryQuotaTaskListLogic.go index fa4aae6..50cfd9f 100644 --- a/internal/logic/admin/marketing/queryQuotaTaskListLogic.go +++ b/internal/logic/admin/marketing/queryQuotaTaskListLogic.go @@ -3,6 +3,7 @@ package marketing import ( "context" + "github.com/perfect-panel/server/internal/model/task" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" @@ -14,7 +15,7 @@ type QueryQuotaTaskListLogic struct { svcCtx *svc.ServiceContext } -// Query quota task list +// NewQueryQuotaTaskListLogic Query quota task list func NewQueryQuotaTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryQuotaTaskListLogic { return &QueryQuotaTaskListLogic{ Logger: logger.WithContext(ctx), @@ -24,7 +25,59 @@ func NewQueryQuotaTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *QueryQuotaTaskListLogic) QueryQuotaTaskList(req *types.QueryQuotaTaskListRequest) (resp *types.QueryQuotaTaskListResponse, err error) { - // todo: add your logic here and delete this line + var data []*task.Task + var count int64 + query := l.svcCtx.DB.Model(&task.Task{}).Where("`type` = ?", task.TypeQuota) + if req.Page == 0 { + req.Page = 1 + } + if req.Size == 0 { + req.Size = 20 + } - return + if req.Status != nil { + query = query.Where("`status` = ?", *req.Status) + } + err = query.Count(&count).Offset((req.Page - 1) * req.Size).Limit(req.Size).Order("created_at DESC").Find(&data).Error + if err != nil { + l.Errorf("[QueryQuotaTaskList] failed to get quota tasks: %v", err) + return nil, err + } + + var list []types.QuotaTask + for _, item := range data { + var scopeInfo task.QuotaScope + if err = scopeInfo.Unmarshal([]byte(item.Scope)); err != nil { + l.Errorf("[QueryQuotaTaskList] failed to unmarshal quota task scope: %v", err.Error()) + continue + } + var contentInfo task.QuotaContent + if err = contentInfo.Unmarshal([]byte(item.Content)); err != nil { + l.Errorf("[QueryQuotaTaskList] failed to unmarshal quota task content: %v", err.Error()) + continue + } + list = append(list, types.QuotaTask{ + Id: item.Id, + Subscribers: scopeInfo.Subscribers, + IsActive: scopeInfo.IsActive, + StartTime: scopeInfo.StartTime, + EndTime: scopeInfo.EndTime, + ResetTraffic: contentInfo.ResetTraffic, + Days: contentInfo.Days, + GiftType: contentInfo.GiftType, + GiftValue: contentInfo.GiftValue, + Objects: scopeInfo.Objects, + Status: uint8(item.Status), + Total: int64(item.Total), + Current: int64(item.Current), + Errors: item.Errors, + CreatedAt: item.CreatedAt.UnixMilli(), + UpdatedAt: item.UpdatedAt.UnixMilli(), + }) + } + + return &types.QueryQuotaTaskListResponse{ + Total: count, + List: list, + }, nil } diff --git a/internal/logic/admin/marketing/queryQuotaTaskPreCountLogic.go b/internal/logic/admin/marketing/queryQuotaTaskPreCountLogic.go index 6f12b07..d004619 100644 --- a/internal/logic/admin/marketing/queryQuotaTaskPreCountLogic.go +++ b/internal/logic/admin/marketing/queryQuotaTaskPreCountLogic.go @@ -2,7 +2,9 @@ package marketing import ( "context" + "time" + "github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" @@ -14,7 +16,7 @@ type QueryQuotaTaskPreCountLogic struct { svcCtx *svc.ServiceContext } -// Query quota task pre-count +// NewQueryQuotaTaskPreCountLogic Query quota task pre-count func NewQueryQuotaTaskPreCountLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryQuotaTaskPreCountLogic { return &QueryQuotaTaskPreCountLogic{ Logger: logger.WithContext(ctx), @@ -24,7 +26,30 @@ func NewQueryQuotaTaskPreCountLogic(ctx context.Context, svcCtx *svc.ServiceCont } func (l *QueryQuotaTaskPreCountLogic) QueryQuotaTaskPreCount(req *types.QueryQuotaTaskPreCountRequest) (resp *types.QueryQuotaTaskPreCountResponse, err error) { - // todo: add your logic here and delete this line + tx := l.svcCtx.DB.WithContext(l.ctx).Model(&user.Subscribe{}) + var count int64 - return + if len(req.Subscribers) > 0 { + tx = tx.Where("`subscribe_id` IN ?", req.Subscribers) + } + + if req.IsActive != nil && *req.IsActive { + tx = tx.Where("`status` IN ?", []int64{0, 1, 2}) // 0: Pending 1: Active 2: Finished + } + if req.StartTime != 0 { + start := time.UnixMilli(req.StartTime) + tx = tx.Where("`start_time` >= ?", start) + } + if req.EndTime != 0 { + end := time.UnixMilli(req.EndTime) + tx = tx.Where("`start_time` <= ?", end) + } + if err = tx.Count(&count).Error; err != nil { + l.Errorf("[QueryQuotaTaskPreCount] count error: %v", err.Error()) + return nil, err + } + + return &types.QueryQuotaTaskPreCountResponse{ + Count: count, + }, nil } diff --git a/internal/logic/admin/marketing/queryQuotaTaskStatusLogic.go b/internal/logic/admin/marketing/queryQuotaTaskStatusLogic.go index 3aa250e..70599fe 100644 --- a/internal/logic/admin/marketing/queryQuotaTaskStatusLogic.go +++ b/internal/logic/admin/marketing/queryQuotaTaskStatusLogic.go @@ -3,9 +3,12 @@ package marketing import ( "context" + "github.com/perfect-panel/server/internal/model/task" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/logger" + "github.com/perfect-panel/server/pkg/xerr" + "github.com/pkg/errors" ) type QueryQuotaTaskStatusLogic struct { @@ -14,7 +17,7 @@ type QueryQuotaTaskStatusLogic struct { svcCtx *svc.ServiceContext } -// Query quota task status +// NewQueryQuotaTaskStatusLogic Query quota task status func NewQueryQuotaTaskStatusLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryQuotaTaskStatusLogic { return &QueryQuotaTaskStatusLogic{ Logger: logger.WithContext(ctx), @@ -24,7 +27,16 @@ func NewQueryQuotaTaskStatusLogic(ctx context.Context, svcCtx *svc.ServiceContex } func (l *QueryQuotaTaskStatusLogic) QueryQuotaTaskStatus(req *types.QueryQuotaTaskStatusRequest) (resp *types.QueryQuotaTaskStatusResponse, err error) { - // todo: add your logic here and delete this line - - return + var data *task.Task + err = l.svcCtx.DB.Model(&task.Task{}).Where("id = ? AND `type` = ?", req.Id, task.TypeQuota).First(&data).Error + if err != nil { + l.Errorf("[QueryQuotaTaskStatus] failed to get quota task: %v", err.Error()) + return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), " failed to get quota task: %v", err.Error()) + } + return &types.QueryQuotaTaskStatusResponse{ + Status: uint8(data.Status), + Current: int64(data.Current), + Total: int64(data.Total), + Errors: data.Errors, + }, nil } diff --git a/internal/model/log/log.go b/internal/model/log/log.go index 810602b..af3eb98 100644 --- a/internal/model/log/log.go +++ b/internal/model/log/log.go @@ -36,6 +36,7 @@ const ( ResetSubscribeTypeAuto uint16 = 231 // Auto reset ResetSubscribeTypeAdvance uint16 = 232 // Advance reset ResetSubscribeTypePaid uint16 = 233 // Paid reset + ResetSubscribeTypeQuota uint16 = 234 // Quota reset BalanceTypeRecharge uint16 = 321 // Recharge BalanceTypeWithdraw uint16 = 322 // Withdraw BalanceTypePayment uint16 = 323 // Payment diff --git a/internal/model/task/task.go b/internal/model/task/task.go index 06f5688..5c1b987 100644 --- a/internal/model/task/task.go +++ b/internal/model/task/task.go @@ -91,10 +91,11 @@ func (c *EmailContent) Unmarshal(data []byte) error { } type QuotaScope struct { - Type int8 `gorm:"not null;comment:Scope Type"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` - Recipients []int64 `json:"recipients"` // list of user subs IDs + Subscribers []int64 `json:"subscribers"` // Subscribe IDs + IsActive *bool `json:"is_active"` // filter by active status + StartTime int64 `json:"start_time"` // filter by subscription start time + EndTime int64 `json:"end_time"` // filter by subscription end time + Objects []int64 `json:"recipients"` // list of user subs IDs } func (s *QuotaScope) Marshal() ([]byte, error) { @@ -112,18 +113,26 @@ func (s *QuotaScope) Unmarshal(data []byte) error { return json.Unmarshal(data, &aux) } -type QuotaType int8 - -const ( - QuotaTypeReset QuotaType = iota + 1 // Reset Subscribe Quota - QuotaTypeDays // Add Subscribe Days - QuotaTypeGift // Add Gift Amount -) - type QuotaContent struct { - Type int8 `json:"type"` - Days uint64 `json:"days,omitempty"` // days to add - Gift uint8 `json:"gift,omitempty"` // Invoice amount ratio(%) to gift amount + ResetTraffic bool `json:"reset_traffic"` // whether to reset traffic + Days uint64 `json:"days,omitempty"` // days to add + GiftType uint8 `json:"gift_type,omitempty"` // 1: Fixed, 2: Ratio + GiftValue uint64 `json:"gift_value,omitempty"` // value of the gift type +} + +func (c *QuotaContent) Marshal() ([]byte, error) { + type Alias QuotaContent + return json.Marshal(&struct { + *Alias + }{ + Alias: (*Alias)(c), + }) +} + +func (c *QuotaContent) Unmarshal(data []byte) error { + type Alias QuotaContent + aux := (*Alias)(c) + return json.Unmarshal(data, &aux) } func ParseScopeType(t int8) ScopeType { diff --git a/internal/model/user/model.go b/internal/model/user/model.go index 648e8cd..3c5dff9 100644 --- a/internal/model/user/model.go +++ b/internal/model/user/model.go @@ -101,7 +101,7 @@ type customUserLogicModel interface { DeleteDevice(ctx context.Context, id int64, tx ...*gorm.DB) error ClearSubscribeCache(ctx context.Context, data ...*Subscribe) error - clearUserCache(ctx context.Context, data ...*User) error + ClearUserCache(ctx context.Context, data ...*User) error QueryDailyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error) QueryMonthlyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error) diff --git a/internal/types/types.go b/internal/types/types.go index 599b38a..abe6c33 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -345,12 +345,14 @@ type CreatePaymentMethodRequest struct { } type CreateQuotaTaskRequest struct { - Scope int8 `json:"scope"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` - QuotaType uint8 `json:"quota_type"` - Days uint64 `json:"days"` // Number of days for the quota - Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift amount for quota + Subscribers []int64 `json:"subscribers"` + IsActive *bool `json:"is_active"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + ResetTraffic bool `json:"reset_traffic"` + Days uint64 `json:"days"` + GiftType uint8 `json:"gift_type"` + GiftValue uint64 `json:"gift_value"` } type CreateServerRequest struct { @@ -887,9 +889,9 @@ type GetPaymentMethodListResponse struct { } type GetPreSendEmailCountRequest struct { - Scope string `json:"scope"` - RegisterStartTime int64 `json:"register_start_time,omitempty"` - RegisterEndTime int64 `json:"register_end_time,omitempty"` + Scope int8 `json:"scope"` + RegisterStartTime int64 `json:"register_start_time,omitempty"` + RegisterEndTime int64 `json:"register_end_time,omitempty"` } type GetPreSendEmailCountResponse struct { @@ -1534,7 +1536,6 @@ type QueryPurchaseOrderResponse struct { type QueryQuotaTaskListRequest struct { Page int `form:"page"` Size int `form:"size"` - Scope *uint8 `form:"scope,omitempty"` Status *uint8 `form:"status,omitempty"` } @@ -1544,9 +1545,10 @@ type QueryQuotaTaskListResponse struct { } type QueryQuotaTaskPreCountRequest struct { - Scope uint8 `json:"scope"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` + Subscribers []int64 `json:"subscribers"` + IsActive *bool `json:"is_active"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` } type QueryQuotaTaskPreCountResponse struct { @@ -1614,20 +1616,22 @@ type QueryUserSubscribeListResponse struct { } type QuotaTask struct { - Id int64 `json:"id"` - Scope int8 `json:"scope"` - RegisterStartTime int64 `json:"register_start_time"` - RegisterEndTime int64 `json:"register_end_time"` - QuotaType uint8 `json:"quota_type"` - Days uint64 `json:"days"` // Number of days for the quota - Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift - Recipients []int64 `json:"recipients"` // UserSubscribe IDs of recipients - Status uint8 `json:"status"` - Total int64 `json:"total"` - Current int64 `json:"current"` - Errors string `json:"errors"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + Id int64 `json:"id"` + Subscribers []int64 `json:"subscribers"` + IsActive *bool `json:"is_active"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + ResetTraffic bool `json:"reset_traffic"` + Days uint64 `json:"days"` + GiftType uint8 `json:"gift_type"` + GiftValue uint64 `json:"gift_value"` + Objects []int64 `json:"objects"` // UserSubscribe IDs + Status uint8 `json:"status"` + Total int64 `json:"total"` + Current int64 `json:"current"` + Errors string `json:"errors"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` } type RechargeOrderRequest struct { diff --git a/pkg/tool/time.go b/pkg/tool/time.go index 65008f5..31c6883 100644 --- a/pkg/tool/time.go +++ b/pkg/tool/time.go @@ -144,3 +144,10 @@ func DayDiff(startTime, endTime time.Time) int64 { duration := endTime.Sub(startTime) return int64(duration.Hours() / 24) // 转换为整天数 } + +// HourDiff 计算两个时间点之间的小时差 +func HourDiff(startTime, endTime time.Time) int64 { + // 计算时间差 + duration := endTime.Sub(startTime) + return int64(duration.Hours()) // 返回小时数,可能包含小数部分 +} diff --git a/queue/handler/routes.go b/queue/handler/routes.go index b4ead3c..edf2293 100644 --- a/queue/handler/routes.go +++ b/queue/handler/routes.go @@ -7,6 +7,7 @@ import ( orderLogic "github.com/perfect-panel/server/queue/logic/order" smslogic "github.com/perfect-panel/server/queue/logic/sms" "github.com/perfect-panel/server/queue/logic/subscription" + "github.com/perfect-panel/server/queue/logic/task" "github.com/perfect-panel/server/queue/logic/traffic" "github.com/perfect-panel/server/queue/types" @@ -42,4 +43,7 @@ func RegisterHandlers(mux *asynq.ServeMux, serverCtx *svc.ServiceContext) { // ScheduledTrafficStat mux.Handle(types.SchedulerTrafficStat, traffic.NewStatLogic(serverCtx)) + + // ForthwithQuotaTask + mux.Handle(types.ForthwithQuotaTask, task.NewQuotaTaskLogic(serverCtx)) } diff --git a/queue/logic/task/quotaLogic.go b/queue/logic/task/quotaLogic.go new file mode 100644 index 0000000..0ea9297 --- /dev/null +++ b/queue/logic/task/quotaLogic.go @@ -0,0 +1,448 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/hibiken/asynq" + "github.com/perfect-panel/server/internal/model/log" + "github.com/perfect-panel/server/internal/model/order" + "github.com/perfect-panel/server/internal/model/subscribe" + "github.com/perfect-panel/server/internal/model/task" + "github.com/perfect-panel/server/internal/model/user" + "github.com/perfect-panel/server/internal/svc" + "github.com/perfect-panel/server/pkg/logger" + "github.com/perfect-panel/server/pkg/tool" + "gorm.io/gorm" +) + +const ( + UnitTimeNoLimit = "NoLimit" // Unlimited time subscription + UnitTimeYear = "Year" // Annual subscription + UnitTimeMonth = "Month" // Monthly subscription + UnitTimeDay = "Day" // Daily subscription + UnitTimeHour = "Hour" // Hourly subscription + UnitTimeMinute = "Minute" // Per-minute subscription + +) + +type QuotaTaskLogic struct { + svcCtx *svc.ServiceContext +} + +type ErrorInfo struct { + UserSubscribeId int64 `json:"user_subscribe_id"` + Error string `json:"error"` +} + +func NewQuotaTaskLogic(svcCtx *svc.ServiceContext) *QuotaTaskLogic { + return &QuotaTaskLogic{ + svcCtx: svcCtx, + } +} + +func (l *QuotaTaskLogic) ProcessTask(ctx context.Context, t *asynq.Task) error { + taskID, err := l.parseTaskID(ctx, t.Payload()) + if err != nil { + return err + } + + taskInfo, err := l.getTaskInfo(ctx, taskID) + if err != nil { + return err + } + + if taskInfo.Status != 0 { + logger.WithContext(ctx).Info("[QuotaTaskLogic.ProcessTask] task already processed", + logger.Field("taskID", taskID), + logger.Field("status", taskInfo.Status), + ) + return nil + } + + scope, content, err := l.parseTaskData(ctx, taskInfo) + if err != nil { + return err + } + + subscribes, err := l.getSubscribes(ctx, scope.Subscribers) + if err != nil { + return err + } + if err = l.processSubscribes(ctx, subscribes, content, taskInfo); err != nil { + return err + } + if content.GiftValue != 0 { + var userIds []int64 + for _, sub := range subscribes { + userIds = append(userIds, sub.UserId) + } + userIds = tool.RemoveDuplicateElements(userIds...) + var users []*user.User + if err = l.svcCtx.DB.WithContext(ctx).Model(&user.User{}).Where("id IN ?", userIds).Find(&users).Error; err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.ProcessTask] find users error", + logger.Field("error", err.Error()), + logger.Field("userIDs", userIds)) + } + err = l.svcCtx.UserModel.ClearUserCache(ctx, users...) + if err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.ProcessTask] clear user cache error", + logger.Field("error", err.Error()), + logger.Field("userIDs", userIds)) + } + } + + // 清理用户订阅缓存 + err = l.svcCtx.UserModel.ClearSubscribeCache(ctx, subscribes...) + if err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.ProcessTask] clear subscribe cache error", + logger.Field("error", err.Error())) + } + + return nil +} + +func (l *QuotaTaskLogic) parseTaskID(ctx context.Context, payload []byte) (int64, error) { + if len(payload) == 0 { + logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskID] empty payload") + return 0, asynq.SkipRetry + } + + taskID, err := strconv.ParseInt(string(payload), 10, 64) + if err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskID] invalid task ID", + logger.Field("error", err.Error()), + logger.Field("payload", string(payload)), + ) + return 0, asynq.SkipRetry + } + return taskID, nil +} + +func (l *QuotaTaskLogic) getTaskInfo(ctx context.Context, taskID int64) (*task.Task, error) { + var taskInfo *task.Task + if err := l.svcCtx.DB.WithContext(ctx).Model(&task.Task{}).Where("id = ?", taskID).First(&taskInfo).Error; err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.getTaskInfo] find task error", + logger.Field("error", err.Error()), + logger.Field("taskID", taskID), + ) + return nil, asynq.SkipRetry + } + return taskInfo, nil +} + +func (l *QuotaTaskLogic) parseTaskData(ctx context.Context, taskInfo *task.Task) (task.QuotaScope, task.QuotaContent, error) { + var scope task.QuotaScope + if err := scope.Unmarshal([]byte(taskInfo.Scope)); err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskData] unmarshal scope error", + logger.Field("error", err.Error()), + ) + return scope, task.QuotaContent{}, asynq.SkipRetry + } + + var content task.QuotaContent + if err := content.Unmarshal([]byte(taskInfo.Content)); err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskData] unmarshal content error", + logger.Field("error", err.Error()), + ) + return scope, content, asynq.SkipRetry + } + return scope, content, nil +} + +func (l *QuotaTaskLogic) getSubscribes(ctx context.Context, subscriberIDs []int64) ([]*user.Subscribe, error) { + var subscribes []*user.Subscribe + if err := l.svcCtx.DB.WithContext(ctx).Model(&user.Subscribe{}).Where("subscribe_id IN ?", subscriberIDs).Find(&subscribes).Error; err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.getSubscribes] find subscribes error", + logger.Field("error", err.Error()), + logger.Field("subscribers", subscriberIDs), + ) + return nil, asynq.SkipRetry + } + return subscribes, nil +} + +func (l *QuotaTaskLogic) processSubscribes(ctx context.Context, subscribes []*user.Subscribe, content task.QuotaContent, taskInfo *task.Task) error { + tx := l.svcCtx.DB.WithContext(ctx).Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] transaction panic", + logger.Field("panic", r), + ) + } + }() + + var errors []ErrorInfo + now := time.Now() + + for _, sub := range subscribes { + if err := l.processSubscription(tx, sub, content, now, &errors); err != nil { + tx.Rollback() + return err + } + } + + // 根据错误情况决定任务状态 + status := int8(2) // Completed + if len(errors) > 0 { + logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] some subscriptions failed", + logger.Field("total", len(subscribes)), + logger.Field("failed", len(errors)), + ) + // 如果所有订阅都失败,标记为失败状态 + if len(errors) == len(subscribes) { + status = 3 // Failed + } + errs, err := json.Marshal(errors) + if err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] marshal errors failed", + logger.Field("error", err.Error()), + ) + tx.Rollback() + return err + } + taskInfo.Errors = string(errs) + } + + taskInfo.Status = status + err := tx.Where("id = ?", taskInfo.Id).Save(taskInfo).Error + if err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] update task status error", + logger.Field("error", err.Error()), + logger.Field("taskID", taskInfo.Id), + ) + tx.Rollback() + return err + } + + if err = tx.Commit().Error; err != nil { + logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] commit transaction error", + logger.Field("error", err.Error()), + ) + return err + } + + return nil +} + +func (l *QuotaTaskLogic) processSubscription(tx *gorm.DB, sub *user.Subscribe, content task.QuotaContent, now time.Time, errors *[]ErrorInfo) error { + // 验证订阅数据 + if sub == nil { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: 0, + Error: "subscription is nil", + }) + return nil + } + + updated := false + + // 处理时间延长 - 修复逻辑:只要Days不为0就处理,不管ExpireTime是否为0 + if content.Days != 0 { + if sub.ExpireTime.Unix() == 0 || sub.ExpireTime.Before(now) { + // 如果没有过期时间或已过期,从现在开始计算 + sub.ExpireTime = now.AddDate(0, 0, int(content.Days)) + } else { + // 在原有过期时间基础上延长 + sub.ExpireTime = sub.ExpireTime.AddDate(0, 0, int(content.Days)) + } + // 如果订阅延长到未来时间,设置为激活状态 + if sub.ExpireTime.After(now) && sub.Status != 1 { + sub.Status = 1 // Active + } + updated = true + } + + // 处理流量重置 + if content.ResetTraffic { + sub.Download = 0 + sub.Upload = 0 + updated = true + if err := l.createResetTrafficLog(tx, sub.Id, sub.UserId, now); err != nil { + // 记录错误但不阻断整个任务,日志失败不影响主流程 + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: "create reset traffic log error: " + err.Error(), + }) + } + } + + // 处理赠送金 + if content.GiftValue != 0 { + if err := l.processGift(tx, sub, content, now, errors); err != nil { + return err + } + } + + // 只有在有更新时才保存订阅信息 + if updated { + if err := tx.Where("id = ?", sub.Id).Save(sub).Error; err != nil { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: "update subscription error: " + err.Error(), + }) + return nil + } + } + + return nil +} + +func (l *QuotaTaskLogic) processGift(tx *gorm.DB, sub *user.Subscribe, content task.QuotaContent, now time.Time, errors *[]ErrorInfo) error { + // 验证赠送类型 + if content.GiftType != 1 && content.GiftType != 2 { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: fmt.Sprintf("invalid gift type: %d", content.GiftType), + }) + return nil + } + + var userInfo user.User + if err := tx.Model(&user.User{}).Where("id = ?", sub.UserId).First(&userInfo).Error; err != nil { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: "find user error: " + err.Error(), + }) + return nil + } + + var giftAmount int64 + switch content.GiftType { + case 1: + giftAmount = int64(content.GiftValue) + case 2: + orderAmount, err := l.calculateOrderAmount(tx, sub, now) + if err != nil { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: err.Error(), + }) + return nil + } + if orderAmount > 0 { + giftAmount = int64(float64(orderAmount) * (float64(content.GiftValue) / 100)) + } + } + + if giftAmount > 0 { + userInfo.GiftAmount += giftAmount + // 使用Update而不是Save,更精确地更新单个字段 + if err := tx.Model(&user.User{}).Where("id = ?", sub.UserId).Update("gift_amount", userInfo.GiftAmount).Error; err != nil { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: "update user gift amount error: " + err.Error(), + }) + return nil + } + + if err := l.createGiftLog(tx, sub.Id, userInfo.Id, giftAmount, userInfo.GiftAmount, now); err != nil { + *errors = append(*errors, ErrorInfo{ + UserSubscribeId: sub.Id, + Error: "create gift log error: " + err.Error(), + }) + // 回滚用户金额更新 + userInfo.GiftAmount -= giftAmount + tx.Model(&user.User{}).Where("id = ?", sub.UserId).Update("gift_amount", userInfo.GiftAmount) + return nil + } + } + + return nil +} + +func (l *QuotaTaskLogic) getStartTime(sub *user.Subscribe, now time.Time) time.Time { + if sub.StartTime.Unix() == 0 { + return now + } + return sub.StartTime +} + +func (l *QuotaTaskLogic) calculateOrderAmount(tx *gorm.DB, sub *user.Subscribe, now time.Time) (int64, error) { + if sub.OrderId != 0 { + var orderInfo *order.Order + if err := tx.Model(&order.Order{}).Where("id = ?", sub.OrderId).First(&orderInfo).Error; err != nil { + return 0, fmt.Errorf("find order error: %v", err) + } + return orderInfo.Amount + orderInfo.GiftAmount, nil + } + + var subInfo *subscribe.Subscribe + if err := tx.Model(&subscribe.Subscribe{}).Where("id = ?", sub.SubscribeId).First(&subInfo).Error; err != nil { + return 0, fmt.Errorf("find subscribe error: %v", err) + } + + startTime := l.getStartTime(sub, now) + if sub.ExpireTime.Before(startTime) { + return subInfo.UnitPrice, nil + } + + switch subInfo.UnitTime { + case UnitTimeNoLimit: + return subInfo.UnitPrice, nil + case UnitTimeYear: + days := tool.DayDiff(startTime, sub.ExpireTime) + return subInfo.UnitPrice / 365 * days, nil + case UnitTimeMonth: + days := tool.DayDiff(startTime, sub.ExpireTime) + return subInfo.UnitPrice / 30 * days, nil + case UnitTimeDay: + days := tool.DayDiff(startTime, sub.ExpireTime) + return subInfo.UnitPrice * days, nil + case UnitTimeHour: + hours := int(tool.HourDiff(startTime, sub.ExpireTime)) + return subInfo.UnitPrice * int64(hours), nil + case UnitTimeMinute: + minutes := tool.HourDiff(startTime, sub.ExpireTime) * 60 + return subInfo.UnitPrice * minutes, nil + default: + return subInfo.UnitPrice, nil + } +} + +func (l *QuotaTaskLogic) createGiftLog(tx *gorm.DB, subscribeId, userId, amount, balance int64, now time.Time) error { + giftLog := &log.Gift{ + Type: log.GiftTypeIncrease, + OrderNo: "", + SubscribeId: subscribeId, + Amount: amount, + Balance: balance, + Remark: "Quota task gift", + Timestamp: now.UnixMilli(), + } + + logString, err := giftLog.Marshal() + if err != nil { + return fmt.Errorf("marshal gift log error: %v", err) + } + return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{ + Type: log.TypeGift.Uint8(), + Content: string(logString), + ObjectID: userId, + Date: now.Format(time.DateOnly), + }).Error +} + +func (l *QuotaTaskLogic) createResetTrafficLog(tx *gorm.DB, subscribeId, userId int64, now time.Time) error { + trafficLog := &log.ResetSubscribe{ + Type: log.ResetSubscribeTypeQuota, + UserId: userId, + OrderNo: "", + Timestamp: now.UnixMilli(), + } + + logString, err := trafficLog.Marshal() + if err != nil { + return fmt.Errorf("marshal traffic log error: %v", err) + } + return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{ + Type: log.TypeResetSubscribe.Uint8(), + Content: string(logString), + ObjectID: subscribeId, + Date: now.Format(time.DateOnly), + }).Error +} diff --git a/queue/types/email.go b/queue/types/email.go index 45bc1b6..4fee979 100644 --- a/queue/types/email.go +++ b/queue/types/email.go @@ -3,8 +3,6 @@ package types const ( // ForthwithSendEmail forthwith send email ForthwithSendEmail = "forthwith:email:send" - // ScheduledBatchSendEmail scheduled batch send email - ScheduledBatchSendEmail = "scheduled:email:batch" ) const ( diff --git a/queue/types/task.go b/queue/types/task.go new file mode 100644 index 0000000..0115f28 --- /dev/null +++ b/queue/types/task.go @@ -0,0 +1,9 @@ +package types + +const ( + // ScheduledBatchSendEmail scheduled batch send email + ScheduledBatchSendEmail = "scheduled:email:batch" + + // ForthwithQuotaTask create quota task immediately + ForthwithQuotaTask = "forthwith:quota:task" +)