2012-08-21

Follow the white RabbitMQ - czyli rozproszone przetwarzanie zadań

Często gdy przetwarzanie pewnych zadań w PHP po akcji użytkownika zajmuje cenny czas i zmusza do oczekiwania na ponowne wyrenderowanie widoku, warto wydelegować je do zewnętrznego workera. Typowym przykładem jest np. wysyłka wiadomości e-mail po zarejestrowaniu użytkownika, czy generowanie plików pdf. Można w tym celu posłużyć się taskami wywoływanymi z CRONa, ale zwłaszcza w systemach o większej skali, to rozwiązanie nie jest wystarczające.

Godnym polecenia systemem, który rozwiąże problem jest RabbitMQ.
Integracja z PHP okazuje się nie być tak banalna jak by się to mogło wydawać (a to głównie z uwagi na delikatnie mówiąc kulejącą dokumentację biblioteki AMQP)

Zacznijmy od instalacji króliczka, która na moim developerskim ubuntu była bardzo prosta:
sudo apt-get install rabbitmq-server

Następnie za pomocą PECLa instalujemy rozszerzenie AMQP do PHP:
sudo pecl install amqp

w php.ini uzupełniamy:
extension=ampq.so

UWAGA użytkownicy DEBIANA! Powyższe kroki były wystarczjące gdy instalowałem amqp pod Ubuntu, jednak podczas próby powtórzenia ich pod debianem squeeze wystąpiły problemy z instalację z pecl'a (komunikat "configure: error: Please reinstall the librabbit-mq distribution")
Rozwiązaniem jest wykonanie:
git clone git://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
git submodule init
git submodule update
autoreconf -i
./configure
make
sudo make install

Naturalnie użytkownicy debiana squeeze mają do dyspozycji mega starą wersję servera rabbitmq, z którą nie będzie chciał gadać AMQP. Dlatego konieczne będzie wykonanie dodatkowych kroków:
Jeśli już zainstalowałeś starego rabbita - usuń go, aby uniknąć kolejnych przykrości :)

sudo apt-get remove rabbitmq-server --purge

następnie dodaj do /etc/apr/sources.list linijkę:
deb http://backports.debian.org/debian-backports squeeze-backports main

następnie update i instalacja backportu:
sudo apt-get update
sudo apt-get -t squeeze-backports install rabbitmq-server

Teraz należy skonfigurować RabbitMQ. Służy do tego konsolowy narząd rabbitmqctl, który dostajemy w paczce z królikiem. Po szczegóły odsyłam do dokumentacji rabbita, która w odróżnieniu od tej z PHP.net jest bardzo dobra i szczegółowa.

Dodajemy usera i host wirtualny, oraz ustawiamy uprawnienia:
sudo rabbitmqctl add_user butterfly butterfly
sudo rabbitmqctl add_vhost butterfly
sudo rabbitmqctl set_permissions -p butterfly butterfly ".*" ".*" ".*"

Warto jeszcze (przynajmniej na początku) włączyć logowanie dla naszego hosta i na jednej z konsol odpalić podgląd logu:
sudo rabbitmqctl trace_on -p butterfly
tail -f /var/log/rabbitmq/rabbit@Butterfly.log


Jesteśmy już gotowi do napisania klasy PHP, która będzie się komunikowała protokołem AMQP z serverem RabbitMQ:

/**
 * Class for AMQP Connections
 * 
 * @author gmotyl 
 */
class AMQPConnector {

  //Broker login credentials
  protected $login = "butterfly"
  protected $password = "butterfly";
  protected $vhost = "butterfly";
  
  //amqp connection variables
  private $amqpConnection;
  private $queue;
  private $exchange;
  private $routingKey;

  /**
   * Initializes amqpConnection and sets up queue and exchange
   * 
   * @param string $exchangeName
   * @param string $routingKey
   * @param string $queueName 
   */
  public function __construct($exchangeName, $routingKey, $queueName)
  {
    $this->amqpConnection = $this->connect();
    
    $channel = new AMQPChannel($this->amqpConnection);
    $this->exchange = new AMQPExchange($channel);
    
    $this->exchange->setName($exchangeName);
    $this->exchange->setType(AMQP_EX_TYPE_DIRECT);

    $this->queue = new AMQPQueue($channel);

    $this->queue->setName($queueName);
    $this->queue->declare();
    $this->queue->bind($exchangeName, $routingKey);    
    
    $this->routingKey = $routingKey;
  }
  
  public function __destruct() 
  {
    if(!$this->amqpConnection->disconnect()) {
      throw new Exception("Could not disconnect !");
    }
  }
  
  /**
   * Connects to broker
   * 
   * @return \AMQPConnection 
   */
  protected function connect() 
  {
    $amqpConnection = new AMQPConnection();

    $amqpConnection->setLogin($this->login);
    $amqpConnection->setPassword($this->password);
    $amqpConnection->setVhost($this->vhost);    
    $amqpConnection->connect();

    if(!$amqpConnection->isConnected()) {
      die("Cannot connect to the broker, exiting !");
    }

    return $amqpConnection;
  }
  
  /**
   * Returns Queue object
   * 
   * @return \AMQPQueue 
   */
  public function receiveQueue() 
  {
    return $this->queue;
  }
  
  /**
   * Publish a message to the exchange
   * Returns TRUE on success or FALSE on failure. 
   * 
   * @param string $text
   * @return bool 
   */
  public function sendMessage($text)
  {
    return $this->exchange->publish($text, $this->routingKey);;
  }
}

Brak komentarzy:

Prześlij komentarz