From 70c0483ca90125f5880b676c695f2bc54ccff76c Mon Sep 17 00:00:00 2001 From: shanshanzhong Date: Wed, 4 Mar 2026 23:08:46 -0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/docker.yml | 2 +- .../iap/apple/attachTransactionLogic.go | 25 ++- .../logic/public/user/accountMergeHelper.go | 163 ++++++++++++++++++ .../user/bindEmailWithVerificationLogic.go | 20 +-- internal/model/user/model.go | 1 + internal/svc/devce.go | 49 ++++-- 6 files changed, 230 insertions(+), 30 deletions(-) create mode 100644 internal/logic/public/user/accountMergeHelper.go diff --git a/.gitea/workflows/docker.yml b/.gitea/workflows/docker.yml index 0ef9071..4cfa1c1 100644 --- a/.gitea/workflows/docker.yml +++ b/.gitea/workflows/docker.yml @@ -21,7 +21,7 @@ env: SSH_PASSWORD: ${{ github.ref_name == 'main' && vars.SSH_PASSWORD || vars.DEV_SSH_PASSWORD }} # TG通知 TG_BOT_TOKEN: 8114337882:AAHkEx03HSu7RxN4IHBJJEnsK9aPPzNLIk0 - TG_CHAT_ID: "-4940243803" + TG_CHAT_ID: "-49402438031" # Go构建变量 SERVICE: vpn SERVICE_STYLE: vpn diff --git a/internal/logic/public/iap/apple/attachTransactionLogic.go b/internal/logic/public/iap/apple/attachTransactionLogic.go index 8e8d926..ee1a02a 100644 --- a/internal/logic/public/iap/apple/attachTransactionLogic.go +++ b/internal/logic/public/iap/apple/attachTransactionLogic.go @@ -183,13 +183,30 @@ func (l *AttachTransactionLogic) Attach(req *types.AttachAppleTransactionRequest if existTx != nil && existTx.Id > 0 { token := fmt.Sprintf("iap:%s", txPayload.OriginalTransactionId) existSub, err := l.svcCtx.UserModel.FindOneSubscribeByToken(l.ctx, token) - if err == nil && existSub != nil && existSub.Id > 0 { - // Already processed, return success - l.Infow("事务已处理,直接返回", logger.Field("originalTransactionId", txPayload.OriginalTransactionId), logger.Field("tier", tier), logger.Field("expiresAt", exp.Unix())) + switch { + case err == nil && existSub != nil && existSub.Id > 0: + newExpire := existSub.ExpireTime + if exp.After(newExpire) { + existSub.ExpireTime = exp + newExpire = exp + } + if subscribeId > 0 && existSub.SubscribeId != subscribeId { + existSub.SubscribeId = subscribeId + } + existSub.Status = 1 + existSub.FinishedAt = nil + if err := l.svcCtx.UserModel.UpdateSubscribe(l.ctx, existSub); err != nil { + l.Errorw("刷新 IAP 订阅失败", logger.Field("error", err.Error()), logger.Field("subscribeId", existSub.Id)) + return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update iap subscribe failed: %v", err.Error()) + } + l.Infow("事务已处理,刷新订阅到期时间", logger.Field("originalTransactionId", txPayload.OriginalTransactionId), logger.Field("tier", tier), logger.Field("expiresAt", newExpire.Unix())) return &types.AttachAppleTransactionResponse{ - ExpiresAt: exp.Unix(), + ExpiresAt: newExpire.Unix(), Tier: tier, }, nil + case err != nil && !errors.Is(err, gorm.ErrRecordNotFound): + l.Errorw("查询 IAP 订阅失败", logger.Field("error", err.Error()), logger.Field("token", token)) + return nil, errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "find iap subscribe error: %v", err.Error()) } } diff --git a/internal/logic/public/user/accountMergeHelper.go b/internal/logic/public/user/accountMergeHelper.go new file mode 100644 index 0000000..acfe7c7 --- /dev/null +++ b/internal/logic/public/user/accountMergeHelper.go @@ -0,0 +1,163 @@ +package user + +import ( + "context" + "time" + + modelUser "github.com/perfect-panel/server/internal/model/user" + "github.com/perfect-panel/server/internal/svc" + "github.com/perfect-panel/server/pkg/logger" + "github.com/perfect-panel/server/pkg/xerr" + "github.com/pkg/errors" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type accountMergeResult struct { + OwnerUserID int64 + DeviceUserID int64 + MovedDevices []modelUser.Device + RemovedSubscribes []modelUser.Subscribe +} + +type accountMergeHelper struct { + ctx context.Context + svcCtx *svc.ServiceContext +} + +func newAccountMergeHelper(ctx context.Context, svcCtx *svc.ServiceContext) *accountMergeHelper { + return &accountMergeHelper{ + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (h *accountMergeHelper) mergeIntoOwner(ownerUserID, deviceUserID int64, source string) (*accountMergeResult, error) { + if ownerUserID == 0 || deviceUserID == 0 { + return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidParams), "merge user id is empty") + } + if ownerUserID == deviceUserID { + return nil, errors.Wrapf(xerr.NewErrCode(xerr.InvalidParams), "cannot merge same user id") + } + + result := &accountMergeResult{ + OwnerUserID: ownerUserID, + DeviceUserID: deviceUserID, + } + + err := h.svcCtx.DB.WithContext(h.ctx).Transaction(func(tx *gorm.DB) error { + var owner modelUser.User + if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). + Where("id = ?", ownerUserID). + First(&owner).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return errors.Wrapf(xerr.NewErrCode(xerr.UserNotExist), "owner user %d not found", ownerUserID) + } + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "query owner user failed") + } + + var device modelUser.User + if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). + Where("id = ?", deviceUserID). + First(&device).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return errors.Wrapf(xerr.NewErrCode(xerr.UserNotExist), "device user %d not found", deviceUserID) + } + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "query device user failed") + } + + exitHelper := newFamilyExitHelper(h.ctx, h.svcCtx) + if err := exitHelper.removeUserFromActiveFamily(tx, deviceUserID, false); err != nil { + return err + } + + removedSubscribes, err := clearMemberSubscribes(tx, deviceUserID) + if err != nil { + return err + } + result.RemovedSubscribes = removedSubscribes + + var devices []modelUser.Device + if err := tx.Where("user_id = ?", deviceUserID).Find(&devices).Error; err != nil { + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "query device list failed") + } + if len(devices) > 0 { + if err := tx.Model(&modelUser.Device{}). + Where("user_id = ?", deviceUserID). + Updates(map[string]interface{}{ + "user_id": ownerUserID, + "updated_at": time.Now(), + }).Error; err != nil { + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update device ownership failed") + } + } + result.MovedDevices = devices + + if err := tx.Model(&modelUser.AuthMethods{}). + Where("user_id = ?", deviceUserID). + Update("user_id", ownerUserID).Error; err != nil { + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "migrate auth methods failed") + } + + if err := tx.Model(&modelUser.User{}). + Where("id = ?", deviceUserID). + Update("enable", false).Error; err != nil { + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "disable device user failed") + } + if err := tx.Where("id = ?", deviceUserID).Delete(&modelUser.User{}).Error; err != nil { + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "soft delete device user failed") + } + + return nil + }) + if err != nil { + return nil, err + } + + if err := h.clearCaches(result); err != nil { + return nil, err + } + + logger.WithContext(h.ctx).Infow("device account merged into owner", + logger.Field("owner_user_id", ownerUserID), + logger.Field("device_user_id", deviceUserID), + logger.Field("moved_devices", len(result.MovedDevices)), + logger.Field("removed_subscribes", len(result.RemovedSubscribes)), + logger.Field("source", source), + ) + + return result, nil +} + +func (h *accountMergeHelper) clearCaches(result *accountMergeResult) error { + if result == nil { + return nil + } + + if err := h.svcCtx.UserModel.ClearUserCache(h.ctx, + &modelUser.User{Id: result.OwnerUserID}, + &modelUser.User{Id: result.DeviceUserID}, + ); err != nil { + return err + } + + if len(result.MovedDevices) > 0 { + deviceModels := make([]*modelUser.Device, 0, len(result.MovedDevices)) + for i := range result.MovedDevices { + device := result.MovedDevices[i] + deviceModels = append(deviceModels, &device) + } + if err := h.svcCtx.UserModel.ClearDeviceCache(h.ctx, deviceModels...); err != nil { + return err + } + } + + if len(result.RemovedSubscribes) > 0 { + familyHelper := newFamilyBindingHelper(h.ctx, h.svcCtx) + if err := familyHelper.clearRemovedMemberSubscribeCache(result.RemovedSubscribes); err != nil { + return err + } + } + + return nil +} diff --git a/internal/logic/public/user/bindEmailWithVerificationLogic.go b/internal/logic/public/user/bindEmailWithVerificationLogic.go index c6cef3d..f2b910c 100644 --- a/internal/logic/public/user/bindEmailWithVerificationLogic.go +++ b/internal/logic/public/user/bindEmailWithVerificationLogic.go @@ -112,26 +112,20 @@ func (l *BindEmailWithVerificationLogic) BindEmailWithVerification(req *types.Bi return nil, errors.Wrapf(xerr.NewErrCode(xerr.FamilyAlreadyBound), "email already bound to current user") } - if err = familyHelper.validateJoinFamily(existingMethod.UserId, u.Id); err != nil { + mergeHelper := newAccountMergeHelper(l.ctx, l.svcCtx) + if _, err = mergeHelper.mergeIntoOwner(existingMethod.UserId, u.Id, "bind_email_with_verification"); err != nil { return nil, err } - joinResult, err := familyHelper.joinFamily(existingMethod.UserId, u.Id, "bind_email_with_verification") - if err != nil { - return nil, err - } - token, err := l.refreshBindSessionToken(u.Id) + token, err := l.refreshBindSessionToken(existingMethod.UserId) if err != nil { return nil, err } return &types.BindEmailWithVerificationResponse{ - Success: true, - Message: "joined family successfully", - Token: token, - UserId: u.Id, - FamilyJoined: true, - FamilyId: joinResult.FamilyId, - OwnerUserId: joinResult.OwnerUserId, + Success: true, + Message: "email bound successfully", + Token: token, + UserId: existingMethod.UserId, }, nil } diff --git a/internal/model/user/model.go b/internal/model/user/model.go index f70f905..4d316af 100644 --- a/internal/model/user/model.go +++ b/internal/model/user/model.go @@ -114,6 +114,7 @@ type customUserLogicModel interface { ClearSubscribeCache(ctx context.Context, data ...*Subscribe) error ClearUserCache(ctx context.Context, data ...*User) error + ClearDeviceCache(ctx context.Context, devices ...*Device) error QueryDailyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error) QueryMonthlyUserStatisticsList(ctx context.Context, date time.Time) ([]UserStatisticsWithDate, error) diff --git a/internal/svc/devce.go b/internal/svc/devce.go index 1a1556c..df8ed3b 100644 --- a/internal/svc/devce.go +++ b/internal/svc/devce.go @@ -82,24 +82,16 @@ func NewDeviceManager(srv *ServiceContext) *device.DeviceManager { } manager.OnDeviceKicked = func(userID int64, deviceID, session string, operator device.Operator) { - //管理员踢下线 - if operator == device.Admin { + switch operator { + case device.Admin: message := DeviceMessage{Method: DeviceKickedAdmin} _ = manager.SendToDevice(userID, deviceID, message.Json()) - //将登陆凭证从缓存中删除 - srv.Redis.Del(ctx, fmt.Sprintf("%v:%v", config.SessionIdKey, session)) - return - } - - //登陆设备超过限制踢下线 - if operator == device.MaxDevices { + case device.MaxDevices: message := DeviceMessage{Method: DeviceKickedMax} _ = manager.SendToDevice(userID, deviceID, message.Json()) - //将登陆凭证从缓存中删除 - srv.Redis.Del(ctx, fmt.Sprintf("%v:%v", config.SessionIdKey, session)) - return } + cleanupDeviceSessionCache(ctx, srv, userID, deviceID, session) } manager.OnMessage = func(userID int64, deviceID, session string, message string) { @@ -108,6 +100,39 @@ func NewDeviceManager(srv *ServiceContext) *device.DeviceManager { return manager } +func cleanupDeviceSessionCache(ctx context.Context, srv *ServiceContext, userID int64, deviceID, session string) { + if session == "" && deviceID == "" { + return + } + + pipe := srv.Redis.TxPipeline() + + if session != "" { + sessionKey := fmt.Sprintf("%v:%v", config.SessionIdKey, session) + pipe.Del(ctx, sessionKey) + + sessionDetailKey := fmt.Sprintf("%s:detail:%s", config.SessionIdKey, session) + pipe.Del(ctx, sessionDetailKey) + + sessionsKey := fmt.Sprintf("%s%v", config.UserSessionsKeyPrefix, userID) + pipe.ZRem(ctx, sessionsKey, session) + } + + if deviceID != "" { + deviceCacheKey := fmt.Sprintf("%v:%v", config.DeviceCacheKeyKey, deviceID) + pipe.Del(ctx, deviceCacheKey) + } + + if _, err := pipe.Exec(ctx); err != nil { + logger.Errorw("[DeviceManager] failed to clear kicked device cache", + logger.Field("user_id", userID), + logger.Field("device_id", deviceID), + logger.Field("session", session), + logger.Field("error", err.Error()), + ) + } +} + type DeviceMessage struct { Method DeviceMessageMethod `json:"method"` }