修复订阅
Some checks failed
Build docker and publish / build (20.15.1) (push) Failing after 8m2s

This commit is contained in:
shanshanzhong 2026-03-04 23:08:46 -08:00
parent 4349a7ea2f
commit 70c0483ca9
6 changed files with 230 additions and 30 deletions

View File

@ -21,7 +21,7 @@ env:
SSH_PASSWORD: ${{ github.ref_name == 'main' && vars.SSH_PASSWORD || vars.DEV_SSH_PASSWORD }} SSH_PASSWORD: ${{ github.ref_name == 'main' && vars.SSH_PASSWORD || vars.DEV_SSH_PASSWORD }}
# TG通知 # TG通知
TG_BOT_TOKEN: 8114337882:AAHkEx03HSu7RxN4IHBJJEnsK9aPPzNLIk0 TG_BOT_TOKEN: 8114337882:AAHkEx03HSu7RxN4IHBJJEnsK9aPPzNLIk0
TG_CHAT_ID: "-4940243803" TG_CHAT_ID: "-49402438031"
# Go构建变量 # Go构建变量
SERVICE: vpn SERVICE: vpn
SERVICE_STYLE: vpn SERVICE_STYLE: vpn

View File

@ -183,13 +183,30 @@ func (l *AttachTransactionLogic) Attach(req *types.AttachAppleTransactionRequest
if existTx != nil && existTx.Id > 0 { if existTx != nil && existTx.Id > 0 {
token := fmt.Sprintf("iap:%s", txPayload.OriginalTransactionId) token := fmt.Sprintf("iap:%s", txPayload.OriginalTransactionId)
existSub, err := l.svcCtx.UserModel.FindOneSubscribeByToken(l.ctx, token) existSub, err := l.svcCtx.UserModel.FindOneSubscribeByToken(l.ctx, token)
if err == nil && existSub != nil && existSub.Id > 0 { switch {
// Already processed, return success case err == nil && existSub != nil && existSub.Id > 0:
l.Infow("事务已处理,直接返回", logger.Field("originalTransactionId", txPayload.OriginalTransactionId), logger.Field("tier", tier), logger.Field("expiresAt", exp.Unix())) newExpire := existSub.ExpireTime
if exp.After(newExpire) {
existSub.ExpireTime = exp
newExpire = exp
}
if subscribeId > 0 && existSub.SubscribeId != subscribeId {
existSub.SubscribeId = subscribeId
}
existSub.Status = 1
existSub.FinishedAt = nil
if err := l.svcCtx.UserModel.UpdateSubscribe(l.ctx, existSub); err != nil {
l.Errorw("刷新 IAP 订阅失败", logger.Field("error", err.Error()), logger.Field("subscribeId", existSub.Id))
return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update iap subscribe failed: %v", err.Error())
}
l.Infow("事务已处理,刷新订阅到期时间", logger.Field("originalTransactionId", txPayload.OriginalTransactionId), logger.Field("tier", tier), logger.Field("expiresAt", newExpire.Unix()))
return &types.AttachAppleTransactionResponse{ return &types.AttachAppleTransactionResponse{
ExpiresAt: exp.Unix(), ExpiresAt: newExpire.Unix(),
Tier: tier, Tier: tier,
}, nil }, nil
case err != nil && !errors.Is(err, gorm.ErrRecordNotFound):
l.Errorw("查询 IAP 订阅失败", logger.Field("error", err.Error()), logger.Field("token", token))
return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "find iap subscribe error: %v", err.Error())
} }
} }

View File

