fix(redemption): enhance redemption code flow with transaction safety and idempotency

This commit addresses critical issues in the redemption code activation flow
to ensure data consistency, prevent duplicate redemptions, and improve user
experience.

Key improvements:

1. Transaction Safety (P0)
   - Wrap subscription creation, used count update, and record insertion in
     a single database transaction
   - Ensure atomicity: all operations succeed or all rollback
   - Prevent orphaned records and data inconsistencies

2. Idempotency Protection (P0)
   - Add redemption record check before processing to prevent duplicate
     operations on queue task retries
   - Maintain idempotency at multiple layers: interface, order, and record

3. Distributed Lock (P1)
   - Implement Redis-based distributed lock (10s timeout) to prevent
     concurrent duplicate redemptions
   - Lock key format: redemption_lock:{user_id}:{code}

4. IsNew Field Correction (P2)
   - Fix IsNew field to correctly determine first-time purchases using
     IsUserEligibleForNewOrder method
   - Ensure accurate statistics and future commission calculations

5. Quota Pre-check (P2)
   - Add quota validation at interface layer for immediate user feedback
   - Prevent "processing" status followed by eventual failure

6. Extended Cache TTL (P2)
   - Increase Redis cache expiration from 30 minutes to 2 hours
   - Ensure queue tasks can retrieve redemption data even with delays

7. Error Handling (P2)
   - Clean up Order records when Redis cache or queue enqueue fails
   - Prevent orphaned Order records in the database

8. Cache Clearing Optimization
   - Add user subscription cache clearing after activation
   - Ensure both node-side and user-side display latest subscription info

Technical details:
- Modified: internal/logic/public/redemption/redeemCodeLogic.go
- Modified: queue/logic/order/activateOrderLogic.go
- Modified: internal/model/redemption/default.go (transaction support)

Testing:
- All changes compiled successfully
- Comprehensive flow verification completed
- Ready for production deployment

BREAKING CHANGE: None
This commit is contained in:
EUForest 2026-02-09 01:00:47 +08:00
parent 7e08a07e29
commit 34372fe0b3
3 changed files with 366 additions and 179 deletions

View File

