feat(subscription): enhance subscription cache management and improve error handling

This commit is contained in:
Chang lue Tsen 2025-08-15 14:45:54 -04:00
parent 740dd48763
commit 2be1c4f6ed
11 changed files with 187 additions and 28 deletions

View File

@ -81,6 +81,5 @@ func (l *CreateUserSubscribeLogic) CreateUserSubscribe(req *types.CreateUserSubs
if err != nil { if err != nil {
logger.Errorw("ClearSubscribe error", logger.Field("error", err.Error())) logger.Errorw("ClearSubscribe error", logger.Field("error", err.Error()))
} }
return nil return nil
} }

View File

@ -26,10 +26,27 @@ func NewDeleteUserSubscribeLogic(ctx context.Context, svcCtx *svc.ServiceContext
} }
func (l *DeleteUserSubscribeLogic) DeleteUserSubscribe(req *types.DeleteUserSubscribeRequest) error { func (l *DeleteUserSubscribeLogic) DeleteUserSubscribe(req *types.DeleteUserSubscribeRequest) error {
err := l.svcCtx.UserModel.DeleteSubscribeById(l.ctx, req.UserSubscribeId) // find user subscribe by ID
userSubscribe, err := l.svcCtx.UserModel.FindOneSubscribe(l.ctx, req.UserSubscribeId)
if err != nil {
l.Errorw("failed to find user subscribe", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "failed to find user subscribe: %v", err.Error())
}
err = l.svcCtx.UserModel.DeleteSubscribeById(l.ctx, req.UserSubscribeId)
if err != nil { if err != nil {
l.Errorw("failed to delete user subscribe", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId)) l.Errorw("failed to delete user subscribe", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseDeletedError), "failed to delete user subscribe: %v", err.Error()) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseDeletedError), "failed to delete user subscribe: %v", err.Error())
} }
// Clear user subscribe cache
if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, userSubscribe); err != nil {
l.Errorw("failed to clear user subscribe cache", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "failed to clear user subscribe cache: %v", err.Error())
}
// Clear subscribe cache
if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSubscribe.SubscribeId); err != nil {
l.Errorw("failed to clear subscribe cache", logger.Field("error", err.Error()), logger.Field("subscribeId", userSubscribe.SubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "failed to clear subscribe cache: %v", err.Error())
}
return nil return nil
} }

View File

@ -28,7 +28,7 @@ func NewUpdateUserSubscribeLogic(ctx context.Context, svcCtx *svc.ServiceContext
} }
func (l *UpdateUserSubscribeLogic) UpdateUserSubscribe(req *types.UpdateUserSubscribeRequest) error { func (l *UpdateUserSubscribeLogic) UpdateUserSubscribe(req *types.UpdateUserSubscribeRequest) error {
userSub, err := l.svcCtx.UserModel.FindOneUserSubscribe(l.ctx, req.UserSubscribeId) userSub, err := l.svcCtx.UserModel.FindOneSubscribe(l.ctx, req.UserSubscribeId)
if err != nil { if err != nil {
l.Errorw("FindOneUserSubscribe failed:", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId)) l.Errorw("FindOneUserSubscribe failed:", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "FindOneUserSubscribe failed: %v", err.Error()) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "FindOneUserSubscribe failed: %v", err.Error())
@ -59,6 +59,15 @@ func (l *UpdateUserSubscribeLogic) UpdateUserSubscribe(req *types.UpdateUserSubs
l.Errorw("UpdateSubscribe failed:", logger.Field("error", err.Error())) l.Errorw("UpdateSubscribe failed:", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "UpdateSubscribe failed: %v", err.Error()) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "UpdateSubscribe failed: %v", err.Error())
} }
// Clear user subscribe cache
if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, userSub); err != nil {
l.Errorw("ClearSubscribeCache failed:", logger.Field("error", err.Error()), logger.Field("userSubscribeId", userSub.Id))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error())
}
// Clear subscribe cache
if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSub.SubscribeId); err != nil {
l.Errorw("failed to clear subscribe cache", logger.Field("error", err.Error()), logger.Field("subscribeId", userSub.SubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "failed to clear subscribe cache: %v", err.Error())
}
return nil return nil
} }

View File

