publish message with custom queue options : flags | attributes
This commit is contained in:
parent
c6a2c3348f
commit
6f9fdaf7e4
@ -233,6 +233,24 @@ class ConnectionTest extends TestCase
|
|||||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
|
||||||
$connection->publish('body');
|
$connection->publish('body');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testPublishWithQueueOptions()
|
||||||
|
{
|
||||||
|
$factory = new TestAmqpFactory(
|
||||||
|
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
|
||||||
|
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
|
||||||
|
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
|
||||||
|
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||||
|
);
|
||||||
|
$headers = [
|
||||||
|
'type' => '*',
|
||||||
|
];
|
||||||
|
$amqpExchange->expects($this->once())->method('publish')
|
||||||
|
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => $headers]);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[flags]=1', [], true, $factory);
|
||||||
|
$connection->publish('body', $headers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestAmqpFactory extends AmqpFactory
|
class TestAmqpFactory extends AmqpFactory
|
||||||
|
@ -133,7 +133,10 @@ class Connection
|
|||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, AMQP_NOPARAM, ['headers' => $headers]);
|
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
|
||||||
|
$attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
|
||||||
|
|
||||||
|
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user