Często gdy przetwarzanie pewnych zadań w PHP po akcji użytkownika zajmuje cenny czas i zmusza do oczekiwania na ponowne wyrenderowanie widoku, warto wydelegować je do zewnętrznego workera. Typowym przykładem jest np. wysyłka wiadomości e-mail po zarejestrowaniu użytkownika, czy generowanie plików pdf. Można w tym celu posłużyć się taskami wywoływanymi z CRONa, ale zwłaszcza w systemach o większej skali, to rozwiązanie nie jest wystarczające.
Godnym polecenia systemem, który rozwiąże problem jest RabbitMQ.
Integracja z PHP okazuje się nie być tak banalna jak by się to mogło wydawać (a to głównie z uwagi na delikatnie mówiąc kulejącą dokumentację biblioteki AMQP).
Zacznijmy od instalacji króliczka, która na moim developerskim ubuntu była bardzo prosta:
sudo apt-get install rabbitmq-server
Następnie za pomocą PECLa instalujemy rozszerzenie AMQP do PHP:
sudo pecl install amqp
w php.ini uzupełniamy:
extension=ampq.so
UWAGA użytkownicy DEBIANA!
Powyższe kroki były wystarczjące gdy instalowałem amqp pod Ubuntu, jednak podczas próby powtórzenia ich pod debianem squeeze wystąpiły problemy z instalację z pecl'a (komunikat "configure: error: Please reinstall the librabbit-mq distribution")
Rozwiązaniem jest wykonanie:
git clone git://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
git submodule init
git submodule update
autoreconf -i
./configure
make
sudo make install
Naturalnie użytkownicy debiana squeeze mają do dyspozycji mega starą wersję servera rabbitmq, z którą nie będzie chciał gadać AMQP. Dlatego konieczne będzie wykonanie dodatkowych kroków:
Jeśli już zainstalowałeś starego rabbita - usuń go, aby uniknąć kolejnych przykrości :)
sudo apt-get remove rabbitmq-server --purge
następnie dodaj do /etc/apr/sources.list linijkę:
deb http://backports.debian.org/debian-backports squeeze-backports main
następnie update i instalacja backportu:
sudo apt-get update
sudo apt-get -t squeeze-backports install rabbitmq-server
Teraz należy skonfigurować RabbitMQ. Służy do tego konsolowy narząd rabbitmqctl, który dostajemy w paczce z królikiem. Po szczegóły odsyłam do dokumentacji rabbita, która w odróżnieniu od tej z PHP.net jest bardzo dobra i szczegółowa.
Dodajemy usera i host wirtualny, oraz ustawiamy uprawnienia:
sudo rabbitmqctl add_user butterfly butterfly
sudo rabbitmqctl add_vhost butterfly
sudo rabbitmqctl set_permissions -p butterfly butterfly ".*" ".*" ".*"
Warto jeszcze (przynajmniej na początku) włączyć logowanie dla naszego hosta i na jednej z konsol odpalić podgląd logu:
sudo rabbitmqctl trace_on -p butterfly
tail -f /var/log/rabbitmq/rabbit@Butterfly.log
Jesteśmy już gotowi do napisania klasy PHP, która będzie się komunikowała protokołem AMQP z serverem RabbitMQ:
/**
* Class for AMQP Connections
*
* @author gmotyl
*/
class AMQPConnector {
//Broker login credentials
protected $login = "butterfly"
protected $password = "butterfly";
protected $vhost = "butterfly";
//amqp connection variables
private $amqpConnection;
private $queue;
private $exchange;
private $routingKey;
/**
* Initializes amqpConnection and sets up queue and exchange
*
* @param string $exchangeName
* @param string $routingKey
* @param string $queueName
*/
public function __construct($exchangeName, $routingKey, $queueName)
{
$this->amqpConnection = $this->connect();
$channel = new AMQPChannel($this->amqpConnection);
$this->exchange = new AMQPExchange($channel);
$this->exchange->setName($exchangeName);
$this->exchange->setType(AMQP_EX_TYPE_DIRECT);
$this->queue = new AMQPQueue($channel);
$this->queue->setName($queueName);
$this->queue->declare();
$this->queue->bind($exchangeName, $routingKey);
$this->routingKey = $routingKey;
}
public function __destruct()
{
if(!$this->amqpConnection->disconnect()) {
throw new Exception("Could not disconnect !");
}
}
/**
* Connects to broker
*
* @return \AMQPConnection
*/
protected function connect()
{
$amqpConnection = new AMQPConnection();
$amqpConnection->setLogin($this->login);
$amqpConnection->setPassword($this->password);
$amqpConnection->setVhost($this->vhost);
$amqpConnection->connect();
if(!$amqpConnection->isConnected()) {
die("Cannot connect to the broker, exiting !");
}
return $amqpConnection;
}
/**
* Returns Queue object
*
* @return \AMQPQueue
*/
public function receiveQueue()
{
return $this->queue;
}
/**
* Publish a message to the exchange
* Returns TRUE on success or FALSE on failure.
*
* @param string $text
* @return bool
*/
public function sendMessage($text)
{
return $this->exchange->publish($text, $this->routingKey);;
}
}