shanshanzhong 70c0483ca9
Some checks failed
Build docker and publish / build (20.15.1) (push) Failing after 8m2s
修复订阅
2026-03-04 23:08:46 -08:00

155 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package svc
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/perfect-panel/server/internal/config"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/pkg/device"
"github.com/perfect-panel/server/pkg/logger"
"github.com/pkg/errors"
"gorm.io/gorm"
)
func NewDeviceManager(srv *ServiceContext) *device.DeviceManager {
ctx := context.Background()
manager := device.NewDeviceManager(30, 30)
//设备离线处理
manager.OnDeviceOffline = func(userID int64, deviceID, session string, createAt time.Time) {
oneDevice, err := srv.UserModel.FindOneDeviceByIdentifier(ctx, deviceID)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
logger.Errorw("failed to find device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID))
}
return
}
//更新设备状态为离线
oneDevice.Online = false
err = srv.UserModel.UpdateDevice(ctx, oneDevice)
if err != nil {
logger.Errorw("[DeviceManager] failed to update device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID))
}
//当前时间为设备离线时间
currentTime := time.Now()
endTime := currentTime.Format("2006-01-02 00:00:00")
parseStart, _ := time.Parse(time.DateTime, endTime)
startTime := parseStart.Add(time.Hour * 24).Format(time.DateTime)
deviceOnlineRecord := user.DeviceOnlineRecord{
UserId: userID,
Identifier: deviceID,
OnlineTime: createAt,
OfflineTime: currentTime,
OnlineSeconds: int64(currentTime.Sub(createAt).Seconds()),
}
//获取设备昨日在线记录
var onlineRecord user.DeviceOnlineRecord
if err := srv.DB.Model(&onlineRecord).Where("user_id = ? and created_at >= ? and created_at < ?", userID, startTime, endTime).First(&onlineRecord).Error; err != nil {
//昨日未在线连续在线天数为1
deviceOnlineRecord.DurationDays = 1
} else {
//昨日在线,连续在线天数为,昨天连续在线天数+1等于当前连续在线天数
deviceOnlineRecord.DurationDays = onlineRecord.DurationDays + 1
}
if err := srv.DB.Create(&deviceOnlineRecord).Error; err != nil {
logger.Errorw("[DeviceOnlineRecord] failed to DeviceOnlineRecord", logger.Field("error", err.Error()), logger.Field("device_id", deviceID))
}
}
//设备上线处理
manager.OnDeviceOnline = func(userID int64, deviceID, session string) {
oneDevice, err := srv.UserModel.FindOneDeviceByIdentifier(ctx, deviceID)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
logger.Errorw("failed to find device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID))
}
return
}
oneDevice.Online = true
err = srv.UserModel.UpdateDevice(ctx, oneDevice)
if err != nil {
logger.Errorw("[DeviceManager] failed to update device", logger.Field("error", err.Error()), logger.Field("device_id", deviceID))
return
}
}
manager.OnDeviceKicked = func(userID int64, deviceID, session string, operator device.Operator) {
switch operator {
case device.Admin:
message := DeviceMessage{Method: DeviceKickedAdmin}
_ = manager.SendToDevice(userID, deviceID, message.Json())
case device.MaxDevices:
message := DeviceMessage{Method: DeviceKickedMax}
_ = manager.SendToDevice(userID, deviceID, message.Json())
}
cleanupDeviceSessionCache(ctx, srv, userID, deviceID, session)
}
manager.OnMessage = func(userID int64, deviceID, session string, message string) {
logger.Infof("userid: %d ,device_number: %s,session: %s, message: %v", userID, deviceID, session, message)
}
return manager
}
func cleanupDeviceSessionCache(ctx context.Context, srv *ServiceContext, userID int64, deviceID, session string) {
if session == "" && deviceID == "" {
return
}
pipe := srv.Redis.TxPipeline()
if session != "" {
sessionKey := fmt.Sprintf("%v:%v", config.SessionIdKey, session)
pipe.Del(ctx, sessionKey)
sessionDetailKey := fmt.Sprintf("%s:detail:%s", config.SessionIdKey, session)
pipe.Del(ctx, sessionDetailKey)
sessionsKey := fmt.Sprintf("%s%v", config.UserSessionsKeyPrefix, userID)
pipe.ZRem(ctx, sessionsKey, session)
}
if deviceID != "" {
deviceCacheKey := fmt.Sprintf("%v:%v", config.DeviceCacheKeyKey, deviceID)
pipe.Del(ctx, deviceCacheKey)
}
if _, err := pipe.Exec(ctx); err != nil {
logger.Errorw("[DeviceManager] failed to clear kicked device cache",
logger.Field("user_id", userID),
logger.Field("device_id", deviceID),
logger.Field("session", session),
logger.Field("error", err.Error()),
)
}
}
type DeviceMessage struct {
Method DeviceMessageMethod `json:"method"`
}
func (dm *DeviceMessage) Json() string {
jsonData, _ := json.Marshal(dm)
return string(jsonData)
}
type DeviceMessageMethod string
const (
// DeviceKickedMax 设备数量超出限制
DeviceKickedMax DeviceMessageMethod = "kicked_device"
// DeviceKickedAdmin 管理员踢下线
DeviceKickedAdmin DeviceMessageMethod = "kicked_admin"
// SubscribeUpdate 订阅有更新
SubscribeUpdate DeviceMessageMethod = "subscribe_update"
)