Commit Graph

46 Commits

Author SHA1 Message Date
Fabien Potencier
343d28e3d1 feature #30754 [Messenger] New messenger:stop-workers Command (weaverryan)
This PR was squashed before being merged into the 4.3-dev branch (closes #30754).

Discussion
----------

[Messenger] New messenger:stop-workers Command

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Kinda of #29451
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

o/ me again.

This requires and is built on top of #30708

When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :).

Now run:

```
bin/console messenger:stop-workers
```

And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`.

Cheers!

Commits
-------

58971627f5 [Messenger] New messenger:stop-workers Command
2019-03-31 18:41:56 +02:00
Ryan Weaver
58971627f5 [Messenger] New messenger:stop-workers Command 2019-03-31 18:41:31 +02:00
Fabien Potencier
3abca64c5d feature #30707 [Messenger][DX] Allow stamps to be passed directly to MessageBusInterface::dispatch() (weaverryan)
This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger][DX] Allow stamps to be passed directly to MessageBusInterface::dispatch()

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | yes
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | none
| License       | MIT
| Doc PR        | TODO

Me again o/!

This proposal is *purely* for DX. With `DelayStamp`, the proposal of QueueNameStamp and future things like `AmqpRoutingKeyStamp`, stamps are becoming more common for end users to use. This changes how it looks to use them:

```php
// before
$bus->dispatch(new Envelope(new SendSmsNotification('Hi!'), new DelayStamp(10), new QueueNameStamp('low')));

// after
$bus->dispatch(new SendSmsNotification('Hi!'), [new DelayStamp(10), new QueueNameStamp('low')]);
```

It's definitely a BC break, which is allowed because the component is experimental, though it should be minimized. This BC break shouldn't be felt by most end users, as creating your own bus is an advanced use-case. Even if you decorated it, you'll get an obvious error.

Commits
-------

e861de7e61 Allow stamps to be passed directly to MessageBusInterface::dispatch()
2019-03-31 18:29:02 +02:00
Samuel ROZE
e897e6cc4e Add missing word in CHANGELOG 2019-03-31 17:26:00 +01:00
Samuel ROZE
d9e2732a7d feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet)
This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Add a Doctrine transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10616
| DoctrineBundle PR | doctrine/DoctrineBundle#868

As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.

Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).

# How it works

The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868

## Configuration

To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)
```yml
        # config/packages/messenger.yaml
        framework:
            messenger:
                transports:
                    my_transport: "doctrine://default?queue=important"
```

## Table schema

Dispatched messages are stored into a database table with the following schema:

| Column       | Type     | Options                  | Description                                                       |
|--------------|----------|--------------------------|-------------------------------------------------------------------|
| id           | bigint   | AUTO_INCREMENT, NOT NULL | Primary key                                                       |
| body         | text     | NOT NULL                 | Body of the message                                               |
| headers      | text     | NOT NULL                 | Headers of the message                                            |
| queue      | varchar(32)     | NOT NULL                 | Headers of the message                                            |
| created_at   | datetime | NOT NULL                 | When the message was inserted onto the table. (automatically set) |
| available_at       | datetime   | NOT NULL                 | When the message is available to be handled                      |
| delivered_at | datetime | NULL                     | When the message was delivered to a worker                        |

## Message dispatching

When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`

## Message consuming

The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.

### Getting the next message

* Start a transaction
* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)
* Update the message in database to update the delivered_at columns
* Commit the transaction

### Handling the message

The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.

If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.

## Message requeueing

It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a  `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)

# TODO

- [x] Add tests
- [x] Create DOC PR
- [x] PR on doctrine-bundle for transport factory
- [x] Add a `available_at` column
- [x] Add a `queue` column
- [x] Implement the retry functionnality : See #30557
- [x] Rebase after #29476

Commits
-------

88d008c828 [Messenger] Add a Doctrine transport
2019-03-31 17:25:18 +01:00
Ryan Weaver
e861de7e61 Allow stamps to be passed directly to MessageBusInterface::dispatch()
And changing signature of Envelope::__construct() to accept an array of envelopes
2019-03-31 12:05:21 -04:00
Ryan Weaver
ef6f23e8b9 Making the serializer configurable by transport 2019-03-31 16:54:31 +02:00
Vincent Touzet
88d008c828 [Messenger] Add a Doctrine transport 2019-03-31 16:05:11 +02:00
Fabien Potencier
7cb14f3186 fixed typo 2019-03-30 08:22:54 +01:00
Fabien Potencier
6ffad7bba4 feature #30759 [Messenger] Adding the "sync" transport to call handlers synchronously (weaverryan)
This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Adding the "sync" transport to call handlers synchronously

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | none
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

This adds a `sync://` transport that just calls the handlers immediately. Why? This allows you to route your messages to some "async" transport. But then, when developing locally or running your tests, you can choose to run them synchronously instead:

```yml
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
        routing:
            'App\Message\SmsNotification': async
            'App\Message\OtherMessage': async
```

```
# .env
# by default, handle this sync
MESSENGER_TRANSPORT_DSN=sync://
```

```
# .env.local on production (or set this via real env vars)
# on production, use amqp
MESSENGER_TRANSPORT_DSN=amqp://.......
```

Cheers!

Commits
-------

