feat(quota): enhance quota task management with new request structures and processing logic

This commit is contained in:
Chang lue Tsen 2025-09-10 14:53:48 -04:00
parent 83c1c14b01
commit 3f5aac239b
15 changed files with 740 additions and 95 deletions

View File

@ -53,7 +53,7 @@ type (
Id int64 `json:"id"`
}
GetPreSendEmailCountRequest {
Scope string `json:"scope"`
Scope int8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time,omitempty"`
RegisterEndTime int64 `json:"register_end_time,omitempty"`
}
@ -70,22 +70,26 @@ type (
Errors string `json:"errors"`
}
CreateQuotaTaskRequest {
Scope int8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
QuotaType uint8 `json:"quota_type"`
Days uint64 `json:"days"` // Number of days for the quota
Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift amount for quota
Subscribers []int64 `json:"subscribers"`
IsActive *bool `json:"is_active"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
ResetTraffic bool `json:"reset_traffic"`
Days uint64 `json:"days"`
GiftType uint8 `json:"gift_type"`
GiftValue uint64 `json:"gift_value"`
}
QuotaTask {
Id int64 `json:"id"`
Scope int8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
QuotaType uint8 `json:"quota_type"`
Days uint64 `json:"days"` // Number of days for the quota
Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift
Recipients []int64 `json:"recipients"` // UserSubscribe IDs of recipients
Subscribers []int64 `json:"subscribers"`
IsActive *bool `json:"is_active"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
ResetTraffic bool `json:"reset_traffic"`
Days uint64 `json:"days"`
GiftType uint8 `json:"gift_type"`
GiftValue uint64 `json:"gift_value"`
Objects []int64 `json:"objects"` // UserSubscribe IDs
Status uint8 `json:"status"`
Total int64 `json:"total"`
Current int64 `json:"current"`
@ -94,9 +98,10 @@ type (
UpdatedAt int64 `json:"updated_at"`
}
QueryQuotaTaskPreCountRequest {
Scope uint8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
Subscribers []int64 `json:"subscribers"`
IsActive *bool `json:"is_active"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
}
QueryQuotaTaskPreCountResponse {
Count int64 `json:"count"`
@ -104,7 +109,6 @@ type (
QueryQuotaTaskListRequest {
Page int `form:"page"`
Size int `form:"size"`
Scope *uint8 `form:"scope,omitempty"`
Status *uint8 `form:"status,omitempty"`
}
QueryQuotaTaskListResponse {
@ -159,9 +163,5 @@ service ppanel {
@doc "Query quota task list"
@handler QueryQuotaTaskList
get /quota/list (QueryQuotaTaskListRequest) returns (QueryQuotaTaskListResponse)
@doc "Query quota task status"
@handler QueryQuotaTaskStatus
post /quota/status (QueryQuotaTaskStatusRequest) returns (QueryQuotaTaskStatusResponse)
}

View File

@ -2,10 +2,18 @@ package marketing
import (
"context"
"strconv"
"time"
"github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/model/task"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
"github.com/perfect-panel/server/pkg/logger"
"github.com/perfect-panel/server/pkg/xerr"
queueType "github.com/perfect-panel/server/queue/types"
"github.com/pkg/errors"
)
type CreateQuotaTaskLogic struct {
@ -14,7 +22,7 @@ type CreateQuotaTaskLogic struct {
svcCtx *svc.ServiceContext
}
// Create a quota task
// NewCreateQuotaTaskLogic Create a quota task
func NewCreateQuotaTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateQuotaTaskLogic {
return &CreateQuotaTaskLogic{
Logger: logger.WithContext(ctx),
@ -24,7 +32,72 @@ func NewCreateQuotaTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C
}
func (l *CreateQuotaTaskLogic) CreateQuotaTask(req *types.CreateQuotaTaskRequest) error {
// todo: add your logic here and delete this line
var subs []*user.Subscribe
query := l.svcCtx.DB.WithContext(l.ctx).Model(&user.Subscribe{})
if len(req.Subscribers) > 0 {
query = query.Where("`subscribe_id` IN ?", req.Subscribers)
}
if req.IsActive != nil && *req.IsActive {
query = query.Where("`status` IN ?", []int64{0, 1, 2}) // 0: Pending 1: Active 2: Finished
}
if req.StartTime != 0 {
start := time.UnixMilli(req.StartTime)
query = query.Where("`start_time` >= ?", start)
}
if req.EndTime != 0 {
end := time.UnixMilli(req.EndTime)
query = query.Where("`start_time` <= ?", end)
}
if err := query.Find(&subs).Error; err != nil {
l.Errorf("[CreateQuotaTask] find subscribers error: %v", err.Error())
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "find subscribers error")
}
if len(subs) == 0 {
return errors.Wrapf(xerr.NewErrMsg("No subscribers found"), "no subscribers found")
}
var subIds []int64
for _, sub := range subs {
subIds = append(subIds, sub.Id)
}
scopeInfo := task.QuotaScope{
Subscribers: req.Subscribers,
IsActive: req.IsActive,
StartTime: req.StartTime,
EndTime: req.EndTime,
}
scopeBytes, _ := scopeInfo.Marshal()
contentInfo := task.QuotaContent{
ResetTraffic: req.ResetTraffic,
Days: req.Days,
GiftType: req.GiftType,
GiftValue: req.GiftValue,
}
contentBytes, _ := contentInfo.Marshal()
// create task
newTask := &task.Task{
Type: task.TypeQuota,
Status: 0,
Scope: string(scopeBytes),
Content: string(contentBytes),
Total: uint64(len(subIds)),
Current: 0,
Errors: "",
}
if err := l.svcCtx.DB.WithContext(l.ctx).Model(&task.Task{}).Create(newTask).Error; err != nil {
l.Errorf("[CreateQuotaTask] create task error: %v", err.Error())
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "create task error")
}
// enqueue task
queueTask := asynq.NewTask(queueType.ForthwithQuotaTask, []byte(strconv.FormatInt(newTask.Id, 10)))
if _, err := l.svcCtx.Queue.EnqueueContext(l.ctx, queueTask); err != nil {
l.Errorf("[CreateQuotaTask] enqueue task error: %v", err.Error())
return errors.Wrapf(xerr.NewErrCode(xerr.QueueEnqueueError), "enqueue task error")
}
logger.Infof("[CreateQuotaTask] Successfully created task with ID: %d", newTask.Id)
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/perfect-panel/server/internal/model/task"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
@ -50,28 +51,29 @@ func (l *GetPreSendEmailCountLogic) GetPreSendEmailCount(req *types.GetPreSendEm
return query
}
var query *gorm.DB
switch req.Scope {
case "all":
scope := task.ParseScopeType(req.Scope)
switch scope {
case task.ScopeAll:
query = baseQuery()
case "active":
case task.ScopeActive:
query = baseQuery().
Joins("JOIN user_subscribe ON user.id = user_subscribe.user_id").
Where("user_subscribe.status IN ?", []int64{1, 2})
case "expired":
case task.ScopeExpired:
query = baseQuery().
Joins("JOIN user_subscribe ON user.id = user_subscribe.user_id").
Where("user_subscribe.status = ?", 3)
case "none":
case task.ScopeNone:
query = baseQuery().
Joins("LEFT JOIN user_subscribe ON user.id = user_subscribe.user_id").
Where("user_subscribe.user_id IS NULL")
case "skip":
case task.ScopeSkip:
// Skip scope does not require a count
query = nil
default:
l.Errorf("[CreateBatchSendEmailTask] Invalid scope: %v", req.Scope)
return nil, xerr.NewErrMsg("Invalid email scope")

View File

@ -3,6 +3,7 @@ package marketing
import (
"context"
"github.com/perfect-panel/server/internal/model/task"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
"github.com/perfect-panel/server/pkg/logger"
@ -14,7 +15,7 @@ type QueryQuotaTaskListLogic struct {
svcCtx *svc.ServiceContext
}
// Query quota task list
// NewQueryQuotaTaskListLogic Query quota task list
func NewQueryQuotaTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryQuotaTaskListLogic {
return &QueryQuotaTaskListLogic{
Logger: logger.WithContext(ctx),
@ -24,7 +25,59 @@ func NewQueryQuotaTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext)
}
func (l *QueryQuotaTaskListLogic) QueryQuotaTaskList(req *types.QueryQuotaTaskListRequest) (resp *types.QueryQuotaTaskListResponse, err error) {
// todo: add your logic here and delete this line
return
var data []*task.Task
var count int64
query := l.svcCtx.DB.Model(&task.Task{}).Where("`type` = ?", task.TypeQuota)
if req.Page == 0 {
req.Page = 1
}
if req.Size == 0 {
req.Size = 20
}
if req.Status != nil {
query = query.Where("`status` = ?", *req.Status)
}
err = query.Count(&count).Offset((req.Page - 1) * req.Size).Limit(req.Size).Order("created_at DESC").Find(&data).Error
if err != nil {
l.Errorf("[QueryQuotaTaskList] failed to get quota tasks: %v", err)
return nil, err
}
var list []types.QuotaTask
for _, item := range data {
var scopeInfo task.QuotaScope
if err = scopeInfo.Unmarshal([]byte(item.Scope)); err != nil {
l.Errorf("[QueryQuotaTaskList] failed to unmarshal quota task scope: %v", err.Error())
continue
}
var contentInfo task.QuotaContent
if err = contentInfo.Unmarshal([]byte(item.Content)); err != nil {
l.Errorf("[QueryQuotaTaskList] failed to unmarshal quota task content: %v", err.Error())
continue
}
list = append(list, types.QuotaTask{
Id: item.Id,
Subscribers: scopeInfo.Subscribers,
IsActive: scopeInfo.IsActive,
StartTime: scopeInfo.StartTime,
EndTime: scopeInfo.EndTime,
ResetTraffic: contentInfo.ResetTraffic,
Days: contentInfo.Days,
GiftType: contentInfo.GiftType,
GiftValue: contentInfo.GiftValue,
Objects: scopeInfo.Objects,
Status: uint8(item.Status),
Total: int64(item.Total),
Current: int64(item.Current),
Errors: item.Errors,
CreatedAt: item.CreatedAt.UnixMilli(),
UpdatedAt: item.UpdatedAt.UnixMilli(),
})
}
return &types.QueryQuotaTaskListResponse{
Total: count,
List: list,
}, nil
}

View File

@ -2,7 +2,9 @@ package marketing
import (
"context"
"time"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
"github.com/perfect-panel/server/pkg/logger"
@ -14,7 +16,7 @@ type QueryQuotaTaskPreCountLogic struct {
svcCtx *svc.ServiceContext
}
// Query quota task pre-count
// NewQueryQuotaTaskPreCountLogic Query quota task pre-count
func NewQueryQuotaTaskPreCountLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryQuotaTaskPreCountLogic {
return &QueryQuotaTaskPreCountLogic{
Logger: logger.WithContext(ctx),
@ -24,7 +26,30 @@ func NewQueryQuotaTaskPreCountLogic(ctx context.Context, svcCtx *svc.ServiceCont
}
func (l *QueryQuotaTaskPreCountLogic) QueryQuotaTaskPreCount(req *types.QueryQuotaTaskPreCountRequest) (resp *types.QueryQuotaTaskPreCountResponse, err error) {
// todo: add your logic here and delete this line
tx := l.svcCtx.DB.WithContext(l.ctx).Model(&user.Subscribe{})
var count int64
return
if len(req.Subscribers) > 0 {
tx = tx.Where("`subscribe_id` IN ?", req.Subscribers)
}
if req.IsActive != nil && *req.IsActive {
tx = tx.Where("`status` IN ?", []int64{0, 1, 2}) // 0: Pending 1: Active 2: Finished
}
if req.StartTime != 0 {
start := time.UnixMilli(req.StartTime)
tx = tx.Where("`start_time` >= ?", start)
}
if req.EndTime != 0 {
end := time.UnixMilli(req.EndTime)
tx = tx.Where("`start_time` <= ?", end)
}
if err = tx.Count(&count).Error; err != nil {
l.Errorf("[QueryQuotaTaskPreCount] count error: %v", err.Error())
return nil, err
}
return &types.QueryQuotaTaskPreCountResponse{
Count: count,
}, nil
}

View File

@ -3,9 +3,12 @@ package marketing
import (
"context"
"github.com/perfect-panel/server/internal/model/task"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
"github.com/perfect-panel/server/pkg/logger"
"github.com/perfect-panel/server/pkg/xerr"
"github.com/pkg/errors"
)
type QueryQuotaTaskStatusLogic struct {
@ -14,7 +17,7 @@ type QueryQuotaTaskStatusLogic struct {
svcCtx *svc.ServiceContext
}
// Query quota task status
// NewQueryQuotaTaskStatusLogic Query quota task status
func NewQueryQuotaTaskStatusLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryQuotaTaskStatusLogic {
return &QueryQuotaTaskStatusLogic{
Logger: logger.WithContext(ctx),
@ -24,7 +27,16 @@ func NewQueryQuotaTaskStatusLogic(ctx context.Context, svcCtx *svc.ServiceContex
}
func (l *QueryQuotaTaskStatusLogic) QueryQuotaTaskStatus(req *types.QueryQuotaTaskStatusRequest) (resp *types.QueryQuotaTaskStatusResponse, err error) {
// todo: add your logic here and delete this line
return
var data *task.Task
err = l.svcCtx.DB.Model(&task.Task{}).Where("id = ? AND `type` = ?", req.Id, task.TypeQuota).First(&data).Error
if err != nil {
l.Errorf("[QueryQuotaTaskStatus] failed to get quota task: %v", err.Error())
return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), " failed to get quota task: %v", err.Error())
}
return &types.QueryQuotaTaskStatusResponse{
Status: uint8(data.Status),
Current: int64(data.Current),
Total: int64(data.Total),
Errors: data.Errors,
}, nil
}

View File

@ -36,6 +36,7 @@ const (
ResetSubscribeTypeAuto uint16 = 231 // Auto reset
ResetSubscribeTypeAdvance uint16 = 232 // Advance reset
ResetSubscribeTypePaid uint16 = 233 // Paid reset
ResetSubscribeTypeQuota uint16 = 234 // Quota reset
BalanceTypeRecharge uint16 = 321 // Recharge
BalanceTypeWithdraw uint16 = 322 // Withdraw
BalanceTypePayment uint16 = 323 // Payment

View File

@ -91,10 +91,11 @@ func (c *EmailContent) Unmarshal(data []byte) error {
}
type QuotaScope struct {
Type int8 `gorm:"not null;comment:Scope Type"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
Recipients []int64 `json:"recipients"` // list of user subs IDs
Subscribers []int64 `json:"subscribers"` // Subscribe IDs
IsActive *bool `json:"is_active"` // filter by active status
StartTime int64 `json:"start_time"` // filter by subscription start time
EndTime int64 `json:"end_time"` // filter by subscription end time
Objects []int64 `json:"recipients"` // list of user subs IDs
}
func (s *QuotaScope) Marshal() ([]byte, error) {
@ -112,18 +113,26 @@ func (s *QuotaScope) Unmarshal(data []byte) error {
return json.Unmarshal(data, &aux)
}
type QuotaType int8
const (
QuotaTypeReset QuotaType = iota + 1 // Reset Subscribe Quota
QuotaTypeDays // Add Subscribe Days
QuotaTypeGift // Add Gift Amount
)
type QuotaContent struct {
Type int8 `json:"type"`
ResetTraffic bool `json:"reset_traffic"` // whether to reset traffic
Days uint64 `json:"days,omitempty"` // days to add
Gift uint8 `json:"gift,omitempty"` // Invoice amount ratio(%) to gift amount
GiftType uint8 `json:"gift_type,omitempty"` // 1: Fixed, 2: Ratio
GiftValue uint64 `json:"gift_value,omitempty"` // value of the gift type
}
func (c *QuotaContent) Marshal() ([]byte, error) {
type Alias QuotaContent
return json.Marshal(&struct {
*Alias
}{
Alias: (*Alias)(c),
})
}
func (c *QuotaContent) Unmarshal(data []byte) error {
type Alias QuotaContent
aux := (*Alias)(c)
return json.Unmarshal(data, &aux)
}
func ParseScopeType(t int8) ScopeType {

View File

@ -101,7 +101,7 @@ type customUserLogicModel interface {
DeleteDevice(ctx context.Context, id int64, tx ...*gorm.DB) error
ClearSubscribeCache(ctx context.Context, data ...*Subscribe) error
clearUserCache(ctx context.Context, data ...*User) error
ClearUserCache(ctx context.Context, data ...*User) error
QueryDailyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error)
QueryMonthlyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error)

View File

@ -345,12 +345,14 @@ type CreatePaymentMethodRequest struct {
}
type CreateQuotaTaskRequest struct {
Scope int8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
QuotaType uint8 `json:"quota_type"`
Days uint64 `json:"days"` // Number of days for the quota
Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift amount for quota
Subscribers []int64 `json:"subscribers"`
IsActive *bool `json:"is_active"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
ResetTraffic bool `json:"reset_traffic"`
Days uint64 `json:"days"`
GiftType uint8 `json:"gift_type"`
GiftValue uint64 `json:"gift_value"`
}
type CreateServerRequest struct {
@ -887,7 +889,7 @@ type GetPaymentMethodListResponse struct {
}
type GetPreSendEmailCountRequest struct {
Scope string `json:"scope"`
Scope int8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time,omitempty"`
RegisterEndTime int64 `json:"register_end_time,omitempty"`
}
@ -1534,7 +1536,6 @@ type QueryPurchaseOrderResponse struct {
type QueryQuotaTaskListRequest struct {
Page int `form:"page"`
Size int `form:"size"`
Scope *uint8 `form:"scope,omitempty"`
Status *uint8 `form:"status,omitempty"`
}
@ -1544,9 +1545,10 @@ type QueryQuotaTaskListResponse struct {
}
type QueryQuotaTaskPreCountRequest struct {
Scope uint8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
Subscribers []int64 `json:"subscribers"`
IsActive *bool `json:"is_active"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
}
type QueryQuotaTaskPreCountResponse struct {
@ -1615,13 +1617,15 @@ type QueryUserSubscribeListResponse struct {
type QuotaTask struct {
Id int64 `json:"id"`
Scope int8 `json:"scope"`
RegisterStartTime int64 `json:"register_start_time"`
RegisterEndTime int64 `json:"register_end_time"`
QuotaType uint8 `json:"quota_type"`
Days uint64 `json:"days"` // Number of days for the quota
Gift uint8 `json:"gift"` // Invoice amount ratio(%) to gift
Recipients []int64 `json:"recipients"` // UserSubscribe IDs of recipients
Subscribers []int64 `json:"subscribers"`
IsActive *bool `json:"is_active"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
ResetTraffic bool `json:"reset_traffic"`
Days uint64 `json:"days"`
GiftType uint8 `json:"gift_type"`
GiftValue uint64 `json:"gift_value"`
Objects []int64 `json:"objects"` // UserSubscribe IDs
Status uint8 `json:"status"`
Total int64 `json:"total"`
Current int64 `json:"current"`

View File

@ -144,3 +144,10 @@ func DayDiff(startTime, endTime time.Time) int64 {
duration := endTime.Sub(startTime)
return int64(duration.Hours() / 24) // 转换为整天数
}
// HourDiff 计算两个时间点之间的小时差
func HourDiff(startTime, endTime time.Time) int64 {
// 计算时间差
duration := endTime.Sub(startTime)
return int64(duration.Hours()) // 返回小时数,可能包含小数部分
}

View File

@ -7,6 +7,7 @@ import (
orderLogic "github.com/perfect-panel/server/queue/logic/order"
smslogic "github.com/perfect-panel/server/queue/logic/sms"
"github.com/perfect-panel/server/queue/logic/subscription"
"github.com/perfect-panel/server/queue/logic/task"
"github.com/perfect-panel/server/queue/logic/traffic"
"github.com/perfect-panel/server/queue/types"
@ -42,4 +43,7 @@ func RegisterHandlers(mux *asynq.ServeMux, serverCtx *svc.ServiceContext) {
// ScheduledTrafficStat
mux.Handle(types.SchedulerTrafficStat, traffic.NewStatLogic(serverCtx))
// ForthwithQuotaTask
mux.Handle(types.ForthwithQuotaTask, task.NewQuotaTaskLogic(serverCtx))
}

View File

@ -0,0 +1,448 @@
package task
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/model/log"
"github.com/perfect-panel/server/internal/model/order"
"github.com/perfect-panel/server/internal/model/subscribe"
"github.com/perfect-panel/server/internal/model/task"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/pkg/logger"
"github.com/perfect-panel/server/pkg/tool"
"gorm.io/gorm"
)
const (
UnitTimeNoLimit = "NoLimit" // Unlimited time subscription
UnitTimeYear = "Year" // Annual subscription
UnitTimeMonth = "Month" // Monthly subscription
UnitTimeDay = "Day" // Daily subscription
UnitTimeHour = "Hour" // Hourly subscription
UnitTimeMinute = "Minute" // Per-minute subscription
)
type QuotaTaskLogic struct {
svcCtx *svc.ServiceContext
}
type ErrorInfo struct {
UserSubscribeId int64 `json:"user_subscribe_id"`
Error string `json:"error"`
}
func NewQuotaTaskLogic(svcCtx *svc.ServiceContext) *QuotaTaskLogic {
return &QuotaTaskLogic{
svcCtx: svcCtx,
}
}
func (l *QuotaTaskLogic) ProcessTask(ctx context.Context, t *asynq.Task) error {
taskID, err := l.parseTaskID(ctx, t.Payload())
if err != nil {
return err
}
taskInfo, err := l.getTaskInfo(ctx, taskID)
if err != nil {
return err
}
if taskInfo.Status != 0 {
logger.WithContext(ctx).Info("[QuotaTaskLogic.ProcessTask] task already processed",
logger.Field("taskID", taskID),
logger.Field("status", taskInfo.Status),
)
return nil
}
scope, content, err := l.parseTaskData(ctx, taskInfo)
if err != nil {
return err
}
subscribes, err := l.getSubscribes(ctx, scope.Subscribers)
if err != nil {
return err
}
if err = l.processSubscribes(ctx, subscribes, content, taskInfo); err != nil {
return err
}
if content.GiftValue != 0 {
var userIds []int64
for _, sub := range subscribes {
userIds = append(userIds, sub.UserId)
}
userIds = tool.RemoveDuplicateElements(userIds...)
var users []*user.User
if err = l.svcCtx.DB.WithContext(ctx).Model(&user.User{}).Where("id IN ?", userIds).Find(&users).Error; err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.ProcessTask] find users error",
logger.Field("error", err.Error()),
logger.Field("userIDs", userIds))
}
err = l.svcCtx.UserModel.ClearUserCache(ctx, users...)
if err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.ProcessTask] clear user cache error",
logger.Field("error", err.Error()),
logger.Field("userIDs", userIds))
}
}
// 清理用户订阅缓存
err = l.svcCtx.UserModel.ClearSubscribeCache(ctx, subscribes...)
if err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.ProcessTask] clear subscribe cache error",
logger.Field("error", err.Error()))
}
return nil
}
func (l *QuotaTaskLogic) parseTaskID(ctx context.Context, payload []byte) (int64, error) {
if len(payload) == 0 {
logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskID] empty payload")
return 0, asynq.SkipRetry
}
taskID, err := strconv.ParseInt(string(payload), 10, 64)
if err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskID] invalid task ID",
logger.Field("error", err.Error()),
logger.Field("payload", string(payload)),
)
return 0, asynq.SkipRetry
}
return taskID, nil
}
func (l *QuotaTaskLogic) getTaskInfo(ctx context.Context, taskID int64) (*task.Task, error) {
var taskInfo *task.Task
if err := l.svcCtx.DB.WithContext(ctx).Model(&task.Task{}).Where("id = ?", taskID).First(&taskInfo).Error; err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.getTaskInfo] find task error",
logger.Field("error", err.Error()),
logger.Field("taskID", taskID),
)
return nil, asynq.SkipRetry
}
return taskInfo, nil
}
func (l *QuotaTaskLogic) parseTaskData(ctx context.Context, taskInfo *task.Task) (task.QuotaScope, task.QuotaContent, error) {
var scope task.QuotaScope
if err := scope.Unmarshal([]byte(taskInfo.Scope)); err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskData] unmarshal scope error",
logger.Field("error", err.Error()),
)
return scope, task.QuotaContent{}, asynq.SkipRetry
}
var content task.QuotaContent
if err := content.Unmarshal([]byte(taskInfo.Content)); err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.parseTaskData] unmarshal content error",
logger.Field("error", err.Error()),
)
return scope, content, asynq.SkipRetry
}
return scope, content, nil
}
func (l *QuotaTaskLogic) getSubscribes(ctx context.Context, subscriberIDs []int64) ([]*user.Subscribe, error) {
var subscribes []*user.Subscribe
if err := l.svcCtx.DB.WithContext(ctx).Model(&user.Subscribe{}).Where("subscribe_id IN ?", subscriberIDs).Find(&subscribes).Error; err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.getSubscribes] find subscribes error",
logger.Field("error", err.Error()),
logger.Field("subscribers", subscriberIDs),
)
return nil, asynq.SkipRetry
}
return subscribes, nil
}
func (l *QuotaTaskLogic) processSubscribes(ctx context.Context, subscribes []*user.Subscribe, content task.QuotaContent, taskInfo *task.Task) error {
tx := l.svcCtx.DB.WithContext(ctx).Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] transaction panic",
logger.Field("panic", r),
)
}
}()
var errors []ErrorInfo
now := time.Now()
for _, sub := range subscribes {
if err := l.processSubscription(tx, sub, content, now, &errors); err != nil {
tx.Rollback()
return err
}
}
// 根据错误情况决定任务状态
status := int8(2) // Completed
if len(errors) > 0 {
logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] some subscriptions failed",
logger.Field("total", len(subscribes)),
logger.Field("failed", len(errors)),
)
// 如果所有订阅都失败,标记为失败状态
if len(errors) == len(subscribes) {
status = 3 // Failed
}
errs, err := json.Marshal(errors)
if err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] marshal errors failed",
logger.Field("error", err.Error()),
)
tx.Rollback()
return err
}
taskInfo.Errors = string(errs)
}
taskInfo.Status = status
err := tx.Where("id = ?", taskInfo.Id).Save(taskInfo).Error
if err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] update task status error",
logger.Field("error", err.Error()),
logger.Field("taskID", taskInfo.Id),
)
tx.Rollback()
return err
}
if err = tx.Commit().Error; err != nil {
logger.WithContext(ctx).Error("[QuotaTaskLogic.processSubscribes] commit transaction error",
logger.Field("error", err.Error()),
)
return err
}
return nil
}
func (l *QuotaTaskLogic) processSubscription(tx *gorm.DB, sub *user.Subscribe, content task.QuotaContent, now time.Time, errors *[]ErrorInfo) error {
// 验证订阅数据
if sub == nil {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: 0,
Error: "subscription is nil",
})
return nil
}
updated := false
// 处理时间延长 - 修复逻辑只要Days不为0就处理不管ExpireTime是否为0
if content.Days != 0 {
if sub.ExpireTime.Unix() == 0 || sub.ExpireTime.Before(now) {
// 如果没有过期时间或已过期,从现在开始计算
sub.ExpireTime = now.AddDate(0, 0, int(content.Days))
} else {
// 在原有过期时间基础上延长
sub.ExpireTime = sub.ExpireTime.AddDate(0, 0, int(content.Days))
}
// 如果订阅延长到未来时间,设置为激活状态
if sub.ExpireTime.After(now) && sub.Status != 1 {
sub.Status = 1 // Active
}
updated = true
}
// 处理流量重置
if content.ResetTraffic {
sub.Download = 0
sub.Upload = 0
updated = true
if err := l.createResetTrafficLog(tx, sub.Id, sub.UserId, now); err != nil {
// 记录错误但不阻断整个任务,日志失败不影响主流程
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: "create reset traffic log error: " + err.Error(),
})
}
}
// 处理赠送金
if content.GiftValue != 0 {
if err := l.processGift(tx, sub, content, now, errors); err != nil {
return err
}
}
// 只有在有更新时才保存订阅信息
if updated {
if err := tx.Where("id = ?", sub.Id).Save(sub).Error; err != nil {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: "update subscription error: " + err.Error(),
})
return nil
}
}
return nil
}
func (l *QuotaTaskLogic) processGift(tx *gorm.DB, sub *user.Subscribe, content task.QuotaContent, now time.Time, errors *[]ErrorInfo) error {
// 验证赠送类型
if content.GiftType != 1 && content.GiftType != 2 {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: fmt.Sprintf("invalid gift type: %d", content.GiftType),
})
return nil
}
var userInfo user.User
if err := tx.Model(&user.User{}).Where("id = ?", sub.UserId).First(&userInfo).Error; err != nil {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: "find user error: " + err.Error(),
})
return nil
}
var giftAmount int64
switch content.GiftType {
case 1:
giftAmount = int64(content.GiftValue)
case 2:
orderAmount, err := l.calculateOrderAmount(tx, sub, now)
if err != nil {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: err.Error(),
})
return nil
}
if orderAmount > 0 {
giftAmount = int64(float64(orderAmount) * (float64(content.GiftValue) / 100))
}
}
if giftAmount > 0 {
userInfo.GiftAmount += giftAmount
// 使用Update而不是Save更精确地更新单个字段
if err := tx.Model(&user.User{}).Where("id = ?", sub.UserId).Update("gift_amount", userInfo.GiftAmount).Error; err != nil {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: "update user gift amount error: " + err.Error(),
})
return nil
}
if err := l.createGiftLog(tx, sub.Id, userInfo.Id, giftAmount, userInfo.GiftAmount, now); err != nil {
*errors = append(*errors, ErrorInfo{
UserSubscribeId: sub.Id,
Error: "create gift log error: " + err.Error(),
})
// 回滚用户金额更新
userInfo.GiftAmount -= giftAmount
tx.Model(&user.User{}).Where("id = ?", sub.UserId).Update("gift_amount", userInfo.GiftAmount)
return nil
}
}
return nil
}
func (l *QuotaTaskLogic) getStartTime(sub *user.Subscribe, now time.Time) time.Time {
if sub.StartTime.Unix() == 0 {
return now
}
return sub.StartTime
}
func (l *QuotaTaskLogic) calculateOrderAmount(tx *gorm.DB, sub *user.Subscribe, now time.Time) (int64, error) {
if sub.OrderId != 0 {
var orderInfo *order.Order
if err := tx.Model(&order.Order{}).Where("id = ?", sub.OrderId).First(&orderInfo).Error; err != nil {
return 0, fmt.Errorf("find order error: %v", err)
}
return orderInfo.Amount + orderInfo.GiftAmount, nil
}
var subInfo *subscribe.Subscribe
if err := tx.Model(&subscribe.Subscribe{}).Where("id = ?", sub.SubscribeId).First(&subInfo).Error; err != nil {
return 0, fmt.Errorf("find subscribe error: %v", err)
}
startTime := l.getStartTime(sub, now)
if sub.ExpireTime.Before(startTime) {
return subInfo.UnitPrice, nil
}
switch subInfo.UnitTime {
case UnitTimeNoLimit:
return subInfo.UnitPrice, nil
case UnitTimeYear:
days := tool.DayDiff(startTime, sub.ExpireTime)
return subInfo.UnitPrice / 365 * days, nil
case UnitTimeMonth:
days := tool.DayDiff(startTime, sub.ExpireTime)
return subInfo.UnitPrice / 30 * days, nil
case UnitTimeDay:
days := tool.DayDiff(startTime, sub.ExpireTime)
return subInfo.UnitPrice * days, nil
case UnitTimeHour:
hours := int(tool.HourDiff(startTime, sub.ExpireTime))
return subInfo.UnitPrice * int64(hours), nil
case UnitTimeMinute:
minutes := tool.HourDiff(startTime, sub.ExpireTime) * 60
return subInfo.UnitPrice * minutes, nil
default:
return subInfo.UnitPrice, nil
}
}
func (l *QuotaTaskLogic) createGiftLog(tx *gorm.DB, subscribeId, userId, amount, balance int64, now time.Time) error {
giftLog := &log.Gift{
Type: log.GiftTypeIncrease,
OrderNo: "",
SubscribeId: subscribeId,
Amount: amount,
Balance: balance,
Remark: "Quota task gift",
Timestamp: now.UnixMilli(),
}
logString, err := giftLog.Marshal()
if err != nil {
return fmt.Errorf("marshal gift log error: %v", err)
}
return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeGift.Uint8(),
Content: string(logString),
ObjectID: userId,
Date: now.Format(time.DateOnly),
}).Error
}
func (l *QuotaTaskLogic) createResetTrafficLog(tx *gorm.DB, subscribeId, userId int64, now time.Time) error {
trafficLog := &log.ResetSubscribe{
Type: log.ResetSubscribeTypeQuota,
UserId: userId,
OrderNo: "",
Timestamp: now.UnixMilli(),
}
logString, err := trafficLog.Marshal()
if err != nil {
return fmt.Errorf("marshal traffic log error: %v", err)
}
return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeResetSubscribe.Uint8(),
Content: string(logString),
ObjectID: subscribeId,
Date: now.Format(time.DateOnly),
}).Error
}

View File

@ -3,8 +3,6 @@ package types
const (
// ForthwithSendEmail forthwith send email
ForthwithSendEmail = "forthwith:email:send"
// ScheduledBatchSendEmail scheduled batch send email
ScheduledBatchSendEmail = "scheduled:email:batch"
)
const (

9
queue/types/task.go Normal file
View File

@ -0,0 +1,9 @@
package types
const (
// ScheduledBatchSendEmail scheduled batch send email
ScheduledBatchSendEmail = "scheduled:email:batch"
// ForthwithQuotaTask create quota task immediately
ForthwithQuotaTask = "forthwith:quota:task"
)