Merge branch 'master' of https://github.com/OmnTeam/ppanel-server
This commit is contained in:
commit
2d4d926924
17
.github/workflows/deploy-linux.yml
vendored
17
.github/workflows/deploy-linux.yml
vendored
@ -62,5 +62,18 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
VERSION=${GITHUB_REF#refs/tags/}
|
||||
gh release create $VERSION --title "PPanel Server $VERSION" || true
|
||||
gh release upload $VERSION ppanel-server-${VERSION}-linux-amd64.tar.gz checksum.txt
|
||||
|
||||
# Check if release exists
|
||||
if gh release view $VERSION >/dev/null 2>&1; then
|
||||
echo "Release $VERSION already exists, deleting old assets..."
|
||||
# Delete existing assets if they exist
|
||||
gh release delete-asset $VERSION ppanel-server-${VERSION}-linux-amd64.tar.gz --yes 2>/dev/null || true
|
||||
gh release delete-asset $VERSION checksum.txt --yes 2>/dev/null || true
|
||||
else
|
||||
echo "Creating new release $VERSION..."
|
||||
gh release create $VERSION --title "PPanel Server $VERSION" --notes "Release $VERSION"
|
||||
fi
|
||||
|
||||
# Upload assets (will overwrite if --clobber is supported, otherwise will fail gracefully)
|
||||
echo "Uploading assets..."
|
||||
gh release upload $VERSION ppanel-server-${VERSION}-linux-amd64.tar.gz checksum.txt --clobber
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
ALTER TABLE traffic_log ADD INDEX IF NOT EXISTS idx_timestamp (timestamp);
|
||||
CREATE INDEX idx_timestamp ON traffic_log (timestamp);
|
||||
|
||||
|
||||
@ -2,16 +2,17 @@ package redemption
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/perfect-panel/server/internal/model/redemption"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/perfect-panel/server/internal/model/order"
|
||||
"github.com/perfect-panel/server/internal/model/user"
|
||||
"github.com/perfect-panel/server/pkg/constant"
|
||||
"github.com/perfect-panel/server/pkg/snowflake"
|
||||
"github.com/perfect-panel/server/pkg/uuidx"
|
||||
"github.com/perfect-panel/server/pkg/tool"
|
||||
"github.com/perfect-panel/server/pkg/xerr"
|
||||
queue "github.com/perfect-panel/server/queue/types"
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
|
||||
@ -43,6 +44,21 @@ func (l *RedeemCodeLogic) RedeemCode(req *types.RedeemCodeRequest) (resp *types.
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access")
|
||||
}
|
||||
|
||||
// 使用Redis分布式锁防止并发重复兑换
|
||||
lockKey := fmt.Sprintf("redemption_lock:%d:%s", u.Id, req.Code)
|
||||
lockSuccess, err := l.svcCtx.Redis.SetNX(l.ctx, lockKey, "1", 10*time.Second).Result()
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Acquire lock failed", logger.Field("error", err.Error()))
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "system busy, please try again later")
|
||||
}
|
||||
if !lockSuccess {
|
||||
l.Errorw("[RedeemCode] Redemption in progress",
|
||||
logger.Field("user_id", u.Id),
|
||||
logger.Field("code", req.Code))
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "redemption in progress, please wait")
|
||||
}
|
||||
defer l.svcCtx.Redis.Del(l.ctx, lockKey)
|
||||
|
||||
// Find redemption code by code
|
||||
redemptionCode, err := l.svcCtx.RedemptionCodeModel.FindOneByCode(l.ctx, req.Code)
|
||||
if err != nil {
|
||||
@ -102,179 +118,107 @@ func (l *RedeemCodeLogic) RedeemCode(req *types.RedeemCodeRequest) (resp *types.
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.SubscribeNotAvailable), "subscribe plan is not available")
|
||||
}
|
||||
|
||||
// Start transaction
|
||||
err = l.svcCtx.RedemptionCodeModel.Transaction(l.ctx, func(tx *gorm.DB) error {
|
||||
// Find user's existing subscribe for this plan
|
||||
var existingSubscribe *user.SubscribeDetails
|
||||
userSubscribes, err := l.svcCtx.UserModel.QueryUserSubscribe(l.ctx, u.Id, 0, 1)
|
||||
if err == nil {
|
||||
for _, us := range userSubscribes {
|
||||
if us.SubscribeId == redemptionCode.SubscribePlan {
|
||||
existingSubscribe = us
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
if existingSubscribe != nil {
|
||||
// Extend existing subscribe
|
||||
var newExpireTime time.Time
|
||||
if existingSubscribe.ExpireTime.After(now) {
|
||||
newExpireTime = existingSubscribe.ExpireTime
|
||||
} else {
|
||||
newExpireTime = now
|
||||
}
|
||||
|
||||
// Calculate duration based on redemption code
|
||||
duration, err := calculateDuration(redemptionCode.UnitTime, redemptionCode.Quantity)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Calculate duration error", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "calculate duration error: %v", err.Error())
|
||||
}
|
||||
newExpireTime = newExpireTime.Add(duration)
|
||||
|
||||
// Update subscribe
|
||||
existingSubscribe.ExpireTime = newExpireTime
|
||||
existingSubscribe.Status = 1
|
||||
|
||||
// Add traffic if needed
|
||||
if subscribePlan.Traffic > 0 {
|
||||
existingSubscribe.Traffic = subscribePlan.Traffic * 1024 * 1024 * 1024
|
||||
existingSubscribe.Download = 0
|
||||
existingSubscribe.Upload = 0
|
||||
}
|
||||
|
||||
err = l.svcCtx.UserModel.UpdateSubscribe(l.ctx, &user.Subscribe{
|
||||
Id: existingSubscribe.Id,
|
||||
UserId: existingSubscribe.UserId,
|
||||
ExpireTime: existingSubscribe.ExpireTime,
|
||||
Status: existingSubscribe.Status,
|
||||
Traffic: existingSubscribe.Traffic,
|
||||
Download: existingSubscribe.Download,
|
||||
Upload: existingSubscribe.Upload,
|
||||
}, tx)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Update subscribe error", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update subscribe error: %v", err.Error())
|
||||
}
|
||||
} else {
|
||||
// Check quota limit before creating new subscribe
|
||||
if subscribePlan.Quota > 0 {
|
||||
var count int64
|
||||
if err := tx.Model(&user.Subscribe{}).Where("user_id = ? AND subscribe_id = ?", u.Id, redemptionCode.SubscribePlan).Count(&count).Error; err != nil {
|
||||
l.Errorw("[RedeemCode] Count user subscribe failed", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "count user subscribe error: %v", err.Error())
|
||||
}
|
||||
if count >= subscribePlan.Quota {
|
||||
l.Infow("[RedeemCode] Subscribe quota limit exceeded",
|
||||
logger.Field("user_id", u.Id),
|
||||
logger.Field("subscribe_id", redemptionCode.SubscribePlan),
|
||||
logger.Field("quota", subscribePlan.Quota),
|
||||
logger.Field("current_count", count),
|
||||
)
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.SubscribeQuotaLimit), "subscribe quota limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
// Create new subscribe
|
||||
expireTime, traffic, err := calculateSubscribeTimeAndTraffic(redemptionCode.UnitTime, redemptionCode.Quantity, subscribePlan.Traffic)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Calculate subscribe time and traffic error", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "calculate subscribe time and traffic error: %v", err.Error())
|
||||
}
|
||||
|
||||
newSubscribe := &user.Subscribe{
|
||||
Id: snowflake.GetID(),
|
||||
UserId: u.Id,
|
||||
OrderId: 0,
|
||||
SubscribeId: redemptionCode.SubscribePlan,
|
||||
StartTime: now,
|
||||
ExpireTime: expireTime,
|
||||
FinishedAt: nil,
|
||||
Traffic: traffic,
|
||||
Download: 0,
|
||||
Upload: 0,
|
||||
Token: uuidx.SubscribeToken(fmt.Sprintf("redemption:%d:%d", u.Id, time.Now().UnixMilli())),
|
||||
UUID: uuid.New().String(),
|
||||
Status: 1,
|
||||
}
|
||||
|
||||
err = l.svcCtx.UserModel.InsertSubscribe(l.ctx, newSubscribe, tx)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Insert subscribe error", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "insert subscribe error: %v", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Increment redemption code used count
|
||||
err = l.svcCtx.RedemptionCodeModel.IncrementUsedCount(l.ctx, redemptionCode.Id)
|
||||
// 检查配额限制(预检查,队列任务中会再次检查)
|
||||
if subscribePlan.Quota > 0 {
|
||||
var count int64
|
||||
err = l.svcCtx.DB.Model(&user.Subscribe{}).
|
||||
Where("user_id = ? AND subscribe_id = ?", u.Id, redemptionCode.SubscribePlan).
|
||||
Count(&count).Error
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Increment used count error", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "increment used count error: %v", err.Error())
|
||||
l.Errorw("[RedeemCode] Check quota failed", logger.Field("error", err.Error()))
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "check quota failed")
|
||||
}
|
||||
|
||||
// Create redemption record
|
||||
redemptionRecord := &redemption.RedemptionRecord{
|
||||
Id: snowflake.GetID(),
|
||||
RedemptionCodeId: redemptionCode.Id,
|
||||
UserId: u.Id,
|
||||
SubscribeId: redemptionCode.SubscribePlan,
|
||||
UnitTime: redemptionCode.UnitTime,
|
||||
Quantity: redemptionCode.Quantity,
|
||||
RedeemedAt: now,
|
||||
CreatedAt: now,
|
||||
if count >= subscribePlan.Quota {
|
||||
l.Errorw("[RedeemCode] Subscribe quota limit exceeded",
|
||||
logger.Field("user_id", u.Id),
|
||||
logger.Field("subscribe_id", redemptionCode.SubscribePlan),
|
||||
logger.Field("quota", subscribePlan.Quota),
|
||||
logger.Field("current_count", count))
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.SubscribeQuotaLimit), "subscribe quota limit exceeded")
|
||||
}
|
||||
|
||||
err = l.svcCtx.RedemptionRecordModel.Insert(l.ctx, redemptionRecord)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Insert redemption record error", logger.Field("error", err.Error()))
|
||||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "insert redemption record error: %v", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 判断是否首次购买
|
||||
isNew, err := l.svcCtx.OrderModel.IsUserEligibleForNewOrder(l.ctx, u.Id)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Check user order failed", logger.Field("error", err.Error()))
|
||||
// 可以继续,默认为false
|
||||
isNew = false
|
||||
}
|
||||
|
||||
// 创建Order记录
|
||||
orderInfo := &order.Order{
|
||||
UserId: u.Id,
|
||||
OrderNo: tool.GenerateTradeNo(),
|
||||
Type: 5, // 兑换类型
|
||||
Quantity: redemptionCode.Quantity,
|
||||
Price: 0, // 兑换无价格
|
||||
Amount: 0, // 兑换无金额
|
||||
Discount: 0,
|
||||
GiftAmount: 0,
|
||||
Coupon: "",
|
||||
CouponDiscount: 0,
|
||||
PaymentId: 0,
|
||||
Method: "redemption",
|
||||
FeeAmount: 0,
|
||||
Commission: 0,
|
||||
Status: 2, // 直接设置为已支付
|
||||
SubscribeId: redemptionCode.SubscribePlan,
|
||||
IsNew: isNew,
|
||||
}
|
||||
|
||||
// 保存Order到数据库
|
||||
err = l.svcCtx.OrderModel.Insert(l.ctx, orderInfo)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Create order failed", logger.Field("error", err.Error()))
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseInsertError), "create order failed")
|
||||
}
|
||||
|
||||
// 缓存兑换码信息到Redis(供队列任务使用)
|
||||
cacheKey := fmt.Sprintf("redemption_order:%s", orderInfo.OrderNo)
|
||||
cacheData := map[string]interface{}{
|
||||
"redemption_code_id": redemptionCode.Id,
|
||||
"unit_time": redemptionCode.UnitTime,
|
||||
"quantity": redemptionCode.Quantity,
|
||||
}
|
||||
jsonData, _ := json.Marshal(cacheData)
|
||||
err = l.svcCtx.Redis.Set(l.ctx, cacheKey, jsonData, 2*time.Hour).Err()
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Cache redemption data failed", logger.Field("error", err.Error()))
|
||||
// 缓存失败,删除已创建的Order避免孤儿记录
|
||||
if delErr := l.svcCtx.OrderModel.Delete(l.ctx, orderInfo.Id); delErr != nil {
|
||||
l.Errorw("[RedeemCode] Delete order failed after cache error",
|
||||
logger.Field("order_id", orderInfo.Id),
|
||||
logger.Field("error", delErr.Error()))
|
||||
}
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "cache redemption data failed")
|
||||
}
|
||||
|
||||
// 触发队列任务
|
||||
payload := queue.ForthwithActivateOrderPayload{
|
||||
OrderNo: orderInfo.OrderNo,
|
||||
}
|
||||
bytes, _ := json.Marshal(&payload)
|
||||
task := asynq.NewTask(queue.ForthwithActivateOrder, bytes, asynq.MaxRetry(5))
|
||||
_, err = l.svcCtx.Queue.EnqueueContext(l.ctx, task)
|
||||
if err != nil {
|
||||
l.Errorw("[RedeemCode] Enqueue task failed", logger.Field("error", err.Error()))
|
||||
// 入队失败,删除Order和Redis缓存
|
||||
l.svcCtx.Redis.Del(l.ctx, cacheKey)
|
||||
if delErr := l.svcCtx.OrderModel.Delete(l.ctx, orderInfo.Id); delErr != nil {
|
||||
l.Errorw("[RedeemCode] Delete order failed after enqueue error",
|
||||
logger.Field("order_id", orderInfo.Id),
|
||||
logger.Field("error", delErr.Error()))
|
||||
}
|
||||
return nil, errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "enqueue task failed")
|
||||
}
|
||||
|
||||
l.Infow("[RedeemCode] Redemption order created successfully",
|
||||
logger.Field("order_no", orderInfo.OrderNo),
|
||||
logger.Field("user_id", u.Id),
|
||||
)
|
||||
|
||||
return &types.RedeemCodeResponse{
|
||||
Message: "Redemption successful",
|
||||
Message: "Redemption successful, processing...",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// calculateDuration calculates time duration based on unit time
|
||||
func calculateDuration(unitTime string, quantity int64) (time.Duration, error) {
|
||||
switch unitTime {
|
||||
case "month":
|
||||
return time.Duration(quantity*30*24) * time.Hour, nil
|
||||
case "quarter":
|
||||
return time.Duration(quantity*90*24) * time.Hour, nil
|
||||
case "half_year":
|
||||
return time.Duration(quantity*180*24) * time.Hour, nil
|
||||
case "year":
|
||||
return time.Duration(quantity*365*24) * time.Hour, nil
|
||||
case "day":
|
||||
return time.Duration(quantity*24) * time.Hour, nil
|
||||
default:
|
||||
return time.Duration(quantity*30*24) * time.Hour, nil
|
||||
}
|
||||
}
|
||||
|
||||
// calculateSubscribeTimeAndTraffic calculates expire time and traffic based on subscribe plan
|
||||
func calculateSubscribeTimeAndTraffic(unitTime string, quantity int64, traffic int64) (time.Time, int64, error) {
|
||||
duration, err := calculateDuration(unitTime, quantity)
|
||||
if err != nil {
|
||||
return time.Time{}, 0, err
|
||||
}
|
||||
|
||||
expireTime := time.Now().Add(duration)
|
||||
trafficBytes := int64(0)
|
||||
if traffic > 0 {
|
||||
trafficBytes = traffic * 1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
return expireTime, trafficBytes, nil
|
||||
}
|
||||
}
|
||||
@ -24,14 +24,14 @@ type (
|
||||
Insert(ctx context.Context, data *RedemptionCode) error
|
||||
FindOne(ctx context.Context, id int64) (*RedemptionCode, error)
|
||||
FindOneByCode(ctx context.Context, code string) (*RedemptionCode, error)
|
||||
Update(ctx context.Context, data *RedemptionCode) error
|
||||
Update(ctx context.Context, data *RedemptionCode, tx ...*gorm.DB) error
|
||||
Delete(ctx context.Context, id int64) error
|
||||
Transaction(ctx context.Context, fn func(db *gorm.DB) error) error
|
||||
customRedemptionCodeLogicModel
|
||||
}
|
||||
|
||||
RedemptionRecordModel interface {
|
||||
Insert(ctx context.Context, data *RedemptionRecord) error
|
||||
Insert(ctx context.Context, data *RedemptionRecord, tx ...*gorm.DB) error
|
||||
FindOne(ctx context.Context, id int64) (*RedemptionRecord, error)
|
||||
Update(ctx context.Context, data *RedemptionRecord) error
|
||||
Delete(ctx context.Context, id int64) error
|
||||
@ -41,7 +41,7 @@ type (
|
||||
customRedemptionCodeLogicModel interface {
|
||||
QueryRedemptionCodeListByPage(ctx context.Context, page, size int, subscribePlan int64, unitTime string, code string) (total int64, list []*RedemptionCode, err error)
|
||||
BatchDelete(ctx context.Context, ids []int64) error
|
||||
IncrementUsedCount(ctx context.Context, id int64) error
|
||||
IncrementUsedCount(ctx context.Context, id int64, tx ...*gorm.DB) error
|
||||
}
|
||||
|
||||
customRedemptionRecordLogicModel interface {
|
||||
@ -130,13 +130,16 @@ func (m *defaultRedemptionCodeModel) FindOneByCode(ctx context.Context, code str
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultRedemptionCodeModel) Update(ctx context.Context, data *RedemptionCode) error {
|
||||
func (m *defaultRedemptionCodeModel) Update(ctx context.Context, data *RedemptionCode, tx ...*gorm.DB) error {
|
||||
old, err := m.FindOne(ctx, data.Id)
|
||||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return err
|
||||
}
|
||||
err = m.ExecCtx(ctx, func(conn *gorm.DB) error {
|
||||
db := conn
|
||||
if len(tx) > 0 {
|
||||
db = tx[0]
|
||||
}
|
||||
return db.Save(data).Error
|
||||
}, m.getCacheKeys(old)...)
|
||||
return err
|
||||
@ -189,12 +192,15 @@ func (m *customRedemptionCodeModel) BatchDelete(ctx context.Context, ids []int64
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *customRedemptionCodeModel) IncrementUsedCount(ctx context.Context, id int64) error {
|
||||
func (m *customRedemptionCodeModel) IncrementUsedCount(ctx context.Context, id int64, tx ...*gorm.DB) error {
|
||||
data, err := m.FindOne(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data.UsedCount++
|
||||
if len(tx) > 0 {
|
||||
return m.Update(ctx, data, tx[0])
|
||||
}
|
||||
return m.Update(ctx, data)
|
||||
}
|
||||
|
||||
@ -210,8 +216,11 @@ func (m *defaultRedemptionRecordModel) getCacheKeys(data *RedemptionRecord) []st
|
||||
return cacheKeys
|
||||
}
|
||||
|
||||
func (m *defaultRedemptionRecordModel) Insert(ctx context.Context, data *RedemptionRecord) error {
|
||||
func (m *defaultRedemptionRecordModel) Insert(ctx context.Context, data *RedemptionRecord, tx ...*gorm.DB) error {
|
||||
err := m.ExecCtx(ctx, func(conn *gorm.DB) error {
|
||||
if len(tx) > 0 {
|
||||
conn = tx[0]
|
||||
}
|
||||
return conn.Create(data).Error
|
||||
}, m.getCacheKeys(data)...)
|
||||
return err
|
||||
|
||||
@ -155,7 +155,9 @@ func (c *Client) QueryOrderStatus(orderNo string) (bool, error) {
|
||||
|
||||
// ParseNotify
|
||||
func (c *Client) ParseNotify(payload []byte, signature string) (*NotifyResult, error) {
|
||||
event, err := webhook.ConstructEvent(payload, signature, c.Config.WebhookSecret)
|
||||
event, err := webhook.ConstructEventWithOptions(payload, signature, c.Config.WebhookSecret, webhook.ConstructEventOptions{
|
||||
IgnoreAPIVersionMismatch: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/perfect-panel/server/internal/logic/telegram"
|
||||
"github.com/perfect-panel/server/internal/model/order"
|
||||
"github.com/perfect-panel/server/internal/model/redemption"
|
||||
"github.com/perfect-panel/server/internal/model/subscribe"
|
||||
"github.com/perfect-panel/server/internal/model/user"
|
||||
"github.com/perfect-panel/server/internal/svc"
|
||||
@ -33,6 +34,7 @@ const (
|
||||
OrderTypeRenewal = 2 // Subscription renewal
|
||||
OrderTypeResetTraffic = 3 // Traffic quota reset
|
||||
OrderTypeRecharge = 4 // Balance recharge
|
||||
OrderTypeRedemption = 5 // Redemption code activation
|
||||
)
|
||||
|
||||
// Order status constants define the lifecycle states of an order
|
||||
@ -145,6 +147,8 @@ func (l *ActivateOrderLogic) processOrderByType(ctx context.Context, orderInfo *
|
||||
return l.ResetTraffic(ctx, orderInfo)
|
||||
case OrderTypeRecharge:
|
||||
return l.Recharge(ctx, orderInfo)
|
||||
case OrderTypeRedemption:
|
||||
return l.RedemptionActivate(ctx, orderInfo)
|
||||
default:
|
||||
logger.WithContext(ctx).Error("Order type is invalid", logger.Field("type", orderInfo.Type))
|
||||
return ErrInvalidOrderType
|
||||
@ -826,3 +830,233 @@ func findTelegram(u *user.User) (int64, bool) {
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// RedemptionActivate handles redemption code activation including subscription creation,
|
||||
// redemption record creation, used count update, cache clearing, and notifications
|
||||
func (l *ActivateOrderLogic) RedemptionActivate(ctx context.Context, orderInfo *order.Order) error {
|
||||
// 1. 获取用户信息
|
||||
userInfo, err := l.getExistingUser(ctx, orderInfo.UserId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. 获取套餐信息
|
||||
sub, err := l.getSubscribeInfo(ctx, orderInfo.SubscribeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. 从Redis获取兑换码信息
|
||||
cacheKey := fmt.Sprintf("redemption_order:%s", orderInfo.OrderNo)
|
||||
data, err := l.svc.Redis.Get(ctx, cacheKey).Result()
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Get redemption cache failed",
|
||||
logger.Field("error", err.Error()),
|
||||
logger.Field("cache_key", cacheKey),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
var redemptionData struct {
|
||||
RedemptionCodeId int64 `json:"redemption_code_id"`
|
||||
UnitTime string `json:"unit_time"`
|
||||
Quantity int64 `json:"quantity"`
|
||||
}
|
||||
if err = json.Unmarshal([]byte(data), &redemptionData); err != nil {
|
||||
logger.WithContext(ctx).Error("Unmarshal redemption cache failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
// 4. 幂等性检查:查询是否已有兑换记录
|
||||
existingRecords, err := l.svc.RedemptionRecordModel.FindByUserId(ctx, userInfo.Id)
|
||||
if err == nil {
|
||||
for _, record := range existingRecords {
|
||||
if record.RedemptionCodeId == redemptionData.RedemptionCodeId {
|
||||
logger.WithContext(ctx).Info("Redemption already processed, skip",
|
||||
logger.Field("order_no", orderInfo.OrderNo),
|
||||
logger.Field("user_id", userInfo.Id),
|
||||
logger.Field("redemption_code_id", redemptionData.RedemptionCodeId),
|
||||
)
|
||||
// 已处理过,直接返回成功(幂等性保护)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. 查找用户现有订阅
|
||||
var existingSubscribe *user.Subscribe
|
||||
userSubscribes, err := l.svc.UserModel.QueryUserSubscribe(ctx, userInfo.Id, 0, 1)
|
||||
if err == nil {
|
||||
for _, us := range userSubscribes {
|
||||
if us.SubscribeId == orderInfo.SubscribeId {
|
||||
existingSubscribe = &user.Subscribe{
|
||||
Id: us.Id,
|
||||
UserId: us.UserId,
|
||||
SubscribeId: us.SubscribeId,
|
||||
ExpireTime: us.ExpireTime,
|
||||
Status: us.Status,
|
||||
Traffic: us.Traffic,
|
||||
Download: us.Download,
|
||||
Upload: us.Upload,
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// 6. 使用事务保护核心操作
|
||||
err = l.svc.DB.Transaction(func(tx *gorm.DB) error {
|
||||
// 6.1 创建或更新订阅
|
||||
if existingSubscribe != nil {
|
||||
// 续期现有订阅
|
||||
var newExpireTime time.Time
|
||||
if existingSubscribe.ExpireTime.After(now) {
|
||||
newExpireTime = existingSubscribe.ExpireTime
|
||||
} else {
|
||||
newExpireTime = now
|
||||
}
|
||||
|
||||
// 计算新的过期时间
|
||||
newExpireTime = tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, newExpireTime)
|
||||
|
||||
// 更新订阅
|
||||
existingSubscribe.OrderId = orderInfo.Id // 设置OrderId用于追溯
|
||||
existingSubscribe.ExpireTime = newExpireTime
|
||||
existingSubscribe.Status = 1
|
||||
|
||||
// 重置流量(如果套餐有流量限制)
|
||||
if sub.Traffic > 0 {
|
||||
existingSubscribe.Traffic = sub.Traffic * 1024 * 1024 * 1024
|
||||
existingSubscribe.Download = 0
|
||||
existingSubscribe.Upload = 0
|
||||
}
|
||||
|
||||
err = l.svc.UserModel.UpdateSubscribe(ctx, existingSubscribe, tx)
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Update subscribe failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
logger.WithContext(ctx).Info("Extended existing subscription",
|
||||
logger.Field("subscribe_id", existingSubscribe.Id),
|
||||
logger.Field("new_expire_time", newExpireTime),
|
||||
)
|
||||
} else {
|
||||
// 检查配额限制
|
||||
if sub.Quota > 0 {
|
||||
var count int64
|
||||
if err := tx.Model(&user.Subscribe{}).
|
||||
Where("user_id = ? AND subscribe_id = ?", userInfo.Id, orderInfo.SubscribeId).
|
||||
Count(&count).Error; err != nil {
|
||||
logger.WithContext(ctx).Error("Count user subscribe failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
if count >= sub.Quota {
|
||||
logger.WithContext(ctx).Infow("Subscribe quota limit exceeded",
|
||||
logger.Field("user_id", userInfo.Id),
|
||||
logger.Field("subscribe_id", orderInfo.SubscribeId),
|
||||
logger.Field("quota", sub.Quota),
|
||||
logger.Field("current_count", count),
|
||||
)
|
||||
return fmt.Errorf("subscribe quota limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
// 创建新订阅
|
||||
expireTime := tool.AddTime(redemptionData.UnitTime, redemptionData.Quantity, now)
|
||||
traffic := int64(0)
|
||||
if sub.Traffic > 0 {
|
||||
traffic = sub.Traffic * 1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
newSubscribe := &user.Subscribe{
|
||||
UserId: userInfo.Id,
|
||||
OrderId: orderInfo.Id,
|
||||
SubscribeId: orderInfo.SubscribeId,
|
||||
StartTime: now,
|
||||
ExpireTime: expireTime,
|
||||
FinishedAt: nil,
|
||||
Traffic: traffic,
|
||||
Download: 0,
|
||||
Upload: 0,
|
||||
Token: uuidx.SubscribeToken(orderInfo.OrderNo),
|
||||
UUID: uuid.New().String(),
|
||||
Status: 1,
|
||||
}
|
||||
|
||||
err = l.svc.UserModel.InsertSubscribe(ctx, newSubscribe, tx)
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Insert subscribe failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
logger.WithContext(ctx).Info("Created new subscription",
|
||||
logger.Field("subscribe_id", newSubscribe.Id),
|
||||
logger.Field("expire_time", expireTime),
|
||||
)
|
||||
}
|
||||
|
||||
// 6.2 更新兑换码使用次数
|
||||
err = l.svc.RedemptionCodeModel.IncrementUsedCount(ctx, redemptionData.RedemptionCodeId, tx)
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Increment used count failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
// 6.3 创建兑换记录
|
||||
redemptionRecord := &redemption.RedemptionRecord{
|
||||
RedemptionCodeId: redemptionData.RedemptionCodeId,
|
||||
UserId: userInfo.Id,
|
||||
SubscribeId: orderInfo.SubscribeId,
|
||||
UnitTime: redemptionData.UnitTime,
|
||||
Quantity: redemptionData.Quantity,
|
||||
RedeemedAt: now,
|
||||
CreatedAt: now,
|
||||
}
|
||||
|
||||
err = l.svc.RedemptionRecordModel.Insert(ctx, redemptionRecord, tx)
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Insert redemption record failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Redemption transaction failed", logger.Field("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
// 7. 清理缓存(关键步骤:让节点获取最新订阅)
|
||||
l.clearServerCache(ctx, sub)
|
||||
|
||||
// 7.1 清理用户订阅缓存(确保用户端显示最新信息)
|
||||
if existingSubscribe != nil {
|
||||
err = l.svc.UserModel.ClearSubscribeCache(ctx, existingSubscribe)
|
||||
if err != nil {
|
||||
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
|
||||
logger.Field("error", err.Error()),
|
||||
logger.Field("subscribe_id", existingSubscribe.Id),
|
||||
logger.Field("user_id", userInfo.Id),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 8. 删除Redis临时数据
|
||||
l.svc.Redis.Del(ctx, cacheKey)
|
||||
|
||||
// 9. 发送通知(可选)
|
||||
// 可以复用现有的通知模板或创建新的兑换通知模板
|
||||
// l.sendNotifications(ctx, orderInfo, userInfo, sub, existingSubscribe, telegram.RedemptionNotify)
|
||||
|
||||
logger.WithContext(ctx).Info("Redemption activation success",
|
||||
logger.Field("order_no", orderInfo.OrderNo),
|
||||
logger.Field("user_id", userInfo.Id),
|
||||
logger.Field("subscribe_id", orderInfo.SubscribeId),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user