@ -0,0 +1,163 @@
package user
import (
"context"
"time"
modelUser "github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/pkg/logger"
"github.com/perfect-panel/server/pkg/xerr"
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type accountMergeResult struct {
OwnerUserID int64
DeviceUserID int64
MovedDevices []modelUser.Device
RemovedSubscribes []modelUser.Subscribe
}
type accountMergeHelper struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func newAccountMergeHelper(ctx context.Context, svcCtx *svc.ServiceContext) *accountMergeHelper {
return &accountMergeHelper{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (h *accountMergeHelper) mergeIntoOwner(ownerUserID, deviceUserID int64, source string) (*accountMergeResult, error) {
if ownerUserID == 0 || deviceUserID == 0 {
return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidParams), "merge user id is empty")
}
if ownerUserID == deviceUserID {
return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidParams), "cannot merge same user id")
}
result := &accountMergeResult{
OwnerUserID: ownerUserID,
DeviceUserID: deviceUserID,
}
err := h.svcCtx.DB.WithContext(h.ctx).Transaction(func(tx *gorm.DB) error {
var owner modelUser.User
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ?", ownerUserID).
First(&owner).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errors.Wrapf(xerr.NewErrCode(xerr.UserNotExist), "owner user %d not found", ownerUserID)
}
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "query owner user failed")
}
var device modelUser.User
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ?", deviceUserID).
First(&device).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errors.Wrapf(xerr.NewErrCode(xerr.UserNotExist), "device user %d not found", deviceUserID)
}
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "query device user failed")
}
exitHelper := newFamilyExitHelper(h.ctx, h.svcCtx)
if err := exitHelper.removeUserFromActiveFamily(tx, deviceUserID, false); err != nil {
return err
}
removedSubscribes, err := clearMemberSubscribes(tx, deviceUserID)
if err != nil {
return err
}
result.RemovedSubscribes = removedSubscribes
var devices []modelUser.Device
if err := tx.Where("user_id = ?", deviceUserID).Find(&devices).Error; err != nil {
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "query device list failed")
}
if len(devices) > 0 {
if err := tx.Model(&modelUser.Device{}).
Where("user_id = ?", deviceUserID).
Updates(map[string]interface{}{
"user_id": ownerUserID,
"updated_at": time.Now(),
}).Error; err != nil {
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update device ownership failed")
}
}
result.MovedDevices = devices
if err := tx.Model(&modelUser.AuthMethods{}).
Where("user_id = ?", deviceUserID).
Update("user_id", ownerUserID).Error; err != nil {
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "migrate auth methods failed")
}
if err := tx.Model(&modelUser.User{}).
Where("id = ?", deviceUserID).
Update("enable", false).Error; err != nil {
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "disable device user failed")
}
if err := tx.Where("id = ?", deviceUserID).Delete(&modelUser.User{}).Error; err != nil {
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "soft delete device user failed")
}
return nil
})
if err != nil {
return nil, err
}
if err := h.clearCaches(result); err != nil {
return nil, err
}
logger.WithContext(h.ctx).Infow("device account merged into owner",
logger.Field("owner_user_id", ownerUserID),
logger.Field("device_user_id", deviceUserID),
logger.Field("moved_devices", len(result.MovedDevices)),
logger.Field("removed_subscribes", len(result.RemovedSubscribes)),
logger.Field("source", source),
)
return result, nil
}
func (h *accountMergeHelper) clearCaches(result *accountMergeResult) error {
if result == nil {
return nil
}
if err := h.svcCtx.UserModel.ClearUserCache(h.ctx,
&modelUser.User{Id: result.OwnerUserID},
&modelUser.User{Id: result.DeviceUserID},
); err != nil {
return err
}
if len(result.MovedDevices) > 0 {
deviceModels := make([]*modelUser.Device, 0, len(result.MovedDevices))
for i := range result.MovedDevices {
device := result.MovedDevices[i]
deviceModels = append(deviceModels, &device)
}
if err := h.svcCtx.UserModel.ClearDeviceCache(h.ctx, deviceModels...); err != nil {
return err
}
}
if len(result.RemovedSubscribes) > 0 {
familyHelper := newFamilyBindingHelper(h.ctx, h.svcCtx)
if err := familyHelper.clearRemovedMemberSubscribeCache(result.RemovedSubscribes); err != nil {
return err
}
}
return nil
}

View File