@ -2,16 +2,17 @@ package redemption
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time" "time"
"github.com/google/uuid" "github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/model/redemption" "github.com/perfect-panel/server/internal/model/order"
"github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/pkg/constant" "github.com/perfect-panel/server/pkg/constant"
"github.com/perfect-panel/server/pkg/snowflake" "github.com/perfect-panel/server/pkg/tool"
"github.com/perfect-panel/server/pkg/uuidx"
"github.com/perfect-panel/server/pkg/xerr" "github.com/perfect-panel/server/pkg/xerr"
queue "github.com/perfect-panel/server/queue/types"
"github.com/pkg/errors" "github.com/pkg/errors"
"gorm.io/gorm" "gorm.io/gorm"
@ -43,6 +44,21 @@ func (l *RedeemCodeLogic) RedeemCode(req *types.RedeemCodeRequest) (resp *types.
return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access") return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access")
} }
// 使用Redis分布式锁防止并发重复兑换
lockKey := fmt.Sprintf("redemption_lock:%d:%s", u.Id, req.Code)
lockSuccess, err := l.svcCtx.Redis.SetNX(l.ctx, lockKey, "1", 10*time.Second).Result()
if err != nil {
l.Errorw("[RedeemCode] Acquire lock failed", logger.Field("error", err.Error()))
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "system busy, please try again later")
}
if !lockSuccess {
l.Errorw("[RedeemCode] Redemption in progress",
logger.Field("user_id", u.Id),
logger.Field("code", req.Code))
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "redemption in progress, please wait")
}
defer l.svcCtx.Redis.Del(l.ctx, lockKey)
// Find redemption code by code // Find redemption code by code
redemptionCode, err := l.svcCtx.RedemptionCodeModel.FindOneByCode(l.ctx, req.Code) redemptionCode, err := l.svcCtx.RedemptionCodeModel.FindOneByCode(l.ctx, req.Code)
if err != nil { if err != nil {
@ -102,179 +118,107 @@ func (l *RedeemCodeLogic) RedeemCode(req *types.RedeemCodeRequest) (resp *types.
return nil, errors.Wrapf(xerr.NewErrCode(xerr.SubscribeNotAvailable), "subscribe plan is not available") return nil, errors.Wrapf(xerr.NewErrCode(xerr.SubscribeNotAvailable), "subscribe plan is not available")
} }
// Start transaction // 检查配额限制(预检查,队列任务中会再次检查)
err = l.svcCtx.RedemptionCodeModel.Transaction(l.ctx, func(tx *gorm.DB) error { if subscribePlan.Quota > 0 {
// Find user's existing subscribe for this plan var count int64
var existingSubscribe *user.SubscribeDetails err = l.svcCtx.DB.Model(&user.Subscribe{}).
userSubscribes, err := l.svcCtx.UserModel.QueryUserSubscribe(l.ctx, u.Id, 0, 1) Where("user_id = ? AND subscribe_id = ?", u.Id, redemptionCode.SubscribePlan).
if err == nil { Count(&count).Error
for _, us := range userSubscribes {
if us.SubscribeId == redemptionCode.SubscribePlan {
existingSubscribe = us
break
}
}
}
now := time.Now()
if existingSubscribe != nil {
// Extend existing subscribe
var newExpireTime time.Time
if existingSubscribe.ExpireTime.After(now) {
newExpireTime = existingSubscribe.ExpireTime
} else {
newExpireTime = now
}
// Calculate duration based on redemption code
duration, err := calculateDuration(redemptionCode.UnitTime, redemptionCode.Quantity)
if err != nil {
l.Errorw("[RedeemCode] Calculate duration error", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "calculate duration error: %v", err.Error())
}
newExpireTime = newExpireTime.Add(duration)
// Update subscribe
existingSubscribe.ExpireTime = newExpireTime
existingSubscribe.Status = 1
// Add traffic if needed
if subscribePlan.Traffic > 0 {
existingSubscribe.Traffic = subscribePlan.Traffic * 1024 * 1024 * 1024
existingSubscribe.Download = 0
existingSubscribe.Upload = 0
}
err = l.svcCtx.UserModel.UpdateSubscribe(l.ctx, &user.Subscribe{
Id: existingSubscribe.Id,
UserId: existingSubscribe.UserId,
ExpireTime: existingSubscribe.ExpireTime,
Status: existingSubscribe.Status,
Traffic: existingSubscribe.Traffic,
Download: existingSubscribe.Download,
Upload: existingSubscribe.Upload,
}, tx)
if err != nil {
l.Errorw("[RedeemCode] Update subscribe error", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update subscribe error: %v", err.Error())
}
} else {
// Check quota limit before creating new subscribe
if subscribePlan.Quota > 0 {
var count int64
if err := tx.Model(&user.Subscribe{}).Where("user_id = ? AND subscribe_id = ?", u.Id, redemptionCode.SubscribePlan).Count(&count).Error; err != nil {
l.Errorw("[RedeemCode] Count user subscribe failed", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "count user subscribe error: %v", err.Error())
}
if count >= subscribePlan.Quota {
l.Infow("[RedeemCode] Subscribe quota limit exceeded",
logger.Field("user_id", u.Id),
logger.Field("subscribe_id", redemptionCode.SubscribePlan),
logger.Field("quota", subscribePlan.Quota),
logger.Field("current_count", count),
)
return errors.Wrapf(xerr.NewErrCode(xerr.SubscribeQuotaLimit), "subscribe quota limit exceeded")
}
}
// Create new subscribe
expireTime, traffic, err := calculateSubscribeTimeAndTraffic(redemptionCode.UnitTime, redemptionCode.Quantity, subscribePlan.Traffic)
if err != nil {
l.Errorw("[RedeemCode] Calculate subscribe time and traffic error", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "calculate subscribe time and traffic error: %v", err.Error())
}
newSubscribe := &user.Subscribe{
Id: snowflake.GetID(),
UserId: u.Id,
OrderId: 0,
SubscribeId: redemptionCode.SubscribePlan,
StartTime: now,
ExpireTime: expireTime,
FinishedAt: nil,
Traffic: traffic,
Download: 0,
Upload: 0,
Token: uuidx.SubscribeToken(fmt.Sprintf("redemption:%d:%d", u.Id, time.Now().UnixMilli())),
UUID: uuid.New().String(),
Status: 1,
}
err = l.svcCtx.UserModel.InsertSubscribe(l.ctx, newSubscribe, tx)
if err != nil {
l.Errorw("[RedeemCode] Insert subscribe error", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "insert subscribe error: %v", err.Error())
}
}
// Increment redemption code used count
err = l.svcCtx.RedemptionCodeModel.IncrementUsedCount(l.ctx, redemptionCode.Id)
if err != nil { if err != nil {
l.Errorw("[RedeemCode] Increment used count error", logger.Field("error", err.Error())) l.Errorw("[RedeemCode] Check quota failed", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "increment used count error: %v", err.Error()) return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "check quota failed")
} }
if count >= subscribePlan.Quota {
// Create redemption record l.Errorw("[RedeemCode] Subscribe quota limit exceeded",
redemptionRecord := &redemption.RedemptionRecord{ logger.Field("user_id", u.Id),
Id: snowflake.GetID(), logger.Field("subscribe_id", redemptionCode.SubscribePlan),
RedemptionCodeId: redemptionCode.Id, logger.Field("quota", subscribePlan.Quota),
UserId: u.Id, logger.Field("current_count", count))
SubscribeId: redemptionCode.SubscribePlan, return nil, errors.Wrapf(xerr.NewErrCode(xerr.SubscribeQuotaLimit), "subscribe quota limit exceeded")
UnitTime: redemptionCode.UnitTime,
Quantity: redemptionCode.Quantity,
RedeemedAt: now,
CreatedAt: now,
} }
err = l.svcCtx.RedemptionRecordModel.Insert(l.ctx, redemptionRecord)
if err != nil {
l.Errorw("[RedeemCode] Insert redemption record error", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "insert redemption record error: %v", err.Error())
}
return nil
})
if err != nil {
return nil, err
} }
// 判断是否首次购买
isNew, err := l.svcCtx.OrderModel.IsUserEligibleForNewOrder(l.ctx, u.Id)
if err != nil {
l.Errorw("[RedeemCode] Check user order failed", logger.Field("error", err.Error()))
// 可以继续默认为false
isNew = false
}
// 创建Order记录
orderInfo := &order.Order{
UserId: u.Id,
OrderNo: tool.GenerateTradeNo(),
Type: 5, // 兑换类型
Quantity: redemptionCode.Quantity,
Price: 0, // 兑换无价格
Amount: 0, // 兑换无金额
Discount: 0,
GiftAmount: 0,
Coupon: "",
CouponDiscount: 0,
PaymentId: 0,
Method: "redemption",
FeeAmount: 0,
Commission: 0,
Status: 2, // 直接设置为已支付
SubscribeId: redemptionCode.SubscribePlan,
IsNew: isNew,
}
// 保存Order到数据库
err = l.svcCtx.OrderModel.Insert(l.ctx, orderInfo)
if err != nil {
l.Errorw("[RedeemCode] Create order failed", logger.Field("error", err.Error()))
return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "create order failed")
}
// 缓存兑换码信息到Redis供队列任务使用
cacheKey := fmt.Sprintf("redemption_order:%s", orderInfo.OrderNo)
cacheData := map[string]interface{}{
"redemption_code_id": redemptionCode.Id,
"unit_time": redemptionCode.UnitTime,
"quantity": redemptionCode.Quantity,
}
jsonData, _ := json.Marshal(cacheData)
err = l.svcCtx.Redis.Set(l.ctx, cacheKey, jsonData, 2*time.Hour).Err()
if err != nil {
l.Errorw("[RedeemCode] Cache redemption data failed", logger.Field("error", err.Error()))
// 缓存失败删除已创建的Order避免孤儿记录
if delErr := l.svcCtx.OrderModel.Delete(l.ctx, orderInfo.Id); delErr != nil {
l.Errorw("[RedeemCode] Delete order failed after cache error",
logger.Field("order_id", orderInfo.Id),
logger.Field("error", delErr.Error()))
}
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "cache redemption data failed")
}
// 触发队列任务
payload := queue.ForthwithActivateOrderPayload{
OrderNo: orderInfo.OrderNo,
}
bytes, _ := json.Marshal(&payload)
task := asynq.NewTask(queue.ForthwithActivateOrder, bytes, asynq.MaxRetry(5))
_, err = l.svcCtx.Queue.EnqueueContext(l.ctx, task)
if err != nil {
l.Errorw("[RedeemCode] Enqueue task failed", logger.Field("error", err.Error()))
// 入队失败删除Order和Redis缓存
l.svcCtx.Redis.Del(l.ctx, cacheKey)
if delErr := l.svcCtx.OrderModel.Delete(l.ctx, orderInfo.Id); delErr != nil {
l.Errorw("[RedeemCode] Delete order failed after enqueue error",
logger.Field("order_id", orderInfo.Id),
logger.Field("error", delErr.Error()))
}
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "enqueue task failed")
}
l.Infow("[RedeemCode] Redemption order created successfully",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("user_id", u.Id),
)
return &types.RedeemCodeResponse{ return &types.RedeemCodeResponse{
Message: "Redemption successful", Message: "Redemption successful, processing...",
}, nil }, nil
} }
// calculateDuration calculates time duration based on unit time
func calculateDuration(unitTime string, quantity int64) (time.Duration, error) {
switch unitTime {
case "month":
return time.Duration(quantity*30*24) * time.Hour, nil
case "quarter":
return time.Duration(quantity*90*24) * time.Hour, nil
case "half_year":
return time.Duration(quantity*180*24) * time.Hour, nil
case "year":
return time.Duration(quantity*365*24) * time.Hour, nil
case "day":
return time.Duration(quantity*24) * time.Hour, nil
default:
return time.Duration(quantity*30*24) * time.Hour, nil
}
}
// calculateSubscribeTimeAndTraffic calculates expire time and traffic based on subscribe plan
func calculateSubscribeTimeAndTraffic(unitTime string, quantity int64, traffic int64) (time.Time, int64, error) {
duration, err := calculateDuration(unitTime, quantity)
if err != nil {
return time.Time{}, 0, err
}
expireTime := time.Now().Add(duration)
trafficBytes := int64(0)
if traffic > 0 {
trafficBytes = traffic * 1024 * 1024 * 1024
}
return expireTime, trafficBytes, nil
}

