diff --git a/docs/docs/bridges/symfony-messenger.rst b/docs/docs/bridges/symfony-messenger.rst index 14903dc6..9d40ef4b 100644 --- a/docs/docs/bridges/symfony-messenger.rst +++ b/docs/docs/bridges/symfony-messenger.rst @@ -19,6 +19,7 @@ message will be dispatched and handled by the `LaunchJobMessageHandler `__ will be called with that message after being routed. + How to configure an async transport for the launcher? ------------------------------------------------------------ @@ -46,6 +47,31 @@ You will end with something like: | :doc:`Bridge with Symfony Framework ` +How to configure different transport for your jobs? +------------------------------------------------------------ + +On some projects, you will end with different messenger transports, +and will not want to run all jobs on the same transport. + +| Because we are using the same message class for all jobs, there is no way to configure this in Symfony. +| Instead, you will have to configure a job name to transport routing, in our side of the configuration. +| It is very likely to the messenger routing configuration, but you will use the job name instead of the message class. + +.. code-block:: yaml + + # config/packages/yokai_batch.yaml + yokai_batch: + launchers: + messenger: + routing: + export_job_name: async_with_low_priority + import_job_name: async_with_high_priority + +.. seealso:: + | :doc:`What is a job launcher? ` + | :doc:`Getting started with Symfony Framework ` + + Dispatch item with messenger writer ------------------------------------------------------------ diff --git a/src/batch-symfony-framework/src/DependencyInjection/Configuration.php b/src/batch-symfony-framework/src/DependencyInjection/Configuration.php index f5efb194..ce9eac78 100644 --- a/src/batch-symfony-framework/src/DependencyInjection/Configuration.php +++ b/src/batch-symfony-framework/src/DependencyInjection/Configuration.php @@ -33,6 +33,9 @@ * @phpstan-type LauncherConfig array{ * default: string|null, * launchers: array, + * messenger?: array{ + * routing: array, + * }, * } * @phpstan-type ParametersConfig array{ * global: array, @@ -130,8 +133,18 @@ private function launcher(): ArrayNodeDefinition ->defaultValue(['simple' => 'simple://simple']) ->useAttributeAsKey('name') ->scalarPrototype() - ->validate() - ->ifTrue($isInvalidDsn)->thenInvalid('Invalid job launcher DSN.') + ->validate() + ->ifTrue($isInvalidDsn)->thenInvalid('Invalid job launcher DSN.') + ->end() + ->end() + ->end() + ->arrayNode('messenger') + ->children() + ->arrayNode('routing') + ->normalizeKeys(false) + ->useAttributeAsKey('name') + ->scalarPrototype()->end() + ->end() ->end() ->end() ->end() @@ -164,15 +177,15 @@ private function parameters(): ArrayNodeDefinition ->children() ->arrayNode('global') ->useAttributeAsKey('name') - ->variablePrototype() - ->end() + ->variablePrototype()->end() ->end() ->arrayNode('per_job') ->useAttributeAsKey('name') ->variablePrototype() - ->validate() - ->ifTrue(fn(mixed $value) => !$isStringAssociativeArray($value)) - ->thenInvalid('Should be an array.') + ->validate() + ->ifTrue(fn(mixed $value) => !$isStringAssociativeArray($value)) + ->thenInvalid('Should be an array.') + ->end() ->end() ->end() ->end() diff --git a/src/batch-symfony-framework/src/DependencyInjection/JobLauncherDefinitionFactory.php b/src/batch-symfony-framework/src/DependencyInjection/JobLauncherDefinitionFactory.php index d7185592..5b95fd4d 100644 --- a/src/batch-symfony-framework/src/DependencyInjection/JobLauncherDefinitionFactory.php +++ b/src/batch-symfony-framework/src/DependencyInjection/JobLauncherDefinitionFactory.php @@ -11,6 +11,7 @@ use Yokai\Batch\Bridge\Symfony\Console\CommandRunner; use Yokai\Batch\Bridge\Symfony\Console\RunCommandJobLauncher; use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher; +use Yokai\Batch\Bridge\Symfony\Messenger\MessengerJobsConfiguration; use Yokai\Batch\Launcher\JobLauncherInterface; use Yokai\Batch\Launcher\SimpleJobLauncher; use Yokai\Batch\Storage\JobExecutionStorageInterface; @@ -71,6 +72,9 @@ private static function messenger(): Definition '$jobExecutionFactory' => new Reference('yokai_batch.job_execution_factory'), '$jobExecutionStorage' => new Reference(JobExecutionStorageInterface::class), '$messageBus' => new Reference(MessageBusInterface::class), + '$messengerJobsConfiguration' => new Definition(MessengerJobsConfiguration::class, [ + '$routing' => '%yokai_batch.launcher.messenger_routing%', + ]), ]); } diff --git a/src/batch-symfony-framework/src/DependencyInjection/YokaiBatchExtension.php b/src/batch-symfony-framework/src/DependencyInjection/YokaiBatchExtension.php index ea0b3d03..a0ef0363 100644 --- a/src/batch-symfony-framework/src/DependencyInjection/YokaiBatchExtension.php +++ b/src/batch-symfony-framework/src/DependencyInjection/YokaiBatchExtension.php @@ -144,6 +144,8 @@ private function configureLauncher(ContainerBuilder $container, array $config): )); } + $container->setParameter('yokai_batch.launcher.messenger_routing', $config['messenger']['routing'] ?? []); + $launcherIdPerLauncherName = []; foreach ($config['launchers'] as $name => $dsn) { $definitionOrReference = JobLauncherDefinitionFactory::fromDsn($dsn); diff --git a/src/batch-symfony-messenger/src/DispatchMessageJobLauncher.php b/src/batch-symfony-messenger/src/DispatchMessageJobLauncher.php index 43969a37..9088b31a 100644 --- a/src/batch-symfony-messenger/src/DispatchMessageJobLauncher.php +++ b/src/batch-symfony-messenger/src/DispatchMessageJobLauncher.php @@ -4,8 +4,10 @@ namespace Yokai\Batch\Bridge\Symfony\Messenger; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\ExceptionInterface; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\TransportNamesStamp; use Yokai\Batch\BatchStatus; use Yokai\Batch\Factory\JobExecutionFactory; use Yokai\Batch\JobExecution; @@ -21,6 +23,7 @@ public function __construct( private JobExecutionFactory $jobExecutionFactory, private JobExecutionStorageInterface $jobExecutionStorage, private MessageBusInterface $messageBus, + private MessengerJobsConfiguration $messengerJobsConfiguration, ) { } @@ -33,9 +36,17 @@ public function launch(string $name, array $configuration = []): JobExecution $jobExecution->setStatus(BatchStatus::PENDING); $this->jobExecutionStorage->store($jobExecution); + $message = new LaunchJobMessage($name, $configuration); + + // if it was configured a specific transport name for this job, add a stamp to force it + $transportName = $this->messengerJobsConfiguration->getTransportNameForJobName($name); + if ($transportName !== null) { + $message = (new Envelope($message)) + ->with(new TransportNamesStamp($transportName)); + } + try { - // dispatch message - $this->messageBus->dispatch(new LaunchJobMessage($name, $configuration)); + $this->messageBus->dispatch($message); } catch (ExceptionInterface $exception) { // if a messenger exception occurs, it will be converted to job failure $jobExecution->setStatus(BatchStatus::FAILED); diff --git a/src/batch-symfony-messenger/src/MessengerJobsConfiguration.php b/src/batch-symfony-messenger/src/MessengerJobsConfiguration.php new file mode 100644 index 00000000..4f1ad0f2 --- /dev/null +++ b/src/batch-symfony-messenger/src/MessengerJobsConfiguration.php @@ -0,0 +1,26 @@ + */ + private array $routing, + ) { + } + + /** + * Get the configured transport name for a job name. + * Return null if none was provided. + */ + public function getTransportNameForJobName(string $jobName): string|null + { + return $this->routing[$jobName] ?? null; + } +} diff --git a/src/batch-symfony-messenger/tests/DispatchMessageJobLauncherTest.php b/src/batch-symfony-messenger/tests/DispatchMessageJobLauncherTest.php index 91981789..a5ffc5e3 100644 --- a/src/batch-symfony-messenger/tests/DispatchMessageJobLauncherTest.php +++ b/src/batch-symfony-messenger/tests/DispatchMessageJobLauncherTest.php @@ -5,10 +5,13 @@ namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\TransportNamesStamp; use Yokai\Batch\BatchStatus; use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher; use Yokai\Batch\Bridge\Symfony\Messenger\LaunchJobMessage; +use Yokai\Batch\Bridge\Symfony\Messenger\MessengerJobsConfiguration; use Yokai\Batch\Factory\JobExecutionFactory; use Yokai\Batch\Factory\JobExecutionParametersBuilder\NullJobExecutionParametersBuilder; use Yokai\Batch\Factory\UniqidJobExecutionIdGenerator; @@ -25,6 +28,7 @@ public function testLaunch(): void new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()), $storage = new InMemoryJobExecutionStorage(), $messageBus = new BufferingMessageBus(), + new MessengerJobsConfiguration([]), ); $jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]); @@ -48,6 +52,7 @@ public function testLaunchWithNoId(): void ), $storage = new InMemoryJobExecutionStorage(), $messageBus = new BufferingMessageBus(), + new MessengerJobsConfiguration([]), ); $jobExecutionFromLauncher = $jobLauncher->launch('testing'); @@ -67,6 +72,7 @@ public function testLaunchAndMessengerFail(): void new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()), $storage = new InMemoryJobExecutionStorage(), new FailingMessageBus(new TransportException('This is a test')), + new MessengerJobsConfiguration([]), ); $jobExecutionFromLauncher = $jobLauncher->launch('testing'); @@ -82,6 +88,33 @@ public function testLaunchAndMessengerFail(): void self::assertSame('This is a test', $failure->getMessage()); } + public function testLaunchWithRouting(): void + { + $jobLauncher = new DispatchMessageJobLauncher( + new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()), + $storage = new InMemoryJobExecutionStorage(), + $messageBus = new BufferingMessageBus(), + new MessengerJobsConfiguration(['testing' => 'custom_transport', 'unused' => 'unused_transport']), + ); + + $jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]); + + [$jobExecutionFromStorage] = $storage->getExecutions(); + self::assertSame($jobExecutionFromLauncher, $jobExecutionFromStorage); + + $messages = $messageBus->getMessages(); + self::assertCount(1, $messages); + $message = $messages[0]; + self::assertInstanceOf(Envelope::class, $message); + $stamp = $message->all(TransportNamesStamp::class)[0]; + self::assertInstanceOf(TransportNamesStamp::class, $stamp); + self::assertSame(['custom_transport'], $stamp->getTransportNames()); + $message = $message->getMessage(); + self::assertInstanceOf(LaunchJobMessage::class, $message); + self::assertSame('testing', $message->getJobName()); + self::assertSame(['_id' => '123456789', 'foo' => ['bar']], $message->getConfiguration()); + } + private static function assertJobWasTriggered(BufferingMessageBus $bus, string $jobName, array $config): void { $messages = $bus->getMessages();