ario_server/queue/logic/traffic/resetTrafficLogic.go
Leif Draven e895180388
Develop (#76)
* refactor: rename queryannouncementhandler.go to queryAnnouncementLogic.go for clarity

* feat(panDomain): update subscription logic to use V2 handler for improved functionality

* refactor(subscribe): replace V2 handler with a unified Handler method for subscription logic

* feat(subscribe): implement user agent limit feature with configurable list

* fix(subscribe): improve error handling and logging for subscription requests

* feat(subscribe): add user agent limit configuration to system settings

* refactor(api): remove deprecated application-related endpoints and types

* refactor(swagger): remove deprecated app.json generation from swagger configuration

* refactor(swagger): remove deprecated app.json check from swagger configuration

* fix(subscribe): update delete method to use Where clause for improved query accuracy

* fix(subscribe): update Id field tag to use primaryKey and improve save method query

* fix(subscribe): update Id field tag to use primaryKey and improve model queries

* fix(subscribe): rename variable for clarity and add special handling for Stash user agent

* fix(email): convert RegisterStartTime and RegisterEndTime to time.Time for accurate query filtering

* refactor(log): consolidate logging models and update related logic for improved clarity and functionality

* fix(types): change Content field type in MessageLog to interface{} for improved flexibility

* fix(log): change MessageLog list to use value type for improved performance and memory efficiency

* fix(email): set EmailTypeVerify in task payload and update content type conversion for verification email

* fix(log): remove unused Id field from SystemLog during login log insertion

* fix(login): remove debug logs and error logging during user login process

* fix(log): add traffic reset logging for subscription resets

* fix(log): insert reset traffic log during subscription activation

* feat(log): add endpoints for retrieving and resetting subscribe traffic logs

* refactor(log): remove Reset Subscribe Traffic Log endpoint and related types

* feat(traffic): add traffic statistics logging and scheduling

* fix(subscribe): ensure active status and reset timestamps during traffic resets

* feat(api): enhance server and node management with new request/response structures

* refactor(api): rename OnlineUser to ServerOnlineUser for clarity

* feat(api): define OnlineUser type with SID and IP fields

* feat(server): implement server management handlers and database schema

* feat(api): add traffic log details filtering and enhance traffic log structures

* feat(api): migrate server and node data handling, update related structures and logic

* feat(server): implement server deletion logic with error handling

* feat(api): update log filtering to use ResetSubscribe type for subscription logs

* feat(api): standardize timestamp field across log structures

* feat(api): refactor cache key handling for server and user lists

* feat(api): enhance server status handling with protocol support and refactor related logic

* fix(traffic): adjust start date for traffic statistics and improve log deletion comment

* feat(api): implement daily traffic ranking for users and servers with error handling

* feat(api): update server total data response to use 'OnlineUsers' and implement daily traffic statistics logging

* feat(api): add log settings management with auto-clear and clear days configuration

* fix(log): correct category in log settings update query

* feat(routes): add handler for scheduled traffic statistics

* feat(model): add user counts struct and update queries for new and renewal users

* feat(api): add referral percentage and only first purchase fields to user model and requests

* feat(database): update user table to add referral percentage and only first purchase fields

* feat(api): add reset sort endpoints for server and node

* feat(api): add sort field to server model

* feat(api): implement sorting functionality for nodes and servers

* fix(database): add sort column to nodes table

* fix(model): enhance user statistics queries with new order and renewal order counts

* fix(log): update timestamp handling in login and registration logs

* fix(log): update sorting logic for server and user subscribe traffic logs

* fix(server): add server status handling based on last reported time

* fix(model): correct filter condition to use 'date' instead of 'data'

* fix(migration): add index for traffic log on timestamp, user_id, and subscribe_id

* fix(log): optimize user traffic rank data handling by using append instead of index assignment

* fix(filter): refactor node list creation to use append and remove duplicates from tags

* fix(node): add ServerId and Enabled fields to node update logic

* feat(tags): add endpoint to query all node tags

* fix(preview): add Preload parameter to FilterNodeList for improved data retrieval

* fix(log): date is empty

* feat(subscribe): add Language field to subscription models and update query logic

* feat(subscription): add Language parameter to GetSubscription request and update query logic

* fix(server): encode ServerKey in base64 and update last reported time for nodes

* feat: delete common GetSubscription

* feat(subscription): implement FilterList method for subscription queries and update related logic

* fix(subscribe): remove duplicate user agents in SubscribeHandler

* fix(push): initialize onlineUsers as a map in pushOnlineUsersLogic

* fix(reset): initialize subs as a map in clearCache method

* refactor(query): simplify node and tag filtering using InSet function

* feat(userlist): enhance GetServerUserListLogic with improved node and tag handling

* fix(userlist): correct node ID assignment and update query logic for tag filtering

* fix(userlist): correct node ID assignment in getServerUserListLogic

* refactor(query): streamline query construction for tag filtering

* fix(statistics): optimize server ranking data handling in QueryServerTotalDataLogic

* refactor(statistics): simplify server ranking data construction in QueryServerTotalDataLogic

* fix(statistics): correct server traffic data assignment in QueryServerTotalDataLogic

* fix(statistics): optimize yesterday's top 10 server traffic data assignment in QueryServerTotalDataLogic

* fix(middleware): remove duplicate elements from user agent list in PanDomainMiddleware

* feat(middleware): enhance user agent handling by querying client list in PanDomainMiddleware

* feat(client): subscribe_template

* feat(oauth): add user agent and IP logging to registration and login processes

* fix(balance): add timestamp to balance logs for payment, refund, and recharge transactions

* fix(log): correct comment for CommissionTypeRefund to improve clarity

* fix(log): replace magic number with constant for gift type in purchase checkout logic

* fix(log): rename OrderId to OrderNo for consistency in balance logging

* feat(log): add logging for balance, gift amount, and commission adjustments

* fix(user): correct placement of DeepCopy for user info update logic

* feat(log): add UserSubscribeId to FilterSubscribeLogRequest for enhanced filtering

* fix(purchase): streamline error handling and improve JSON marshaling for temporary orders

* fix(order): simplify commission handling and improve payload parsing logic

* fix(order): update commission calculation to actual payment amount minus gateway handling fee

* feat(payment): add support for CryptoSaaS payment platform and enhance configuration handling

* fix(balance): update QueryUserBalanceLog response structure to include balance log list

* fix(email): update task progress handling to use specific task ID for updates

* feat(quota): add quota task creation and querying endpoints with updated data structures

* fix(email): update task handling to use generic task model and improve error logging

* fix(order): improve error logging for database transaction and user cache updates

* feat(quota): enhance quota task management with new request structures and processing logic

* fix(quota): remove redundant quota task status endpoint from admin marketing routes

* fix(worker): update task completion status handling in worker logic

* fix(quota): update taskInfo to include current subscription count in quota logic

* doc(log): rename function for clarity and add cache cleanup comment

* fix(quota): update time handling in quota logic and correct subscriber ID query

* fix(quota): update time handling to use UnixMilli for start time in quota logic

* feat(protocol): add server protocol configuration query and enhance protocol options

* fix(quota): correct time range queries for start and expire times in quota logic

* fix(types): update plugin options to include 'none' in the plugin field

---------

Co-authored-by: Chang lue Tsen <tension@ppanel.dev>
2025-09-14 09:50:22 -04:00

648 lines
21 KiB
Go

package traffic
import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"time"
"github.com/perfect-panel/server/internal/model/log"
"github.com/perfect-panel/server/internal/model/node"
"github.com/perfect-panel/server/internal/model/subscribe"
"github.com/perfect-panel/server/internal/model/user"
"github.com/perfect-panel/server/internal/svc"
"github.com/perfect-panel/server/pkg/logger"
"github.com/perfect-panel/server/pkg/tool"
"github.com/perfect-panel/server/queue/types"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
// ResetTrafficLogic handles traffic reset logic for different subscription cycles
// Supports three reset modes:
// - reset_cycle = 1: Reset on 1st of every month
// - reset_cycle = 2: Reset monthly based on subscription start date
// - reset_cycle = 3: Reset yearly based on subscription start date
type ResetTrafficLogic struct {
svc *svc.ServiceContext
}
// Cache and retry configuration constants
const (
maxRetryAttempts = 3
retryDelay = 30 * time.Minute
lockTimeout = 5 * time.Minute
)
// Cache keys
var (
cacheKey = "reset_traffic_cache"
retryCountKey = "reset_traffic_retry_count"
lockKey = "reset_traffic_lock"
)
// resetTrafficCache stores the last reset time to prevent duplicate processing
type resetTrafficCache struct {
LastResetTime time.Time
}
func NewResetTrafficLogic(svc *svc.ServiceContext) *ResetTrafficLogic {
return &ResetTrafficLogic{
svc: svc,
}
}
// ProcessTask executes the traffic reset task for all subscription types with enhanced retry mechanism
func (l *ResetTrafficLogic) ProcessTask(ctx context.Context, _ *asynq.Task) error {
var err error
startTime := time.Now()
// Get current retry count
retryCount := l.getRetryCount(ctx)
logger.Infow("[ResetTraffic] Starting task execution",
logger.Field("retryCount", retryCount),
logger.Field("startTime", startTime))
// Acquire distributed lock to prevent duplicate execution
lockAcquired := l.acquireLock(ctx)
if !lockAcquired {
logger.Infow("[ResetTraffic] Another task is already running, skipping execution")
return nil
}
defer l.releaseLock(ctx)
defer func() {
if err != nil {
// Check if error is retryable and within retry limit
if l.isRetryableError(err) && retryCount < maxRetryAttempts {
// Increment retry count
l.setRetryCount(ctx, retryCount+1)
// Schedule retry with delay
task := asynq.NewTask(types.SchedulerResetTraffic, nil)
_, retryErr := l.svc.Queue.Enqueue(task, asynq.ProcessIn(retryDelay))
if retryErr != nil {
logger.Errorw("[ResetTraffic] Failed to enqueue retry task",
logger.Field("error", retryErr.Error()),
logger.Field("retryCount", retryCount))
} else {
logger.Infow("[ResetTraffic] Task failed, retrying in 30 minutes",
logger.Field("error", err.Error()),
logger.Field("retryCount", retryCount+1),
logger.Field("maxRetryAttempts", maxRetryAttempts))
}
} else {
// Max retries reached or non-retryable error
if retryCount >= maxRetryAttempts {
logger.Errorw("[ResetTraffic] Max retry attempts reached, giving up",
logger.Field("retryCount", retryCount),
logger.Field("maxRetryAttempts", maxRetryAttempts),
logger.Field("error", err.Error()))
} else {
logger.Errorw("[ResetTraffic] Non-retryable error, not retrying",
logger.Field("error", err.Error()),
logger.Field("retryCount", retryCount))
}
// Reset retry count for next scheduled task
l.clearRetryCount(ctx)
}
} else {
// Task completed successfully, reset retry count
l.clearRetryCount(ctx)
logger.Infow("[ResetTraffic] Task completed successfully",
logger.Field("processingTime", time.Since(startTime)),
logger.Field("retryCount", retryCount))
}
}()
// Load last reset time from cache
var cache resetTrafficCache
cacheData, err := l.svc.Redis.Get(ctx, cacheKey).Result()
if err != nil {
if !errors.Is(err, redis.Nil) {
logger.Errorw("[ResetTraffic] Failed to get cache", logger.Field("error", err.Error()))
}
// Set default value if cache not found
cache = resetTrafficCache{
LastResetTime: time.Now().Add(-10 * time.Minute),
}
logger.Infow("[ResetTraffic] Using default cache value", logger.Field("lastResetTime", cache.LastResetTime))
} else {
// Parse JSON data
if err := json.Unmarshal([]byte(cacheData), &cache); err != nil {
logger.Errorw("[ResetTraffic] Failed to unmarshal cache", logger.Field("error", err.Error()))
cache = resetTrafficCache{
LastResetTime: time.Now().Add(-10 * time.Minute),
}
} else {
logger.Infow("[ResetTraffic] Cache loaded successfully", logger.Field("lastResetTime", cache.LastResetTime))
}
}
// Execute reset operations in order: yearly -> monthly (1st) -> monthly (cycle)
err = l.resetYear(ctx)
if err != nil {
logger.Errorw("[ResetTraffic] Yearly reset failed", logger.Field("error", err.Error()))
return err
}
err = l.reset1st(ctx, cache)
if err != nil {
logger.Errorw("[ResetTraffic] Monthly 1st reset failed", logger.Field("error", err.Error()))
return err
}
err = l.resetMonth(ctx)
if err != nil {
logger.Errorw("[ResetTraffic] Monthly cycle reset failed", logger.Field("error", err.Error()))
return err
}
// Update cache with current time after successful processing
updatedCache := resetTrafficCache{
LastResetTime: startTime,
}
cacheDataBytes, marshalErr := json.Marshal(updatedCache)
if marshalErr != nil {
logger.Errorw("[ResetTraffic] Failed to marshal cache", logger.Field("error", marshalErr.Error()))
} else {
cacheErr := l.svc.Redis.Set(ctx, cacheKey, cacheDataBytes, 0).Err()
if cacheErr != nil {
logger.Errorw("[ResetTraffic] Failed to update cache", logger.Field("error", cacheErr.Error()))
// Don't return error here as the main task completed successfully
} else {
logger.Infow("[ResetTraffic] Cache updated successfully", logger.Field("newLastResetTime", startTime))
}
}
return nil
}
// resetMonth handles monthly cycle reset based on subscription start date
// reset_cycle = 2: Reset monthly based on subscription start date
func (l *ResetTrafficLogic) resetMonth(ctx context.Context) error {
now := time.Now()
err := l.svc.UserModel.Transaction(ctx, func(db *gorm.DB) error {
// Get all subscriptions that reset monthly based on start date
var resetMonthSubIds []int64
err := db.Model(&subscribe.Subscribe{}).Select("`id`").Where("`reset_cycle` = ?", 2).Find(&resetMonthSubIds).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to query monthly subscriptions", logger.Field("error", err.Error()))
return err
}
if len(resetMonthSubIds) == 0 {
logger.Infow("[ResetTraffic] No monthly cycle subscriptions found")
return nil
}
// Query users for monthly reset based on subscription start date cycle
var monthlyResetUsers []int64
// Check if today is the last day of current month
isLastDayOfMonth := now.AddDate(0, 0, 1).Month() != now.Month()
query := db.Model(&user.Subscribe{}).Select("`id`").
Where("`subscribe_id` IN ?", resetMonthSubIds).
Where("`status` IN ?", []int64{1, 2}). // Only active subscriptions
Where("TIMESTAMPDIFF(MONTH, CURDATE(),DATE(expire_time)) >= 1") // At least 1 month passed
if isLastDayOfMonth {
// Last day of month: handle subscription start dates >= today
query = query.Where("DAY(`expire_time`) >= ?", now.Day())
} else {
// Normal case: exact day match
query = query.Where("DAY(`expire_time`) = ?", now.Day())
}
err = query.Find(&monthlyResetUsers).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to query monthly reset users", logger.Field("error", err.Error()))
return err
}
if len(monthlyResetUsers) > 0 {
logger.Infow("[ResetTraffic] Found users for monthly reset",
logger.Field("count", len(monthlyResetUsers)),
logger.Field("userIds", monthlyResetUsers))
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", monthlyResetUsers).
Updates(map[string]interface{}{
"upload": 0,
"download": 0,
"status": 1, // Ensure status is active
"finished_at": nil,
}).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to update monthly reset users", logger.Field("error", err.Error()))
return err
}
// Find user subscriptions for these users
var userSubs []*user.Subscribe
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", monthlyResetUsers).Find(&userSubs).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error()))
return err
}
// Clear cache for these subscriptions
l.clearCache(ctx, userSubs)
logger.Infow("[ResetTraffic] Monthly reset completed", logger.Field("count", len(monthlyResetUsers)))
} else {
logger.Infow("[ResetTraffic] No users found for monthly reset")
}
return l.svc.SubscribeModel.ClearCache(ctx, resetMonthSubIds...)
})
if err != nil {
logger.Errorw("[ResetTraffic] Monthly reset transaction failed", logger.Field("error", err.Error()))
return err
}
logger.Infow("[ResetTraffic] Monthly reset process completed")
return nil
}
// reset1st handles reset on 1st of every month
// reset_cycle = 1: Reset on 1st of every month
func (l *ResetTrafficLogic) reset1st(ctx context.Context, cache resetTrafficCache) error {
now := time.Now()
// Check if we already reset this month using cache
if cache.LastResetTime.Year() == now.Year() && cache.LastResetTime.Month() == now.Month() {
logger.Infow("[ResetTraffic] Already reset this month, skipping 1st reset",
logger.Field("lastResetTime", cache.LastResetTime),
logger.Field("currentTime", now))
return nil
}
// Only reset if it's the 1st day of the month
if now.Day() != 1 {
logger.Infow("[ResetTraffic] Not 1st day of month, skipping 1st reset", logger.Field("currentDay", now.Day()))
return nil
}
err := l.svc.UserModel.Transaction(ctx, func(db *gorm.DB) error {
// Get all subscriptions that reset on 1st of month
var reset1stSubIds []int64
err := db.Model(&subscribe.Subscribe{}).Select("`id`").Where("`reset_cycle` = ?", 1).Find(&reset1stSubIds).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to query 1st reset subscriptions", logger.Field("error", err.Error()))
return err
}
if len(reset1stSubIds) == 0 {
logger.Infow("[ResetTraffic] No 1st reset subscriptions found")
return nil
}
// Get all active users with these subscriptions
var users1stReset []int64
err = db.Model(&user.Subscribe{}).Select("`id`").
Where("`subscribe_id` IN ?", reset1stSubIds).
Where("`status` IN ?", []int64{1, 2}). // Only active subscriptions
Find(&users1stReset).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to query 1st reset users", logger.Field("error", err.Error()))
return err
}
if len(users1stReset) > 0 {
logger.Infow("[ResetTraffic] Found users for 1st reset",
logger.Field("count", len(users1stReset)),
logger.Field("userIds", users1stReset))
// Reset upload and download traffic to zero
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", users1stReset).
Updates(map[string]interface{}{
"upload": 0,
"download": 0,
"status": 1, // Ensure status is active
"finished_at": nil,
}).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to update 1st reset users", logger.Field("error", err.Error()))
return err
}
var userSubs []*user.Subscribe
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", users1stReset).Find(&userSubs).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error()))
return err
}
// Clear cache for these subscriptions
l.clearCache(ctx, userSubs)
logger.Infow("[ResetTraffic] 1st reset completed", logger.Field("count", len(users1stReset)))
} else {
logger.Infow("[ResetTraffic] No users found for 1st reset")
}
return l.svc.SubscribeModel.ClearCache(ctx, reset1stSubIds...)
})
if err != nil {
logger.Errorw("[ResetTraffic] 1st reset transaction failed", logger.Field("error", err.Error()))
return err
}
logger.Infow("[ResetTraffic] 1st reset process completed")
return nil
}
// resetYear handles yearly reset based on subscription start date anniversary
// reset_cycle = 3: Reset yearly based on subscription start date
func (l *ResetTrafficLogic) resetYear(ctx context.Context) error {
now := time.Now()
err := l.svc.UserModel.Transaction(ctx, func(db *gorm.DB) error {
// Get all subscriptions that reset yearly
var resetYearSubIds []int64
err := db.Model(&subscribe.Subscribe{}).Select("`id`").Where("`reset_cycle` = ?", 3).Find(&resetYearSubIds).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to query yearly subscriptions", logger.Field("error", err.Error()))
return err
}
if len(resetYearSubIds) == 0 {
logger.Infow("[ResetTraffic] No yearly reset subscriptions found")
return nil
}
// Query users for yearly reset based on subscription start date anniversary
var usersYearReset []int64
// Check if today is February 28th (handle leap year case)
isLeapYearCase := now.Month() == 2 && now.Day() == 28
query := db.Model(&user.Subscribe{}).Select("`id`").
Where("`subscribe_id` IN ?", resetYearSubIds).
Where("MONTH(expire_time) = ?", now.Month()). // Same month
Where("`status` IN ?", []int64{1, 2}). // Only active subscriptions
Where("TIMESTAMPDIFF(YEAR, CURDATE(),DATE(expire_time)) >= 1") // At least 1 year passed
if isLeapYearCase {
// February 28th: handle both Feb 28 and Feb 29 subscriptions
query = query.Where("DAY(expire_time) IN (28, 29)")
} else {
// Normal case: exact day match
query = query.Where("DAY(expire_time) = ?", now.Day())
}
err = query.Find(&usersYearReset).Error
if err != nil {
logger.Errorw("[ResetTraffic] Query yearly reset users failed", logger.Field("error", err.Error()))
return err
}
if len(usersYearReset) > 0 {
logger.Infow("[ResetTraffic] Found users for yearly reset",
logger.Field("count", len(usersYearReset)),
logger.Field("userIds", usersYearReset))
// Reset upload and download traffic to zero
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", usersYearReset).
Updates(map[string]interface{}{
"upload": 0,
"download": 0,
"status": 1, // Ensure status is active
"finished_at": nil,
}).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to update yearly reset users", logger.Field("error", err.Error()))
return err
}
// Find user subscriptions for these users
var userSubs []*user.Subscribe
err = db.Model(&user.Subscribe{}).Where("`id` IN ?", usersYearReset).Find(&userSubs).Error
if err != nil {
logger.Errorw("[ResetTraffic] Failed to find user subscriptions for 1st reset", logger.Field("error", err.Error()))
return err
}
// Clear cache for these subscriptions
l.clearCache(ctx, userSubs)
logger.Infow("[ResetTraffic] Yearly reset completed", logger.Field("count", len(usersYearReset)))
} else {
logger.Infow("[ResetTraffic] No users found for yearly reset")
}
err = l.svc.SubscribeModel.ClearCache(ctx, resetYearSubIds...)
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear yearly reset subscription cache", logger.Field("error", err.Error()))
}
return nil
})
if err != nil {
logger.Errorw("[ResetTraffic] Yearly reset transaction failed", logger.Field("error", err.Error()))
return err
}
logger.Infow("[ResetTraffic] Yearly reset process completed")
return nil
}
// getRetryCount retrieves the current retry count from Redis
func (l *ResetTrafficLogic) getRetryCount(ctx context.Context) int {
countStr, err := l.svc.Redis.Get(ctx, retryCountKey).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return 0 // No retry count found, start with 0
}
logger.Errorw("[ResetTraffic] Failed to get retry count", logger.Field("error", err.Error()))
return 0
}
count, err := strconv.Atoi(countStr)
if err != nil {
logger.Errorw("[ResetTraffic] Invalid retry count format", logger.Field("value", countStr))
return 0
}
return count
}
// setRetryCount sets the retry count in Redis
func (l *ResetTrafficLogic) setRetryCount(ctx context.Context, count int) {
err := l.svc.Redis.Set(ctx, retryCountKey, count, 24*time.Hour).Err()
if err != nil {
logger.Errorw("[ResetTraffic] Failed to set retry count",
logger.Field("count", count),
logger.Field("error", err.Error()))
}
}
// clearRetryCount removes the retry count from Redis
func (l *ResetTrafficLogic) clearRetryCount(ctx context.Context) {
err := l.svc.Redis.Del(ctx, retryCountKey).Err()
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear retry count", logger.Field("error", err.Error()))
}
}
// acquireLock attempts to acquire a distributed lock
func (l *ResetTrafficLogic) acquireLock(ctx context.Context) bool {
result := l.svc.Redis.SetNX(ctx, lockKey, "locked", lockTimeout)
acquired, err := result.Result()
if err != nil {
logger.Errorw("[ResetTraffic] Failed to acquire lock", logger.Field("error", err.Error()))
return false
}
if acquired {
logger.Infow("[ResetTraffic] Lock acquired successfully")
} else {
logger.Infow("[ResetTraffic] Lock already exists, another task is running")
}
return acquired
}
// releaseLock releases the distributed lock
func (l *ResetTrafficLogic) releaseLock(ctx context.Context) {
err := l.svc.Redis.Del(ctx, lockKey).Err()
if err != nil {
logger.Errorw("[ResetTraffic] Failed to release lock", logger.Field("error", err.Error()))
} else {
logger.Infow("[ResetTraffic] Lock released successfully")
}
}
// isRetryableError determines if an error is retryable
func (l *ResetTrafficLogic) isRetryableError(err error) bool {
if err == nil {
return false
}
errorMessage := strings.ToLower(err.Error())
// Network and connection errors (retryable)
retryableErrors := []string{
"connection refused",
"connection reset",
"connection timeout",
"network",
"timeout",
"dial",
"context deadline exceeded",
"temporary failure",
"server error",
"service unavailable",
"internal server error",
"database is locked",
"too many connections",
"deadlock",
"lock wait timeout",
}
// Database constraint errors (non-retryable)
nonRetryableErrors := []string{
"foreign key constraint",
"unique constraint",
"check constraint",
"not null constraint",
"invalid input syntax",
"column does not exist",
"table does not exist",
"permission denied",
"access denied",
"authentication failed",
"invalid credentials",
}
// Check for non-retryable errors first
for _, nonRetryable := range nonRetryableErrors {
if strings.Contains(errorMessage, nonRetryable) {
logger.Infow("[ResetTraffic] Non-retryable error detected",
logger.Field("error", err.Error()),
logger.Field("pattern", nonRetryable))
return false
}
}
// Check for retryable errors
for _, retryable := range retryableErrors {
if strings.Contains(errorMessage, retryable) {
logger.Infow("[ResetTraffic] Retryable error detected",
logger.Field("error", err.Error()),
logger.Field("pattern", retryable))
return true
}
}
// Default: treat unknown errors as retryable, but log for analysis
logger.Infow("[ResetTraffic] Unknown error type, treating as retryable",
logger.Field("error", err.Error()))
return true
}
// clearCache clears the reset traffic cache
func (l *ResetTrafficLogic) clearCache(ctx context.Context, list []*user.Subscribe) {
if len(list) != 0 {
subs := make(map[int64]bool)
for _, sub := range list {
if sub.SubscribeId > 0 {
err := l.svc.UserModel.ClearSubscribeCache(ctx, sub)
if err != nil {
logger.Errorw("[ResetTraffic] Failed to clear cache for subscription",
logger.Field("subscribeId", sub.SubscribeId),
logger.Field("error", err.Error()))
}
if _, ok := subs[sub.SubscribeId]; !ok {
subs[sub.SubscribeId] = true
}
}
// Insert traffic reset log
l.insertLog(ctx, sub.Id, sub.UserId)
}
for sub, _ := range subs {
info, err := l.svc.SubscribeModel.FindOne(ctx, sub)
if err != nil {
logger.Errorw("[CheckSubscription] FindOne subscribe failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", sub))
continue
}
if info != nil && info.Id == sub {
var nodes []int64
if info.Nodes != "" {
nodes = tool.StringToInt64Slice(info.Nodes)
}
var tag []string
if info.NodeTags != "" {
tag = strings.Split(info.NodeTags, ",")
}
err = l.svc.NodeModel.ClearNodeCache(ctx, &node.FilterNodeParams{
Page: 1,
Size: 1000,
Tag: tag,
ServerId: nodes,
})
if err != nil {
logger.Errorw("[CheckSubscription] ClearNodeCache failed", logger.Field("error", err.Error()), logger.Field("subscribe_id", sub))
continue
}
}
}
}
}
// insertLog inserts a reset traffic log entry
func (l *ResetTrafficLogic) insertLog(ctx context.Context, subId, userId int64) {
trafficLog := log.ResetSubscribe{
Type: log.ResetSubscribeTypeAuto,
UserId: userId,
Timestamp: time.Now().UnixMilli(),
}
content, _ := trafficLog.Marshal()
if err := l.svc.DB.WithContext(ctx).Model(&log.SystemLog{}).Create(&log.SystemLog{
Type: log.TypeResetSubscribe.Uint8(),
ObjectID: subId,
Date: time.Now().Format(time.DateOnly),
Content: string(content),
}).Error; err != nil {
logger.Errorw("[ResetTraffic] Failed to create system log for subscription", logger.Field("error", err.Error()))
}
}