// Package orderLogic provides order processing logic for handling various types of orders // including subscription purchases, renewals, traffic resets, and balance recharges. package orderLogic import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/perfect-panel/server/internal/logic/admin/group" "github.com/perfect-panel/server/internal/model/log" "github.com/perfect-panel/server/pkg/constant" "github.com/perfect-panel/server/pkg/logger" "github.com/google/uuid" "github.com/hibiken/asynq" "github.com/perfect-panel/server/internal/model/order" internaltypes "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/internal/model/redemption" "github.com/perfect-panel/server/internal/model/subscribe" "github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/types" "github.com/perfect-panel/server/pkg/tool" "github.com/perfect-panel/server/pkg/uuidx" queueTypes "github.com/perfect-panel/server/queue/types" "gorm.io/gorm" ) // Order type constants define the different types of orders that can be processed const ( OrderTypeSubscribe = 1 // New subscription purchase OrderTypeRenewal = 2 // Subscription renewal OrderTypeResetTraffic = 3 // Traffic quota reset OrderTypeRecharge = 4 // Balance recharge OrderTypeRedemption = 5 // Redemption code activation ) // Order status constants define the lifecycle states of an order const ( OrderStatusPending = 1 // Order created but not paid OrderStatusPaid = 2 // Order paid and ready for processing OrderStatusClose = 3 // Order closed/cancelled OrderStatusFailed = 4 // Order processing failed OrderStatusFinished = 5 // Order successfully completed ) // Predefined error variables for common error conditions var ( ErrInvalidOrderStatus = fmt.Errorf("invalid order status") ErrInvalidOrderType = fmt.Errorf("invalid order type") ) // ActivateOrderLogic handles the activation and processing of paid orders type ActivateOrderLogic struct { svc *svc.ServiceContext // Service context containing dependencies } // NewActivateOrderLogic creates a new instance of ActivateOrderLogic func NewActivateOrderLogic(svc *svc.ServiceContext) *ActivateOrderLogic { return &ActivateOrderLogic{ svc: svc, } } // ProcessTask is the main entry point for processing order activation tasks. // It handles the complete workflow of activating a paid order including validation, // processing based on order type, and finalization. func (l *ActivateOrderLogic) ProcessTask(ctx context.Context, task *asynq.Task) error { logger.WithContext(ctx).Info("[ActivateOrderLogic] 开始处理订单激活任务", logger.Field("payload", string(task.Payload()))) payload, err := l.parsePayload(ctx, task.Payload()) if err != nil { logger.WithContext(ctx).Error("[ActivateOrderLogic] 解析 payload 失败,跳过任务", logger.Field("error", err.Error())) return nil // payload 解析失败不重试,因为重试也会失败 } logger.WithContext(ctx).Info("[ActivateOrderLogic] 正在验证订单", logger.Field("order_no", payload.OrderNo)) orderInfo, err := l.validateAndGetOrder(ctx, payload.OrderNo) if err != nil { // 如果订单不存在或状态不对,不重试 if errors.Is(err, ErrInvalidOrderStatus) { logger.WithContext(ctx).Info("[ActivateOrderLogic] 订单状态不是已支付,跳过", logger.Field("order_no", payload.OrderNo)) return nil } // 数据库查询失败,应该重试 logger.WithContext(ctx).Error("[ActivateOrderLogic] 查询订单失败,将重试", logger.Field("order_no", payload.OrderNo), logger.Field("error", err.Error())) return err } // Idempotency: if order is already finished, skip processing if orderInfo == nil { return nil } logger.WithContext(ctx).Info("[ActivateOrderLogic] 订单验证通过,开始处理", logger.Field("order_no", orderInfo.OrderNo), logger.Field("order_type", orderInfo.Type), logger.Field("user_id", orderInfo.UserId)) if err = l.processOrderByType(ctx, orderInfo, payload.IAPExpireAt); err != nil { logger.WithContext(ctx).Error("[ActivateOrderLogic] 处理订单失败,将重试", logger.Field("order_no", orderInfo.OrderNo), logger.Field("order_type", orderInfo.Type), logger.Field("error", err.Error())) return err // 返回 err 允许 asynq 重试 } l.finalizeCouponAndOrder(ctx, orderInfo) logger.WithContext(ctx).Info("[ActivateOrderLogic] 订单激活成功", logger.Field("order_no", orderInfo.OrderNo), logger.Field("order_type", orderInfo.Type), logger.Field("user_id", orderInfo.UserId)) return nil } // parsePayload unMarshals the task payload into a structured format func (l *ActivateOrderLogic) parsePayload(ctx context.Context, payload []byte) (*queueTypes.ForthwithActivateOrderPayload, error) { var p queueTypes.ForthwithActivateOrderPayload if err := json.Unmarshal(payload, &p); err != nil { logger.WithContext(ctx).Error("[ActivateOrderLogic] Unmarshal payload failed", logger.Field("error", err.Error()), logger.Field("payload", string(payload)), ) return nil, err } return &p, nil } // validateAndGetOrder retrieves an order by order number and validates its status // Returns error if order is not found or not in paid status func (l *ActivateOrderLogic) validateAndGetOrder(ctx context.Context, orderNo string) (*order.Order, error) { orderInfo, err := l.svc.OrderModel.FindOneByOrderNo(ctx, orderNo) if err != nil { logger.WithContext(ctx).Error("Find order failed", logger.Field("error", err.Error()), logger.Field("order_no", orderNo), ) return nil, err } // Idempotency check: if order is already finished, return success if orderInfo.Status == OrderStatusFinished { logger.WithContext(ctx).Info("Order already finished, skip processing", logger.Field("order_no", orderInfo.OrderNo), ) return nil, nil } if orderInfo.Status != OrderStatusPaid { logger.WithContext(ctx).Error("Order status error", logger.Field("order_no", orderInfo.OrderNo), logger.Field("status", orderInfo.Status), ) return nil, ErrInvalidOrderStatus } return orderInfo, nil } // processOrderByType routes order processing based on the order type func (l *ActivateOrderLogic) processOrderByType(ctx context.Context, orderInfo *order.Order, iapExpireAt int64) error { switch orderInfo.Type { case OrderTypeSubscribe: return l.NewPurchase(ctx, orderInfo) case OrderTypeRenewal: return l.Renewal(ctx, orderInfo, iapExpireAt) case OrderTypeResetTraffic: return l.ResetTraffic(ctx, orderInfo) case OrderTypeRecharge: return l.Recharge(ctx, orderInfo) case OrderTypeRedemption: return l.RedemptionActivate(ctx, orderInfo) default: logger.WithContext(ctx).Error("Order type is invalid", logger.Field("type", orderInfo.Type)) return ErrInvalidOrderType } } // finalizeCouponAndOrder handles post-processing tasks including coupon updates // and order status finalization func (l *ActivateOrderLogic) finalizeCouponAndOrder(ctx context.Context, orderInfo *order.Order) { // Update coupon if exists (non-critical, logged but not blocking) if orderInfo.Coupon != "" { if err := l.svc.CouponModel.UpdateCount(ctx, orderInfo.Coupon); err != nil { logger.WithContext(ctx).Error("Update coupon status failed", logger.Field("error", err.Error()), logger.Field("coupon", orderInfo.Coupon), ) } } // Update order status using state-guarded UpdateOrderStatus to prevent double finalization if err := l.svc.OrderModel.UpdateOrderStatus(ctx, orderInfo.OrderNo, OrderStatusFinished); err != nil { logger.WithContext(ctx).Error("Update order status failed", logger.Field("error", err.Error()), logger.Field("order_no", orderInfo.OrderNo), ) } orderInfo.Status = OrderStatusFinished } // NewPurchase handles new subscription purchase including user creation, // subscription setup, commission processing, cache updates, and notifications func (l *ActivateOrderLogic) NewPurchase(ctx context.Context, orderInfo *order.Order) error { userInfo, err := l.getUserOrCreate(ctx, orderInfo) if err != nil { return err } sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId) if err != nil { return err } // check new user only restriction at activation to prevent concurrent bypass if orderInfo.Type == OrderTypeSubscribe && sub.Discount != "" { var dis []internaltypes.SubscribeDiscount if jsonErr := json.Unmarshal([]byte(sub.Discount), &dis); jsonErr == nil { newUserOnly := isNewUserOnlyForQuantity(dis, orderInfo.Quantity) if newUserOnly { if time.Since(userInfo.CreatedAt) > 24*time.Hour { return fmt.Errorf("new user only: user %d is not a new user", userInfo.Id) } var historyCount int64 if e := l.svc.DB.Model(&order.Order{}). Where("user_id = ? AND subscribe_id = ? AND type = 1 AND status = ? AND order_no != ?", orderInfo.UserId, orderInfo.SubscribeId, OrderStatusFinished, orderInfo.OrderNo). Count(&historyCount).Error; e != nil { return fmt.Errorf("new user only: check history error: %w", e) } if historyCount >= 1 { return fmt.Errorf("new user only: user %d already activated subscribe %d", userInfo.Id, orderInfo.SubscribeId) } } } } var userSub *user.Subscribe // 单订阅模式下,优先兜底为“续费语义”:延长已购订阅,避免并发下重复创建 user_subscribe if l.svc.Config.Subscribe.SingleModel { anchorSub, anchorErr := l.svc.UserModel.FindSingleModeAnchorSubscribe(ctx, orderInfo.UserId) switch { case anchorErr == nil && anchorSub != nil: if anchorSub.SubscribeId == orderInfo.SubscribeId { if orderInfo.ParentId == 0 && anchorSub.OrderId > 0 && anchorSub.OrderId != orderInfo.Id { if patchErr := l.patchOrderParentID(ctx, orderInfo.Id, anchorSub.OrderId); patchErr != nil { logger.WithContext(ctx).Error("Patch order parent_id failed", logger.Field("error", patchErr.Error()), logger.Field("order_no", orderInfo.OrderNo), ) } else { orderInfo.ParentId = anchorSub.OrderId } } if renewErr := l.updateSubscriptionForRenewal(ctx, anchorSub, sub, orderInfo); renewErr != nil { logger.WithContext(ctx).Error("Single mode renewal fallback failed", logger.Field("error", renewErr.Error()), logger.Field("anchor_user_subscribe_id", anchorSub.Id), logger.Field("order_no", orderInfo.OrderNo), ) } else { userSub = anchorSub logger.WithContext(ctx).Infow("Single mode purchase routed to renewal in activation", logger.Field("mode", "single"), logger.Field("route", "purchase_to_renewal"), logger.Field("anchor_user_subscribe_id", anchorSub.Id), logger.Field("order_no", orderInfo.OrderNo), ) } } else { logger.WithContext(ctx).Errorw("Single mode anchor subscribe mismatch in activation", logger.Field("order_no", orderInfo.OrderNo), logger.Field("order_subscribe_id", orderInfo.SubscribeId), logger.Field("anchor_subscribe_id", anchorSub.SubscribeId), ) } case errors.Is(anchorErr, gorm.ErrRecordNotFound): case anchorErr != nil: logger.WithContext(ctx).Error("Find single mode anchor subscribe failed", logger.Field("error", anchorErr.Error()), logger.Field("order_no", orderInfo.OrderNo), ) } // 如果没有合并已购订阅,再尝试合并赠送订阅(order_id=0) if userSub == nil { giftSub, giftErr := l.findGiftSubscription(ctx, orderInfo.UserId, orderInfo.SubscribeId) if giftErr == nil && giftSub != nil { // 在赠送订阅上延长时间,保持 token 不变 userSub, err = l.extendGiftSubscription(ctx, giftSub, orderInfo, sub) if err != nil { logger.WithContext(ctx).Error("Extend gift subscription failed", logger.Field("error", err.Error()), logger.Field("gift_subscribe_id", giftSub.Id), ) // 合并失败时回退到创建新订阅 userSub = nil } } } } // 如果没有合并赠送订阅,则正常创建新订阅 if userSub == nil { userSub, err = l.createUserSubscription(ctx, orderInfo, sub) if err != nil { return err } } // Trigger user group recalculation (runs in background) l.triggerUserGroupRecalculation(ctx, userInfo.Id) // Handle commission in separate goroutine to avoid blocking go l.handleCommission(context.Background(), userInfo, orderInfo) // Clear cache l.clearServerCache(ctx, sub) logger.WithContext(ctx).Info("Insert user subscribe success") return nil } // getUserOrCreate retrieves an existing user or creates a new guest user based on order details func (l *ActivateOrderLogic) getUserOrCreate(ctx context.Context, orderInfo *order.Order) (*user.User, error) { if orderInfo.UserId != 0 { return l.getExistingUser(ctx, orderInfo.UserId) } return l.createGuestUser(ctx, orderInfo) } // getExistingUser retrieves user information by user ID func (l *ActivateOrderLogic) getExistingUser(ctx context.Context, userId int64) (*user.User, error) { userInfo, err := l.svc.UserModel.FindOne(ctx, userId) if err != nil { logger.WithContext(ctx).Error("Find user failed", logger.Field("error", err.Error()), logger.Field("user_id", userId), ) return nil, err } return userInfo, nil } // createGuestUser creates a new user account for guest orders using temporary order information // stored in Redis cache func (l *ActivateOrderLogic) createGuestUser(ctx context.Context, orderInfo *order.Order) (*user.User, error) { tempOrder, err := l.getTempOrderInfo(ctx, orderInfo.OrderNo) if err != nil { return nil, err } userInfo := &user.User{ Password: tool.EncodePassWord(tempOrder.Password), Algo: "default", AuthMethods: []user.AuthMethods{ { AuthType: tempOrder.AuthType, AuthIdentifier: tempOrder.Identifier, }, }, } err = l.svc.UserModel.Transaction(ctx, func(tx *gorm.DB) error { if err := tx.Save(userInfo).Error; err != nil { return err } userInfo.ReferCode = uuidx.UserInviteCode(userInfo.Id) if err := tx.Model(&user.User{}).Where("id = ?", userInfo.Id).Update("refer_code", userInfo.ReferCode).Error; err != nil { return err } orderInfo.UserId = userInfo.Id return tx.Model(&order.Order{}).Where("order_no = ?", orderInfo.OrderNo).Update("user_id", userInfo.Id).Error }) if err != nil { logger.WithContext(ctx).Error("Create user failed", logger.Field("error", err.Error())) return nil, err } // Handle referrer relationship l.handleReferrer(ctx, userInfo, tempOrder.InviteCode) logger.WithContext(ctx).Info("Create guest user success", logger.Field("user_id", userInfo.Id), logger.Field("identifier", tempOrder.Identifier), logger.Field("auth_type", tempOrder.AuthType), ) return userInfo, nil } // getTempOrderInfo retrieves temporary order information from Redis cache func (l *ActivateOrderLogic) getTempOrderInfo(ctx context.Context, orderNo string) (*constant.TemporaryOrderInfo, error) { cacheKey := fmt.Sprintf(constant.TempOrderCacheKey, orderNo) data, err := l.svc.Redis.Get(ctx, cacheKey).Result() if err != nil { logger.WithContext(ctx).Error("Get temp order cache failed", logger.Field("error", err.Error()), logger.Field("cache_key", cacheKey), ) return nil, err } var tempOrder constant.TemporaryOrderInfo if err = tempOrder.Unmarshal([]byte(data)); err != nil { logger.WithContext(ctx).Error("Unmarshal temp order cache failed", logger.Field("error", err.Error()), logger.Field("cache_key", cacheKey), logger.Field("data", data), ) return nil, err } return &tempOrder, nil } // handleReferrer establishes referrer relationship if an invite code is provided func (l *ActivateOrderLogic) handleReferrer(ctx context.Context, userInfo *user.User, inviteCode string) { if inviteCode == "" { return } referer, err := l.svc.UserModel.FindOneByReferCode(ctx, inviteCode) if err != nil { logger.WithContext(ctx).Error("Find referer failed", logger.Field("error", err.Error()), logger.Field("refer_code", inviteCode), ) return } userInfo.RefererId = referer.Id if err = l.svc.UserModel.Update(ctx, userInfo); err != nil { logger.WithContext(ctx).Error("Update user referer failed", logger.Field("error", err.Error()), logger.Field("user_id", userInfo.Id), ) } } // getSubscribeInfo retrieves subscription plan details by subscription ID func (l *ActivateOrderLogic) getSubscribeInfo(ctx context.Context, subscribeId int64) (*subscribe.Subscribe, error) { sub, err := l.svc.SubscribeModel.FindOne(ctx, subscribeId) if err != nil { logger.WithContext(ctx).Error("Find subscribe failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", subscribeId), ) return nil, err } return sub, nil } // createUserSubscription creates a new user subscription record based on order and subscription plan details func (l *ActivateOrderLogic) createUserSubscription(ctx context.Context, orderInfo *order.Order, sub *subscribe.Subscribe) (*user.Subscribe, error) { now := time.Now() // Determine subscription owner: use SubscriptionUserId if set, otherwise fall back to UserId subscriptionUserId := orderInfo.UserId if orderInfo.SubscriptionUserId > 0 { subscriptionUserId = orderInfo.SubscriptionUserId } userSub := &user.Subscribe{ UserId: subscriptionUserId, OrderId: orderInfo.Id, SubscribeId: orderInfo.SubscribeId, StartTime: now, ExpireTime: tool.AddTime(sub.UnitTime, orderInfo.Quantity, now), Traffic: sub.Traffic, Download: 0, Upload: 0, Token: uuidx.SubscribeToken(orderInfo.OrderNo), UUID: uuid.New().String(), Status: 1, } // Check quota limit before creating subscription (final safeguard) if sub.Quota > 0 { var count int64 if err := l.svc.DB.Model(&user.Subscribe{}).Where("user_id = ? AND subscribe_id = ?", orderInfo.UserId, orderInfo.SubscribeId).Count(&count).Error; err != nil { logger.WithContext(ctx).Error("Count user subscribe failed", logger.Field("error", err.Error())) return nil, err } if count >= sub.Quota { logger.WithContext(ctx).Infow("Subscribe quota limit exceeded", logger.Field("user_id", orderInfo.UserId), logger.Field("subscribe_id", orderInfo.SubscribeId), logger.Field("quota", sub.Quota), logger.Field("current_count", count), ) return nil, fmt.Errorf("subscribe quota limit exceeded") } } if err := l.svc.UserModel.InsertSubscribe(ctx, userSub); err != nil { logger.WithContext(ctx).Error("Insert user subscribe failed", logger.Field("error", err.Error())) return nil, err } return userSub, nil } func (l *ActivateOrderLogic) patchOrderParentID(ctx context.Context, orderID int64, parentID int64) error { return l.svc.DB.WithContext(ctx).Model(&order.Order{}).Where("id = ? AND (parent_id = 0 OR parent_id IS NULL)", orderID).Update("parent_id", parentID).Error } // findGiftSubscription 查找用户指定套餐的赠送订阅(order_id=0),包括已过期的 // 返回找到的赠送订阅记录,如果没有则返回 nil func (l *ActivateOrderLogic) findGiftSubscription(ctx context.Context, userId int64, subscribeId int64) (*user.Subscribe, error) { // 直接查询数据库,查找 order_id=0(赠送)且同套餐的订阅,不限制过期状态 var giftSub user.Subscribe err := l.svc.DB.Model(&user.Subscribe{}). Where("user_id = ? AND order_id = 0 AND subscribe_id = ?", userId, subscribeId). Order("created_at DESC"). First(&giftSub).Error if err != nil { return nil, err } return &giftSub, nil } // extendGiftSubscription 在现有赠送订阅上延长到期时间,保持 token 不变 // 将购买的天数叠加到赠送订阅的到期时间上,并更新 order_id 为新订单 ID func (l *ActivateOrderLogic) extendGiftSubscription(ctx context.Context, giftSub *user.Subscribe, orderInfo *order.Order, sub *subscribe.Subscribe) (*user.Subscribe, error) { now := time.Now() // 计算基准时间:取赠送订阅到期时间和当前时间的较大值 baseTime := giftSub.ExpireTime if baseTime.Before(now) { baseTime = now } // 在基准时间上增加购买的天数 newExpireTime := tool.AddTime(sub.UnitTime, orderInfo.Quantity, baseTime) // 更新赠送订阅的信息 giftSub.OrderId = orderInfo.Id giftSub.ExpireTime = newExpireTime giftSub.Status = 1 if err := l.svc.UserModel.UpdateSubscribe(ctx, giftSub); err != nil { logger.WithContext(ctx).Error("Update gift subscription failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", giftSub.Id), ) return nil, err } logger.WithContext(ctx).Info("Extended gift subscription successfully", logger.Field("subscribe_id", giftSub.Id), logger.Field("old_expire_time", baseTime), logger.Field("new_expire_time", newExpireTime), logger.Field("order_id", orderInfo.Id), ) return giftSub, nil } // handleCommission processes referral commission for the referrer if applicable. // This runs asynchronously to avoid blocking the main order processing flow. func (l *ActivateOrderLogic) handleCommission(ctx context.Context, userInfo *user.User, orderInfo *order.Order) { if !l.shouldProcessCommission(userInfo, orderInfo.IsNew) { // 普通用户路径(佣金比例=0):只有首单才双方赠N天 if orderInfo.IsNew { l.grantGiftDaysToBothParties(ctx, userInfo, orderInfo.OrderNo) } return } // 渠道路径(佣金比例>0):被邀请人首单赠N天 if orderInfo.IsNew { _ = l.grantGiftDays(ctx, userInfo, int(l.svc.Config.Invite.GiftDays), orderInfo.OrderNo, "邀请赠送") } referer, err := l.svc.UserModel.FindOne(ctx, userInfo.RefererId) if err != nil { logger.WithContext(ctx).Error("Find referer failed", logger.Field("error", err.Error()), logger.Field("referer_id", userInfo.RefererId), ) return } var referralPercentage uint8 if referer.ReferralPercentage != 0 { referralPercentage = referer.ReferralPercentage } else { referralPercentage = uint8(l.svc.Config.Invite.ReferralPercentage) } // Order commission calculation: (Order Amount - Order Fee) * Referral Percentage amount := l.calculateCommission(orderInfo.Amount-orderInfo.FeeAmount, referralPercentage) // Use transaction for commission updates err = l.svc.DB.Transaction(func(tx *gorm.DB) error { // Idempotency: check if commission log already exists for this order var existingLogCount int64 if e := tx.Model(&log.SystemLog{}). Where("type = ? AND object_id = ? AND content LIKE ?", log.TypeCommission.Uint8(), referer.Id, fmt.Sprintf("%%\"%s\"%%", orderInfo.OrderNo)). Count(&existingLogCount).Error; e != nil { return e } if existingLogCount > 0 { logger.WithContext(ctx).Info("Commission already processed, skip", logger.Field("order_no", orderInfo.OrderNo), logger.Field("referer_id", referer.Id), ) return nil } // Atomic increment to prevent lost updates under concurrency if e := tx.Model(&user.User{}).Where("id = ?", referer.Id). UpdateColumn("commission", gorm.Expr("commission + ?", amount)).Error; e != nil { return e } referer.Commission += amount var commissionType uint16 switch orderInfo.Type { case OrderTypeSubscribe: commissionType = log.CommissionTypePurchase case OrderTypeRenewal: commissionType = log.CommissionTypeRenewal default: commissionType = log.CommissionTypePurchase } commissionLog := &log.Commission{ Type: commissionType, Amount: amount, OrderNo: orderInfo.OrderNo, Timestamp: orderInfo.CreatedAt.UnixMilli(), } content, _ := commissionLog.Marshal() return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{ Type: log.TypeCommission.Uint8(), Date: time.Now().Format("2006-01-02"), ObjectID: referer.Id, Content: string(content), }).Error }) if err != nil { logger.WithContext(ctx).Error("Update referer commission failed", logger.Field("error", err.Error())) return } // Update cache if err = l.svc.UserModel.UpdateUserCache(ctx, referer); err != nil { logger.WithContext(ctx).Error("Update referer cache failed", logger.Field("error", err.Error()), logger.Field("user_id", referer.Id), ) } } func (l *ActivateOrderLogic) grantGiftDaysToBothParties(ctx context.Context, referee *user.User, orderNo string) { giftDays := l.svc.Config.Invite.GiftDays if giftDays <= 0 || referee == nil || referee.Id == 0 || referee.RefererId == 0 { return } _ = l.grantGiftDays(ctx, referee, int(giftDays), orderNo, "邀请赠送") if referee.RefererId == 0 { return } referer, err := l.svc.UserModel.FindOne(ctx, referee.RefererId) if err != nil || referer == nil { return } _ = l.grantGiftDays(ctx, referer, int(giftDays), orderNo, "邀请赠送") } func (l *ActivateOrderLogic) grantGiftDays(ctx context.Context, u *user.User, days int, orderNo string, remark string) error { if u == nil || days <= 0 { return nil } // Idempotency: check if gift log already exists for this order + user var existingLogCount int64 if e := l.svc.DB.Model(&log.SystemLog{}). Where("type = ? AND object_id = ? AND content LIKE ?", log.TypeGift.Uint8(), u.Id, fmt.Sprintf("%%\"%s\"%%", orderNo)). Count(&existingLogCount).Error; e != nil { return e } if existingLogCount > 0 { logger.WithContext(ctx).Info("Gift days already granted, skip", logger.Field("order_no", orderNo), logger.Field("user_id", u.Id), ) return nil } activeSubscribe, err := l.svc.UserModel.FindActiveSubscribe(ctx, u.Id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil } return err } activeSubscribe.ExpireTime = activeSubscribe.ExpireTime.Add(time.Duration(days) * 24 * time.Hour) err = l.svc.UserModel.UpdateSubscribe(ctx, activeSubscribe) if err != nil { return err } // Insert system log giftLog := &log.Gift{ Type: log.GiftTypeIncrease, OrderNo: orderNo, SubscribeId: activeSubscribe.Id, Amount: int64(days), Balance: u.Balance, Remark: remark, Timestamp: time.Now().UnixMilli(), } content, _ := giftLog.Marshal() return l.svc.LogModel.Insert(ctx, &log.SystemLog{ Type: log.TypeGift.Uint8(), Date: time.Now().Format("2006-01-02"), ObjectID: u.Id, Content: string(content), }) } // shouldProcessCommission determines if commission should be processed based on // referrer existence, commission settings, and order type func (l *ActivateOrderLogic) shouldProcessCommission(userInfo *user.User, isFirstPurchase bool) bool { if userInfo == nil || userInfo.RefererId == 0 { return false } referer, err := l.svc.UserModel.FindOne(context.Background(), userInfo.RefererId) if err != nil { logger.Errorw("Find referer failed", logger.Field("error", err.Error()), logger.Field("referer_id", userInfo.RefererId)) return false } if referer == nil { return false } // use referer's custom settings if set if referer.ReferralPercentage > 0 { if referer.OnlyFirstPurchase != nil && *referer.OnlyFirstPurchase && !isFirstPurchase { return false } return true } // use global settings if l.svc.Config.Invite.ReferralPercentage == 0 { return false } if l.svc.Config.Invite.OnlyFirstPurchase && !isFirstPurchase { return false } return true } // calculateCommission computes the commission amount based on order price and referral percentage func (l *ActivateOrderLogic) calculateCommission(price int64, percentage uint8) int64 { return int64(float64(price) * (float64(percentage) / 100)) } // clearServerCache clears user list cache for all servers associated with the subscription func (l *ActivateOrderLogic) clearServerCache(ctx context.Context, sub *subscribe.Subscribe) { if err := l.svc.SubscribeModel.ClearCache(ctx, sub.Id); err != nil { logger.WithContext(ctx).Error("[Order Queue] Clear subscribe cache failed", logger.Field("error", err.Error())) } } // triggerUserGroupRecalculation triggers user group recalculation after subscription changes // This runs asynchronously in background to avoid blocking the main order processing flow func (l *ActivateOrderLogic) triggerUserGroupRecalculation(ctx context.Context, userId int64) { go func() { // Use a new context with timeout for group recalculation ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Check if group management is enabled var groupEnabled string err := l.svc.DB.Table("system"). Where("`category` = ? AND `key` = ?", "group", "enabled"). Select("value"). Scan(&groupEnabled).Error if err != nil || groupEnabled != "true" && groupEnabled != "1" { logger.Debugf("[Group Trigger] Group management not enabled, skipping recalculation") return } // Get the configured grouping mode var groupMode string err = l.svc.DB.Table("system"). Where("`category` = ? AND `key` = ?", "group", "mode"). Select("value"). Scan(&groupMode).Error if err != nil { logger.Errorw("[Group Trigger] Failed to get group mode", logger.Field("error", err.Error())) return } // Validate group mode if groupMode != "average" && groupMode != "subscribe" && groupMode != "traffic" { logger.Debugf("[Group Trigger] Invalid group mode (current: %s), skipping", groupMode) return } // Trigger group recalculation with the configured mode logic := group.NewRecalculateGroupLogic(ctx, l.svc) req := &types.RecalculateGroupRequest{ Mode: groupMode, } if err := logic.RecalculateGroup(req); err != nil { logger.Errorw("[Group Trigger] Failed to recalculate user group", logger.Field("user_id", userId), logger.Field("error", err.Error()), ) return } logger.Infow("[Group Trigger] Successfully recalculated user group", logger.Field("user_id", userId), logger.Field("mode", groupMode), ) }() } // Renewal handles subscription renewal including subscription extension, // traffic reset (if configured), commission processing, and notifications func (l *ActivateOrderLogic) Renewal(ctx context.Context, orderInfo *order.Order, iapExpireAt int64) error { userInfo, err := l.getExistingUser(ctx, orderInfo.UserId) if err != nil { return err } userSub, err := l.getUserSubscription(ctx, orderInfo.SubscribeToken) if err != nil { return err } sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId) if err != nil { return err } if iapExpireAt > 0 { // Apple IAP 续费:按“累计加时”语义处理,避免连续购买时仅覆盖到期时间。 if err = l.updateSubscriptionWithIAPExpire(ctx, userSub, sub, orderInfo, iapExpireAt); err != nil { return err } } else { if err = l.updateSubscriptionForRenewal(ctx, userSub, sub, orderInfo); err != nil { return err } } // Clear user subscription cache err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub) if err != nil { logger.WithContext(ctx).Error("Clear user subscribe cache failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", userSub.Id), logger.Field("user_id", userInfo.Id), ) } // Clear cache l.clearServerCache(ctx, sub) // Handle commission go l.handleCommission(context.Background(), userInfo, orderInfo) return nil } // getUserSubscription retrieves user subscription by token func (l *ActivateOrderLogic) getUserSubscription(ctx context.Context, token string) (*user.Subscribe, error) { userSub, err := l.svc.UserModel.FindOneSubscribeByToken(ctx, token) if err != nil { logger.WithContext(ctx).Error("Find user subscribe failed", logger.Field("error", err.Error())) return nil, err } return userSub, nil } // updateSubscriptionWithIAPExpire 用于 Apple IAP 续费:按累计加时语义更新到期时间。 func (l *ActivateOrderLogic) updateSubscriptionWithIAPExpire(ctx context.Context, userSub *user.Subscribe, sub *subscribe.Subscribe, orderInfo *order.Order, iapExpireAt int64) error { now := time.Now() baseTime := userSub.ExpireTime if baseTime.Before(now) { baseTime = now } newExpire := tool.AddTime(sub.UnitTime, orderInfo.Quantity, baseTime) if iapExpireAt > 0 { appleExpire := time.Unix(iapExpireAt, 0) if appleExpire.After(newExpire) { newExpire = appleExpire } } today := now.Day() resetDay := newExpire.Day() if sub.RenewalReset != nil && *sub.RenewalReset || today == resetDay { userSub.Download = 0 userSub.Upload = 0 } if userSub.FinishedAt != nil { userSub.FinishedAt = nil } userSub.ExpireTime = newExpire userSub.Status = 1 if err := l.svc.UserModel.UpdateSubscribe(ctx, userSub); err != nil { logger.WithContext(ctx).Error("Update user subscribe (IAP) failed", logger.Field("error", err.Error())) return err } return nil } // updateSubscriptionForRenewal updates subscription details for renewal including // expiration time extension and traffic reset if configured func (l *ActivateOrderLogic) updateSubscriptionForRenewal(ctx context.Context, userSub *user.Subscribe, sub *subscribe.Subscribe, orderInfo *order.Order) error { now := time.Now() if userSub.ExpireTime.Before(now) { userSub.ExpireTime = now } today := time.Now().Day() resetDay := userSub.ExpireTime.Day() // Reset traffic if enabled if (sub.RenewalReset != nil && *sub.RenewalReset) || today == resetDay { userSub.Download = 0 userSub.Upload = 0 } if userSub.FinishedAt != nil { if userSub.FinishedAt.Before(now) && today > resetDay { // reset user traffic if finished at is before now userSub.Download = 0 userSub.Upload = 0 } userSub.FinishedAt = nil } userSub.ExpireTime = tool.AddTime(sub.UnitTime, orderInfo.Quantity, userSub.ExpireTime) userSub.Status = 1 // 续费时重置过期流量字段 userSub.ExpiredDownload = 0 userSub.ExpiredUpload = 0 if err := l.svc.UserModel.UpdateSubscribe(ctx, userSub); err != nil { logger.WithContext(ctx).Error("Update user subscribe failed", logger.Field("error", err.Error())) return err } return nil } // ResetTraffic handles traffic quota reset for existing subscriptions func (l *ActivateOrderLogic) ResetTraffic(ctx context.Context, orderInfo *order.Order) error { userInfo, err := l.getExistingUser(ctx, orderInfo.UserId) if err != nil { return err } userSub, err := l.getUserSubscription(ctx, orderInfo.SubscribeToken) if err != nil { return err } // Reset traffic userSub.Download = 0 userSub.Upload = 0 userSub.ExpiredDownload = 0 userSub.ExpiredUpload = 0 userSub.Status = 1 if err := l.svc.UserModel.UpdateSubscribe(ctx, userSub); err != nil { logger.WithContext(ctx).Error("Update user subscribe failed", logger.Field("error", err.Error())) return err } sub, err := l.getSubscribeInfo(ctx, userSub.SubscribeId) if err != nil { return err } // Clear user subscription cache err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub) if err != nil { logger.WithContext(ctx).Error("Clear user subscribe cache failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", userSub.Id), logger.Field("user_id", userInfo.Id), ) } // Clear cache l.clearServerCache(ctx, sub) // insert reset traffic log resetLog := &log.ResetSubscribe{ Type: log.ResetSubscribeTypePaid, UserId: userInfo.Id, OrderNo: orderInfo.OrderNo, Timestamp: time.Now().UnixMilli(), } content, _ := resetLog.Marshal() if err = l.svc.LogModel.Insert(ctx, &log.SystemLog{ Type: log.TypeResetSubscribe.Uint8(), Date: time.Now().Format(time.DateOnly), ObjectID: userSub.Id, Content: string(content), }); err != nil { logger.WithContext(ctx).Error("[Order Queue]Insert reset subscribe log failed", logger.Field("error", err.Error())) } return nil } // Recharge handles balance recharge orders including balance updates, // transaction logging, and notifications func (l *ActivateOrderLogic) Recharge(ctx context.Context, orderInfo *order.Order) error { userInfo, err := l.getExistingUser(ctx, orderInfo.UserId) if err != nil { return err } // Update balance in transaction err = l.svc.DB.Transaction(func(tx *gorm.DB) error { // Idempotency: check if balance log already exists for this order var existingLogCount int64 if e := tx.Model(&log.SystemLog{}). Where("type = ? AND object_id = ? AND content LIKE ?", log.TypeBalance.Uint8(), userInfo.Id, fmt.Sprintf("%%\"%s\"%%", orderInfo.OrderNo)). Count(&existingLogCount).Error; e != nil { return e } if existingLogCount > 0 { logger.WithContext(ctx).Info("Recharge already processed, skip", logger.Field("order_no", orderInfo.OrderNo), logger.Field("user_id", userInfo.Id), ) return nil } // Atomic increment to prevent lost updates if e := tx.Model(&user.User{}).Where("id = ?", userInfo.Id). UpdateColumn("balance", gorm.Expr("balance + ?", orderInfo.Price)).Error; e != nil { return e } userInfo.Balance += orderInfo.Price balanceLog := &log.Balance{ Amount: orderInfo.Price, Type: log.BalanceTypeRecharge, OrderNo: orderInfo.OrderNo, Balance: userInfo.Balance, Timestamp: time.Now().UnixMilli(), } content, _ := balanceLog.Marshal() return tx.Model(&log.SystemLog{}).Create(&log.SystemLog{ Type: log.TypeBalance.Uint8(), Date: time.Now().Format("2006-01-02"), ObjectID: userInfo.Id, Content: string(content), }).Error }) if err != nil { logger.WithContext(ctx).Error("[Recharge] Database transaction failed", logger.Field("error", err.Error())) return err } // clear user cache if err = l.svc.UserModel.UpdateUserCache(ctx, userInfo); err != nil { logger.WithContext(ctx).Error("[Recharge] Update user cache failed", logger.Field("error", err.Error())) return err } return nil } // RedemptionActivate handles redemption code activation including subscription creation, // redemption record creation, used count update, cache clearing, and notifications func (l *ActivateOrderLogic) RedemptionActivate(ctx context.Context, orderInfo *order.Order) error { // 1. 获取用户信息 userInfo, err := l.getExistingUser(ctx, orderInfo.UserId) if err != nil { return err } // 2. 获取套餐信息 sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId) if err != nil { return err } // 3. 从Redis获取兑换码信息 cacheKey := fmt.Sprintf("redemption_order:%s", orderInfo.OrderNo) data, err := l.svc.Redis.Get(ctx, cacheKey).Result() if err != nil { logger.WithContext(ctx).Error("Get redemption cache failed", logger.Field("error", err.Error()), logger.Field("cache_key", cacheKey), ) return err } var redemptionData struct { RedemptionCodeId int64 `json:"redemption_code_id"` UnitTime string `json:"unit_time"` Quantity int64 `json:"quantity"` } if err = json.Unmarshal([]byte(data), &redemptionData); err != nil { logger.WithContext(ctx).Error("Unmarshal redemption cache failed", logger.Field("error", err.Error())) return err } // 4. 幂等性检查:查询是否已有兑换记录 existingRecords, err := l.svc.RedemptionRecordModel.FindByUserId(ctx, userInfo.Id) if err == nil { for _, record := range existingRecords { if record.RedemptionCodeId == redemptionData.RedemptionCodeId { logger.WithContext(ctx).Info("Redemption already processed, skip", logger.Field("order_no", orderInfo.OrderNo), logger.Field("user_id", userInfo.Id), logger.Field("redemption_code_id", redemptionData.RedemptionCodeId), ) // 已处理过,直接返回成功(幂等性保护) return nil } } } // 5. 查找用户现有订阅 var existingSubscribe *user.Subscribe userSubscribes, err := l.svc.UserModel.QueryUserSubscribe(ctx, userInfo.Id, 0, 1) if err == nil { for _, us := range userSubscribes { if us.SubscribeId == orderInfo.SubscribeId { existingSubscribe = &user.Subscribe{ Id: us.Id, UserId: us.UserId, SubscribeId: us.SubscribeId, ExpireTime: us.ExpireTime, Status: us.Status, Traffic: us.Traffic, Download: us.Download, Upload: us.Upload, NodeGroupId: us.NodeGroupId, } break } } } now := time.Now() // 6. 使用事务保护核心操作 err = l.svc.DB.Transaction(func(tx *gorm.DB) error { // 6.1 创建或更新订阅 if existingSubscribe != nil { // 续期现有订阅 var newExpireTime time.Time if existingSubscribe.ExpireTime.After(now) { newExpireTime = existingSubscribe.ExpireTime } else { newExpireTime = now } // 计算新的过期时间 newExpireTime = tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, newExpireTime) // 更新订阅 existingSubscribe.OrderId = orderInfo.Id // 设置OrderId用于追溯 existingSubscribe.ExpireTime = newExpireTime existingSubscribe.Status = 1 // 重置流量(如果套餐有流量限制) if sub.Traffic > 0 { existingSubscribe.Traffic = sub.Traffic * 1024 * 1024 * 1024 existingSubscribe.Download = 0 existingSubscribe.Upload = 0 } err = l.svc.UserModel.UpdateSubscribe(ctx, existingSubscribe, tx) if err != nil { logger.WithContext(ctx).Error("Update subscribe failed", logger.Field("error", err.Error())) return err } logger.WithContext(ctx).Info("Extended existing subscription", logger.Field("subscribe_id", existingSubscribe.Id), logger.Field("new_expire_time", newExpireTime), ) } else { // 检查配额限制 if sub.Quota > 0 { var count int64 if err := tx.Model(&user.Subscribe{}). Where("user_id = ? AND subscribe_id = ?", userInfo.Id, orderInfo.SubscribeId). Count(&count).Error; err != nil { logger.WithContext(ctx).Error("Count user subscribe failed", logger.Field("error", err.Error())) return err } if count >= sub.Quota { logger.WithContext(ctx).Infow("Subscribe quota limit exceeded", logger.Field("user_id", userInfo.Id), logger.Field("subscribe_id", orderInfo.SubscribeId), logger.Field("quota", sub.Quota), logger.Field("current_count", count), ) return fmt.Errorf("subscribe quota limit exceeded") } } // 创建新订阅 expireTime := tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, now) traffic := int64(0) if sub.Traffic > 0 { traffic = sub.Traffic * 1024 * 1024 * 1024 } newSubscribe := &user.Subscribe{ UserId: userInfo.Id, OrderId: orderInfo.Id, SubscribeId: orderInfo.SubscribeId, StartTime: now, ExpireTime: expireTime, FinishedAt: nil, Traffic: traffic, Download: 0, Upload: 0, Token: uuidx.SubscribeToken(orderInfo.OrderNo), UUID: uuid.New().String(), Status: 1, NodeGroupId: sub.NodeGroupId, // Inherit node_group_id from subscription plan } err = l.svc.UserModel.InsertSubscribe(ctx, newSubscribe, tx) if err != nil { logger.WithContext(ctx).Error("Insert subscribe failed", logger.Field("error", err.Error())) return err } logger.WithContext(ctx).Info("Created new subscription", logger.Field("subscribe_id", newSubscribe.Id), logger.Field("expire_time", expireTime), ) } // 6.2 更新兑换码使用次数 err = l.svc.RedemptionCodeModel.IncrementUsedCount(ctx, redemptionData.RedemptionCodeId, tx) if err != nil { logger.WithContext(ctx).Error("Increment used count failed", logger.Field("error", err.Error())) return err } // 6.3 创建兑换记录 redemptionRecord := &redemption.RedemptionRecord{ RedemptionCodeId: redemptionData.RedemptionCodeId, UserId: userInfo.Id, SubscribeId: orderInfo.SubscribeId, UnitTime: redemptionData.UnitTime, Quantity: redemptionData.Quantity, RedeemedAt: now, CreatedAt: now, } err = l.svc.RedemptionRecordModel.Insert(ctx, redemptionRecord, tx) if err != nil { logger.WithContext(ctx).Error("Insert redemption record failed", logger.Field("error", err.Error())) return err } return nil }) if err != nil { logger.WithContext(ctx).Error("Redemption transaction failed", logger.Field("error", err.Error())) return err } // Trigger user group recalculation (runs in background) l.triggerUserGroupRecalculation(ctx, userInfo.Id) // 7. 清理缓存(关键步骤:让节点获取最新订阅) l.clearServerCache(ctx, sub) // 7.1 清理用户订阅缓存(确保用户端显示最新信息) if existingSubscribe != nil { err = l.svc.UserModel.ClearSubscribeCache(ctx, existingSubscribe) if err != nil { logger.WithContext(ctx).Error("Clear user subscribe cache failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", existingSubscribe.Id), logger.Field("user_id", userInfo.Id), ) } } // 8. 删除Redis临时数据 l.svc.Redis.Del(ctx, cacheKey) // 9. 发送通知(可选) // 可以复用现有的通知模板或创建新的兑换通知模板 // l.sendNotifications(ctx, orderInfo, userInfo, sub, existingSubscribe, telegram.RedemptionNotify) logger.WithContext(ctx).Info("Redemption activation success", logger.Field("order_no", orderInfo.OrderNo), logger.Field("user_id", userInfo.Id), logger.Field("subscribe_id", orderInfo.SubscribeId), ) return nil } // isNewUserOnlyForQuantity checks whether the matched discount tier has new_user_only enabled. func isNewUserOnlyForQuantity(discounts []internaltypes.SubscribeDiscount, inputQuantity int64) bool { for _, d := range discounts { if inputQuantity == d.Quantity { return d.NewUserOnly } } return false }