package svc import ( "context" "encoding/json" "fmt" "time" "github.com/perfect-panel/server/internal/config" "github.com/perfect-panel/server/internal/model/user" "github.com/perfect-panel/server/pkg/device" "github.com/perfect-panel/server/pkg/logger" "github.com/pkg/errors" "gorm.io/gorm" ) func NewDeviceManager(srv *ServiceContext) *device.DeviceManager { ctx := context.Background() manager := device.NewDeviceManager(30, 30) //设备离线处理 manager.OnDeviceOffline = func(userID int64, deviceID, session string, createAt time.Time) { oneDevice, err := srv.UserModel.FindOneDeviceByIdentifier(ctx, deviceID) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { logger.Errorw("failed to find device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID)) } return } //更新设备状态为离线 oneDevice.Online = false err = srv.UserModel.UpdateDevice(ctx, oneDevice) if err != nil { logger.Errorw("[DeviceManager] failed to update device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID)) } //当前时间为设备离线时间 currentTime := time.Now() endTime := currentTime.Format("2006-01-02 00:00:00") parseStart, _ := time.Parse(time.DateTime, endTime) startTime := parseStart.Add(time.Hour * 24).Format(time.DateTime) deviceOnlineRecord := user.DeviceOnlineRecord{ UserId: userID, Identifier: deviceID, OnlineTime: createAt, OfflineTime: currentTime, OnlineSeconds: int64(currentTime.Sub(createAt).Seconds()), } //获取设备昨日在线记录 var onlineRecord user.DeviceOnlineRecord if err := srv.DB.Model(&onlineRecord).Where("user_id = ? and created_at >= ? and created_at < ?", userID, startTime, endTime).First(&onlineRecord).Error; err != nil { //昨日未在线,连续在线天数为1 deviceOnlineRecord.DurationDays = 1 } else { //昨日在线,连续在线天数为,昨天连续在线天数+1,等于当前连续在线天数 deviceOnlineRecord.DurationDays = onlineRecord.DurationDays + 1 } if err := srv.DB.Create(&deviceOnlineRecord).Error; err != nil { logger.Errorw("[DeviceOnlineRecord] failed to DeviceOnlineRecord", logger.Field("error", err.Error()), logger.Field("device_id", deviceID)) } } //设备上线处理 manager.OnDeviceOnline = func(userID int64, deviceID, session string) { oneDevice, err := srv.UserModel.FindOneDeviceByIdentifier(ctx, deviceID) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { logger.Errorw("failed to find device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID)) } return } oneDevice.Online = true err = srv.UserModel.UpdateDevice(ctx, oneDevice) if err != nil { logger.Errorw("[DeviceManager] failed to update device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID)) return } } manager.OnDeviceKicked = func(userID int64, deviceID, session string, operator device.Operator) { switch operator { case device.Admin: message := DeviceMessage{Method: DeviceKickedAdmin} _ = manager.SendToDevice(userID, deviceID, message.Json()) case device.MaxDevices: message := DeviceMessage{Method: DeviceKickedMax} _ = manager.SendToDevice(userID, deviceID, message.Json()) } cleanupDeviceSessionCache(ctx, srv, userID, deviceID, session) } manager.OnMessage = func(userID int64, deviceID, session string, message string) { logger.Infof("userid: %d ,device_number: %s,session: %s, message: %v", userID, deviceID, session, message) } return manager } func cleanupDeviceSessionCache(ctx context.Context, srv *ServiceContext, userID int64, deviceID, session string) { if session == "" && deviceID == "" { return } pipe := srv.Redis.TxPipeline() if session != "" { sessionKey := fmt.Sprintf("%v:%v", config.SessionIdKey, session) pipe.Del(ctx, sessionKey) sessionDetailKey := fmt.Sprintf("%s:detail:%s", config.SessionIdKey, session) pipe.Del(ctx, sessionDetailKey) sessionsKey := fmt.Sprintf("%s%v", config.UserSessionsKeyPrefix, userID) pipe.ZRem(ctx, sessionsKey, session) } if deviceID != "" { deviceCacheKey := fmt.Sprintf("%v:%v", config.DeviceCacheKeyKey, deviceID) pipe.Del(ctx, deviceCacheKey) } if _, err := pipe.Exec(ctx); err != nil { logger.Errorw("[DeviceManager] failed to clear kicked device cache", logger.Field("user_id", userID), logger.Field("device_id", deviceID), logger.Field("session", session), logger.Field("error", err.Error()), ) } } type DeviceMessage struct { Method DeviceMessageMethod `json:"method"` } func (dm *DeviceMessage) Json() string { jsonData, _ := json.Marshal(dm) return string(jsonData) } type DeviceMessageMethod string const ( // DeviceKickedMax 设备数量超出限制 DeviceKickedMax DeviceMessageMethod = "kicked_device" // DeviceKickedAdmin 管理员踢下线 DeviceKickedAdmin DeviceMessageMethod = "kicked_admin" // SubscribeUpdate 订阅有更新 SubscribeUpdate DeviceMessageMethod = "subscribe_update" )