From 5f55b1242e787bda4486c316dc9fce6a3ca6f11d Mon Sep 17 00:00:00 2001 From: EUForest Date: Mon, 12 Jan 2026 18:30:42 +0800 Subject: [PATCH] fix: resolve order queue loss issue with retry mechanism and idempotency - Fix task error handling: return actual errors instead of nil to enable retry - Add idempotency check: skip processing for already finished orders - Extend temp order cache: increase from 15 minutes to 24 hours - Configure retry policy: add MaxRetry(5) for all payment callbacks (Epay, Alipay, Stripe) This fixes the critical issue where paid orders were being lost due to: 1. Failed tasks being marked as successful and deleted from queue 2. Temporary order info expiring before queue processing 3. No retry mechanism for transient failures Changes: - queue/logic/order/activateOrderLogic.go: Fix error returns and add idempotency - internal/logic/public/portal/purchaseLogic.go: Extend cache to 24 hours - internal/logic/notify/*NotifyLogic.go: Add retry configuration --- internal/logic/notify/alipayNotifyLogic.go | 2 +- internal/logic/notify/ePayNotifyLogic.go | 2 +- internal/logic/notify/stripeNotifyLogic.go | 2 +- internal/logic/public/portal/purchaseLogic.go | 2 +- queue/logic/order/activateOrderLogic.go | 19 ++++++++++++++++--- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/internal/logic/notify/alipayNotifyLogic.go b/internal/logic/notify/alipayNotifyLogic.go index f0ce2aa..f3ff752 100644 --- a/internal/logic/notify/alipayNotifyLogic.go +++ b/internal/logic/notify/alipayNotifyLogic.go @@ -82,7 +82,7 @@ func (l *AlipayNotifyLogic) AlipayNotify(r *http.Request) error { l.Logger.Error("[AlipayNotify] Marshal payload failed", logger.Field("error", err.Error())) return err } - task := asynq.NewTask(types.ForthwithActivateOrder, bytes) + task := asynq.NewTask(types.ForthwithActivateOrder, bytes, asynq.MaxRetry(5)) taskInfo, err := l.svcCtx.Queue.EnqueueContext(l.ctx, task) if err != nil { l.Logger.Error("[AlipayNotify] Enqueue task failed", logger.Field("error", err.Error())) diff --git a/internal/logic/notify/ePayNotifyLogic.go b/internal/logic/notify/ePayNotifyLogic.go index efdd127..0fd33cc 100644 --- a/internal/logic/notify/ePayNotifyLogic.go +++ b/internal/logic/notify/ePayNotifyLogic.go @@ -84,7 +84,7 @@ func (l *EPayNotifyLogic) EPayNotify(req *types.EPayNotifyRequest) error { l.Logger.Error("[EPayNotify] Marshal payload failed", logger.Field("error", err.Error())) return err } - task := asynq.NewTask(queueType.ForthwithActivateOrder, bytes) + task := asynq.NewTask(queueType.ForthwithActivateOrder, bytes, asynq.MaxRetry(5)) taskInfo, err := l.svcCtx.Queue.EnqueueContext(l.ctx, task) if err != nil { l.Logger.Error("[EPayNotify] Enqueue task failed", logger.Field("error", err.Error())) diff --git a/internal/logic/notify/stripeNotifyLogic.go b/internal/logic/notify/stripeNotifyLogic.go index a364339..47b3d05 100644 --- a/internal/logic/notify/stripeNotifyLogic.go +++ b/internal/logic/notify/stripeNotifyLogic.go @@ -85,7 +85,7 @@ func (l *StripeNotifyLogic) StripeNotify(r *http.Request, w http.ResponseWriter) l.Errorw("[StripeNotify] Marshal error", logger.Field("errors", err.Error()), logger.Field("payload", payload)) return err } - task := asynq.NewTask(types.ForthwithActivateOrder, bytes) + task := asynq.NewTask(types.ForthwithActivateOrder, bytes, asynq.MaxRetry(5)) _, err = l.svcCtx.Queue.Enqueue(task) if err != nil { l.Errorw("[StripeNotify] Enqueue error", logger.Field("errors", err.Error())) diff --git a/internal/logic/public/portal/purchaseLogic.go b/internal/logic/public/portal/purchaseLogic.go index c21cb69..d588726 100644 --- a/internal/logic/public/portal/purchaseLogic.go +++ b/internal/logic/public/portal/purchaseLogic.go @@ -150,7 +150,7 @@ func (l *PurchaseLogic) Purchase(req *types.PortalPurchaseRequest) (resp *types. } content, _ := tempOrder.Marshal() - if _, err = l.svcCtx.Redis.Set(l.ctx, fmt.Sprintf(constant.TempOrderCacheKey, orderInfo.OrderNo), string(content), CloseOrderTimeMinutes*time.Minute).Result(); err != nil { + if _, err = l.svcCtx.Redis.Set(l.ctx, fmt.Sprintf(constant.TempOrderCacheKey, orderInfo.OrderNo), string(content), 24*time.Hour).Result(); err != nil { l.Errorw("[Purchase] Redis set error", logger.Field("error", err.Error()), logger.Field("order_no", orderInfo.OrderNo)) return err } diff --git a/queue/logic/order/activateOrderLogic.go b/queue/logic/order/activateOrderLogic.go index ff4f69e..ad96877 100644 --- a/queue/logic/order/activateOrderLogic.go +++ b/queue/logic/order/activateOrderLogic.go @@ -68,17 +68,22 @@ func NewActivateOrderLogic(svc *svc.ServiceContext) *ActivateOrderLogic { func (l *ActivateOrderLogic) ProcessTask(ctx context.Context, task *asynq.Task) error { payload, err := l.parsePayload(ctx, task.Payload()) if err != nil { - return nil // Log and continue + return err // Return error to trigger retry } orderInfo, err := l.validateAndGetOrder(ctx, payload.OrderNo) if err != nil { - return nil // Log and continue + return err // Return error to trigger retry + } + + // Idempotency: if order is already finished, skip processing + if orderInfo == nil { + return nil } if err = l.processOrderByType(ctx, orderInfo); err != nil { logger.WithContext(ctx).Error("[ActivateOrderLogic] Process task failed", logger.Field("error", err.Error())) - return nil + return err // Return error to trigger retry } l.finalizeCouponAndOrder(ctx, orderInfo) @@ -110,6 +115,14 @@ func (l *ActivateOrderLogic) validateAndGetOrder(ctx context.Context, orderNo st return nil, err } + // Idempotency check: if order is already finished, return success + if orderInfo.Status == OrderStatusFinished { + logger.WithContext(ctx).Info("Order already finished, skip processing", + logger.Field("order_no", orderInfo.OrderNo), + ) + return nil, nil + } + if orderInfo.Status != OrderStatusPaid { logger.WithContext(ctx).Error("Order status error", logger.Field("order_no", orderInfo.OrderNo),