@ -2,9 +2,10 @@ package user
import ( import (
"context" "context"
"github.com/perfect-panel/server/internal/model/order"
"time" "time"
"github.com/perfect-panel/server/internal/model/order"
"github.com/perfect-panel/server/pkg/constant" "github.com/perfect-panel/server/pkg/constant"
"github.com/google/uuid" "github.com/google/uuid"
@ -72,5 +73,16 @@ func (l *ResetUserSubscribeTokenLogic) ResetUserSubscribeToken(req *types.ResetU
l.Errorw("UpdateSubscribe failed:", logger.Field("error", err.Error())) l.Errorw("UpdateSubscribe failed:", logger.Field("error", err.Error()))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "UpdateSubscribe failed: %v", err.Error()) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "UpdateSubscribe failed: %v", err.Error())
} }
//clear user subscription cache
if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, &newSub); err != nil {
l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("userSubscribeId", userSub.Id))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error())
}
// Clear subscription cache
if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSub.SubscribeId); err != nil {
l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("subscribeId", userSub.SubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error())
}
return nil return nil
} }

View File

@ -38,6 +38,14 @@ func (l *UnsubscribeLogic) Unsubscribe(req *types.UnsubscribeRequest) error {
logger.Error("current user is not found in context") logger.Error("current user is not found in context")
return errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access") return errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access")
} }
// find user subscription by ID
userSub, err := l.svcCtx.UserModel.FindOneSubscribe(l.ctx, req.Id)
if err != nil {
l.Errorw("FindOneSubscribe failed", logger.Field("error", err.Error()), logger.Field("reqId", req.Id))
return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "FindOneSubscribe failed: %v", err.Error())
}
// Calculate the remaining amount to refund based on unused subscription time/traffic // Calculate the remaining amount to refund based on unused subscription time/traffic
remainingAmount, err := CalculateRemainingAmount(l.ctx, l.svcCtx, req.Id) remainingAmount, err := CalculateRemainingAmount(l.ctx, l.svcCtx, req.Id)
if err != nil { if err != nil {
@ -118,5 +126,21 @@ func (l *UnsubscribeLogic) Unsubscribe(req *types.UnsubscribeRequest) error {
return l.svcCtx.UserModel.Update(l.ctx, u) return l.svcCtx.UserModel.Update(l.ctx, u)
}) })
if err != nil {
l.Errorw("Unsubscribe transaction failed", logger.Field("error", err.Error()), logger.Field("userId", u.Id), logger.Field("reqId", req.Id))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "Unsubscribe transaction failed: %v", err.Error())
}
//clear user subscription cache
if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, userSub); err != nil {
l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("userSubscribeId", userSub.Id))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error())
}
// Clear subscription cache
if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSub.SubscribeId); err != nil {
l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("subscribeId", userSub.SubscribeId))
return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error())
}
return err return err
} }

View File

