When administrators update subscription plan configurations (traffic limits,
nodes, speed limits, etc.), existing subscribers were not seeing the updated
settings immediately. This was caused by stale cache entries that were not
being invalidated.
The issue occurred because:
- User subscription queries cache the entire result including preloaded plan details
- Plan update/delete operations only cleared the plan's own cache keys
- User subscription cache keys (cache:user:subscribe:user:{userId}) remained stale
This fix ensures that when a subscription plan is updated or deleted, all
associated user subscription caches are properly invalidated by:
- Querying all active users subscribed to the plan
- Building cache keys for each affected user
- Clearing both plan and user subscription caches atomically
Users will now immediately see updated plan configurations without waiting
for cache expiration.
197 lines
5.5 KiB
Go
197 lines
5.5 KiB
Go
package subscribe
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/perfect-panel/server/internal/model/node"
|
|
"github.com/perfect-panel/server/pkg/cache"
|
|
"github.com/perfect-panel/server/pkg/tool"
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
var _ Model = (*customSubscribeModel)(nil)
|
|
var (
|
|
cacheSubscribeIdPrefix = "cache:subscribe:id:"
|
|
)
|
|
|
|
type (
|
|
Model interface {
|
|
subscribeModel
|
|
customSubscribeLogicModel
|
|
}
|
|
subscribeModel interface {
|
|
Insert(ctx context.Context, data *Subscribe, tx ...*gorm.DB) error
|
|
FindOne(ctx context.Context, id int64) (*Subscribe, error)
|
|
Update(ctx context.Context, data *Subscribe, tx ...*gorm.DB) error
|
|
Delete(ctx context.Context, id int64, tx ...*gorm.DB) error
|
|
Transaction(ctx context.Context, fn func(db *gorm.DB) error) error
|
|
}
|
|
|
|
customSubscribeModel struct {
|
|
*defaultSubscribeModel
|
|
}
|
|
defaultSubscribeModel struct {
|
|
cache.CachedConn
|
|
table string
|
|
}
|
|
)
|
|
|
|
func newSubscribeModel(db *gorm.DB, c *redis.Client) *defaultSubscribeModel {
|
|
return &defaultSubscribeModel{
|
|
CachedConn: cache.NewConn(db, c),
|
|
table: "`subscribe`",
|
|
}
|
|
}
|
|
|
|
//nolint:unused
|
|
func (m *defaultSubscribeModel) batchGetCacheKeys(Subscribes ...*Subscribe) []string {
|
|
var keys []string
|
|
for _, subscribe := range Subscribes {
|
|
keys = append(keys, m.getCacheKeys(subscribe)...)
|
|
}
|
|
return keys
|
|
|
|
}
|
|
func (m *defaultSubscribeModel) getCacheKeys(data *Subscribe) []string {
|
|
if data == nil {
|
|
return []string{}
|
|
}
|
|
var keys []string
|
|
if data.Nodes != "" {
|
|
var nodes []*node.Node
|
|
ids := strings.Split(data.Nodes, ",")
|
|
|
|
err := m.QueryNoCacheCtx(context.Background(), &nodes, func(conn *gorm.DB, v interface{}) error {
|
|
return conn.Model(&node.Node{}).Where("id IN (?)", tool.StringSliceToInt64Slice(ids)).Find(&nodes).Error
|
|
})
|
|
if err == nil {
|
|
for _, n := range nodes {
|
|
keys = append(keys, fmt.Sprintf("%s%d", node.ServerUserListCacheKey, n.ServerId))
|
|
}
|
|
}
|
|
}
|
|
if data.NodeTags != "" {
|
|
var nodes []*node.Node
|
|
tags := tool.RemoveDuplicateElements(strings.Split(data.NodeTags, ",")...)
|
|
err := m.QueryNoCacheCtx(context.Background(), &nodes, func(conn *gorm.DB, v interface{}) error {
|
|
return conn.Model(&node.Node{}).Scopes(InSet("tags", tags)).Find(&nodes).Error
|
|
})
|
|
if err == nil {
|
|
for _, n := range nodes {
|
|
keys = append(keys, fmt.Sprintf("%s%d", node.ServerUserListCacheKey, n.ServerId))
|
|
}
|
|
}
|
|
}
|
|
|
|
return append(keys, fmt.Sprintf("%s%v", cacheSubscribeIdPrefix, data.Id))
|
|
}
|
|
|
|
func (m *defaultSubscribeModel) Insert(ctx context.Context, data *Subscribe, tx ...*gorm.DB) error {
|
|
err := m.ExecCtx(ctx, func(conn *gorm.DB) error {
|
|
if len(tx) > 0 {
|
|
conn = tx[0]
|
|
}
|
|
return conn.Create(&data).Error
|
|
}, m.getCacheKeys(data)...)
|
|
return err
|
|
}
|
|
|
|
func (m *defaultSubscribeModel) FindOne(ctx context.Context, id int64) (*Subscribe, error) {
|
|
SubscribeIdKey := fmt.Sprintf("%s%v", cacheSubscribeIdPrefix, id)
|
|
var resp Subscribe
|
|
err := m.QueryCtx(ctx, &resp, SubscribeIdKey, func(conn *gorm.DB, v interface{}) error {
|
|
return conn.Model(&Subscribe{}).Where("`id` = ?", id).First(&resp).Error
|
|
})
|
|
switch {
|
|
case err == nil:
|
|
return &resp, nil
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func (m *defaultSubscribeModel) Update(ctx context.Context, data *Subscribe, tx ...*gorm.DB) error {
|
|
old, err := m.FindOne(ctx, data.Id)
|
|
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
|
|
// 获取所有使用该套餐的用户订阅缓存 key
|
|
var userIds []int64
|
|
err = m.QueryNoCacheCtx(ctx, &userIds, func(conn *gorm.DB, v interface{}) error {
|
|
return conn.Table("user_subscribe").
|
|
Where("subscribe_id = ? AND status IN (0, 1)", data.Id).
|
|
Distinct("user_id").
|
|
Pluck("user_id", &userIds).Error
|
|
})
|
|
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
|
|
// 构建用户订阅缓存 key 列表
|
|
userSubscribeCacheKeys := make([]string, 0, len(userIds))
|
|
for _, userId := range userIds {
|
|
userSubscribeCacheKeys = append(userSubscribeCacheKeys, fmt.Sprintf("cache:user:subscribe:user:%d", userId))
|
|
}
|
|
|
|
// 合并套餐缓存 key 和用户订阅缓存 key
|
|
allCacheKeys := append(m.getCacheKeys(old), userSubscribeCacheKeys...)
|
|
|
|
err = m.ExecCtx(ctx, func(conn *gorm.DB) error {
|
|
db := conn
|
|
if len(tx) > 0 {
|
|
db = tx[0]
|
|
}
|
|
return db.Save(data).Error
|
|
}, allCacheKeys...)
|
|
return err
|
|
}
|
|
|
|
func (m *defaultSubscribeModel) Delete(ctx context.Context, id int64, tx ...*gorm.DB) error {
|
|
data, err := m.FindOne(ctx, id)
|
|
if err != nil {
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// 获取所有使用该套餐的用户订阅缓存 key
|
|
var userIds []int64
|
|
err = m.QueryNoCacheCtx(ctx, &userIds, func(conn *gorm.DB, v interface{}) error {
|
|
return conn.Table("user_subscribe").
|
|
Where("subscribe_id = ? AND status IN (0, 1)", id).
|
|
Distinct("user_id").
|
|
Pluck("user_id", &userIds).Error
|
|
})
|
|
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
|
|
// 构建用户订阅缓存 key 列表
|
|
userSubscribeCacheKeys := make([]string, 0, len(userIds))
|
|
for _, userId := range userIds {
|
|
userSubscribeCacheKeys = append(userSubscribeCacheKeys, fmt.Sprintf("cache:user:subscribe:user:%d", userId))
|
|
}
|
|
|
|
// 合并套餐缓存 key 和用户订阅缓存 key
|
|
allCacheKeys := append(m.getCacheKeys(data), userSubscribeCacheKeys...)
|
|
|
|
err = m.ExecCtx(ctx, func(conn *gorm.DB) error {
|
|
db := conn
|
|
if len(tx) > 0 {
|
|
db = tx[0]
|
|
}
|
|
return db.Delete(&Subscribe{}, id).Error
|
|
}, allCacheKeys...)
|
|
return err
|
|
}
|
|
|
|
func (m *defaultSubscribeModel) Transaction(ctx context.Context, fn func(db *gorm.DB) error) error {
|
|
return m.TransactCtx(ctx, fn)
|
|
}
|