fix: resolve order queue loss issue with retry mechanism and idempotency

- Fix task error handling: return actual errors instead of nil to enable retry
- Add idempotency check: skip processing for already finished orders
- Extend temp order cache: increase from 15 minutes to 24 hours
- Configure retry policy: add MaxRetry(5) for all payment callbacks (Epay, Alipay, Stripe)

This fixes the critical issue where paid orders were being lost due to:
1. Failed tasks being marked as successful and deleted from queue
2. Temporary order info expiring before queue processing
3. No retry mechanism for transient failures

Changes:
- queue/logic/order/activateOrderLogic.go: Fix error returns and add idempotency
- internal/logic/public/portal/purchaseLogic.go: Extend cache to 24 hours
- internal/logic/notify/*NotifyLogic.go: Add retry configuration
This commit is contained in:
EUForest 2026-01-12 18:30:42 +08:00
parent 7d4a19c9a3
commit 5f55b1242e
5 changed files with 20 additions and 7 deletions

View File

@ -82,7 +82,7 @@ func (l *AlipayNotifyLogic) AlipayNotify(r *http.Request) error {
l.Logger.Error("[AlipayNotify] Marshal payload failed", logger.Field("error", err.Error())) l.Logger.Error("[AlipayNotify] Marshal payload failed", logger.Field("error", err.Error()))
return err return err
} }
task := asynq.NewTask(types.ForthwithActivateOrder, bytes) task := asynq.NewTask(types.ForthwithActivateOrder, bytes, asynq.MaxRetry(5))
taskInfo, err := l.svcCtx.Queue.EnqueueContext(l.ctx, task) taskInfo, err := l.svcCtx.Queue.EnqueueContext(l.ctx, task)
if err != nil { if err != nil {
l.Logger.Error("[AlipayNotify] Enqueue task failed", logger.Field("error", err.Error())) l.Logger.Error("[AlipayNotify] Enqueue task failed", logger.Field("error", err.Error()))

View File

@ -84,7 +84,7 @@ func (l *EPayNotifyLogic) EPayNotify(req *types.EPayNotifyRequest) error {
l.Logger.Error("[EPayNotify] Marshal payload failed", logger.Field("error", err.Error())) l.Logger.Error("[EPayNotify] Marshal payload failed", logger.Field("error", err.Error()))
return err return err
} }
task := asynq.NewTask(queueType.ForthwithActivateOrder, bytes) task := asynq.NewTask(queueType.ForthwithActivateOrder, bytes, asynq.MaxRetry(5))
taskInfo, err := l.svcCtx.Queue.EnqueueContext(l.ctx, task) taskInfo, err := l.svcCtx.Queue.EnqueueContext(l.ctx, task)
if err != nil { if err != nil {
l.Logger.Error("[EPayNotify] Enqueue task failed", logger.Field("error", err.Error())) l.Logger.Error("[EPayNotify] Enqueue task failed", logger.Field("error", err.Error()))

View File

@ -85,7 +85,7 @@ func (l *StripeNotifyLogic) StripeNotify(r *http.Request, w http.ResponseWriter)
l.Errorw("[StripeNotify] Marshal error", logger.Field("errors", err.Error()), logger.Field("payload", payload)) l.Errorw("[StripeNotify] Marshal error", logger.Field("errors", err.Error()), logger.Field("payload", payload))
return err return err
} }
task := asynq.NewTask(types.ForthwithActivateOrder, bytes) task := asynq.NewTask(types.ForthwithActivateOrder, bytes, asynq.MaxRetry(5))
_, err = l.svcCtx.Queue.Enqueue(task) _, err = l.svcCtx.Queue.Enqueue(task)
if err != nil { if err != nil {
l.Errorw("[StripeNotify] Enqueue error", logger.Field("errors", err.Error())) l.Errorw("[StripeNotify] Enqueue error", logger.Field("errors", err.Error()))

View File

@ -150,7 +150,7 @@ func (l *PurchaseLogic) Purchase(req *types.PortalPurchaseRequest) (resp *types.
} }
content, _ := tempOrder.Marshal() content, _ := tempOrder.Marshal()
if _, err = l.svcCtx.Redis.Set(l.ctx, fmt.Sprintf(constant.TempOrderCacheKey, orderInfo.OrderNo), string(content), CloseOrderTimeMinutes*time.Minute).Result(); err != nil { if _, err = l.svcCtx.Redis.Set(l.ctx, fmt.Sprintf(constant.TempOrderCacheKey, orderInfo.OrderNo), string(content), 24*time.Hour).Result(); err != nil {
l.Errorw("[Purchase] Redis set error", logger.Field("error", err.Error()), logger.Field("order_no", orderInfo.OrderNo)) l.Errorw("[Purchase] Redis set error", logger.Field("error", err.Error()), logger.Field("order_no", orderInfo.OrderNo))
return err return err
} }

View File

@ -68,17 +68,22 @@ func NewActivateOrderLogic(svc *svc.ServiceContext) *ActivateOrderLogic {
func (l *ActivateOrderLogic) ProcessTask(ctx context.Context, task *asynq.Task) error { func (l *ActivateOrderLogic) ProcessTask(ctx context.Context, task *asynq.Task) error {
payload, err := l.parsePayload(ctx, task.Payload()) payload, err := l.parsePayload(ctx, task.Payload())
if err != nil { if err != nil {
return nil // Log and continue return err // Return error to trigger retry
} }
orderInfo, err := l.validateAndGetOrder(ctx, payload.OrderNo) orderInfo, err := l.validateAndGetOrder(ctx, payload.OrderNo)
if err != nil { if err != nil {
return nil // Log and continue return err // Return error to trigger retry
}
// Idempotency: if order is already finished, skip processing
if orderInfo == nil {
return nil
} }
if err = l.processOrderByType(ctx, orderInfo); err != nil { if err = l.processOrderByType(ctx, orderInfo); err != nil {
logger.WithContext(ctx).Error("[ActivateOrderLogic] Process task failed", logger.Field("error", err.Error())) logger.WithContext(ctx).Error("[ActivateOrderLogic] Process task failed", logger.Field("error", err.Error()))
return nil return err // Return error to trigger retry
} }
l.finalizeCouponAndOrder(ctx, orderInfo) l.finalizeCouponAndOrder(ctx, orderInfo)
@ -110,6 +115,14 @@ func (l *ActivateOrderLogic) validateAndGetOrder(ctx context.Context, orderNo st
return nil, err return nil, err
} }
// Idempotency check: if order is already finished, return success
if orderInfo.Status == OrderStatusFinished {
logger.WithContext(ctx).Info("Order already finished, skip processing",
logger.Field("order_no", orderInfo.OrderNo),
)
return nil, nil
}
if orderInfo.Status != OrderStatusPaid { if orderInfo.Status != OrderStatusPaid {
logger.WithContext(ctx).Error("Order status error", logger.WithContext(ctx).Error("Order status error",
logger.Field("order_no", orderInfo.OrderNo), logger.Field("order_no", orderInfo.OrderNo),