View File

@ -24,14 +24,14 @@ type (
Insert(ctx context.Context, data *RedemptionCode) error Insert(ctx context.Context, data *RedemptionCode) error
FindOne(ctx context.Context, id int64) (*RedemptionCode, error) FindOne(ctx context.Context, id int64) (*RedemptionCode, error)
FindOneByCode(ctx context.Context, code string) (*RedemptionCode, error) FindOneByCode(ctx context.Context, code string) (*RedemptionCode, error)
Update(ctx context.Context, data *RedemptionCode) error Update(ctx context.Context, data *RedemptionCode, tx ...*gorm.DB) error
Delete(ctx context.Context, id int64) error Delete(ctx context.Context, id int64) error
Transaction(ctx context.Context, fn func(db *gorm.DB) error) error Transaction(ctx context.Context, fn func(db *gorm.DB) error) error
customRedemptionCodeLogicModel customRedemptionCodeLogicModel
} }
RedemptionRecordModel interface { RedemptionRecordModel interface {
Insert(ctx context.Context, data *RedemptionRecord) error Insert(ctx context.Context, data *RedemptionRecord, tx ...*gorm.DB) error
FindOne(ctx context.Context, id int64) (*RedemptionRecord, error) FindOne(ctx context.Context, id int64) (*RedemptionRecord, error)
Update(ctx context.Context, data *RedemptionRecord) error Update(ctx context.Context, data *RedemptionRecord) error
Delete(ctx context.Context, id int64) error Delete(ctx context.Context, id int64) error
@ -41,7 +41,7 @@ type (
customRedemptionCodeLogicModel interface { customRedemptionCodeLogicModel interface {
QueryRedemptionCodeListByPage(ctx context.Context, page, size int, subscribePlan int64, unitTime string, code string) (total int64, list []*RedemptionCode, err error) QueryRedemptionCodeListByPage(ctx context.Context, page, size int, subscribePlan int64, unitTime string, code string) (total int64, list []*RedemptionCode, err error)
BatchDelete(ctx context.Context, ids []int64) error BatchDelete(ctx context.Context, ids []int64) error
IncrementUsedCount(ctx context.Context, id int64) error IncrementUsedCount(ctx context.Context, id int64, tx ...*gorm.DB) error
} }
customRedemptionRecordLogicModel interface { customRedemptionRecordLogicModel interface {
@ -130,13 +130,16 @@ func (m *defaultRedemptionCodeModel) FindOneByCode(ctx context.Context, code str
} }
} }
func (m *defaultRedemptionCodeModel) Update(ctx context.Context, data *RedemptionCode) error { func (m *defaultRedemptionCodeModel) Update(ctx context.Context, data *RedemptionCode, tx ...*gorm.DB) error {
old, err := m.FindOne(ctx, data.Id) old, err := m.FindOne(ctx, data.Id)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err return err
} }
err = m.ExecCtx(ctx, func(conn *gorm.DB) error { err = m.ExecCtx(ctx, func(conn *gorm.DB) error {
db := conn db := conn
if len(tx) > 0 {
db = tx[0]
}
return db.Save(data).Error return db.Save(data).Error
}, m.getCacheKeys(old)...) }, m.getCacheKeys(old)...)
return err return err
@ -189,12 +192,15 @@ func (m *customRedemptionCodeModel) BatchDelete(ctx context.Context, ids []int64
return nil return nil
} }
func (m *customRedemptionCodeModel) IncrementUsedCount(ctx context.Context, id int64) error { func (m *customRedemptionCodeModel) IncrementUsedCount(ctx context.Context, id int64, tx ...*gorm.DB) error {
data, err := m.FindOne(ctx, id) data, err := m.FindOne(ctx, id)
if err != nil { if err != nil {
return err return err
} }
data.UsedCount++ data.UsedCount++
if len(tx) > 0 {
return m.Update(ctx, data, tx[0])
}
return m.Update(ctx, data) return m.Update(ctx, data)
} }
@ -210,8 +216,11 @@ func (m *defaultRedemptionRecordModel) getCacheKeys(data *RedemptionRecord) []st
return cacheKeys return cacheKeys
} }
func (m *defaultRedemptionRecordModel) Insert(ctx context.Context, data *RedemptionRecord) error { func (m *defaultRedemptionRecordModel) Insert(ctx context.Context, data *RedemptionRecord, tx ...*gorm.DB) error {
err := m.ExecCtx(ctx, func(conn *gorm.DB) error { err := m.ExecCtx(ctx, func(conn *gorm.DB) error {
if len(tx) > 0 {
conn = tx[0]
}
return conn.Create(data).Error return conn.Create(data).Error
}, m.getCacheKeys(data)...) }, m.getCacheKeys(data)...)
return err return err

View File

@ -18,6 +18,7 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/perfect-panel/server/internal/logic/telegram" "github.com/perfect-panel/server/internal/logic/telegram"
"github.com/perfect-panel/server/internal/model/order" "github.com/perfect-panel/server/internal/model/order"
"github.com/perfect-panel/server/internal/model/redemption"
"github.com/perfect-panel/server/internal/model/subscribe" "github.com/perfect-panel/server/internal/model/subscribe"
"github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc" "github.com/perfect-panel/server/internal/svc"
@ -33,6 +34,7 @@ const (
OrderTypeRenewal = 2 // Subscription renewal OrderTypeRenewal = 2 // Subscription renewal
OrderTypeResetTraffic = 3 // Traffic quota reset OrderTypeResetTraffic = 3 // Traffic quota reset
OrderTypeRecharge = 4 // Balance recharge OrderTypeRecharge = 4 // Balance recharge
OrderTypeRedemption = 5 // Redemption code activation
) )
// Order status constants define the lifecycle states of an order // Order status constants define the lifecycle states of an order
@ -145,6 +147,8 @@ func (l *ActivateOrderLogic) processOrderByType(ctx context.Context, orderInfo *
return l.ResetTraffic(ctx, orderInfo) return l.ResetTraffic(ctx, orderInfo)
case OrderTypeRecharge: case OrderTypeRecharge:
return l.Recharge(ctx, orderInfo) return l.Recharge(ctx, orderInfo)
case OrderTypeRedemption:
return l.RedemptionActivate(ctx, orderInfo)
default: default:
logger.WithContext(ctx).Error("Order type is invalid", logger.Field("type", orderInfo.Type)) logger.WithContext(ctx).Error("Order type is invalid", logger.Field("type", orderInfo.Type))
return ErrInvalidOrderType return ErrInvalidOrderType
@ -826,3 +830,233 @@ func findTelegram(u *user.User) (int64, bool) {
} }
return 0, false return 0, false
} }
// RedemptionActivate handles redemption code activation including subscription creation,
// redemption record creation, used count update, cache clearing, and notifications
func (l *ActivateOrderLogic) RedemptionActivate(ctx context.Context, orderInfo *order.Order) error {
// 1. 获取用户信息
userInfo, err := l.getExistingUser(ctx, orderInfo.UserId)
if err != nil {
return err
}
// 2. 获取套餐信息
sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId)
if err != nil {
return err
}
// 3. 从Redis获取兑换码信息
cacheKey := fmt.Sprintf("redemption_order:%s", orderInfo.OrderNo)
data, err := l.svc.Redis.Get(ctx, cacheKey).Result()
if err != nil {
logger.WithContext(ctx).Error("Get redemption cache failed",
logger.Field("error", err.Error()),
logger.Field("cache_key", cacheKey),
)
return err
}
var redemptionData struct {
RedemptionCodeId int64 `json:"redemption_code_id"`
UnitTime string `json:"unit_time"`
Quantity int64 `json:"quantity"`
}
if err = json.Unmarshal([]byte(data), &redemptionData); err != nil {
logger.WithContext(ctx).Error("Unmarshal redemption cache failed", logger.Field("error", err.Error()))
return err
}
// 4. 幂等性检查:查询是否已有兑换记录
existingRecords, err := l.svc.RedemptionRecordModel.FindByUserId(ctx, userInfo.Id)
if err == nil {
for _, record := range existingRecords {
if record.RedemptionCodeId == redemptionData.RedemptionCodeId {
logger.WithContext(ctx).Info("Redemption already processed, skip",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("user_id", userInfo.Id),
logger.Field("redemption_code_id", redemptionData.RedemptionCodeId),
)
// 已处理过,直接返回成功(幂等性保护)
return nil
}
}
}
// 5. 查找用户现有订阅
var existingSubscribe *user.Subscribe
userSubscribes, err := l.svc.UserModel.QueryUserSubscribe(ctx, userInfo.Id, 0, 1)
if err == nil {
for _, us := range userSubscribes {
if us.SubscribeId == orderInfo.SubscribeId {
existingSubscribe = &user.Subscribe{
Id: us.Id,
UserId: us.UserId,
SubscribeId: us.SubscribeId,
ExpireTime: us.ExpireTime,
Status: us.Status,
Traffic: us.Traffic,
Download: us.Download,
Upload: us.Upload,
}
break
}
}
}
now := time.Now()
// 6. 使用事务保护核心操作
err = l.svc.DB.Transaction(func(tx *gorm.DB) error {
// 6.1 创建或更新订阅
if existingSubscribe != nil {
// 续期现有订阅
var newExpireTime time.Time
if existingSubscribe.ExpireTime.After(now) {
newExpireTime = existingSubscribe.ExpireTime
} else {
newExpireTime = now
}
// 计算新的过期时间
newExpireTime = tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, newExpireTime)
// 更新订阅
existingSubscribe.OrderId = orderInfo.Id // 设置OrderId用于追溯
existingSubscribe.ExpireTime = newExpireTime
existingSubscribe.Status = 1
// 重置流量(如果套餐有流量限制)
if sub.Traffic > 0 {
existingSubscribe.Traffic = sub.Traffic * 1024 * 1024 * 1024
existingSubscribe.Download = 0
existingSubscribe.Upload = 0
}
err = l.svc.UserModel.UpdateSubscribe(ctx, existingSubscribe, tx)
if err != nil {
logger.WithContext(ctx).Error("Update subscribe failed", logger.Field("error", err.Error()))
return err
}
logger.WithContext(ctx).Info("Extended existing subscription",
logger.Field("subscribe_id", existingSubscribe.Id),
logger.Field("new_expire_time", newExpireTime),
)
} else {
// 检查配额限制
if sub.Quota > 0 {
var count int64
if err := tx.Model(&user.Subscribe{}).
Where("user_id = ? AND subscribe_id = ?", userInfo.Id, orderInfo.SubscribeId).
Count(&count).Error; err != nil {
logger.WithContext(ctx).Error("Count user subscribe failed", logger.Field("error", err.Error()))
return err
}
if count >= sub.Quota {
logger.WithContext(ctx).Infow("Subscribe quota limit exceeded",
logger.Field("user_id", userInfo.Id),
logger.Field("subscribe_id", orderInfo.SubscribeId),
logger.Field("quota", sub.Quota),
logger.Field("current_count", count),
)
return fmt.Errorf("subscribe quota limit exceeded")
}
}
// 创建新订阅
expireTime := tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, now)
traffic := int64(0)
if sub.Traffic > 0 {
traffic = sub.Traffic * 1024 * 1024 * 1024
}
newSubscribe := &user.Subscribe{
UserId: userInfo.Id,
OrderId: orderInfo.Id,
SubscribeId: orderInfo.SubscribeId,
StartTime: now,
ExpireTime: expireTime,
FinishedAt: nil,
Traffic: traffic,
Download: 0,
Upload: 0,
Token: uuidx.SubscribeToken(orderInfo.OrderNo),
UUID: uuid.New().String(),
Status: 1,
}
err = l.svc.UserModel.InsertSubscribe(ctx, newSubscribe, tx)
if err != nil {
logger.WithContext(ctx).Error("Insert subscribe failed", logger.Field("error", err.Error()))
return err
}
logger.WithContext(ctx).Info("Created new subscription",
logger.Field("subscribe_id", newSubscribe.Id),
logger.Field("expire_time", expireTime),
)
}
// 6.2 更新兑换码使用次数
err = l.svc.RedemptionCodeModel.IncrementUsedCount(ctx, redemptionData.RedemptionCodeId, tx)
if err != nil {
logger.WithContext(ctx).Error("Increment used count failed", logger.Field("error", err.Error()))
return err
}
// 6.3 创建兑换记录
redemptionRecord := &redemption.RedemptionRecord{
RedemptionCodeId: redemptionData.RedemptionCodeId,
UserId: userInfo.Id,
SubscribeId: orderInfo.SubscribeId,
UnitTime: redemptionData.UnitTime,
Quantity: redemptionData.Quantity,
RedeemedAt: now,
CreatedAt: now,
}
err = l.svc.RedemptionRecordModel.Insert(ctx, redemptionRecord, tx)
if err != nil {
logger.WithContext(ctx).Error("Insert redemption record failed", logger.Field("error", err.Error()))
return err
}
return nil
})
if err != nil {
logger.WithContext(ctx).Error("Redemption transaction failed", logger.Field("error", err.Error()))
return err
}
// 7. 清理缓存(关键步骤:让节点获取最新订阅)
l.clearServerCache(ctx, sub)
// 7.1 清理用户订阅缓存(确保用户端显示最新信息)
if existingSubscribe != nil {
err = l.svc.UserModel.ClearSubscribeCache(ctx, existingSubscribe)
if err != nil {
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", existingSubscribe.Id),
logger.Field("user_id", userInfo.Id),
)
}
}
// 8. 删除Redis临时数据
l.svc.Redis.Del(ctx, cacheKey)
// 9. 发送通知(可选)
// 可以复用现有的通知模板或创建新的兑换通知模板
// l.sendNotifications(ctx, orderInfo, userInfo, sub, existingSubscribe, telegram.RedemptionNotify)
logger.WithContext(ctx).Info("Redemption activation success",
logger.Field("order_no", orderInfo.OrderNo),
logger.Field("user_id", userInfo.Id),
logger.Field("subscribe_id", orderInfo.SubscribeId),
)
return nil
}