@ -46,8 +46,9 @@ var (
func (m *customServerModel) ClearCache(ctx context.Context, id int64) error { func (m *customServerModel) ClearCache(ctx context.Context, id int64) error {
serverIdKey := fmt.Sprintf("%s%v", cacheServerIdPrefix, id) serverIdKey := fmt.Sprintf("%s%v", cacheServerIdPrefix, id)
configKey := fmt.Sprintf("%s%d", config.ServerConfigCacheKey, id) configKey := fmt.Sprintf("%s%d", config.ServerConfigCacheKey, id)
userListKey := fmt.Sprintf("%s%v", config.ServerUserListCacheKey, id)
return m.DelCacheCtx(ctx, serverIdKey, configKey) return m.DelCacheCtx(ctx, serverIdKey, configKey, userListKey)
} }
// QueryServerCountByServerGroups Query Server Count By Server Groups // QueryServerCountByServerGroups Query Server Count By Server Groups

View File

@ -4,9 +4,11 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"github.com/perfect-panel/server/internal/config" "github.com/perfect-panel/server/internal/config"
"github.com/perfect-panel/server/internal/model/server"
"github.com/perfect-panel/server/pkg/cache" "github.com/perfect-panel/server/pkg/cache"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"gorm.io/gorm" "gorm.io/gorm"
@ -69,6 +71,27 @@ func (m *defaultSubscribeModel) getCacheKeys(data *Subscribe) []string {
} }
} }
} }
// Temporary solution waiting for refactoring
if data.ServerGroup != "" {
cacheKey := strings.Split(data.ServerGroup, ",")
groupIds := make([]int64, 0)
for _, v := range cacheKey {
if v != "" {
id, _ := strconv.ParseInt(v, 10, 64)
if id > 0 {
groupIds = append(groupIds, id)
}
}
}
var ids []int64
_ = m.Transaction(context.Background(), func(tx *gorm.DB) error {
return tx.Model(&server.Server{}).Where("group_id IN ?", groupIds).Pluck("id", &ids).Error
})
for _, id := range ids {
serverKey = append(serverKey, fmt.Sprintf("%s%v", config.ServerUserListCacheKey, id))
}
}
cacheKeys := []string{SubscribeIdKey} cacheKeys := []string{SubscribeIdKey}
if len(serverKey) > 0 { if len(serverKey) > 0 {
cacheKeys = append(cacheKeys, serverKey...) cacheKeys = append(cacheKeys, serverKey...)

View File

@ -35,7 +35,7 @@ type customSubscribeLogicModel interface {
QuerySubscribeIdsByServerIdAndServerGroupId(ctx context.Context, serverId, serverGroupId int64) ([]*Subscribe, error) QuerySubscribeIdsByServerIdAndServerGroupId(ctx context.Context, serverId, serverGroupId int64) ([]*Subscribe, error)
QuerySubscribeMinSortByIds(ctx context.Context, ids []int64) (int64, error) QuerySubscribeMinSortByIds(ctx context.Context, ids []int64) (int64, error)
QuerySubscribeListByIds(ctx context.Context, ids []int64) ([]*Subscribe, error) QuerySubscribeListByIds(ctx context.Context, ids []int64) ([]*Subscribe, error)
ClearCache(ctx context.Context, id int64) error ClearCache(ctx context.Context, id ...int64) error
} }
// NewModel returns a model for the database table. // NewModel returns a model for the database table.
@ -109,23 +109,18 @@ func (m *customSubscribeModel) QuerySubscribeListByIds(ctx context.Context, ids
return list, err return list, err
} }
func (m *customSubscribeModel) ClearCache(ctx context.Context, id int64) error { func (m *customSubscribeModel) ClearCache(ctx context.Context, ids ...int64) error {
if id <= 0 { if len(ids) <= 0 {
return nil return nil
} }
data, err := m.FindOne(ctx, id)
if err != nil {
return err
}
cacheKeys := m.getCacheKeys(data) var cacheKeys []string
for _, id := range ids {
cacheKeys = append(cacheKeys, m.getCacheKeys(&Subscribe{Id: id})...) data, err := m.FindOne(ctx, id)
if err != nil {
for _, key := range cacheKeys {
if err := m.CachedConn.DelCacheCtx(ctx, key); err != nil {
return err return err
} }
cacheKeys = append(cacheKeys, m.getCacheKeys(data)...)
} }
return nil return m.CachedConn.DelCacheCtx(ctx, cacheKeys...)
} }

View File

@ -64,7 +64,7 @@ func (s *Subscribe) GetCacheKeys() []string {
if s == nil { if s == nil {
return []string{} return []string{}
} }
keys := []string{} keys := make([]string, 0)
if s.Token != "" { if s.Token != "" {
keys = append(keys, fmt.Sprintf("%s%s", cacheUserSubscribeTokenPrefix, s.Token)) keys = append(keys, fmt.Sprintf("%s%s", cacheUserSubscribeTokenPrefix, s.Token))

View File

@ -464,6 +464,19 @@ func (l *ActivateOrderLogic) Renewal(ctx context.Context, orderInfo *order.Order
return err return err
} }
// Clear user subscription cache
err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub)
if err != nil {
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", userSub.Id),
logger.Field("user_id", userInfo.Id),
)
}
// Clear cache
l.clearServerCache(ctx, sub)
// Handle commission // Handle commission
go l.handleCommission(context.Background(), userInfo, orderInfo, false) go l.handleCommission(context.Background(), userInfo, orderInfo, false)
@ -547,6 +560,19 @@ func (l *ActivateOrderLogic) ResetTraffic(ctx context.Context, orderInfo *order.
return err return err
} }
// Clear user subscription cache
err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub)
if err != nil {
logger.WithContext(ctx).Error("Clear user subscribe cache failed",
logger.Field("error", err.Error()),
logger.Field("subscribe_id", userSub.Id),
logger.Field("user_id", userInfo.Id),
)
}
// Clear cache
l.clearServerCache(ctx, sub)
// Send notifications // Send notifications
l.sendNotifications(ctx, orderInfo, userInfo, sub, userSub, telegram.ResetTrafficNotify) l.sendNotifications(ctx, orderInfo, userInfo, sub, userSub, telegram.ResetTrafficNotify)

View File

