From 1c1365d862b209d5c5cb54e57cde229fa30a655c Mon Sep 17 00:00:00 2001 From: Chang lue Tsen Date: Wed, 10 Sep 2025 09:02:54 -0400 Subject: [PATCH] fix(email): update task handling to use generic task model and improve error logging --- .../getBatchSendEmailTaskStatusLogic.go | 6 +- .../marketing/stopBatchSendEmailTaskLogic.go | 2 +- pkg/email/worker.go | 64 +++++++++++++------ 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/internal/logic/admin/marketing/getBatchSendEmailTaskStatusLogic.go b/internal/logic/admin/marketing/getBatchSendEmailTaskStatusLogic.go index eec1380..e21a7c5 100644 --- a/internal/logic/admin/marketing/getBatchSendEmailTaskStatusLogic.go +++ b/internal/logic/admin/marketing/getBatchSendEmailTaskStatusLogic.go @@ -28,15 +28,15 @@ func NewGetBatchSendEmailTaskStatusLogic(ctx context.Context, svcCtx *svc.Servic func (l *GetBatchSendEmailTaskStatusLogic) GetBatchSendEmailTaskStatus(req *types.GetBatchSendEmailTaskStatusRequest) (resp *types.GetBatchSendEmailTaskStatusResponse, err error) { tx := l.svcCtx.DB - var taskInfo *task.EmailTask - err = tx.Model(&task.EmailTask{}).Where("id = ?", req.Id).First(&taskInfo).Error + var taskInfo *task.Task + err = tx.Model(&task.Task{}).Where("id = ?", req.Id).First(&taskInfo).Error if err != nil { l.Errorf("failed to get email task status, error: %v", err) return nil, xerr.NewErrCode(xerr.DatabaseQueryError) } return &types.GetBatchSendEmailTaskStatusResponse{ - Status: taskInfo.Status, + Status: uint8(taskInfo.Status), Total: int64(taskInfo.Total), Current: int64(taskInfo.Current), Errors: taskInfo.Errors, diff --git a/internal/logic/admin/marketing/stopBatchSendEmailTaskLogic.go b/internal/logic/admin/marketing/stopBatchSendEmailTaskLogic.go index 3b1d47d..da3949f 100644 --- a/internal/logic/admin/marketing/stopBatchSendEmailTaskLogic.go +++ b/internal/logic/admin/marketing/stopBatchSendEmailTaskLogic.go @@ -32,7 +32,7 @@ func (l *StopBatchSendEmailTaskLogic) StopBatchSendEmailTask(req *types.StopBatc } else { logger.Error("[StopBatchSendEmailTaskLogic] email.Manager is nil, cannot stop task") } - err = l.svcCtx.DB.Model(&task.EmailTask{}).Where("id = ?", req.Id).Update("status", 2).Error + err = l.svcCtx.DB.Model(&task.Task{}).Where("id = ?", req.Id).Update("status", 2).Error if err != nil { l.Errorf("failed to stop email task, error: %v", err) diff --git a/pkg/email/worker.go b/pkg/email/worker.go index 27dd05d..544b49a 100644 --- a/pkg/email/worker.go +++ b/pkg/email/worker.go @@ -3,11 +3,11 @@ package email import ( "context" "encoding/json" - "strings" "time" "github.com/perfect-panel/server/internal/model/task" "github.com/perfect-panel/server/pkg/logger" + "github.com/perfect-panel/server/pkg/tool" "gorm.io/gorm" ) @@ -50,14 +50,13 @@ func (w *Worker) Start() { limit.Lock() defer limit.Unlock() tx := w.db.WithContext(w.ctx) - var taskInfo task.EmailTask - if err := tx.Model(&task.EmailTask{}).Where("id = ?", w.id).First(&taskInfo).Error; err != nil { + var taskInfo task.Task + if err := tx.Model(&task.Task{}).Where("id = ?", w.id).First(&taskInfo).Error; err != nil { logger.Error("Batch Send Email", logger.Field("message", "Failed to find task"), logger.Field("error", err.Error()), logger.Field("task_id", w.id), ) - w.status = 2 // 设置状态为已完成 return } if taskInfo.Status != 0 { @@ -65,27 +64,50 @@ func (w *Worker) Start() { logger.Field("message", "Task already completed or in progress"), logger.Field("task_id", w.id), ) - w.status = 2 // 设置状态为已完成 return } - if taskInfo.Recipients == "" && taskInfo.Additional == "" { + + var scope task.EmailScope + if err := json.Unmarshal([]byte(taskInfo.Scope), &scope); err != nil { + logger.Error("Batch Send Email", + logger.Field("message", "Failed to parse task scope"), + logger.Field("error", err.Error()), + logger.Field("task_id", w.id), + ) + return + } + + if len(scope.Recipients) == 0 && len(scope.Additional) == 0 { logger.Error("Batch Send Email", logger.Field("message", "No recipients or additional emails provided"), logger.Field("task_id", w.id), ) - w.status = 2 // 设置状态为已完成 return } + + var content task.EmailContent + if err := json.Unmarshal([]byte(taskInfo.Content), &content); err != nil { + logger.Error("Batch Send Email", + logger.Field("message", "Failed to parse task content"), + logger.Field("error", err.Error()), + logger.Field("task_id", w.id), + ) + return + } + w.status = 1 // 设置状态为运行中 var recipients []string // 解析收件人 - if taskInfo.Recipients != "" { - recipients = append(recipients, strings.Split(taskInfo.Recipients, "\n")...) + if len(scope.Recipients) > 0 { + recipients = append(recipients, scope.Recipients...) } // 解析附加收件人 - if taskInfo.Additional != "" { - recipients = append(recipients, strings.Split(taskInfo.Additional, "\n")...) + if len(scope.Additional) > 0 { + recipients = append(recipients, scope.Additional...) } + // 去重和清理空字符串 + recipients = tool.RemoveDuplicateElements(recipients...) + if len(recipients) == 0 { logger.Error("Batch Send Email", logger.Field("message", "No valid recipients found"), @@ -97,10 +119,10 @@ func (w *Worker) Start() { // 设置发送间隔时间 var intervalTime time.Duration - if taskInfo.Interval == 0 { + if scope.Interval == 0 { intervalTime = 1 * time.Second } else { - intervalTime = time.Duration(taskInfo.Interval) * time.Second + intervalTime = time.Duration(scope.Interval) * time.Second } var errors []ErrorInfo @@ -115,12 +137,11 @@ func (w *Worker) Start() { return default: } - if taskInfo.Status == 0 { taskInfo.Status = 1 // 1 表示任务进行中 } - if err := w.sender.Send([]string{recipient}, taskInfo.Subject, taskInfo.Content); err != nil { + if err := w.sender.Send([]string{recipient}, content.Subject, content.Content); err != nil { logger.Error("Batch Send Email", logger.Field("message", "Failed to send email"), logger.Field("error", err.Error()), @@ -137,19 +158,24 @@ func (w *Worker) Start() { } count++ taskInfo.Current = count - if err := tx.Model(&task.EmailTask{}).Where("`id` = ?", taskInfo.Id).Save(&taskInfo).Error; err != nil { + if err := tx.Model(&task.Task{}).Where("`id` = ?", taskInfo.Id).Save(&taskInfo).Error; err != nil { logger.Error("Batch Send Email", logger.Field("message", "Failed to update task progress"), logger.Field("error", err.Error()), logger.Field("task_id", w.id), ) + errors = append(errors, ErrorInfo{ + Error: err.Error(), + Email: recipient, + Time: time.Now().Unix(), + }) w.status = 2 // 设置状态为已完成 - return } time.Sleep(intervalTime) } - taskInfo.Status = 2 // 设置状态为已完成 - if err := tx.Model(&task.EmailTask{}).Where("`id` = ?", taskInfo.Id).Save(&taskInfo).Error; err != nil { + w.status = 2 // 设置状态为已完成 + + if err := tx.Model(&task.Task{}).Where("`id` = ?", taskInfo.Id).Save(&taskInfo).Error; err != nil { logger.Error("Batch Send Email", logger.Field("message", "Failed to finalize task"), logger.Field("error", err.Error()),