hi-server/queue/logic/order/activateOrderLogic.go
shanshanzhong e0b2be2058
All checks were successful
Build docker and publish / build (20.15.1) (push) Successful in 7m36s
x
2026-03-30 21:05:21 -07:00

1331 lines
44 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package orderLogic provides order processing logic for handling various types of orders
// including subscription purchases, renewals, traffic resets, and balance recharges.
package orderLogic
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/perfect-panel/server/internal/logic/admin/group"
"github.com/perfect-panel/server/internal/model/log"
"github.com/perfect-panel/server/pkg/constant"
"github.com/perfect-panel/server/pkg/logger"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/model/order"
"github.com/perfect-panel/server/internal/model/redemption"
"github.com/perfect-panel/server/internal/model/subscribe"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/internal/types"
internaltypes "github.com/perfect-panel/server/internal/types"
"github.com/perfect-panel/server/pkg/tool"
"github.com/perfect-panel/server/pkg/uuidx"
queueTypes "github.com/perfect-panel/server/queue/types"
"gorm.io/gorm"
)
// Order type constants define the different types of orders that can be processed
const (
OrderTypeSubscribe = 1 // New subscription purchase
OrderTypeRenewal = 2 // Subscription renewal
OrderTypeResetTraffic = 3 // Traffic quota reset
OrderTypeRecharge = 4 // Balance recharge
OrderTypeRedemption = 5 // Redemption code activation
)
// Order status constants define the lifecycle states of an order
const (
OrderStatusPending = 1 // Order created but not paid
OrderStatusPaid = 2 // Order paid and ready for processing
OrderStatusClose = 3 // Order closed/cancelled
OrderStatusFailed = 4 // Order processing failed
OrderStatusFinished = 5 // Order successfully completed
)
// Predefined error variables for common error conditions
var (
ErrInvalidOrderStatus = fmt.Errorf("invalid order status")
ErrInvalidOrderType = fmt.Errorf("invalid order type")
)
// ActivateOrderLogic handles the activation and processing of paid orders
type ActivateOrderLogic struct {
svc *svc.ServiceContext // Service context containing dependencies
}
// NewActivateOrderLogic creates a new instance of ActivateOrderLogic
func NewActivateOrderLogic(svc *svc.ServiceContext) *ActivateOrderLogic {
return &ActivateOrderLogic{
svc: svc,
}
}
// ProcessTask is the main entry point for processing order activation tasks.
// It handles the complete workflow of activating a paid order including validation,
// processing based on order type, and finalization.
func (l *ActivateOrderLogic) ProcessTask(ctx context.Context, task *asynq.Task) error {
logger.WithContext(ctx).Info("[ActivateOrderLogic] 开始处理订单激活任务",
logger.Field("payload", string(task.Payload())))
payload, err := l.parsePayload(ctx, task.Payload())
if err != nil {
logger.WithContext(ctx).Error("[ActivateOrderLogic] 解析 payload 失败,跳过任务",
logger.Field("error", err.Error()))
return nil // payload 解析失败不重试,因为重试也会失败
}
logger.WithContext(ctx).Info("[ActivateOrderLogic] 正在验证订单",
logger.Field("order_no", payload.OrderNo))
orderInfo, err := l.validateAndGetOrder(ctx, payload.OrderNo)
if err != nil {
// 如果订单不存在或状态不对,不重试
if errors.Is(err, ErrInvalidOrderStatus) {
logger.WithContext(ctx).Info("[ActivateOrderLogic] 订单状态不是已支付,跳过",
logger.Field("order_no", payload.OrderNo))
return nil
}
// 数据库查询失败,应该重试
logger.WithContext(ctx).Error("[ActivateOrderLogic] 查询订单失败,将重试",
logger.Field("order_no", payload.OrderNo),
logger.Field("error", err.Error()))
return err
}
// Idempotency: if order is already finished, skip processing
if orderInfo == nil {
return nil
}
logger.WithContext(ctx).Info("[ActivateOrderLogic] 订单验证通过,开始处理",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("order_type", orderInfo.Type),
logger.Field("user_id", orderInfo.UserId))
if err = l.processOrderByType(ctx, orderInfo, payload.IAPExpireAt); err != nil {
logger.WithContext(ctx).Error("[ActivateOrderLogic] 处理订单失败,将重试",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("order_type", orderInfo.Type),
logger.Field("error", err.Error()))
return err // 返回 err 允许 asynq 重试
}
l.finalizeCouponAndOrder(ctx, orderInfo)
logger.WithContext(ctx).Info("[ActivateOrderLogic] 订单激活成功",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("order_type", orderInfo.Type),
logger.Field("user_id", orderInfo.UserId))
return nil
}
// parsePayload unMarshals the task payload into a structured format
func (l *ActivateOrderLogic) parsePayload(ctx context.Context, payload []byte) (*queueTypes.ForthwithActivateOrderPayload, error) {
var p queueTypes.ForthwithActivateOrderPayload
if err := json.Unmarshal(payload, &p); err != nil {
logger.WithContext(ctx).Error("[ActivateOrderLogic] Unmarshal payload failed",
logger.Field("error", err.Error()),
logger.Field("payload", string(payload)),
)
return nil, err
}
return &p, nil
}
// validateAndGetOrder retrieves an order by order number and validates its status
// Returns error if order is not found or not in paid status
func (l *ActivateOrderLogic) validateAndGetOrder(ctx context.Context, orderNo string) (*order.Order, error) {
orderInfo, err := l.svc.OrderModel.FindOneByOrderNo(ctx, orderNo)
if err != nil {
logger.WithContext(ctx).Error("Find order failed",
logger.Field("error", err.Error()),
logger.Field("order_no", orderNo),
)
return nil, err
}
// Idempotency check: if order is already finished, return success
if orderInfo.Status == OrderStatusFinished {
logger.WithContext(ctx).Info("Order already finished, skip processing",
logger.Field("order_no", orderInfo.OrderNo),
)
return nil, nil
}
if orderInfo.Status != OrderStatusPaid {
logger.WithContext(ctx).Error("Order status error",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("status", orderInfo.Status),
)
return nil, ErrInvalidOrderStatus
}
return orderInfo, nil
}
// processOrderByType routes order processing based on the order type
func (l *ActivateOrderLogic) processOrderByType(ctx context.Context, orderInfo *order.Order, iapExpireAt int64) error {
switch orderInfo.Type {
case OrderTypeSubscribe:
return l.NewPurchase(ctx, orderInfo)
case OrderTypeRenewal:
return l.Renewal(ctx, orderInfo, iapExpireAt)
case OrderTypeResetTraffic:
return l.ResetTraffic(ctx, orderInfo)
case OrderTypeRecharge:
return l.Recharge(ctx, orderInfo)
case OrderTypeRedemption:
return l.RedemptionActivate(ctx, orderInfo)
default:
logger.WithContext(ctx).Error("Order type is invalid", logger.Field("type", orderInfo.Type))
return ErrInvalidOrderType
}
}
// finalizeCouponAndOrder handles post-processing tasks including coupon updates
// and order status finalization
func (l *ActivateOrderLogic) finalizeCouponAndOrder(ctx context.Context, orderInfo *order.Order) {
// Update coupon if exists (non-critical, logged but not blocking)
if orderInfo.Coupon != "" {
if err := l.svc.CouponModel.UpdateCount(ctx, orderInfo.Coupon); err != nil {
logger.WithContext(ctx).Error("Update coupon status failed",
logger.Field("error", err.Error()),
logger.Field("coupon", orderInfo.Coupon),
)
}
}
// Update order status using state-guarded UpdateOrderStatus to prevent double finalization
if err := l.svc.OrderModel.UpdateOrderStatus(ctx, orderInfo.OrderNo, OrderStatusFinished); err != nil {
logger.WithContext(ctx).Error("Update order status failed",
logger.Field("error", err.Error()),
logger.Field("order_no", orderInfo.OrderNo),
)
}
orderInfo.Status = OrderStatusFinished
}
// NewPurchase handles new subscription purchase including user creation,
// subscription setup, commission processing, cache updates, and notifications
func (l *ActivateOrderLogic) NewPurchase(ctx context.Context, orderInfo *order.Order) error {
userInfo, err := l.getUserOrCreate(ctx, orderInfo)
if err != nil {
return err
}
sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId)
if err != nil {
return err
}
if err = validateNewUserOnlyEligibilityAtActivation(ctx, l.svc.DB, orderInfo, sub); err != nil {
return err
}
var userSub *user.Subscribe
// 单订阅模式下,优先兜底为“续费语义”:延长已购订阅,避免并发下重复创建 user_subscribe
if l.svc.Config.Subscribe.SingleModel {
anchorSub, anchorErr := l.svc.UserModel.FindSingleModeAnchorSubscribe(ctx, orderInfo.UserId)
switch {
case anchorErr == nil && anchorSub != nil:
if anchorSub.SubscribeId == orderInfo.SubscribeId {
if orderInfo.ParentId == 0 && anchorSub.OrderId > 0 && anchorSub.OrderId != orderInfo.Id {
if patchErr := l.patchOrderParentID(ctx, orderInfo.Id, anchorSub.OrderId); patchErr != nil {
logger.WithContext(ctx).Error("Patch order parent_id failed",
logger.Field("error", patchErr.Error()),
logger.Field("order_no", orderInfo.OrderNo),
)
} else {
orderInfo.ParentId = anchorSub.OrderId
}
}
if renewErr := l.updateSubscriptionForRenewal(ctx, anchorSub, sub, orderInfo); renewErr != nil {
logger.WithContext(ctx).Error("Single mode renewal fallback failed",
logger.Field("error", renewErr.Error()),
logger.Field("anchor_user_subscribe_id", anchorSub.Id),
logger.Field("order_no", orderInfo.OrderNo),
)
} else {
userSub = anchorSub
logger.WithContext(ctx).Infow("Single mode purchase routed to renewal in activation",
logger.Field("mode", "single"),
logger.Field("route", "purchase_to_renewal"),
logger.Field("anchor_user_subscribe_id", anchorSub.Id),
logger.Field("order_no", orderInfo.OrderNo),
)
}
} else {
logger.WithContext(ctx).Errorw("Single mode anchor subscribe mismatch in activation",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("order_subscribe_id", orderInfo.SubscribeId),
logger.Field("anchor_subscribe_id", anchorSub.SubscribeId),
)
}
case errors.Is(anchorErr, gorm.ErrRecordNotFound):
case anchorErr != nil:
logger.WithContext(ctx).Error("Find single mode anchor subscribe failed",
logger.Field("error", anchorErr.Error()),
logger.Field("order_no", orderInfo.OrderNo),
)
}
// 如果没有合并已购订阅再尝试合并赠送订阅order_id=0
if userSub == nil {
giftSub, giftErr := l.findGiftSubscription(ctx, orderInfo.UserId, orderInfo.SubscribeId)
if giftErr == nil && giftSub != nil {
// 在赠送订阅上延长时间,保持 token 不变
userSub, err = l.extendGiftSubscription(ctx, giftSub, orderInfo, sub)
if err != nil {
logger.WithContext(ctx).Error("Extend gift subscription failed",
logger.Field("error", err.Error()),
logger.Field("gift_subscribe_id", giftSub.Id),
)
// 合并失败时回退到创建新订阅
userSub = nil
}
}
}
}
// 如果没有合并赠送订阅,则正常创建新订阅
if userSub == nil {
userSub, err = l.createUserSubscription(ctx, orderInfo, sub)
if err != nil {
return err
}
}
// Trigger user group recalculation (runs in background)
l.triggerUserGroupRecalculation(ctx, userInfo.Id)
// Handle commission in separate goroutine to avoid blocking
go l.handleCommission(context.Background(), userInfo, orderInfo)
// Clear cache
l.clearServerCache(ctx, sub)
logger.WithContext(ctx).Info("Insert user subscribe success")
return nil
}
// getUserOrCreate retrieves an existing user or creates a new guest user based on order details
func (l *ActivateOrderLogic) getUserOrCreate(ctx context.Context, orderInfo *order.Order) (*user.User, error) {
if orderInfo.UserId != 0 {
return l.getExistingUser(ctx, orderInfo.UserId)
}
return l.createGuestUser(ctx, orderInfo)
}
// getExistingUser retrieves user information by user ID
func (l *ActivateOrderLogic) getExistingUser(ctx context.Context, userId int64) (*user.User, error) {
userInfo, err := l.svc.UserModel.FindOne(ctx, userId)
if err != nil {
logger.WithContext(ctx).Error("Find user failed",
logger.Field("error", err.Error()),
logger.Field("user_id", userId),
)
return nil, err
}
return userInfo, nil
}
// createGuestUser creates a new user account for guest orders using temporary order information
// stored in Redis cache
func (l *ActivateOrderLogic) createGuestUser(ctx context.Context, orderInfo *order.Order) (*user.User, error) {
tempOrder, err := l.getTempOrderInfo(ctx, orderInfo.OrderNo)
if err != nil {
return nil, err
}
userInfo := &user.User{
Password: tool.EncodePassWord(tempOrder.Password),
Algo: "default",
AuthMethods: []user.AuthMethods{
{
AuthType: tempOrder.AuthType,
AuthIdentifier: tempOrder.Identifier,
},
},
}
err = l.svc.UserModel.Transaction(ctx, func(tx *gorm.DB) error {
if err := tx.Save(userInfo).Error; err != nil {
return err
}
userInfo.ReferCode = uuidx.UserInviteCode(userInfo.Id)
if err := tx.Model(&user.User{}).Where("id = ?", userInfo.Id).Update("refer_code", userInfo.ReferCode).Error; err != nil {
return err
}
orderInfo.UserId = userInfo.Id
return tx.Model(&order.Order{}).Where("order_no = ?", orderInfo.OrderNo).Update("user_id", userInfo.Id).Error
})
if err != nil {
logger.WithContext(ctx).Error("Create user failed", logger.Field("error", err.Error()))
return nil, err
}
// Handle referrer relationship
l.handleReferrer(ctx, userInfo, tempOrder.InviteCode)
logger.WithContext(ctx).Info("Create guest user success",
logger.Field("user_id", userInfo.Id),
logger.Field("identifier", tempOrder.Identifier),
logger.Field("auth_type", tempOrder.AuthType),
)
return userInfo, nil
}
// getTempOrderInfo retrieves temporary order information from Redis cache
func (l *ActivateOrderLogic) getTempOrderInfo(ctx context.Context, orderNo string) (*constant.TemporaryOrderInfo, error) {
cacheKey := fmt.Sprintf(constant.TempOrderCacheKey, orderNo)
data, err := l.svc.Redis.Get(ctx, cacheKey).Result()
if err != nil {
logger.WithContext(ctx).Error("Get temp order cache failed",
logger.Field("error", err.Error()),
logger.Field("cache_key", cacheKey),
)
return nil, err
}
var tempOrder constant.TemporaryOrderInfo
if err = tempOrder.Unmarshal([]byte(data)); err != nil {
logger.WithContext(ctx).Error("Unmarshal temp order cache failed",
logger.Field("error", err.Error()),
logger.Field("cache_key", cacheKey),
logger.Field("data", data),
)
return nil, err
}
return &tempOrder, nil
}
// handleReferrer establishes referrer relationship if an invite code is provided
func (l *ActivateOrderLogic) handleReferrer(ctx context.Context, userInfo *user.User, inviteCode string) {
if inviteCode == "" {
return
}
referer, err := l.svc.UserModel.FindOneByReferCode(ctx, inviteCode)
if err != nil {
logger.WithContext(ctx).Error("Find referer failed",
logger.Field("error", err.Error()),
logger.Field("refer_code", inviteCode),
)
return
}
userInfo.RefererId = referer.Id
if err = l.svc.UserModel.Update(ctx, userInfo); err != nil {
logger.WithContext(ctx).Error("Update user referer failed",
logger.Field("error", err.Error()),
logger.Field("user_id", userInfo.Id),
)
}
}
// getSubscribeInfo retrieves subscription plan details by subscription ID
func (l *ActivateOrderLogic) getSubscribeInfo(ctx context.Context, subscribeId int64) (*subscribe.Subscribe, error) {
sub, err := l.svc.SubscribeModel.FindOne(ctx, subscribeId)
if err != nil {
logger.WithContext(ctx).Error("Find subscribe failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", subscribeId),
)
return nil, err
}
return sub, nil
}
// createUserSubscription creates a new user subscription record based on order and subscription plan details
func (l *ActivateOrderLogic) createUserSubscription(ctx context.Context, orderInfo *order.Order, sub *subscribe.Subscribe) (*user.Subscribe, error) {
now := time.Now()
// Determine subscription owner: use SubscriptionUserId if set, otherwise fall back to UserId
subscriptionUserId := orderInfo.UserId
if orderInfo.SubscriptionUserId > 0 {
subscriptionUserId = orderInfo.SubscriptionUserId
}
userSub := &user.Subscribe{
UserId: subscriptionUserId,
OrderId: orderInfo.Id,
SubscribeId: orderInfo.SubscribeId,
StartTime: now,
ExpireTime: tool.AddTime(sub.UnitTime, orderInfo.Quantity, now),
Traffic: sub.Traffic,
Download: 0,
Upload: 0,
Token: uuidx.SubscribeToken(orderInfo.OrderNo),
UUID: uuid.New().String(),
Status: 1,
}
// Check quota limit before creating subscription (final safeguard)
if sub.Quota > 0 {
var count int64
if err := l.svc.DB.Model(&user.Subscribe{}).Where("user_id = ? AND subscribe_id = ?", orderInfo.UserId, orderInfo.SubscribeId).Count(&count).Error; err != nil {
logger.WithContext(ctx).Error("Count user subscribe failed", logger.Field("error", err.Error()))
return nil, err
}
if count >= sub.Quota {
logger.WithContext(ctx).Infow("Subscribe quota limit exceeded",
logger.Field("user_id", orderInfo.UserId),
logger.Field("subscribe_id", orderInfo.SubscribeId),
logger.Field("quota", sub.Quota),
logger.Field("current_count", count),
)
return nil, fmt.Errorf("subscribe quota limit exceeded")
}
}
if err := l.svc.UserModel.InsertSubscribe(ctx, userSub); err != nil {
logger.WithContext(ctx).Error("Insert user subscribe failed", logger.Field("error", err.Error()))
return nil, err
}
return userSub, nil
}
func (l *ActivateOrderLogic) patchOrderParentID(ctx context.Context, orderID int64, parentID int64) error {
return l.svc.DB.WithContext(ctx).Model(&order.Order{}).Where("id = ? AND (parent_id = 0 OR parent_id IS NULL)", orderID).Update("parent_id", parentID).Error
}
// findGiftSubscription 查找用户指定套餐的赠送订阅order_id=0包括已过期的
// 返回找到的赠送订阅记录,如果没有则返回 nil
func (l *ActivateOrderLogic) findGiftSubscription(ctx context.Context, userId int64, subscribeId int64) (*user.Subscribe, error) {
// 直接查询数据库,查找 order_id=0赠送且同套餐的订阅不限制过期状态
var giftSub user.Subscribe
err := l.svc.DB.Model(&user.Subscribe{}).
Where("user_id = ? AND order_id = 0 AND subscribe_id = ?", userId, subscribeId).
Order("created_at DESC").
First(&giftSub).Error
if err != nil {
return nil, err
}
return &giftSub, nil
}
// extendGiftSubscription 在现有赠送订阅上延长到期时间,保持 token 不变
// 将购买的天数叠加到赠送订阅的到期时间上,并更新 order_id 为新订单 ID
func (l *ActivateOrderLogic) extendGiftSubscription(ctx context.Context, giftSub *user.Subscribe, orderInfo *order.Order, sub *subscribe.Subscribe) (*user.Subscribe, error) {
now := time.Now()
// 计算基准时间:取赠送订阅到期时间和当前时间的较大值
baseTime := giftSub.ExpireTime
if baseTime.Before(now) {
baseTime = now
}
// 在基准时间上增加购买的天数
newExpireTime := tool.AddTime(sub.UnitTime, orderInfo.Quantity, baseTime)
// 更新赠送订阅的信息
giftSub.OrderId = orderInfo.Id
giftSub.ExpireTime = newExpireTime
giftSub.Status = 1
if err := l.svc.UserModel.UpdateSubscribe(ctx, giftSub); err != nil {
logger.WithContext(ctx).Error("Update gift subscription failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", giftSub.Id),
)
return nil, err
}
logger.WithContext(ctx).Info("Extended gift subscription successfully",
logger.Field("subscribe_id", giftSub.Id),
logger.Field("old_expire_time", baseTime),
logger.Field("new_expire_time", newExpireTime),
logger.Field("order_id", orderInfo.Id),
)
return giftSub, nil
}
// handleCommission processes referral commission for the referrer if applicable.
// This runs asynchronously to avoid blocking the main order processing flow.
func (l *ActivateOrderLogic) handleCommission(ctx context.Context, userInfo *user.User, orderInfo *order.Order) {
if !l.shouldProcessCommission(userInfo, orderInfo.IsNew) {
// 普通用户路径(佣金比例=0只有首单才双方赠N天
if orderInfo.IsNew {
l.grantGiftDaysToBothParties(ctx, userInfo, orderInfo.OrderNo)
}
return
}
referer, err := l.svc.UserModel.FindOne(ctx, userInfo.RefererId)
if err != nil {
logger.WithContext(ctx).Error("Find referer failed",
logger.Field("error", err.Error()),
logger.Field("referer_id", userInfo.RefererId),
)
return
}
var referralPercentage uint8
if referer.ReferralPercentage != 0 {
referralPercentage = referer.ReferralPercentage
} else {
referralPercentage = uint8(l.svc.Config.Invite.ReferralPercentage)
}
// Order commission calculation (Order Amount - Order Fee) * Referral Percentage
amount := l.calculateCommission(orderInfo.Amount-orderInfo.FeeAmount, referralPercentage)
// Use transaction for commission updates
err = l.svc.DB.Transaction(func(tx *gorm.DB) error {
// Idempotency: check if commission log already exists for this order
var existingLogCount int64
if e := tx.Model(&log.SystemLog{}).
Where("type = ? AND object_id = ? AND content LIKE ?",
log.TypeCommission.Uint8(), referer.Id, fmt.Sprintf("%%\"%s\"%%", orderInfo.OrderNo)).
Count(&existingLogCount).Error; e != nil {
return e
}
if existingLogCount > 0 {
logger.WithContext(ctx).Info("Commission already processed, skip",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("referer_id", referer.Id),
)
return nil
}
// Atomic increment to prevent lost updates under concurrency
if e := tx.Model(&user.User{}).Where("id = ?", referer.Id).
UpdateColumn("commission", gorm.Expr("commission + ?", amount)).Error; e != nil {
return e
}
referer.Commission += amount
var commissionType uint16
switch orderInfo.Type {
case OrderTypeSubscribe:
commissionType = log.CommissionTypePurchase
case OrderTypeRenewal:
commissionType = log.CommissionTypeRenewal
default:
commissionType = log.CommissionTypePurchase
}
commissionLog := &log.Commission{
Type: commissionType,
Amount: amount,
OrderNo: orderInfo.OrderNo,
Timestamp: orderInfo.CreatedAt.UnixMilli(),
}
content, _ := commissionLog.Marshal()
return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeCommission.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: referer.Id,
Content: string(content),
}).Error
})
if err != nil {
logger.WithContext(ctx).Error("Update referer commission failed", logger.Field("error", err.Error()))
return
}
// Update cache
if err = l.svc.UserModel.UpdateUserCache(ctx, referer); err != nil {
logger.WithContext(ctx).Error("Update referer cache failed",
logger.Field("error", err.Error()),
logger.Field("user_id", referer.Id),
)
}
}
func (l *ActivateOrderLogic) grantGiftDaysToBothParties(ctx context.Context, referee *user.User, orderNo string) {
giftDays := l.svc.Config.Invite.GiftDays
if giftDays <= 0 || referee == nil || referee.Id == 0 || referee.RefererId == 0 {
return
}
_ = l.grantGiftDays(ctx, referee, int(giftDays), orderNo, "邀请赠送")
if referee.RefererId == 0 {
return
}
referer, err := l.svc.UserModel.FindOne(ctx, referee.RefererId)
if err != nil || referer == nil {
return
}
_ = l.grantGiftDays(ctx, referer, int(giftDays), orderNo, "邀请赠送")
}
func (l *ActivateOrderLogic) grantGiftDays(ctx context.Context, u *user.User, days int, orderNo string, remark string) error {
if u == nil || days <= 0 {
return nil
}
// Idempotency: check if gift log already exists for this order + user
var existingLogCount int64
if e := l.svc.DB.Model(&log.SystemLog{}).
Where("type = ? AND object_id = ? AND content LIKE ?",
log.TypeGift.Uint8(), u.Id, fmt.Sprintf("%%\"%s\"%%", orderNo)).
Count(&existingLogCount).Error; e != nil {
return e
}
if existingLogCount > 0 {
logger.WithContext(ctx).Info("Gift days already granted, skip",
logger.Field("order_no", orderNo),
logger.Field("user_id", u.Id),
)
return nil
}
activeSubscribe, err := l.svc.UserModel.FindActiveSubscribe(ctx, u.Id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
activeSubscribe.ExpireTime = activeSubscribe.ExpireTime.Add(time.Duration(days) * 24 * time.Hour)
err = l.svc.UserModel.UpdateSubscribe(ctx, activeSubscribe)
if err != nil {
return err
}
// Insert system log
giftLog := &log.Gift{
Type: log.GiftTypeIncrease,
OrderNo: orderNo,
SubscribeId: activeSubscribe.Id,
Amount: int64(days),
Balance: u.Balance,
Remark: remark,
Timestamp: time.Now().UnixMilli(),
}
content, _ := giftLog.Marshal()
return l.svc.LogModel.Insert(ctx, &log.SystemLog{
Type: log.TypeGift.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: u.Id,
Content: string(content),
})
}
// shouldProcessCommission determines if commission should be processed based on
// referrer existence, commission settings, and order type
func (l *ActivateOrderLogic) shouldProcessCommission(userInfo *user.User, isFirstPurchase bool) bool {
if userInfo == nil || userInfo.RefererId == 0 {
return false
}
referer, err := l.svc.UserModel.FindOne(context.Background(), userInfo.RefererId)
if err != nil {
logger.Errorw("Find referer failed",
logger.Field("error", err.Error()),
logger.Field("referer_id", userInfo.RefererId))
return false
}
if referer == nil {
return false
}
// use referer's custom settings if set
if referer.ReferralPercentage > 0 {
if referer.OnlyFirstPurchase != nil && *referer.OnlyFirstPurchase && !isFirstPurchase {
return false
}
return true
}
// use global settings
if l.svc.Config.Invite.ReferralPercentage == 0 {
return false
}
if l.svc.Config.Invite.OnlyFirstPurchase && !isFirstPurchase {
return false
}
return true
}
// calculateCommission computes the commission amount based on order price and referral percentage
func (l *ActivateOrderLogic) calculateCommission(price int64, percentage uint8) int64 {
return int64(float64(price) * (float64(percentage) / 100))
}
// clearServerCache clears user list cache for all servers associated with the subscription
func (l *ActivateOrderLogic) clearServerCache(ctx context.Context, sub *subscribe.Subscribe) {
if err := l.svc.SubscribeModel.ClearCache(ctx, sub.Id); err != nil {
logger.WithContext(ctx).Error("[Order Queue] Clear subscribe cache failed", logger.Field("error", err.Error()))
}
}
// triggerUserGroupRecalculation triggers user group recalculation after subscription changes
// This runs asynchronously in background to avoid blocking the main order processing flow
func (l *ActivateOrderLogic) triggerUserGroupRecalculation(ctx context.Context, userId int64) {
go func() {
// Use a new context with timeout for group recalculation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Check if group management is enabled
var groupEnabled string
err := l.svc.DB.Table("system").
Where("`category` = ? AND `key` = ?", "group", "enabled").
Select("value").
Scan(&groupEnabled).Error
if err != nil || groupEnabled != "true" && groupEnabled != "1" {
logger.Debugf("[Group Trigger] Group management not enabled, skipping recalculation")
return
}
// Get the configured grouping mode
var groupMode string
err = l.svc.DB.Table("system").
Where("`category` = ? AND `key` = ?", "group", "mode").
Select("value").
Scan(&groupMode).Error
if err != nil {
logger.Errorw("[Group Trigger] Failed to get group mode", logger.Field("error", err.Error()))
return
}
// Validate group mode
if groupMode != "average" && groupMode != "subscribe" && groupMode != "traffic" {
logger.Debugf("[Group Trigger] Invalid group mode (current: %s), skipping", groupMode)
return
}
// Trigger group recalculation with the configured mode
logic := group.NewRecalculateGroupLogic(ctx, l.svc)
req := &types.RecalculateGroupRequest{
Mode: groupMode,
}
if err := logic.RecalculateGroup(req); err != nil {
logger.Errorw("[Group Trigger] Failed to recalculate user group",
logger.Field("user_id", userId),
logger.Field("error", err.Error()),
)
return
}
logger.Infow("[Group Trigger] Successfully recalculated user group",
logger.Field("user_id", userId),
logger.Field("mode", groupMode),
)
}()
}
// Renewal handles subscription renewal including subscription extension,
// traffic reset (if configured), commission processing, and notifications
func (l *ActivateOrderLogic) Renewal(ctx context.Context, orderInfo *order.Order, iapExpireAt int64) error {
userInfo, err := l.getExistingUser(ctx, orderInfo.UserId)
if err != nil {
return err
}
userSub, err := l.getUserSubscription(ctx, orderInfo.SubscribeToken)
if err != nil {
return err
}
sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId)
if err != nil {
return err
}
if iapExpireAt > 0 {
// Apple IAP 续费:按“累计加时”语义处理,避免连续购买时仅覆盖到期时间。
if err = l.updateSubscriptionWithIAPExpire(ctx, userSub, sub, orderInfo, iapExpireAt); err != nil {
return err
}
} else {
if err = l.updateSubscriptionForRenewal(ctx, userSub, sub, orderInfo); err != nil {
return err
}
}
// Clear user subscription cache
err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub)
if err != nil {
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", userSub.Id),
logger.Field("user_id", userInfo.Id),
)
}
// Clear cache
l.clearServerCache(ctx, sub)
// Handle commission
go l.handleCommission(context.Background(), userInfo, orderInfo)
return nil
}
// getUserSubscription retrieves user subscription by token
func (l *ActivateOrderLogic) getUserSubscription(ctx context.Context, token string) (*user.Subscribe, error) {
userSub, err := l.svc.UserModel.FindOneSubscribeByToken(ctx, token)
if err != nil {
logger.WithContext(ctx).Error("Find user subscribe failed", logger.Field("error", err.Error()))
return nil, err
}
return userSub, nil
}
// updateSubscriptionWithIAPExpire 用于 Apple IAP 续费:按累计加时语义更新到期时间。
func (l *ActivateOrderLogic) updateSubscriptionWithIAPExpire(ctx context.Context, userSub *user.Subscribe, sub *subscribe.Subscribe, orderInfo *order.Order, iapExpireAt int64) error {
now := time.Now()
baseTime := userSub.ExpireTime
if baseTime.Before(now) {
baseTime = now
}
newExpire := tool.AddTime(sub.UnitTime, orderInfo.Quantity, baseTime)
if iapExpireAt > 0 {
appleExpire := time.Unix(iapExpireAt, 0)
if appleExpire.After(newExpire) {
newExpire = appleExpire
}
}
today := now.Day()
resetDay := newExpire.Day()
if sub.RenewalReset != nil && *sub.RenewalReset || today == resetDay {
userSub.Download = 0
userSub.Upload = 0
}
if userSub.FinishedAt != nil {
userSub.FinishedAt = nil
}
userSub.ExpireTime = newExpire
userSub.Status = 1
if err := l.svc.UserModel.UpdateSubscribe(ctx, userSub); err != nil {
logger.WithContext(ctx).Error("Update user subscribe (IAP) failed", logger.Field("error", err.Error()))
return err
}
return nil
}
// updateSubscriptionForRenewal updates subscription details for renewal including
// expiration time extension and traffic reset if configured
func (l *ActivateOrderLogic) updateSubscriptionForRenewal(ctx context.Context, userSub *user.Subscribe, sub *subscribe.Subscribe, orderInfo *order.Order) error {
now := time.Now()
if userSub.ExpireTime.Before(now) {
userSub.ExpireTime = now
}
today := time.Now().Day()
resetDay := userSub.ExpireTime.Day()
// Reset traffic if enabled
if (sub.RenewalReset != nil && *sub.RenewalReset) || today == resetDay {
userSub.Download = 0
userSub.Upload = 0
}
if userSub.FinishedAt != nil {
if userSub.FinishedAt.Before(now) && today > resetDay {
// reset user traffic if finished at is before now
userSub.Download = 0
userSub.Upload = 0
}
userSub.FinishedAt = nil
}
userSub.ExpireTime = tool.AddTime(sub.UnitTime, orderInfo.Quantity, userSub.ExpireTime)
userSub.Status = 1
// 续费时重置过期流量字段
userSub.ExpiredDownload = 0
userSub.ExpiredUpload = 0
if err := l.svc.UserModel.UpdateSubscribe(ctx, userSub); err != nil {
logger.WithContext(ctx).Error("Update user subscribe failed", logger.Field("error", err.Error()))
return err
}
return nil
}
// ResetTraffic handles traffic quota reset for existing subscriptions
func (l *ActivateOrderLogic) ResetTraffic(ctx context.Context, orderInfo *order.Order) error {
userInfo, err := l.getExistingUser(ctx, orderInfo.UserId)
if err != nil {
return err
}
userSub, err := l.getUserSubscription(ctx, orderInfo.SubscribeToken)
if err != nil {
return err
}
// Reset traffic
userSub.Download = 0
userSub.Upload = 0
userSub.ExpiredDownload = 0
userSub.ExpiredUpload = 0
userSub.Status = 1
if err := l.svc.UserModel.UpdateSubscribe(ctx, userSub); err != nil {
logger.WithContext(ctx).Error("Update user subscribe failed", logger.Field("error", err.Error()))
return err
}
sub, err := l.getSubscribeInfo(ctx, userSub.SubscribeId)
if err != nil {
return err
}
// Clear user subscription cache
err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub)
if err != nil {
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", userSub.Id),
logger.Field("user_id", userInfo.Id),
)
}
// Clear cache
l.clearServerCache(ctx, sub)
// insert reset traffic log
resetLog := &log.ResetSubscribe{
Type: log.ResetSubscribeTypePaid,
UserId: userInfo.Id,
OrderNo: orderInfo.OrderNo,
Timestamp: time.Now().UnixMilli(),
}
content, _ := resetLog.Marshal()
if err = l.svc.LogModel.Insert(ctx, &log.SystemLog{
Type: log.TypeResetSubscribe.Uint8(),
Date: time.Now().Format(time.DateOnly),
ObjectID: userSub.Id,
Content: string(content),
}); err != nil {
logger.WithContext(ctx).Error("[Order Queue]Insert reset subscribe log failed", logger.Field("error", err.Error()))
}
return nil
}
// Recharge handles balance recharge orders including balance updates,
// transaction logging, and notifications
func (l *ActivateOrderLogic) Recharge(ctx context.Context, orderInfo *order.Order) error {
userInfo, err := l.getExistingUser(ctx, orderInfo.UserId)
if err != nil {
return err
}
// Update balance in transaction
err = l.svc.DB.Transaction(func(tx *gorm.DB) error {
// Idempotency: check if balance log already exists for this order
var existingLogCount int64
if e := tx.Model(&log.SystemLog{}).
Where("type = ? AND object_id = ? AND content LIKE ?",
log.TypeBalance.Uint8(), userInfo.Id, fmt.Sprintf("%%\"%s\"%%", orderInfo.OrderNo)).
Count(&existingLogCount).Error; e != nil {
return e
}
if existingLogCount > 0 {
logger.WithContext(ctx).Info("Recharge already processed, skip",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("user_id", userInfo.Id),
)
return nil
}
// Atomic increment to prevent lost updates
if e := tx.Model(&user.User{}).Where("id = ?", userInfo.Id).
UpdateColumn("balance", gorm.Expr("balance + ?", orderInfo.Price)).Error; e != nil {
return e
}
userInfo.Balance += orderInfo.Price
balanceLog := &log.Balance{
Amount: orderInfo.Price,
Type: log.BalanceTypeRecharge,
OrderNo: orderInfo.OrderNo,
Balance: userInfo.Balance,
Timestamp: time.Now().UnixMilli(),
}
content, _ := balanceLog.Marshal()
return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeBalance.Uint8(),
Date: time.Now().Format("2006-01-02"),
ObjectID: userInfo.Id,
Content: string(content),
}).Error
})
if err != nil {
logger.WithContext(ctx).Error("[Recharge] Database transaction failed", logger.Field("error", err.Error()))
return err
}
// clear user cache
if err = l.svc.UserModel.UpdateUserCache(ctx, userInfo); err != nil {
logger.WithContext(ctx).Error("[Recharge] Update user cache failed", logger.Field("error", err.Error()))
return err
}
return nil
}
// RedemptionActivate handles redemption code activation including subscription creation,
// redemption record creation, used count update, cache clearing, and notifications
func (l *ActivateOrderLogic) RedemptionActivate(ctx context.Context, orderInfo *order.Order) error {
// 1. 获取用户信息
userInfo, err := l.getExistingUser(ctx, orderInfo.UserId)
if err != nil {
return err
}
// 2. 获取套餐信息
sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId)
if err != nil {
return err
}
// 3. 从Redis获取兑换码信息
cacheKey := fmt.Sprintf("redemption_order:%s", orderInfo.OrderNo)
data, err := l.svc.Redis.Get(ctx, cacheKey).Result()
if err != nil {
logger.WithContext(ctx).Error("Get redemption cache failed",
logger.Field("error", err.Error()),
logger.Field("cache_key", cacheKey),
)
return err
}
var redemptionData struct {
RedemptionCodeId int64 `json:"redemption_code_id"`
UnitTime string `json:"unit_time"`
Quantity int64 `json:"quantity"`
}
if err = json.Unmarshal([]byte(data), &redemptionData); err != nil {
logger.WithContext(ctx).Error("Unmarshal redemption cache failed", logger.Field("error", err.Error()))
return err
}
// 4. 幂等性检查:查询是否已有兑换记录
existingRecords, err := l.svc.RedemptionRecordModel.FindByUserId(ctx, userInfo.Id)
if err == nil {
for _, record := range existingRecords {
if record.RedemptionCodeId == redemptionData.RedemptionCodeId {
logger.WithContext(ctx).Info("Redemption already processed, skip",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("user_id", userInfo.Id),
logger.Field("redemption_code_id", redemptionData.RedemptionCodeId),
)
// 已处理过,直接返回成功(幂等性保护)
return nil
}
}
}
// 5. 查找用户现有订阅
var existingSubscribe *user.Subscribe
userSubscribes, err := l.svc.UserModel.QueryUserSubscribe(ctx, userInfo.Id, 0, 1)
if err == nil {
for _, us := range userSubscribes {
if us.SubscribeId == orderInfo.SubscribeId {
existingSubscribe = &user.Subscribe{
Id: us.Id,
UserId: us.UserId,
SubscribeId: us.SubscribeId,
ExpireTime: us.ExpireTime,
Status: us.Status,
Traffic: us.Traffic,
Download: us.Download,
Upload: us.Upload,
NodeGroupId: us.NodeGroupId,
}
break
}
}
}
now := time.Now()
// 6. 使用事务保护核心操作
err = l.svc.DB.Transaction(func(tx *gorm.DB) error {
// 6.1 创建或更新订阅
if existingSubscribe != nil {
// 续期现有订阅
var newExpireTime time.Time
if existingSubscribe.ExpireTime.After(now) {
newExpireTime = existingSubscribe.ExpireTime
} else {
newExpireTime = now
}
// 计算新的过期时间
newExpireTime = tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, newExpireTime)
// 更新订阅
existingSubscribe.OrderId = orderInfo.Id // 设置OrderId用于追溯
existingSubscribe.ExpireTime = newExpireTime
existingSubscribe.Status = 1
// 重置流量(如果套餐有流量限制)
if sub.Traffic > 0 {
existingSubscribe.Traffic = sub.Traffic * 1024 * 1024 * 1024
existingSubscribe.Download = 0
existingSubscribe.Upload = 0
}
err = l.svc.UserModel.UpdateSubscribe(ctx, existingSubscribe, tx)
if err != nil {
logger.WithContext(ctx).Error("Update subscribe failed", logger.Field("error", err.Error()))
return err
}
logger.WithContext(ctx).Info("Extended existing subscription",
logger.Field("subscribe_id", existingSubscribe.Id),
logger.Field("new_expire_time", newExpireTime),
)
} else {
// 检查配额限制
if sub.Quota > 0 {
var count int64
if err := tx.Model(&user.Subscribe{}).
Where("user_id = ? AND subscribe_id = ?", userInfo.Id, orderInfo.SubscribeId).
Count(&count).Error; err != nil {
logger.WithContext(ctx).Error("Count user subscribe failed", logger.Field("error", err.Error()))
return err
}
if count >= sub.Quota {
logger.WithContext(ctx).Infow("Subscribe quota limit exceeded",
logger.Field("user_id", userInfo.Id),
logger.Field("subscribe_id", orderInfo.SubscribeId),
logger.Field("quota", sub.Quota),
logger.Field("current_count", count),
)
return fmt.Errorf("subscribe quota limit exceeded")
}
}
// 创建新订阅
expireTime := tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, now)
traffic := int64(0)
if sub.Traffic > 0 {
traffic = sub.Traffic * 1024 * 1024 * 1024
}
newSubscribe := &user.Subscribe{
UserId: userInfo.Id,
OrderId: orderInfo.Id,
SubscribeId: orderInfo.SubscribeId,
StartTime: now,
ExpireTime: expireTime,
FinishedAt: nil,
Traffic: traffic,
Download: 0,
Upload: 0,
Token: uuidx.SubscribeToken(orderInfo.OrderNo),
UUID: uuid.New().String(),
Status: 1,
NodeGroupId: sub.NodeGroupId, // Inherit node_group_id from subscription plan
}
err = l.svc.UserModel.InsertSubscribe(ctx, newSubscribe, tx)
if err != nil {
logger.WithContext(ctx).Error("Insert subscribe failed", logger.Field("error", err.Error()))
return err
}
logger.WithContext(ctx).Info("Created new subscription",
logger.Field("subscribe_id", newSubscribe.Id),
logger.Field("expire_time", expireTime),
)
}
// 6.2 更新兑换码使用次数
err = l.svc.RedemptionCodeModel.IncrementUsedCount(ctx, redemptionData.RedemptionCodeId, tx)
if err != nil {
logger.WithContext(ctx).Error("Increment used count failed", logger.Field("error", err.Error()))
return err
}
// 6.3 创建兑换记录
redemptionRecord := &redemption.RedemptionRecord{
RedemptionCodeId: redemptionData.RedemptionCodeId,
UserId: userInfo.Id,
SubscribeId: orderInfo.SubscribeId,
UnitTime: redemptionData.UnitTime,
Quantity: redemptionData.Quantity,
RedeemedAt: now,
CreatedAt: now,
}
err = l.svc.RedemptionRecordModel.Insert(ctx, redemptionRecord, tx)
if err != nil {
logger.WithContext(ctx).Error("Insert redemption record failed", logger.Field("error", err.Error()))
return err
}
return nil
})
if err != nil {
logger.WithContext(ctx).Error("Redemption transaction failed", logger.Field("error", err.Error()))
return err
}
// Trigger user group recalculation (runs in background)
l.triggerUserGroupRecalculation(ctx, userInfo.Id)
// 7. 清理缓存(关键步骤:让节点获取最新订阅)
l.clearServerCache(ctx, sub)
// 7.1 清理用户订阅缓存(确保用户端显示最新信息)
if existingSubscribe != nil {
err = l.svc.UserModel.ClearSubscribeCache(ctx, existingSubscribe)
if err != nil {
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", existingSubscribe.Id),
logger.Field("user_id", userInfo.Id),
)
}
}
// 8. 删除Redis临时数据
l.svc.Redis.Del(ctx, cacheKey)
// 9. 发送通知(可选)
// 可以复用现有的通知模板或创建新的兑换通知模板
// l.sendNotifications(ctx, orderInfo, userInfo, sub, existingSubscribe, telegram.RedemptionNotify)
logger.WithContext(ctx).Info("Redemption activation success",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("user_id", userInfo.Id),
logger.Field("subscribe_id", orderInfo.SubscribeId),
)
return nil
}
// isNewUserOnlyForQuantity checks whether the matched discount tier has new_user_only enabled.
func isNewUserOnlyForQuantity(discounts []internaltypes.SubscribeDiscount, inputQuantity int64) bool {
for _, d := range discounts {
if inputQuantity == d.Quantity {
return d.NewUserOnly
}
}
return false
}