Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/docs/bridges/symfony-messenger.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message will be dispatched and handled by the
`LaunchJobMessageHandler <https://github.com/yokai-php/batch-symfony-messenger/blob/0.x/src/LaunchJobMessageHandler.php>`__
will be called with that message after being routed.


How to configure an async transport for the launcher?
------------------------------------------------------------

Expand Down Expand Up @@ -46,6 +47,31 @@ You will end with something like:
| :doc:`Bridge with Symfony Framework </bridges/symfony-framework>`


How to configure different transport for your jobs?
------------------------------------------------------------

On some projects, you will end with different messenger transports,
and will not want to run all jobs on the same transport.

| Because we are using the same message class for all jobs, there is no way to configure this in Symfony.
| Instead, you will have to configure a job name to transport routing, in our side of the configuration.
| It is very likely to the messenger routing configuration, but you will use the job name instead of the message class.

.. code-block:: yaml

# config/packages/yokai_batch.yaml
yokai_batch:
launchers:
messenger:
routing:
export_job_name: async_with_low_priority
import_job_name: async_with_high_priority

.. seealso::
| :doc:`What is a job launcher? </core-concepts/job-launcher>`
| :doc:`Getting started with Symfony Framework </getting-started/with-symfony>`


Dispatch item with messenger writer
------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* @phpstan-type LauncherConfig array{
* default: string|null,
* launchers: array<string, string>,
* messenger?: array{
* routing: array<string, string>,
* },
* }
* @phpstan-type ParametersConfig array{
* global: array<string, mixed>,
Expand Down Expand Up @@ -130,8 +133,18 @@ private function launcher(): ArrayNodeDefinition
->defaultValue(['simple' => 'simple://simple'])
->useAttributeAsKey('name')
->scalarPrototype()
->validate()
->ifTrue($isInvalidDsn)->thenInvalid('Invalid job launcher DSN.')
->validate()
->ifTrue($isInvalidDsn)->thenInvalid('Invalid job launcher DSN.')
->end()
->end()
->end()
->arrayNode('messenger')
->children()
->arrayNode('routing')
->normalizeKeys(false)
->useAttributeAsKey('name')
->scalarPrototype()->end()
->end()
->end()
->end()
->end()
Expand Down Expand Up @@ -164,15 +177,15 @@ private function parameters(): ArrayNodeDefinition
->children()
->arrayNode('global')
->useAttributeAsKey('name')
->variablePrototype()
->end()
->variablePrototype()->end()
->end()
->arrayNode('per_job')
->useAttributeAsKey('name')
->variablePrototype()
->validate()
->ifTrue(fn(mixed $value) => !$isStringAssociativeArray($value))
->thenInvalid('Should be an array<string, mixed>.')
->validate()
->ifTrue(fn(mixed $value) => !$isStringAssociativeArray($value))
->thenInvalid('Should be an array<string, mixed>.')
->end()
->end()
->end()
->end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Yokai\Batch\Bridge\Symfony\Console\CommandRunner;
use Yokai\Batch\Bridge\Symfony\Console\RunCommandJobLauncher;
use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher;
use Yokai\Batch\Bridge\Symfony\Messenger\MessengerJobsConfiguration;
use Yokai\Batch\Launcher\JobLauncherInterface;
use Yokai\Batch\Launcher\SimpleJobLauncher;
use Yokai\Batch\Storage\JobExecutionStorageInterface;
Expand Down Expand Up @@ -71,6 +72,9 @@ private static function messenger(): Definition
'$jobExecutionFactory' => new Reference('yokai_batch.job_execution_factory'),
'$jobExecutionStorage' => new Reference(JobExecutionStorageInterface::class),
'$messageBus' => new Reference(MessageBusInterface::class),
'$messengerJobsConfiguration' => new Definition(MessengerJobsConfiguration::class, [
'$routing' => '%yokai_batch.launcher.messenger_routing%',
]),
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ private function configureLauncher(ContainerBuilder $container, array $config):
));
}

$container->setParameter('yokai_batch.launcher.messenger_routing', $config['messenger']['routing'] ?? []);