3da5a438aa Adding the "sync" transport to call handlers synchronously
2019-03-30 08:22:14 +01:00
Ryan Weaver
e800bd5bde [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports 2019-03-30 08:01:26 +01:00
Ryan Weaver
3da5a438aa Adding the "sync" transport to call handlers synchronously 2019-03-28 14:59:51 -04:00
Samuel ROZE
c30f462c2e feature #30671 Add optional parameter prefetching for AMQP connection (fbouchery)
This PR was merged into the 4.3-dev branch.

Discussion
----------

Add optional parameter `prefetching` for AMQP connection

Add prefetching connection parameter to setup channel prefetch count.

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        |

When setting up AMQP transport connection, it can be interesting to configure prefetching on a channel, which is not currently possible.

Commits
-------

47777eedd6 Add optional parameter `prefetching` in connection configuration, to setup channel prefetch count
2019-03-28 18:47:49 +07:00
fbouchery
47777eedd6 Add optional parameter prefetching in connection configuration, to setup channel prefetch count
Co-Authored-By: f2r <frederic.bouchery+github@gmail.com>
2019-03-28 18:20:31 +07:00
Vincent Touzet
fbb534a838 [Messenger] Add a command to setup transports 2019-03-26 21:08:11 +01:00
David Maicher
6263e48392 [Messenger] rename auto-setup amqp option into auto_setup 2019-03-26 11:03:16 +01:00
Samuel ROZE
21235310e3 Add a BC layer for the ConsumeMessagesCommand arguments 2019-03-23 22:35:41 +07:00
Ryan Weaver
ef077cf26c Fixing a bug where a transport could receive a message and dispatch it to a different bus 2019-03-23 21:29:45 +07:00
Ryan Weaver
a989384999 Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
2019-03-23 09:39:27 -04:00
Ryan Weaver
503c20989c Using AMQP auto-setup in all cases, not just in debug 2019-03-16 20:39:02 -04:00
Nicolas Grekas
2bff625abe [Messenger] deprecate LoggingMiddleware in favor of providing a logger to SendMessageMiddleware 2019-03-14 08:52:20 +01:00
Eric Masoero
7d6a3fa487 Updated changelog to document changes in AmqpReceiver 2019-03-04 11:14:17 +01:00
Eric Masoero
93c10013fa [Messenger] Added new TransportException which is thrown if transport could not send a message 2019-03-04 11:14:17 +01:00
Ryan Weaver
97e2e32af4 Changing default serializer in Messenger component to PhpSerializer 2019-01-25 10:10:32 -05:00
Ryan Weaver
4132bfebe7 updating CHANGELOGs and fixing tests 2019-01-23 10:01:36 -05:00
Maxime Steinhausser
6ba4e8aad5 [Messenger] Add a trait for synchronous query & command buses 2018-11-20 19:19:09 +01:00
Maxime Steinhausser
2f5acf790a [Messenger] Add handled & sent stamps 2018-11-15 10:18:06 +01:00
Nicolas Grekas
2e9885922a [Messenger] collect all stamps added on Envelope as collections 2018-11-12 08:39:23 +01:00
Maxime Steinhausser
3136611d1e
[Messenger] The component is still experimental 2018-11-04 20:00:43 +01:00
Nicolas Grekas
1e7af4d35e [Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included 2018-10-31 08:48:19 +01:00
Nicolas Grekas
4b0e015402 [Messenger] make dispatch(), handle() and send() methods return Envelope 2018-10-26 10:10:48 +02:00
Nicolas Grekas
2bc7d11ad3 [Messenger] Add StackInterface, allowing to unstack the call stack 2018-10-25 18:34:40 +02:00
Nicolas Grekas
aedb281b76 [Messenger] remove AllowNoHandlerMiddleware in favor of a constructor argument on HandleMessageMiddleware 2018-10-25 11:11:58 +02:00
Nicolas Grekas
16afb5e2b4 [Messenger] remove classifying sub-namespaces in favor of semantic ones 2018-10-25 09:48:15 +02:00
Nicolas Grekas
4a3edd0b37 [Messenger] internal cleanups 2018-10-21 16:46:18 +02:00
Nicolas Grekas
ae46a436e7 [Messenger] make Envelope first class citizen for middleware handlers 2018-10-21 14:43:41 +02:00
Nicolas Grekas
f942ffcb1b [Messenger] made dispatch() and handle() return void 2018-10-20 15:00:30 +02:00
Nicolas Grekas
0ad2cb906d [Messenger] rename "envelope items" and move them in the "Stamp" namespace 2018-10-18 15:00:56 +02:00
Nicolas Grekas
6c56e82080 [Messenger] drop "handler." prefix from ContainerHandlerLocator 2018-10-03 09:45:32 +02:00
Fabien Potencier
2d55ae5212 [Messenger] changed AmqpExt classes constructor signature 2018-09-10 08:21:34 +02:00
Samuel ROZE
5b93f5f45e Uses a messenger serializer, not an individual encoder/decoder 2018-09-09 11:28:13 +01:00
Fabien Potencier
e658e155aa [Messenger] added a SenderLocator decoupled from ContainerInterface 2018-09-08 14:26:08 +02:00
Fabien Potencier
8651758fc1 feature #28294 [Messenger] Remove the "obscure" message subscriber configuration (sroze)
This PR was merged into the 4.2-dev branch.

Discussion
----------

[Messenger] Remove the "obscure" message subscriber configuration

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | no
| BC breaks?    | yes
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | ø
| License       | MIT
| Doc PR        | ø

As described in #28275, all of the configuration can be done using yield and that we could remove the support for other ways (especially the obscure return `[['method', -10]]` syntax) as I believe this would clarify the configuration a lot.

Commits
-------

cf2ad861f5 Remove the "obscure" message subscriber configuration
2018-08-29 15:11:20 +02:00
Samuel ROZE
cf2ad861f5 Remove the "obscure" message subscriber configuration 2018-08-28 13:58:44 +01:00
Samuel ROZE
e3f1eecbc1 Bus argument is a required option when multiple buses are defined 2018-08-28 11:10:33 +02:00
Nicolas Grekas
2beda894f2 [Messenger] Don't make EnvelopeItemInterface extend Serializable 2018-08-24 15:12:11 +02:00