@ -112,26 +112,20 @@ func (l *BindEmailWithVerificationLogic) BindEmailWithVerification(req *types.Bi
return nil, errors.Wrapf(xerr.NewErrCode(xerr.FamilyAlreadyBound), "email already bound to current user") return nil, errors.Wrapf(xerr.NewErrCode(xerr.FamilyAlreadyBound), "email already bound to current user")
} }
if err = familyHelper.validateJoinFamily(existingMethod.UserId, u.Id); err != nil { mergeHelper := newAccountMergeHelper(l.ctx, l.svcCtx)
if _, err = mergeHelper.mergeIntoOwner(existingMethod.UserId, u.Id, "bind_email_with_verification"); err != nil {
return nil, err return nil, err
} }
joinResult, err := familyHelper.joinFamily(existingMethod.UserId, u.Id, "bind_email_with_verification") token, err := l.refreshBindSessionToken(existingMethod.UserId)
if err != nil {
return nil, err
}
token, err := l.refreshBindSessionToken(u.Id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &types.BindEmailWithVerificationResponse{ return &types.BindEmailWithVerificationResponse{
Success: true, Success: true,
Message: "joined family successfully", Message: "email bound successfully",
Token: token, Token: token,
UserId: u.Id, UserId: existingMethod.UserId,
FamilyJoined: true,
FamilyId: joinResult.FamilyId,
OwnerUserId: joinResult.OwnerUserId,
}, nil }, nil
} }

View File

@ -114,6 +114,7 @@ type customUserLogicModel interface {
ClearSubscribeCache(ctx context.Context, data ...*Subscribe) error ClearSubscribeCache(ctx context.Context, data ...*Subscribe) error
ClearUserCache(ctx context.Context, data ...*User) error ClearUserCache(ctx context.Context, data ...*User) error
ClearDeviceCache(ctx context.Context, devices ...*Device) error
QueryDailyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error) QueryDailyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error)
QueryMonthlyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error) QueryMonthlyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error)

View File

@ -82,24 +82,16 @@ func NewDeviceManager(srv *ServiceContext) *device.DeviceManager {
} }
manager.OnDeviceKicked = func(userID int64, deviceID, session string, operator device.Operator) { manager.OnDeviceKicked = func(userID int64, deviceID, session string, operator device.Operator) {
//管理员踢下线 switch operator {
if operator == device.Admin { case device.Admin:
message := DeviceMessage{Method: DeviceKickedAdmin} message := DeviceMessage{Method: DeviceKickedAdmin}
_ = manager.SendToDevice(userID, deviceID, message.Json()) _ = manager.SendToDevice(userID, deviceID, message.Json())
//将登陆凭证从缓存中删除 case device.MaxDevices:
srv.Redis.Del(ctx, fmt.Sprintf("%v:%v", config.SessionIdKey, session))
return
}
//登陆设备超过限制踢下线
if operator == device.MaxDevices {
message := DeviceMessage{Method: DeviceKickedMax} message := DeviceMessage{Method: DeviceKickedMax}
_ = manager.SendToDevice(userID, deviceID, message.Json()) _ = manager.SendToDevice(userID, deviceID, message.Json())
//将登陆凭证从缓存中删除
srv.Redis.Del(ctx, fmt.Sprintf("%v:%v", config.SessionIdKey, session))
return
} }
cleanupDeviceSessionCache(ctx, srv, userID, deviceID, session)
} }
manager.OnMessage = func(userID int64, deviceID, session string, message string) { manager.OnMessage = func(userID int64, deviceID, session string, message string) {
@ -108,6 +100,39 @@ func NewDeviceManager(srv *ServiceContext) *device.DeviceManager {
return manager return manager
} }
func cleanupDeviceSessionCache(ctx context.Context, srv *ServiceContext, userID int64, deviceID, session string) {
if session == "" && deviceID == "" {
return
}
pipe := srv.Redis.TxPipeline()
if session != "" {
sessionKey := fmt.Sprintf("%v:%v", config.SessionIdKey, session)
pipe.Del(ctx, sessionKey)
sessionDetailKey := fmt.Sprintf("%s:detail:%s", config.SessionIdKey, session)
pipe.Del(ctx, sessionDetailKey)
sessionsKey := fmt.Sprintf("%s%v", config.UserSessionsKeyPrefix, userID)
pipe.ZRem(ctx, sessionsKey, session)
}
if deviceID != "" {
deviceCacheKey := fmt.Sprintf("%v:%v", config.DeviceCacheKeyKey, deviceID)
pipe.Del(ctx, deviceCacheKey)
}
if _, err := pipe.Exec(ctx); err != nil {
logger.Errorw("[DeviceManager] failed to clear kicked device cache",
logger.Field("user_id", userID),
logger.Field("device_id", deviceID),
logger.Field("session", session),
logger.Field("error", err.Error()),
)
}
}
type DeviceMessage struct { type DeviceMessage struct {
Method DeviceMessageMethod `json:"method"` Method DeviceMessageMethod `json:"method"`
} }