vendor/symfony/messenger/EventListener/SendFailedMessageForRetryListener.php line 51

Open in your IDE?
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\EventListener;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  14. use Symfony\Component\Messenger\Envelope;
  15. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  16. use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
  17. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  18. use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
  19. use Symfony\Component\Messenger\Exception\RuntimeException;
  20. use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
  21. use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
  22. use Symfony\Component\Messenger\Stamp\DelayStamp;
  23. use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
  24. use Symfony\Component\Messenger\Stamp\StampInterface;
  25. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  26. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  27. /**
  28. * @author Tobias Schultze <http://tobion.de>
  29. */
  30. class SendFailedMessageForRetryListener implements EventSubscriberInterface
  31. {
  32. private $sendersLocator;
  33. private $retryStrategyLocator;
  34. private $logger;
  35. private $eventDispatcher;
  36. private $historySize;
  37. public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, ?LoggerInterface $logger = null, ?EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
  38. {
  39. $this->sendersLocator = $sendersLocator;
  40. $this->retryStrategyLocator = $retryStrategyLocator;
  41. $this->logger = $logger;
  42. $this->eventDispatcher = $eventDispatcher;
  43. $this->historySize = $historySize;
  44. }
  45. public function onMessageFailed(WorkerMessageFailedEvent $event)
  46. {
  47. $retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
  48. $envelope = $event->getEnvelope();
  49. $throwable = $event->getThrowable();
  50. $message = $envelope->getMessage();
  51. $context = [
  52. 'class' => \get_class($message),
  53. ];
  54. $shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
  55. $retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
  56. if ($shouldRetry) {
  57. $event->setForRetry();
  58. ++$retryCount;
  59. $delay = $retryStrategy->getWaitingTime($envelope, $throwable);
  60. if (null !== $this->logger) {
  61. $this->logger->warning('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
  62. }
  63. // add the delay and retry stamp info
  64. $retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
  65. // re-send the message for retry
  66. $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
  67. if (null !== $this->eventDispatcher) {
  68. $this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
  69. }
  70. } else {
  71. if (null !== $this->logger) {
  72. $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
  73. }
  74. }
  75. }
  76. /**
  77. * Adds stamps to the envelope by keeping only the First + Last N stamps.
  78. */
  79. private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
  80. {
  81. foreach ($stamps as $stamp) {
  82. $history = $envelope->all(\get_class($stamp));
  83. if (\count($history) < $this->historySize) {
  84. $envelope = $envelope->with($stamp);
  85. continue;
  86. }
  87. $history = array_merge(
  88. [$history[0]],
  89. \array_slice($history, -$this->historySize + 2),
  90. [$stamp]
  91. );
  92. $envelope = $envelope->withoutAll(\get_class($stamp))->with(...$history);
  93. }
  94. return $envelope;
  95. }
  96. public static function getSubscribedEvents()
  97. {
  98. return [
  99. // must have higher priority than SendFailedMessageToFailureTransportListener
  100. WorkerMessageFailedEvent::class => ['onMessageFailed', 100],
  101. ];
  102. }
  103. private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
  104. {
  105. if ($e instanceof RecoverableExceptionInterface) {
  106. return true;
  107. }
  108. // if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
  109. // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
  110. if ($e instanceof HandlerFailedException) {
  111. $shouldNotRetry = true;
  112. foreach ($e->getNestedExceptions() as $nestedException) {
  113. if ($nestedException instanceof RecoverableExceptionInterface) {
  114. return true;
  115. }
  116. if (!$nestedException instanceof UnrecoverableExceptionInterface) {
  117. $shouldNotRetry = false;
  118. break;
  119. }
  120. }
  121. if ($shouldNotRetry) {
  122. return false;
  123. }
  124. }
  125. if ($e instanceof UnrecoverableExceptionInterface) {
  126. return false;
  127. }
  128. return $retryStrategy->isRetryable($envelope, $e);
  129. }
  130. private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
  131. {
  132. if ($this->retryStrategyLocator->has($alias)) {
  133. return $this->retryStrategyLocator->get($alias);
  134. }
  135. return null;
  136. }
  137. private function getSenderForTransport(string $alias): SenderInterface
  138. {
  139. if ($this->sendersLocator->has($alias)) {
  140. return $this->sendersLocator->get($alias);
  141. }
  142. throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
  143. }
  144. }