All checks were successful
Build docker and publish / build (20.15.1) (push) Successful in 7m29s
logoutUnbind 循环清理家庭成员时,踢人者(kicker)也在成员列表中, 导致 kicker 的设备被踢、session 被清,自己也下线了。 现在跳过 kickerUserID,只清缓存不踢设备不清 session。 Co-Authored-By: claude-flow <ruv@ruv.net>
410 lines
14 KiB
Go
410 lines
14 KiB
Go
package user
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/perfect-panel/server/internal/config"
|
||
"github.com/perfect-panel/server/internal/model/user"
|
||
"github.com/perfect-panel/server/internal/svc"
|
||
"github.com/perfect-panel/server/internal/types"
|
||
"github.com/perfect-panel/server/pkg/constant"
|
||
"github.com/perfect-panel/server/pkg/logger"
|
||
"github.com/perfect-panel/server/pkg/xerr"
|
||
"github.com/pkg/errors"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type UnbindDeviceLogic struct {
|
||
logger.Logger
|
||
ctx context.Context
|
||
svcCtx *svc.ServiceContext
|
||
}
|
||
|
||
// Unbind Device
|
||
func NewUnbindDeviceLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UnbindDeviceLogic {
|
||
return &UnbindDeviceLogic{
|
||
Logger: logger.WithContext(ctx),
|
||
ctx: ctx,
|
||
svcCtx: svcCtx,
|
||
}
|
||
}
|
||
|
||
func (l *UnbindDeviceLogic) UnbindDevice(req *types.UnbindDeviceRequest) error {
|
||
userInfo, ok := l.ctx.Value(constant.CtxKeyUser).(*user.User)
|
||
if !ok {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access")
|
||
}
|
||
|
||
device, err := l.svcCtx.UserModel.FindOneDevice(l.ctx, req.Id)
|
||
if err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DeviceNotExist), "find device")
|
||
}
|
||
|
||
isSelf := (device.UserId == userInfo.Id)
|
||
|
||
if !isSelf {
|
||
// Not own device — check if in the same family
|
||
if !l.isInSameFamily(userInfo.Id, device.UserId) {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.InvalidParams), "not in same family")
|
||
}
|
||
}
|
||
|
||
targetUser, err := l.svcCtx.UserModel.FindOne(l.ctx, device.UserId)
|
||
if err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "find target user")
|
||
}
|
||
|
||
// If kicking another user (not self), check if subscription transfer is needed
|
||
if !isSelf {
|
||
if err := l.transferSubscriptionsIfNeeded(userInfo, targetUser); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
currentSessionID, _ := l.ctx.Value(constant.CtxKeySessionID).(string)
|
||
return l.logoutUnbind(userInfo.Id, targetUser, device, currentSessionID)
|
||
}
|
||
|
||
func (l *UnbindDeviceLogic) logoutUnbind(kickerUserID int64, userInfo *user.User, device *user.Device, currentSessionID string) error {
|
||
// 1. 事务前查出 AuthMethods,用于事务后清邮箱缓存
|
||
authMethods, _ := l.svcCtx.UserModel.FindUserAuthMethods(l.ctx, userInfo.Id)
|
||
|
||
// 2. 事务前收集受影响的家庭成员 ID(事务会改变家庭关系,之后查不到)
|
||
familyMemberIDs := l.collectFamilyMemberIDs(userInfo.Id)
|
||
|
||
// 3. 事务前查询家庭关系,判断是否需要转移(而非解散)
|
||
var newOwnerUserID int64
|
||
var transferredSubscribes []user.Subscribe
|
||
var relation user.UserFamilyMember
|
||
relErr := l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.UserFamilyMember{}).
|
||
Where("user_id = ? AND status = ?", userInfo.Id, user.FamilyMemberActive).
|
||
First(&relation).Error
|
||
isOwnerWithMembers := relErr == nil && relation.Role == user.FamilyRoleOwner && len(familyMemberIDs) > 0
|
||
|
||
// 4. 事务:根据角色决定转移还是解散
|
||
err := l.svcCtx.DB.Transaction(func(tx *gorm.DB) error {
|
||
if isOwnerWithMembers {
|
||
// Owner 且有其他 active member → 转移而非解散
|
||
newOwnerUserID = familyMemberIDs[0]
|
||
|
||
// 4a. 转移订阅 user_subscribe.user_id → newOwner
|
||
tx.Model(&user.Subscribe{}).
|
||
Where("user_id = ?", userInfo.Id).
|
||
Find(&transferredSubscribes)
|
||
if len(transferredSubscribes) > 0 {
|
||
if err := tx.Model(&user.Subscribe{}).
|
||
Where("user_id = ?", userInfo.Id).
|
||
Update("user_id", newOwnerUserID).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "transfer subscribes failed")
|
||
}
|
||
}
|
||
|
||
// 4b. 转移邮箱 auth_method → newOwner
|
||
if err := tx.Model(&user.AuthMethods{}).
|
||
Where("user_id = ? AND auth_type = ?", userInfo.Id, "email").
|
||
Update("user_id", newOwnerUserID).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "transfer email auth_method failed")
|
||
}
|
||
|
||
// 4c. 提升 member 为 owner
|
||
if err := tx.Model(&user.UserFamilyMember{}).
|
||
Where("family_id = ? AND user_id = ? AND status = ?",
|
||
relation.FamilyId, newOwnerUserID, user.FamilyMemberActive).
|
||
Update("role", user.FamilyRoleOwner).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "promote member to owner failed")
|
||
}
|
||
|
||
// 4d. 更新 family 的 owner_user_id
|
||
if err := tx.Model(&user.UserFamily{}).
|
||
Where("id = ?", relation.FamilyId).
|
||
Update("owner_user_id", newOwnerUserID).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "update family owner failed")
|
||
}
|
||
|
||
// 4e. 将当前 owner 标记为已退出
|
||
now := time.Now()
|
||
if err := tx.Model(&user.UserFamilyMember{}).
|
||
Where("id = ?", relation.Id).
|
||
Updates(map[string]interface{}{
|
||
"status": user.FamilyMemberRemoved,
|
||
"left_at": now,
|
||
}).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "mark owner as removed failed")
|
||
}
|
||
|
||
// 邮箱 auth_method 已转移,不需要再删除
|
||
// 删除其他非 device、非 email 的 auth_methods(如有)
|
||
if err := tx.Where("user_id = ? AND auth_type NOT IN (?,?)", userInfo.Id, "device", "email").
|
||
Delete(&user.AuthMethods{}).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseDeletedError), "delete other auth methods failed")
|
||
}
|
||
|
||
l.Infow("owner logout: transferred to new owner",
|
||
logger.Field("old_owner", userInfo.Id),
|
||
logger.Field("new_owner", newOwnerUserID),
|
||
logger.Field("family_id", relation.FamilyId),
|
||
logger.Field("subscriptions_transferred", len(transferredSubscribes)),
|
||
)
|
||
|
||
return nil
|
||
}
|
||
|
||
// 没有其他成员 或 当前用户是 member → 正常解散/退出流程
|
||
exitHelper := newFamilyExitHelper(l.ctx, l.svcCtx)
|
||
if err := exitHelper.removeUserFromActiveFamily(tx, userInfo.Id, true); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 解绑邮箱等非 device 类型的 auth_methods(保留 device 绑定)
|
||
if err := tx.Where("user_id = ? AND auth_type != ?", userInfo.Id, "device").
|
||
Delete(&user.AuthMethods{}).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseDeletedError), "delete non-device auth methods failed")
|
||
}
|
||
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 5. 事务提交后清缓存
|
||
// 5a. 清旧 owner 的邮箱缓存(不管是否转移,都要清 email cache key)
|
||
for _, am := range authMethods {
|
||
if am.AuthType == "email" && am.AuthIdentifier != "" {
|
||
cacheKey := fmt.Sprintf("cache:user:email:%s", am.AuthIdentifier)
|
||
if delErr := l.svcCtx.Redis.Del(l.ctx, cacheKey).Err(); delErr != nil {
|
||
l.Errorw("clear email cache failed",
|
||
logger.Field("user_id", userInfo.Id),
|
||
logger.Field("email", am.AuthIdentifier),
|
||
logger.Field("error", delErr.Error()),
|
||
)
|
||
}
|
||
}
|
||
}
|
||
// 5b. 清旧 owner 的用户缓存
|
||
if clearErr := l.svcCtx.UserModel.ClearUserCache(l.ctx, userInfo); clearErr != nil {
|
||
l.Errorw("clear user cache failed",
|
||
logger.Field("user_id", userInfo.Id),
|
||
logger.Field("error", clearErr.Error()),
|
||
)
|
||
}
|
||
|
||
// 5c. 如果有转移,清新 owner 的缓存 + 订阅缓存
|
||
if newOwnerUserID > 0 {
|
||
if newOwnerUser, findErr := l.svcCtx.UserModel.FindOne(l.ctx, newOwnerUserID); findErr == nil {
|
||
_ = l.svcCtx.UserModel.ClearUserCache(l.ctx, newOwnerUser)
|
||
}
|
||
for i := range transferredSubscribes {
|
||
_ = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, &transferredSubscribes[i])
|
||
}
|
||
}
|
||
|
||
// 6. Kick 设备(关闭 WebSocket,客户端会收到 kicked 消息)
|
||
l.svcCtx.DeviceManager.KickDevice(device.UserId, device.Identifier)
|
||
|
||
// 7. 清除该用户所有 session(旧 token 全部失效)
|
||
l.clearAllSessions(userInfo.Id)
|
||
|
||
// 8. 清理受影响的家庭成员缓存 + 踢设备 + 清 session(跳过踢人者自己)
|
||
for _, memberID := range familyMemberIDs {
|
||
if memberUser, findErr := l.svcCtx.UserModel.FindOne(l.ctx, memberID); findErr == nil {
|
||
if clearErr := l.svcCtx.UserModel.ClearUserCache(l.ctx, memberUser); clearErr != nil {
|
||
l.Errorw("clear family member cache failed",
|
||
logger.Field("member_id", memberID),
|
||
logger.Field("error", clearErr.Error()),
|
||
)
|
||
}
|
||
}
|
||
|
||
// 踢人者只需刷新缓存,不踢设备、不清 session
|
||
if memberID == kickerUserID {
|
||
continue
|
||
}
|
||
|
||
// 踢该成员的所有在线设备
|
||
var memberDevices []user.Device
|
||
l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.Device{}).
|
||
Where("user_id = ?", memberID).
|
||
Find(&memberDevices)
|
||
for _, d := range memberDevices {
|
||
l.svcCtx.DeviceManager.KickDevice(d.UserId, d.Identifier)
|
||
}
|
||
|
||
// 清除该成员所有 session,确保旧 token 失效
|
||
l.clearAllSessions(memberID)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// collectFamilyMemberIDs 收集当前用户所在家庭的其他成员 ID(需在事务前调用)
|
||
func (l *UnbindDeviceLogic) collectFamilyMemberIDs(userID int64) []int64 {
|
||
var result struct {
|
||
FamilyId int64
|
||
}
|
||
err := l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.UserFamilyMember{}).
|
||
Select("family_id").
|
||
Where("user_id = ? AND status = ?", userID, user.FamilyMemberActive).
|
||
First(&result).Error
|
||
if err != nil || result.FamilyId == 0 {
|
||
return nil
|
||
}
|
||
|
||
var memberIDs []int64
|
||
l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.UserFamilyMember{}).
|
||
Where("family_id = ? AND status = ? AND user_id != ?", result.FamilyId, user.FamilyMemberActive, userID).
|
||
Pluck("user_id", &memberIDs)
|
||
|
||
return memberIDs
|
||
}
|
||
|
||
// clearAllSessions 清除指定用户的所有会话(通过 SCAN 查找,不依赖 sorted set)
|
||
func (l *UnbindDeviceLogic) clearAllSessions(userId int64) {
|
||
sessionSet := make(map[string]struct{})
|
||
|
||
// SCAN 所有 session key,找出属于该用户的
|
||
userIDText := strconv.FormatInt(userId, 10)
|
||
pattern := fmt.Sprintf("%s:*", config.SessionIdKey)
|
||
var cursor uint64
|
||
for {
|
||
keys, nextCursor, scanErr := l.svcCtx.Redis.Scan(l.ctx, cursor, pattern, 200).Result()
|
||
if scanErr != nil {
|
||
l.Errorw("扫描会话键失败", logger.Field("user_id", userId), logger.Field("error", scanErr.Error()))
|
||
break
|
||
}
|
||
for _, sessionKey := range keys {
|
||
value, getErr := l.svcCtx.Redis.Get(l.ctx, sessionKey).Result()
|
||
if getErr != nil || value != userIDText {
|
||
continue
|
||
}
|
||
sessionID := strings.TrimPrefix(sessionKey, config.SessionIdKey+":")
|
||
if sessionID == "" || strings.HasPrefix(sessionID, "detail:") {
|
||
continue
|
||
}
|
||
sessionSet[sessionID] = struct{}{}
|
||
}
|
||
cursor = nextCursor
|
||
if cursor == 0 {
|
||
break
|
||
}
|
||
}
|
||
|
||
// SCAN 设备缓存 key,找出关联的
|
||
deviceKeySet := make(map[string]struct{})
|
||
devicePattern := fmt.Sprintf("%s:*", config.DeviceCacheKeyKey)
|
||
cursor = 0
|
||
for {
|
||
keys, nextCursor, scanErr := l.svcCtx.Redis.Scan(l.ctx, cursor, devicePattern, 200).Result()
|
||
if scanErr != nil {
|
||
l.Errorw("扫描设备会话映射失败", logger.Field("user_id", userId), logger.Field("error", scanErr.Error()))
|
||
break
|
||
}
|
||
for _, deviceKey := range keys {
|
||
sessionID, getErr := l.svcCtx.Redis.Get(l.ctx, deviceKey).Result()
|
||
if getErr != nil {
|
||
continue
|
||
}
|
||
if _, exists := sessionSet[sessionID]; exists {
|
||
deviceKeySet[deviceKey] = struct{}{}
|
||
}
|
||
}
|
||
cursor = nextCursor
|
||
if cursor == 0 {
|
||
break
|
||
}
|
||
}
|
||
|
||
if len(sessionSet) == 0 {
|
||
return
|
||
}
|
||
|
||
sessionsKey := fmt.Sprintf("%s%v", config.UserSessionsKeyPrefix, userId)
|
||
pipe := l.svcCtx.Redis.TxPipeline()
|
||
for sessionID := range sessionSet {
|
||
pipe.Del(l.ctx, fmt.Sprintf("%v:%v", config.SessionIdKey, sessionID))
|
||
pipe.Del(l.ctx, fmt.Sprintf("%s:detail:%s", config.SessionIdKey, sessionID))
|
||
pipe.ZRem(l.ctx, sessionsKey, sessionID)
|
||
}
|
||
pipe.Del(l.ctx, sessionsKey)
|
||
|
||
for deviceKey := range deviceKeySet {
|
||
pipe.Del(l.ctx, deviceKey)
|
||
}
|
||
|
||
if _, err := pipe.Exec(l.ctx); err != nil {
|
||
l.Errorw("清理会话缓存失败",
|
||
logger.Field("user_id", userId),
|
||
logger.Field("error", err.Error()),
|
||
)
|
||
}
|
||
|
||
l.Infow("退出登录-清除所有Session",
|
||
logger.Field("user_id", userId),
|
||
logger.Field("count", len(sessionSet)),
|
||
)
|
||
}
|
||
|
||
// isInSameFamily checks whether two users belong to the same active family
|
||
func (l *UnbindDeviceLogic) isInSameFamily(userID1, userID2 int64) bool {
|
||
var relation1 struct{ FamilyId int64 }
|
||
err := l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.UserFamilyMember{}).
|
||
Select("family_id").
|
||
Where("user_id = ? AND status = ?", userID1, user.FamilyMemberActive).
|
||
First(&relation1).Error
|
||
if err != nil || relation1.FamilyId == 0 {
|
||
return false
|
||
}
|
||
|
||
var count int64
|
||
l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.UserFamilyMember{}).
|
||
Where("family_id = ? AND user_id = ? AND status = ?",
|
||
relation1.FamilyId, userID2, user.FamilyMemberActive).
|
||
Count(&count)
|
||
return count > 0
|
||
}
|
||
|
||
// transferSubscriptionsIfNeeded transfers subscriptions from the kicked user to the kicker
|
||
func (l *UnbindDeviceLogic) transferSubscriptionsIfNeeded(kicker *user.User, kicked *user.User) error {
|
||
// Query kicked user's subscriptions
|
||
var subscribes []user.Subscribe
|
||
if err := l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.Subscribe{}).
|
||
Where("user_id = ?", kicked.Id).
|
||
Find(&subscribes).Error; err != nil {
|
||
return nil // query error, skip transfer
|
||
}
|
||
if len(subscribes) == 0 {
|
||
return nil // no subscriptions to transfer
|
||
}
|
||
|
||
// Transfer subscriptions: UPDATE user_subscribe SET user_id = kicker WHERE user_id = kicked
|
||
if err := l.svcCtx.DB.WithContext(l.ctx).
|
||
Model(&user.Subscribe{}).
|
||
Where("user_id = ?", kicked.Id).
|
||
Update("user_id", kicker.Id).Error; err != nil {
|
||
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "transfer subscriptions failed")
|
||
}
|
||
|
||
// Clear subscription caches for both users
|
||
for _, sub := range subscribes {
|
||
_ = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, &sub)
|
||
}
|
||
|
||
l.Infow("subscriptions transferred",
|
||
logger.Field("from_user_id", kicked.Id),
|
||
logger.Field("to_user_id", kicker.Id),
|
||
logger.Field("count", len(subscribes)),
|
||
)
|
||
return nil
|
||
}
|