This PR was squashed before being merged into the 4.3-dev branch (closes#30757).
Discussion
----------
[Messenger] Adding MessageCountAwareInterface to get transport message count
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | symfony/symfony-docs#11236
This adds a new optional interface that receivers should implement to give an approximate number of the messages "waiting" to be handled. Why? Because, with this, you could design a system that dynamically adds/removes worker processes if a specific transport is getting slammed and needs help. Creating that system could be something we discuss for core later, but this at least makes it possible - and means it could be implemented by the user or in a bundle... which I might do if we don't get it in core ;).
Commits
-------
fc5b0cf570 [Messenger] Adding MessageCountAwareInterface to get transport message count
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] bug fixes in Doctrine Transport
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets |
| License | MIT
| Doc PR |
Just tested the new Doctrine transport and I've see 3 bugs so far :
- [x] The message is not return by the transport
- [x] The headers column must be of type TEXT and not just STRING
- [ ] When using the PhpSerializer the message is truncated (PR: https://github.com/symfony/symfony/pull/30814)
The body in database looks like this :
```
O:36:"Symfony\Component\Messenger\Envelope":2:{s:44:"
```
The body given by the serializer is the following :
```
O:36:"Symfony\Component\Messenger\Envelope":2:{s:44:"Symfony\Component\Messenger\Envelopestamps";a:3:{s:49:"Symfony\Component\Messenger\Stamp\SerializerStamp";a:1:{i:0;O:49:"Symfony\Component\Messenger\Stamp\SerializerStamp":1:{s:58:"Symfony\Component\Messenger\Stamp\SerializerStampcontext";a:0:{}}}s:46:"Symfony\Component\Messenger\Stamp\BusNameStamp";a:1:{i:0;O:46:"Symfony\Component\Messenger\Stamp\BusNameStamp":1:{s:55:"Symfony\Component\Messenger\Stamp\BusNameStampbusName";s:21:"messenger.bus.default";}}s:43:"Symfony\Component\Messenger\Stamp\SentStamp";a:1:{i:0;O:43:"Symfony\Component\Messenger\Stamp\SentStamp":2:{s:56:"Symfony\Component\Messenger\Stamp\SentStampsenderClass";s:64:"Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport";s:56:"Symfony\Component\Messenger\Stamp\SentStampsenderAlias";s:16:"environment.stop";}}}s:45:"Symfony\Component\Messenger\Envelopemessage";O:34:"App\Message\EnvironmentStopMessage":1:{s:51:"App\Message\AbstractEnvironmentMessageenvironment";O:22:"App\Entity\Environment":5:{s:26:"App\Entity\Environmentid";s:36:"3bade252-b7a9-4188-82bd-3e68129e0da7";s:37:"App\Entity\EnvironmentrepositoryUrl";s:6:"string";s:30:"App\Entity\Environmentbranch";s:6:"string";s:33:"App\Entity\EnvironmenthostNames";a:1:{i:0;N;}s:27:"App\Entity\Environmentenv";a:2:{s:7:"APP_ENV";s:4:"prod";s:7:"APP_VAR";s:13:"example value";}}}}
```
Commits
-------
27466498d0 [Messenger] Fix get in Doctrine Transport
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Remove unused option in the Doctrine transport
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets |
| License | MIT
| Doc PR |
This PR remove the unused option `loop_sleep` in the Messenger Doctrine transport
Commits
-------
4811400372 [Messenger] Remove unused option in the Doctrine transport
This PR was squashed before being merged into the 4.3-dev branch (closes#30754).
Discussion
----------
[Messenger] New messenger:stop-workers Command
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | Kinda of #29451
| License | MIT
| Doc PR | symfony/symfony-docs#11236
o/ me again.
This requires and is built on top of #30708
When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :).
Now run:
```
bin/console messenger:stop-workers
```
And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`.
Cheers!
Commits
-------
58971627f5 [Messenger] New messenger:stop-workers Command
This PR was merged into the 4.3-dev branch.
Discussion
----------
Changing to MessageDecodingFailedException so that invalid messages are rejected
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes <!-- please add some, will be required by reviewers -->
| Fixed tickets | #30649
| License | MIT
| Doc PR | not needed for bug fix
Bug fix if a message body is completely blank. I'm fixing this on master only, because in 4.2 and earlier, there is actually no system in place to fail serialization and cause the messages to be rejected. In 4.3, we just need to throw this exception.
Cheers!
Commits
-------
4be827d3ca Changing to MessageDecodingFailedException so that invalid messages are rejected
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Updating SyncTransport for recent changes + tests
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | not needed
When making the `SyncTransport`, I neglected having at least one test in each of these classes, which allowed the test suite to pass, even after some interface changes made these classes fail. Fixed all of that :)
Commits
-------
2df023be46 Updating SyncTransport for recent changes + tests
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger][DX] Allow stamps to be passed directly to MessageBusInterface::dispatch()
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | yes
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | TODO
Me again o/!
This proposal is *purely* for DX. With `DelayStamp`, the proposal of QueueNameStamp and future things like `AmqpRoutingKeyStamp`, stamps are becoming more common for end users to use. This changes how it looks to use them:
```php
// before
$bus->dispatch(new Envelope(new SendSmsNotification('Hi!'), new DelayStamp(10), new QueueNameStamp('low')));
// after
$bus->dispatch(new SendSmsNotification('Hi!'), [new DelayStamp(10), new QueueNameStamp('low')]);
```
It's definitely a BC break, which is allowed because the component is experimental, though it should be minimized. This BC break shouldn't be felt by most end users, as creating your own bus is an advanced use-case. Even if you decorated it, you'll get an obvious error.
Commits
-------
e861de7e61 Allow stamps to be passed directly to MessageBusInterface::dispatch()
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Add a Doctrine transport
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets |
| License | MIT
| Doc PR | symfony/symfony-docs#10616
| DoctrineBundle PR | doctrine/DoctrineBundle#868
As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.
Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).
# How it works
The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868
## Configuration
To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)
```yml
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport: "doctrine://default?queue=important"
```
## Table schema
Dispatched messages are stored into a database table with the following schema:
| Column | Type | Options | Description |
|--------------|----------|--------------------------|-------------------------------------------------------------------|
| id | bigint | AUTO_INCREMENT, NOT NULL | Primary key |
| body | text | NOT NULL | Body of the message |
| headers | text | NOT NULL | Headers of the message |
| queue | varchar(32) | NOT NULL | Headers of the message |
| created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) |
| available_at | datetime | NOT NULL | When the message is available to be handled |
| delivered_at | datetime | NULL | When the message was delivered to a worker |
## Message dispatching
When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`
## Message consuming
The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.
### Getting the next message
* Start a transaction
* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)
* Update the message in database to update the delivered_at columns
* Commit the transaction
### Handling the message
The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.
If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.
## Message requeueing
It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)
# TODO
- [x] Add tests
- [x] Create DOC PR
- [x] PR on doctrine-bundle for transport factory
- [x] Add a `available_at` column
- [x] Add a `queue` column
- [x] Implement the retry functionnality : See #30557
- [x] Rebase after #29476
Commits
-------
88d008c828 [Messenger] Add a Doctrine transport
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Adding the "sync" transport to call handlers synchronously
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | symfony/symfony-docs#11236
This adds a `sync://` transport that just calls the handlers immediately. Why? This allows you to route your messages to some "async" transport. But then, when developing locally or running your tests, you can choose to run them synchronously instead:
```yml
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\SmsNotification': async
'App\Message\OtherMessage': async
```
```
# .env
# by default, handle this sync
MESSENGER_TRANSPORT_DSN=sync://
```
```
# .env.local on production (or set this via real env vars)
# on production, use amqp
MESSENGER_TRANSPORT_DSN=amqp://.......
```
Cheers!
Commits
-------
3da5a438aa Adding the "sync" transport to call handlers synchronously
This PR was merged into the 4.3-dev branch.
Discussion
----------
Add optional parameter `prefetching` for AMQP connection
Add prefetching connection parameter to setup channel prefetch count.
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets |
| License | MIT
| Doc PR |
When setting up AMQP transport connection, it can be interesting to configure prefetching on a channel, which is not currently possible.
Commits
-------
47777eedd6 Add optional parameter `prefetching` in connection configuration, to setup channel prefetch count
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Add a command to setup transports
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| License | MIT
This PR add a `SetupTransportsCommand` that allow to setup the transports.
Actually the `AMQPTransport` is setup only if debug is enabled. With this PR the new `messenger:setup-transports` will setup all declared transports.
Commits
-------
fbb534a838 [Messenger] Add a command to setup transports
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Contracts][EventDispatcher] add EventDispatcherInterface to symfony/contracts and use it where possible
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | -
| License | MIT
| Doc PR | -
This PR adds a new `EventDispatcherInterface` in `Contracts`. This interface contains only one method: `dispatch($event, $eventName)`. That covers almost all use cases of the event dispatcher in components (some use add/removeListeners/Subscribers but they are a minority.)
While doing so, it allows dispatching any objects as events - not only instances of `Event`.
This allows decoupling e.g. `Messenger` from the `EventDispatcher` component.
Next steps could be about planning to remove the base `Event` class from some concrete event classes and/or moving `EventSubscriberInterface` to `symfony/contracts`. It would allow decoupling e.g. `Workflow` from the component - but that's too far away for now, let's think about it in 5.1.
Commits
-------
3c3db2f14a [Contracts][EventDispatcher] add EventDispatcherInterface to symfony/contracts and use it where possible
This purpose of this event is to be a hook when a message is sent to a transport.
If that message is redelivered later, that's not the purpose of this hook (there
are Worker events for that) and could cause problems if the user unknowingly
tries to modify the Envelope in some way, not thinking about how this might
be a redelivery message.
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Add missing information in messenger logs
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes-ish
| New feature? | yes-ish
| BC breaks? | no
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass? | yes <!-- please add some, will be required by reviewers -->
| Fixed tickets | n/a
| License | MIT
| Doc PR | n/a
When using `messenger:consume`, I get the following logs:
```
2019-03-25T11:39:05+01:00 [info] Received message "Symfony\Component\Mailer\EnvelopedMessage"
2019-03-25T11:39:05+01:00 [warning] An exception occurred while handling message "Symfony\Component\Mailer\EnvelopedMessage"
2019-03-25T11:39:05+01:00 [info] Retrying Symfony\Component\Mailer\EnvelopedMessage - retry #1.
2019-03-25T11:39:05+01:00 [info] Sending message "Symfony\Component\Mailer\EnvelopedMessage" with "Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport"
2019-03-25T11:39:06+01:00 [info] Received message "Symfony\Component\Mailer\EnvelopedMessage"
2019-03-25T11:39:06+01:00 [warning] An exception occurred while handling message "Symfony\Component\Mailer\EnvelopedMessage"
2019-03-25T11:39:06+01:00 [info] Retrying Symfony\Component\Mailer\EnvelopedMessage - retry #2.
2019-03-25T11:39:06+01:00 [info] Sending message "Symfony\Component\Mailer\EnvelopedMessage" with "Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport"
2019-03-25T11:39:09+01:00 [info] Received message "Symfony\Component\Mailer\EnvelopedMessage"
2019-03-25T11:39:09+01:00 [warning] An exception occurred while handling message "Symfony\Component\Mailer\EnvelopedMessage"
2019-03-25T11:39:09+01:00 [info] Retrying Symfony\Component\Mailer\EnvelopedMessage - retry #3.
2019-03-25T11:39:09+01:00 [info] Sending message "Symfony\Component\Mailer\EnvelopedMessage" with "Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport"
```
So, an. error occurred, but I have no idea what's going on. The exception is in the context, but the context is not displayed by default. So, this PR fixes it.
Commits
-------
20664caf25 [Messenger] added missing information in messenger logs
This PR was merged into the 4.3-dev branch.
Discussion
----------
Fixing a bug where messenger:consume could send message to wrong bus
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | yes
| New feature? | arguably, yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | #30631
| License | MIT
| Doc PR | Not needed
This fixes#30631, where you can run `messener:consume` and accidentally sent received messages into the wrong bus.
The fix (done via middleware) is to attach a "bus name" to the `Envelope` and use it when the message is received to find that bus.
Commits
-------
ef077cf26c Fixing a bug where a transport could receive a message and dispatch it to a different bus