From 2be1c4f6ed258425038a635fb7b6812ea1212345 Mon Sep 17 00:00:00 2001 From: Chang lue Tsen Date: Fri, 15 Aug 2025 14:45:54 -0400 Subject: [PATCH] feat(subscription): enhance subscription cache management and improve error handling --- .../admin/user/createUserSubscribeLogic.go | 1 - .../admin/user/deleteUserSubscribeLogic.go | 19 +++++- .../admin/user/updateUserSubscribeLogic.go | 13 +++- .../user/resetUserSubscribeTokenLogic.go | 14 +++- .../logic/public/user/unsubscribeLogic.go | 24 +++++++ internal/model/server/model.go | 3 +- internal/model/subscribe/default.go | 23 +++++++ internal/model/subscribe/model.go | 23 +++---- internal/model/user/cache.go | 2 +- queue/logic/order/activateOrderLogic.go | 26 +++++++ queue/logic/traffic/resetTrafficLogic.go | 67 +++++++++++++++++-- 11 files changed, 187 insertions(+), 28 deletions(-) diff --git a/internal/logic/admin/user/createUserSubscribeLogic.go b/internal/logic/admin/user/createUserSubscribeLogic.go index a632f8d..08876f8 100644 --- a/internal/logic/admin/user/createUserSubscribeLogic.go +++ b/internal/logic/admin/user/createUserSubscribeLogic.go @@ -81,6 +81,5 @@ func (l *CreateUserSubscribeLogic) CreateUserSubscribe(req *types.CreateUserSubs if err != nil { logger.Errorw("ClearSubscribe error", logger.Field("error", err.Error())) } - return nil } diff --git a/internal/logic/admin/user/deleteUserSubscribeLogic.go b/internal/logic/admin/user/deleteUserSubscribeLogic.go index 6526829..397299d 100644 --- a/internal/logic/admin/user/deleteUserSubscribeLogic.go +++ b/internal/logic/admin/user/deleteUserSubscribeLogic.go @@ -26,10 +26,27 @@ func NewDeleteUserSubscribeLogic(ctx context.Context, svcCtx *svc.ServiceContext } func (l *DeleteUserSubscribeLogic) DeleteUserSubscribe(req *types.DeleteUserSubscribeRequest) error { - err := l.svcCtx.UserModel.DeleteSubscribeById(l.ctx, req.UserSubscribeId) + // find user subscribe by ID + userSubscribe, err := l.svcCtx.UserModel.FindOneSubscribe(l.ctx, req.UserSubscribeId) + if err != nil { + l.Errorw("failed to find user subscribe", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId)) + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "failed to find user subscribe: %v", err.Error()) + } + + err = l.svcCtx.UserModel.DeleteSubscribeById(l.ctx, req.UserSubscribeId) if err != nil { l.Errorw("failed to delete user subscribe", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId)) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseDeletedError), "failed to delete user subscribe: %v", err.Error()) } + // Clear user subscribe cache + if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, userSubscribe); err != nil { + l.Errorw("failed to clear user subscribe cache", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "failed to clear user subscribe cache: %v", err.Error()) + } + // Clear subscribe cache + if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSubscribe.SubscribeId); err != nil { + l.Errorw("failed to clear subscribe cache", logger.Field("error", err.Error()), logger.Field("subscribeId", userSubscribe.SubscribeId)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "failed to clear subscribe cache: %v", err.Error()) + } return nil } diff --git a/internal/logic/admin/user/updateUserSubscribeLogic.go b/internal/logic/admin/user/updateUserSubscribeLogic.go index 4b50771..9d92ce5 100644 --- a/internal/logic/admin/user/updateUserSubscribeLogic.go +++ b/internal/logic/admin/user/updateUserSubscribeLogic.go @@ -28,7 +28,7 @@ func NewUpdateUserSubscribeLogic(ctx context.Context, svcCtx *svc.ServiceContext } func (l *UpdateUserSubscribeLogic) UpdateUserSubscribe(req *types.UpdateUserSubscribeRequest) error { - userSub, err := l.svcCtx.UserModel.FindOneUserSubscribe(l.ctx, req.UserSubscribeId) + userSub, err := l.svcCtx.UserModel.FindOneSubscribe(l.ctx, req.UserSubscribeId) if err != nil { l.Errorw("FindOneUserSubscribe failed:", logger.Field("error", err.Error()), logger.Field("userSubscribeId", req.UserSubscribeId)) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "FindOneUserSubscribe failed: %v", err.Error()) @@ -59,6 +59,15 @@ func (l *UpdateUserSubscribeLogic) UpdateUserSubscribe(req *types.UpdateUserSubs l.Errorw("UpdateSubscribe failed:", logger.Field("error", err.Error())) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "UpdateSubscribe failed: %v", err.Error()) } - + // Clear user subscribe cache + if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, userSub); err != nil { + l.Errorw("ClearSubscribeCache failed:", logger.Field("error", err.Error()), logger.Field("userSubscribeId", userSub.Id)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error()) + } + // Clear subscribe cache + if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSub.SubscribeId); err != nil { + l.Errorw("failed to clear subscribe cache", logger.Field("error", err.Error()), logger.Field("subscribeId", userSub.SubscribeId)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "failed to clear subscribe cache: %v", err.Error()) + } return nil } diff --git a/internal/logic/public/user/resetUserSubscribeTokenLogic.go b/internal/logic/public/user/resetUserSubscribeTokenLogic.go index edf9342..febcae7 100644 --- a/internal/logic/public/user/resetUserSubscribeTokenLogic.go +++ b/internal/logic/public/user/resetUserSubscribeTokenLogic.go @@ -2,9 +2,10 @@ package user import ( "context" - "github.com/perfect-panel/server/internal/model/order" "time" + "github.com/perfect-panel/server/internal/model/order" + "github.com/perfect-panel/server/pkg/constant" "github.com/google/uuid" @@ -72,5 +73,16 @@ func (l *ResetUserSubscribeTokenLogic) ResetUserSubscribeToken(req *types.ResetU l.Errorw("UpdateSubscribe failed:", logger.Field("error", err.Error())) return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseUpdateError), "UpdateSubscribe failed: %v", err.Error()) } + //clear user subscription cache + if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, &newSub); err != nil { + l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("userSubscribeId", userSub.Id)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error()) + } + // Clear subscription cache + if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSub.SubscribeId); err != nil { + l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("subscribeId", userSub.SubscribeId)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error()) + } + return nil } diff --git a/internal/logic/public/user/unsubscribeLogic.go b/internal/logic/public/user/unsubscribeLogic.go index e2c38b1..9df80d5 100644 --- a/internal/logic/public/user/unsubscribeLogic.go +++ b/internal/logic/public/user/unsubscribeLogic.go @@ -38,6 +38,14 @@ func (l *UnsubscribeLogic) Unsubscribe(req *types.UnsubscribeRequest) error { logger.Error("current user is not found in context") return errors.Wrapf(xerr.NewErrCode(xerr.InvalidAccess), "Invalid Access") } + + // find user subscription by ID + userSub, err := l.svcCtx.UserModel.FindOneSubscribe(l.ctx, req.Id) + if err != nil { + l.Errorw("FindOneSubscribe failed", logger.Field("error", err.Error()), logger.Field("reqId", req.Id)) + return errors.Wrapf(xerr.NewErrCode(xerr.DatabaseQueryError), "FindOneSubscribe failed: %v", err.Error()) + } + // Calculate the remaining amount to refund based on unused subscription time/traffic remainingAmount, err := CalculateRemainingAmount(l.ctx, l.svcCtx, req.Id) if err != nil { @@ -118,5 +126,21 @@ func (l *UnsubscribeLogic) Unsubscribe(req *types.UnsubscribeRequest) error { return l.svcCtx.UserModel.Update(l.ctx, u) }) + if err != nil { + l.Errorw("Unsubscribe transaction failed", logger.Field("error", err.Error()), logger.Field("userId", u.Id), logger.Field("reqId", req.Id)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "Unsubscribe transaction failed: %v", err.Error()) + } + + //clear user subscription cache + if err = l.svcCtx.UserModel.ClearSubscribeCache(l.ctx, userSub); err != nil { + l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("userSubscribeId", userSub.Id)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error()) + } + // Clear subscription cache + if err = l.svcCtx.SubscribeModel.ClearCache(l.ctx, userSub.SubscribeId); err != nil { + l.Errorw("ClearSubscribeCache failed", logger.Field("error", err.Error()), logger.Field("subscribeId", userSub.SubscribeId)) + return errors.Wrapf(xerr.NewErrCode(xerr.ERROR), "ClearSubscribeCache failed: %v", err.Error()) + } + return err } diff --git a/internal/model/server/model.go b/internal/model/server/model.go index daace5f..0c423ff 100644 --- a/internal/model/server/model.go +++ b/internal/model/server/model.go @@ -46,8 +46,9 @@ var ( func (m *customServerModel) ClearCache(ctx context.Context, id int64) error { serverIdKey := fmt.Sprintf("%s%v", cacheServerIdPrefix, id) configKey := fmt.Sprintf("%s%d", config.ServerConfigCacheKey, id) + userListKey := fmt.Sprintf("%s%v", config.ServerUserListCacheKey, id) - return m.DelCacheCtx(ctx, serverIdKey, configKey) + return m.DelCacheCtx(ctx, serverIdKey, configKey, userListKey) } // QueryServerCountByServerGroups Query Server Count By Server Groups diff --git a/internal/model/subscribe/default.go b/internal/model/subscribe/default.go index 8afff6c..92879d7 100644 --- a/internal/model/subscribe/default.go +++ b/internal/model/subscribe/default.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "github.com/perfect-panel/server/internal/config" + "github.com/perfect-panel/server/internal/model/server" "github.com/perfect-panel/server/pkg/cache" "github.com/redis/go-redis/v9" "gorm.io/gorm" @@ -69,6 +71,27 @@ func (m *defaultSubscribeModel) getCacheKeys(data *Subscribe) []string { } } } + // Temporary solution waiting for refactoring + if data.ServerGroup != "" { + cacheKey := strings.Split(data.ServerGroup, ",") + groupIds := make([]int64, 0) + for _, v := range cacheKey { + if v != "" { + id, _ := strconv.ParseInt(v, 10, 64) + if id > 0 { + groupIds = append(groupIds, id) + } + } + } + var ids []int64 + _ = m.Transaction(context.Background(), func(tx *gorm.DB) error { + return tx.Model(&server.Server{}).Where("group_id IN ?", groupIds).Pluck("id", &ids).Error + }) + for _, id := range ids { + serverKey = append(serverKey, fmt.Sprintf("%s%v", config.ServerUserListCacheKey, id)) + } + } + cacheKeys := []string{SubscribeIdKey} if len(serverKey) > 0 { cacheKeys = append(cacheKeys, serverKey...) diff --git a/internal/model/subscribe/model.go b/internal/model/subscribe/model.go index 829427a..f652bc1 100644 --- a/internal/model/subscribe/model.go +++ b/internal/model/subscribe/model.go @@ -35,7 +35,7 @@ type customSubscribeLogicModel interface { QuerySubscribeIdsByServerIdAndServerGroupId(ctx context.Context, serverId, serverGroupId int64) ([]*Subscribe, error) QuerySubscribeMinSortByIds(ctx context.Context, ids []int64) (int64, error) QuerySubscribeListByIds(ctx context.Context, ids []int64) ([]*Subscribe, error) - ClearCache(ctx context.Context, id int64) error + ClearCache(ctx context.Context, id ...int64) error } // NewModel returns a model for the database table. @@ -109,23 +109,18 @@ func (m *customSubscribeModel) QuerySubscribeListByIds(ctx context.Context, ids return list, err } -func (m *customSubscribeModel) ClearCache(ctx context.Context, id int64) error { - if id <= 0 { +func (m *customSubscribeModel) ClearCache(ctx context.Context, ids ...int64) error { + if len(ids) <= 0 { return nil } - data, err := m.FindOne(ctx, id) - if err != nil { - return err - } - cacheKeys := m.getCacheKeys(data) - - cacheKeys = append(cacheKeys, m.getCacheKeys(&Subscribe{Id: id})...) - - for _, key := range cacheKeys { - if err := m.CachedConn.DelCacheCtx(ctx, key); err != nil { + var cacheKeys []string + for _, id := range ids { + data, err := m.FindOne(ctx, id) + if err != nil { return err } + cacheKeys = append(cacheKeys, m.getCacheKeys(data)...) } - return nil + return m.CachedConn.DelCacheCtx(ctx, cacheKeys...) } diff --git a/internal/model/user/cache.go b/internal/model/user/cache.go index 2c37a83..a39f748 100644 --- a/internal/model/user/cache.go +++ b/internal/model/user/cache.go @@ -64,7 +64,7 @@ func (s *Subscribe) GetCacheKeys() []string { if s == nil { return []string{} } - keys := []string{} + keys := make([]string, 0) if s.Token != "" { keys = append(keys, fmt.Sprintf("%s%s", cacheUserSubscribeTokenPrefix, s.Token)) diff --git a/queue/logic/order/activateOrderLogic.go b/queue/logic/order/activateOrderLogic.go index 71d6458..39788d7 100644 --- a/queue/logic/order/activateOrderLogic.go +++ b/queue/logic/order/activateOrderLogic.go @@ -464,6 +464,19 @@ func (l *ActivateOrderLogic) Renewal(ctx context.Context, orderInfo *order.Order return err } + // Clear user subscription cache + err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub) + if err != nil { + logger.WithContext(ctx).Error("Clear user subscribe cache failed", + logger.Field("error", err.Error()), + logger.Field("subscribe_id", userSub.Id), + logger.Field("user_id", userInfo.Id), + ) + } + + // Clear cache + l.clearServerCache(ctx, sub) + // Handle commission go l.handleCommission(context.Background(), userInfo, orderInfo, false) @@ -547,6 +560,19 @@ func (l *ActivateOrderLogic) ResetTraffic(ctx context.Context, orderInfo *order. return err } + // Clear user subscription cache + err = l.svc.UserModel.ClearSubscribeCache(ctx, userSub) + if err != nil { + logger.WithContext(ctx).Error("Clear user subscribe cache failed", + logger.Field("error", err.Error()), + logger.Field("subscribe_id", userSub.Id), + logger.Field("user_id", userInfo.Id), + ) + } + + // Clear cache + l.clearServerCache(ctx, sub) + // Send notifications l.sendNotifications(ctx, orderInfo, userInfo, sub, userSub, telegram.ResetTrafficNotify) diff --git a/queue/logic/traffic/resetTrafficLogic.go b/queue/logic/traffic/resetTrafficLogic.go index e179b07..9aa09f8 100644 --- a/queue/logic/traffic/resetTrafficLogic.go +++ b/queue/logic/traffic/resetTrafficLogic.go @@ -244,13 +244,29 @@ func (l *ResetTrafficLogic) resetMonth(ctx context.Context) error { logger.Errorw("[ResetTraffic] Failed to update monthly reset users", logger.Field("error", err.Error())) return err } - + // Find user subscriptions for these users + var userSubs []*user.Subscribe + err = db.Model(&user.Subscribe{}).Where("`id` IN ?", monthlyResetUsers).Find(&userSubs).Error + if err != nil { + logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error())) + return err + } + // Clear cache for these subscriptions + for _, sub := range userSubs { + if sub.SubscribeId > 0 { + err = l.svc.UserModel.ClearSubscribeCache(ctx, sub) + if err != nil { + logger.Errorw("[ResetTraffic] Failed to clear cache for subscription", + logger.Field("subscribeId", sub.SubscribeId), + logger.Field("error", err.Error())) + } + } + } logger.Infow("[ResetTraffic] Monthly reset completed", logger.Field("count", len(monthlyResetUsers))) } else { logger.Infow("[ResetTraffic] No users found for monthly reset") } - - return nil + return l.svc.SubscribeModel.ClearCache(ctx, resetMonthSubIds...) }) if err != nil { logger.Errorw("[ResetTraffic] Monthly reset transaction failed", logger.Field("error", err.Error())) @@ -320,20 +336,37 @@ func (l *ResetTrafficLogic) reset1st(ctx context.Context, cache resetTrafficCach logger.Errorw("[ResetTraffic] Failed to update 1st reset users", logger.Field("error", err.Error())) return err } + var userSubs []*user.Subscribe + err = db.Model(&user.Subscribe{}).Where("`id` IN ?", users1stReset).Find(&userSubs).Error + if err != nil { + logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error())) + return err + } + + // Clear cache for these subscriptions + for _, sub := range userSubs { + if sub.SubscribeId > 0 { + err = l.svc.UserModel.ClearSubscribeCache(ctx, sub) + if err != nil { + logger.Errorw("[ResetTraffic] Failed to clear cache for subscription", + logger.Field("subscribeId", sub.SubscribeId), + logger.Field("error", err.Error())) + } + } + } logger.Infow("[ResetTraffic] 1st reset completed", logger.Field("count", len(users1stReset))) } else { logger.Infow("[ResetTraffic] No users found for 1st reset") } - return nil + return l.svc.SubscribeModel.ClearCache(ctx, reset1stSubIds...) }) if err != nil { logger.Errorw("[ResetTraffic] 1st reset transaction failed", logger.Field("error", err.Error())) return err } - logger.Infow("[ResetTraffic] 1st reset process completed") return nil } @@ -397,12 +430,32 @@ func (l *ResetTrafficLogic) resetYear(ctx context.Context) error { logger.Errorw("[ResetTraffic] Failed to update yearly reset users", logger.Field("error", err.Error())) return err } - + // Find user subscriptions for these users + var userSubs []*user.Subscribe + err = db.Model(&user.Subscribe{}).Where("`id` IN ?", usersYearReset).Find(&userSubs).Error + if err != nil { + logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error())) + return err + } + // Clear cache for these subscriptions + for _, sub := range userSubs { + if sub.SubscribeId > 0 { + err = l.svc.UserModel.ClearSubscribeCache(ctx, sub) + if err != nil { + logger.Errorw("[ResetTraffic] Failed to clear cache for subscription", + logger.Field("subscribeId", sub.SubscribeId), + logger.Field("error", err.Error())) + } + } + } logger.Infow("[ResetTraffic] Yearly reset completed", logger.Field("count", len(usersYearReset))) } else { logger.Infow("[ResetTraffic] No users found for yearly reset") } - + err = l.svc.SubscribeModel.ClearCache(ctx, resetYearSubIds...) + if err != nil { + logger.Errorw("[ResetTraffic] Failed to clear yearly reset subscription cache", logger.Field("error", err.Error())) + } return nil })