@ -244,13 +244,29 @@ func (l *ResetTrafficLogic) resetMonth(ctx context.Context) error {
logger.Errorw("[ResetTraffic] Failed to update monthly reset users", logger.Field("error", err.Error())) logger.Errorw("[ResetTraffic] Failed to update monthly reset users", logger.Field("error", err.Error()))
return err return err
} }
// Find user subscriptions for these users
var userSubs []*user.Subscribe
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", monthlyResetUsers).Find(&userSubs).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error()))
return err
}
// Clear cache for these subscriptions
for _, sub := range userSubs {
if sub.SubscribeId > 0 {
err = l.svc.UserModel.ClearSubscribeCache(ctx, sub)
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear cache for subscription",
logger.Field("subscribeId", sub.SubscribeId),
logger.Field("error", err.Error()))
}
}
}
logger.Infow("[ResetTraffic] Monthly reset completed", logger.Field("count", len(monthlyResetUsers))) logger.Infow("[ResetTraffic] Monthly reset completed", logger.Field("count", len(monthlyResetUsers)))
} else { } else {
logger.Infow("[ResetTraffic] No users found for monthly reset") logger.Infow("[ResetTraffic] No users found for monthly reset")
} }
return l.svc.SubscribeModel.ClearCache(ctx, resetMonthSubIds...)
return nil
}) })
if err != nil { if err != nil {
logger.Errorw("[ResetTraffic] Monthly reset transaction failed", logger.Field("error", err.Error())) logger.Errorw("[ResetTraffic] Monthly reset transaction failed", logger.Field("error", err.Error()))
@ -320,20 +336,37 @@ func (l *ResetTrafficLogic) reset1st(ctx context.Context, cache resetTrafficCach
logger.Errorw("[ResetTraffic] Failed to update 1st reset users", logger.Field("error", err.Error())) logger.Errorw("[ResetTraffic] Failed to update 1st reset users", logger.Field("error", err.Error()))
return err return err
} }
var userSubs []*user.Subscribe
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", users1stReset).Find(&userSubs).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error()))
return err
}
// Clear cache for these subscriptions
for _, sub := range userSubs {
if sub.SubscribeId > 0 {
err = l.svc.UserModel.ClearSubscribeCache(ctx, sub)
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear cache for subscription",
logger.Field("subscribeId", sub.SubscribeId),
logger.Field("error", err.Error()))
}
}
}
logger.Infow("[ResetTraffic] 1st reset completed", logger.Field("count", len(users1stReset))) logger.Infow("[ResetTraffic] 1st reset completed", logger.Field("count", len(users1stReset)))
} else { } else {
logger.Infow("[ResetTraffic] No users found for 1st reset") logger.Infow("[ResetTraffic] No users found for 1st reset")
} }
return nil return l.svc.SubscribeModel.ClearCache(ctx, reset1stSubIds...)
}) })
if err != nil { if err != nil {
logger.Errorw("[ResetTraffic] 1st reset transaction failed", logger.Field("error", err.Error())) logger.Errorw("[ResetTraffic] 1st reset transaction failed", logger.Field("error", err.Error()))
return err return err
} }
logger.Infow("[ResetTraffic] 1st reset process completed") logger.Infow("[ResetTraffic] 1st reset process completed")
return nil return nil
} }
@ -397,12 +430,32 @@ func (l *ResetTrafficLogic) resetYear(ctx context.Context) error {
logger.Errorw("[ResetTraffic] Failed to update yearly reset users", logger.Field("error", err.Error())) logger.Errorw("[ResetTraffic] Failed to update yearly reset users", logger.Field("error", err.Error()))
return err return err
} }
// Find user subscriptions for these users
var userSubs []*user.Subscribe
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", usersYearReset).Find(&userSubs).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error()))
return err
}
// Clear cache for these subscriptions
for _, sub := range userSubs {
if sub.SubscribeId > 0 {
err = l.svc.UserModel.ClearSubscribeCache(ctx, sub)
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear cache for subscription",
logger.Field("subscribeId", sub.SubscribeId),
logger.Field("error", err.Error()))
}
}
}
logger.Infow("[ResetTraffic] Yearly reset completed", logger.Field("count", len(usersYearReset))) logger.Infow("[ResetTraffic] Yearly reset completed", logger.Field("count", len(usersYearReset)))
} else { } else {
logger.Infow("[ResetTraffic] No users found for yearly reset") logger.Infow("[ResetTraffic] No users found for yearly reset")
} }
err = l.svc.SubscribeModel.ClearCache(ctx, resetYearSubIds...)
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear yearly reset subscription cache", logger.Field("error", err.Error()))
}
return nil return nil
}) })