$launcherIdPerLauncherName = [];
foreach ($config['launchers'] as $name => $dsn) {
$definitionOrReference = JobLauncherDefinitionFactory::fromDsn($dsn);
Expand Down
15 changes: 13 additions & 2 deletions src/batch-symfony-messenger/src/DispatchMessageJobLauncher.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

namespace Yokai\Batch\Bridge\Symfony\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\ExceptionInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
use Yokai\Batch\BatchStatus;
use Yokai\Batch\Factory\JobExecutionFactory;
use Yokai\Batch\JobExecution;
Expand All @@ -21,6 +23,7 @@ public function __construct(
private JobExecutionFactory $jobExecutionFactory,
private JobExecutionStorageInterface $jobExecutionStorage,
private MessageBusInterface $messageBus,
private MessengerJobsConfiguration $messengerJobsConfiguration,
) {
}

Expand All @@ -33,9 +36,17 @@ public function launch(string $name, array $configuration = []): JobExecution
$jobExecution->setStatus(BatchStatus::PENDING);
$this->jobExecutionStorage->store($jobExecution);

$message = new LaunchJobMessage($name, $configuration);

// if it was configured a specific transport name for this job, add a stamp to force it
$transportName = $this->messengerJobsConfiguration->getTransportNameForJobName($name);
if ($transportName !== null) {
$message = (new Envelope($message))
->with(new TransportNamesStamp($transportName));
}

try {
// dispatch message
$this->messageBus->dispatch(new LaunchJobMessage($name, $configuration));
$this->messageBus->dispatch($message);
} catch (ExceptionInterface $exception) {
// if a messenger exception occurs, it will be converted to job failure
$jobExecution->setStatus(BatchStatus::FAILED);
Expand Down
26 changes: 26 additions & 0 deletions src/batch-symfony-messenger/src/MessengerJobsConfiguration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Bridge\Symfony\Messenger;

/**
* Holds the Symfony messenger configuration.
*/
final class MessengerJobsConfiguration
{
public function __construct(
/** @var array<string, string> */
private array $routing,
) {
}

/**
* Get the configured transport name for a job name.
* Return null if none was provided.
*/
public function getTransportNameForJobName(string $jobName): string|null
{
return $this->routing[$jobName] ?? null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
use Yokai\Batch\BatchStatus;
use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher;
use Yokai\Batch\Bridge\Symfony\Messenger\LaunchJobMessage;
use Yokai\Batch\Bridge\Symfony\Messenger\MessengerJobsConfiguration;
use Yokai\Batch\Factory\JobExecutionFactory;
use Yokai\Batch\Factory\JobExecutionParametersBuilder\NullJobExecutionParametersBuilder;
use Yokai\Batch\Factory\UniqidJobExecutionIdGenerator;
Expand All @@ -25,6 +28,7 @@ public function testLaunch(): void
new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()),
$storage = new InMemoryJobExecutionStorage(),
$messageBus = new BufferingMessageBus(),
new MessengerJobsConfiguration([]),
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]);
Expand All @@ -48,6 +52,7 @@ public function testLaunchWithNoId(): void
),
$storage = new InMemoryJobExecutionStorage(),
$messageBus = new BufferingMessageBus(),
new MessengerJobsConfiguration([]),
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing');
Expand All @@ -67,6 +72,7 @@ public function testLaunchAndMessengerFail(): void
new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()),
$storage = new InMemoryJobExecutionStorage(),
new FailingMessageBus(new TransportException('This is a test')),
new MessengerJobsConfiguration([]),
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing');
Expand All @@ -82,6 +88,33 @@ public function testLaunchAndMessengerFail(): void
self::assertSame('This is a test', $failure->getMessage());
}

public function testLaunchWithRouting(): void
{
$jobLauncher = new DispatchMessageJobLauncher(
new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()),
$storage = new InMemoryJobExecutionStorage(),
$messageBus = new BufferingMessageBus(),
new MessengerJobsConfiguration(['testing' => 'custom_transport', 'unused' => 'unused_transport']),
);

$jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]);

[$jobExecutionFromStorage] = $storage->getExecutions();
self::assertSame($jobExecutionFromLauncher, $jobExecutionFromStorage);

$messages = $messageBus->getMessages();
self::assertCount(1, $messages);
$message = $messages[0];
self::assertInstanceOf(Envelope::class, $message);
$stamp = $message->all(TransportNamesStamp::class)[0];
self::assertInstanceOf(TransportNamesStamp::class, $stamp);
self::assertSame(['custom_transport'], $stamp->getTransportNames());
$message = $message->getMessage();
self::assertInstanceOf(LaunchJobMessage::class, $message);
self::assertSame('testing', $message->getJobName());
self::assertSame(['_id' => '123456789', 'foo' => ['bar']], $message->getConfiguration());
}

private static function assertJobWasTriggered(BufferingMessageBus $bus, string $jobName, array $config): void
{
$messages = $bus->getMessages();
Expand Down
Loading