Подписка

Очереди сообщений, AMQP, RabbitMQ

Использование очередей сообщений в архитектуре распределенных систем довольно распространенная техника разделения большой системы на компоненты.

Очереди, простой и в тот же час масштабируемый инструмент, позволяющий "подружить" независимые системы и научить их работать совместно.

Их задача предоставить возможность различным подсистемам обмениваться сообщениями обеспечивая маршрутизацию, гарантированную доставку и масштабирование.

Ниже пойдет речь о самых простых очередях сообщений построенных на основе БД, стандарте AMQP и отличной системе управления очередями RabbitMQ.

Кому нужны эти очереди?

Проблема соединения независимых компонентов в одно логическое целое появляется одновременно с ростом количества компонентов. При небольших нагрузках можно обойтись табличкой в базе данных или даже возможностями файловой системы. Но в какой-то момент сделанные напильником методы перестают работать или же масштабирование приносит больше головной боли чем результатов.

Давайте посмотрим на примере как можно создать примитивную очередь используя табличку в базе данных и проанализируем какие проблемы возникнут при необходимости увеличить активность раз так в 1000000.

Итак, поставим задачу: написать индексатор веб сайтов -- систему которая будет ходить по ссылкам указанного сайта и сохранять заголовок/url страницы, а также картинки размером больше чем 10х10 учитывая возможность дубликатов.

Исходя из задачи систему можно разделить на два основных компонента: вебсканер и анализатор ресурсов, а последний в свою очередь может быть разбит на анализатор страниц и анализатор картинок.

Вебсканер - на входе получает url страницы, сканирует ее и возвращает внутренние ссылки и картинки, при этом передавая текущую страницу анализатору страниц, а картинки анализатору картинок соответственно.

Анализатор страницы - проверят нету ли страницы с аналогичным url в индексе, и если нет, получает заголовок (ограничимся содержимым тега title) и сохраняет.

Анализатор картинки - проверят нету ли картинки в индексе и соответствует ли размер требованиям. Если все критерии удовлетворительны изображение помещается в индекс.

Так как мы разрабатываем систему которая в будущем будет масштабироваться, нужно заложить возможность работы сервисов (вебсканнера и анализаторов) на разных серверах и возможно несколько инстансов одновременно (например 3 сканера и 5 анализаторов).

Начнем с написания сканера. Для получения ссылок и картинок пойдем простым путем и воспользуемся evaluate методом DOMXPath, который возвращает все теги по xpath путю.

class WebScanner extends Web { 
 protected function getPageElements($tagName, $attributeName) {
  $elements = $this->getXPath()->evaluate('/html/body//'.$tagName);
 
  $elementsList = array();
  for ($i = 0; $i < $elements->length; $i++) {
   $elementsList[] = $elements->item($i)->getAttribute($attributeName);
  }

  return $elementsList;
 }

 public function getAllLinks() {
  return $this->getPageElements('a', 'href');
 }

 public function getAllImages() {
  return $this->getPageElements('img', 'src');
 }
}

Таким образом метод getPageElements получает все теги указанного типа и добавляет значение интересующего нас атрибута в возвращаемый массив. В результате мы можем получить все значения href ссылок и значения src картинок.

Так как классы-анализаторы тоже возможно будут работать с dom деревом, я вынес метод getXPath в базовый класс, который все остальные классы будут наследовать и таким образом унаследуют метод получения DOMXPath.

Перейдем к анализатору страницы. Его задача проверить наличие ссылки в базе данных и если если нужно добавить ссылку и title страницы.

class PageAnalyzer extends Web {
 public function analyze() {
  if (!Storage::getInstance()->containsPage($this->url)) {
   $title = $this->getDom()->getElementsByTagName("title")->item(0)->textContent;
   Storage::getInstance()->addPage($title, $this->url);
  }
 }
}

Метод analyze проверяет есть ли текущая страница в индексе, если нет, то используя DomDocument получает title страницы и добавляет в базу данных.

Кроме анализатора появился класс для работы с базой данных Storage -- простой синглтон-обертка mysqli.

class Storage {
 protected static $instance = null;
 protected $mysqli;
 protected static $PAGES_TABLE = 'pages';
 protected static $IMAGES_TABLE = 'images';

    public static function getInstance() {
        if (static::$instance == null) {
            static::$instance = new static();
        }
        return static::$instance;
    }

    private function __construct() {
     $this->mysqli = new mysqli('127.0.0.1', 'root', '', 'blog_test');
     $this->mysqli->set_charset('utf8');
    }

    protected function containsUrl($tableName, $url) {
     return ($this->mysqli->query("SELECT COUNT(*) as cnt FROM ".$tableName." WHERE url='".$url."'")->fetch_object()->cnt > 0);
    }

    public function containsPage($url) {
     return $this->containsUrl(static::$PAGES_TABLE, $url);
    }

    public function containsImage($url) {
     return $this->containsUrl(static::$IMAGES_TABLE, $url);
    }

    public function addPage($title, $url) {
     $this->mysqli->query("INSERT INTO ".static::$PAGES_TABLE." (title,url)VALUES('".$title."','".$url."')");
    }

    public function addImage($url) {
     $this->mysqli->query("INSERT INTO ".static::$IMAGES_TABLE." (url)VALUES('".$url."')");
    }
}

Работа с базой данных не является целью поста, поэтому его доводить до совершенства не будем, для нас достаточно возможности читать и писать в базу.

База данных (она же в нашем случае индекс) будет состоять из двух таблиц: pages - индекс страниц, images - индекс картинок. Для создания таблиц выполним:

CREATE TABLE `pages` (
  `url` VARCHAR(255) NOT NULL,
  `title` VARCHAR(255) NOT NULL,
  PRIMARY KEY (`url`),
  KEY `title` (`title`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;
CREATE TABLE `images` (
  `url` VARCHAR(255) NOT NULL,
  PRIMARY KEY (`url`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

И наконец анализатор картинок с единственным критерием проверки -- картинка должна быть 10х10 или больше.

class ImageAnalyzer extends Web {
 protected static $ALLOWED_WIDTH = 10;
 protected static $ALLOWED_HEIGHT = 10;

 protected function isValid() {
  $size = getimagesize($this->url);
  return ($size[0] >= static::$ALLOWED_WIDTH && $size[1] >= static::$ALLOWED_HEIGHT);
 }

 public function analyze() {
  if (!Storage::getInstance()->containsImage($this->url)) {
   if ($this->isValid()) {
    Storage::getInstance()->addImage($this->url); 
   }
  }
 } 
}

Если картинки нету в индексе, метод isValid проверяет размер и добавляет в базу при удовлетворении критерия.

Как вы наверно заметили оба класса наследуют класс Web, который содержит базовые методы для работы с данными:

abstract class Web {
 protected $url;
 protected $xPath = null;
 protected $dom = null;

 public function __construct($url) {
  $this->url = $url;
 }

 protected function getDom() {
  if ($this->dom == null) {
   $dom = new DOMDocument();
   $dom->loadHTML(file_get_contents($this->url)); 
  }
  return $dom;
 }

 protected function getXPath() {
  if ($this->xPath == null) {
   $this->xPath = new DOMXPath($this->getDom());
  }

  return $this->xPath;
 }
}

Все эти 3 компонента могут работать независимо на разных серверах общаясь между собой. Cканер страниц в процессе работы будет передавать ссылки анализатору страниц и картинки анализатору картинок, которые в свою очередь будут добавлять их в индекс.

Непосредственно момент "передачи" данных рассмотрим более детально.

Самый простой вариант который приходит в голову -- создать таблицу queue, которая будет содержать два типа ссылок - картинки и страницы, что-то наподобие:

CREATE TABLE `queue` (
  `url` varchar(255) NOT NULL,
  `type` enum('page','image') DEFAULT NULL,
  PRIMARY KEY (`url`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

При попадании на новую ссылку сканер будет добавлять ее в таблицу, откуда она позже будет считана анализаторами (и если нужно сканером), таким образом формируя "хаб" для объединения сервисов.

Но тут же получаем узкое, плохо масштабируемое место, которое может стать сильной проблемой при увеличении количества запросов. Шардинг решения построенного на основе БД будет довольно сложным из-за трудности реализации балансировки данных между шардами. А так как MySQL не поддерживает шардинг из коробки, эту логику прийдется включать в класс Storage, таким образом сильно усложняя его. И все это только для того чтобы научить компоненты общаться между собой.

Но не мы первые столкнулись с подобной проблемой) Именно для задач такого рода удобно использовать очереди сообщений.

Начнем с теории: AMQP в PHP

AMQP -- протокол предназначенный для систем обмена сообщениями и описывающий характеристики и функции сообщений, очередей, роутинга, доступности и безопастности, а также схемы поведения сервера сообщений (брокера) и клиента (аналогично протоколам HTTP, FTP и др.).

Таким образом это некая спецификация того как должны общаться между собой брокер и клиент -- формат и тип сообщений, метод передачи данных и т.д. AMQP совместимость избавляет от привязки клиента к конкретному брокеру, так как клиент поддерживающий AMQP может общаться з любым совместимым брокером. Поэтому разочаровавшись в RabbitMQ можно легко попробовать счастья с ActiveMQ, изменив только конфигурацию соединения с брокером.

В PHP функции для работы с AMQP совместимыми брокерами реализованы в виде 5 классов (расширение amqp, которое мы установили):

  • AMQPConnection - класс для реализации соединения с брокером
  • AMQPChannel - работа с каналами передачи данных
  • AMQPExchange - отправка сообщений
  • AMQPQueue - получение сообщений
  • AMQPEnvelope - класс сообщения

Установим RabbitMQ

На убунте все делается как всегда просто:

  1. Подключаем репозиторий (детально описано тут) и выполняем:
    apt-get install rabbitmq-server
    Впринципе можно обойтись без подключения внешнего репозитория и пользоваться стандартным убунтовским (он тоже содержит rabbitmq-server), но версия с официального сервера включает дополнительные плюшки, как например веб-интерфейс для управления очередью.
  2. Pecl расширения для клиента:
    Сначала нужно установить зависимость rabbitmq-c (инструкции взяты с http://www.php.net/manual/en/amqp.installation.php):
    apt-get install pkg-config automake autoconf libsigc++-2.0-dev 
    git clone git://github.com/alanxz/rabbitmq-c.git
    cd rabbitmq-c
    # Enable and update the codegen git submodule
    git submodule init
    git submodule update
    # Configure, compile and install
    autoreconf -i && ./configure && make && sudo make install 

    Если хотите установить web UI, выполняем:

    rabbitmq-plugins enable rabbitmq_management

    После чего установим само расширение:

    pecl install amqp

Чтобы проверить что все работает напишем два тестовых скрипта: один будет отправлять сообщение в очередь, а другой считывать.

Отправка сообщений:

rabbitpost.php
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();

$testChannel = new AMQPChannel($rabbit);
$testExchange = new AMQPExchange($testChannel);

$testExchange->setName('amq.direct');
$testExchange->publish('Hello buddy!', 'route_to_everybody');

$rabbit->disconnect();

Итак, что же мы сделали? Сначала открыли соединение с брокером создав объект AMQPConnection и передав конструктору параметры соединения. Затем открыли новый канал(AMQPChannel), подключились к точке обмена сообщениями "amq.direct" и отправили сообщение с текстом "Hello buddy!" и ключом роутинга "route_to_everybody". Ключи используются для маршрутизации сообщений в разные очереди для обработки различными подсистемами.

Получение сообщений:

rabbitread.php
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();

$channel = new AMQPChannel($rabbit);

$q = new AMQPQueue($channel);
$q->setName('direct_messages');
$q->declare();
$q->bind('amq.direct', 'route_to_everybody');

$envelope = $q->get();
if ($envelope) {
    print_r($envelope);
    $q->ack($envelope->getDeliveryTag());
}

$rabbit->disconnect();

Аналогично предыдущему скрипту, мы создали новые соединение и канал, после чего создали новую очередь, которой указали проверять точку обмена "amq.direct" и доставлять все сообщения с ключом маршрутизации "route_to_everybody". Дальше мы пробуем получить сообщение из очереди, и в случае успеха выводим его в консоль.

Важным нюансом при работе с очередями сообщений является отправка acknowledge сообщений ack и nack. Ack с идентификатором доставленного сообщения отправляется в случае успешной обработки сообщения и значит, что сообщение может быть удалено с очереди, тогда как nack отправляется в случае когда обработчик по какой-то причине не справился и сообщение должно остаться в очереди (возможно будет успешно обработано другим обработчиком). До тех пор пока ack или nack не отправлен сообщение блокируется и переводится в состояние обработки -- уже не отправляется другим клиентам, но еще и не удалено из очереди.

Если выполним:

php rabbitpost.php

А затем

php rabbitread.php

Получим обьект сообщения:

AMQPEnvelope Object
(
    [body] => Hello buddy!
    [content_type] => text/plain
    [routing_key] => route_to_everybody
    [delivery_tag] => 1
    [delivery_mode] => 0
    [exchange_name] => amq.direct
    [is_redelivery] => 0
    [content_encoding] =>
    [type] =>
    [timestamp] => 0
    [priority] => 0
    [expiration] =>
    [user_id] =>
    [app_id] =>
    [message_id] =>
    [reply_to] =>
    [correlation_id] =>
    [headers] => Array()
)

Надеюсь тепер механика работы с очередями сообщений стала простой и понятной, попробуем реализовать на нашем примере с индексатором.

Для каждого типа сообщений создадим отдельную очередь:

  1. Ссылки которые нужно проиндексировать WebScanner'у
  2. Страницы которые нужно проанализировать PageAnalyzer'у
  3. Картинки которые нужно проанализировать ImageAnalyzer'у

Таким образом сканер при обнаружении непроиндексированной ссылки на другую страницу будет добавлять ее в очередь с ключом маршрутизации "scan", текущую, уже приндесированную страницу в "analyze_page", а если на странице найдены картинки, то все одни будут отправлены в "analyze_image".

Посмотрим что получилось.

scanner.php
include 'web.php';
include 'webscanner.php';
include 'storage.php';

error_reporting(E_ERROR);
libxml_use_internal_errors(true);
/**
 * Домен за рамки которого не нужно выходить (мы же не хотим весь интернет сканировать)
 */
$domain = 'phphighload.com';
$imageDomain = 'dropbox.com';
/**
 * Подключаемся к брокеру и точке обмена сообщениями
 */
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPExchange($channel);
$queue->setName('amq.direct');
/**
 * Добавляем очередь откуда будем брать страницы, которые нужно проиндексировать
 */
$q = new AMQPQueue($channel);
$q->setName('pages_to_scan');
$q->declare();
$q->bind('amq.direct', 'scan');
/**
 * Индексируем, пока в очереди не закончатся сообщения
 */
while ($page = $q->get()) {
 if (!is_object($page)) {
  continue;
 }
 $url = $page->getBody();
 echo "Scanning: $url\n";
 $scanner = new WebScanner($url);
 $links = $scanner->getAllLinks();
 foreach ($links as $link) {
  /**
    * Если страница относится к указанному домену и еще не была проиндексирована -- добавляем ее в очередь на индексацию
    */
  if (strpos($link, $domain) !== FALSE && !Storage::getInstance()->containsPage($link)) {
   $queue->publish($link, 'scan');
  }
 }
 /**
  * Также если на странице есть картинки добавляем их для анализа
  */
 $images = $scanner->getAllImages();
 foreach ($images as $image) {
  /**
    * Если картинка относится к указанному домену и еще не была проанализирована -- добавляем ее в очередь
    */
  if (strpos($image, $imageDomain) !== FALSE && !Storage::getInstance()->containsImage($image)) {
   $queue->publish($image, 'analyze_image');
  }
 }
 /**
  * Текущую страницу тоже в очередь для анализации
  */
 $queue->publish($url, 'analyze_page');
 $q->ack($page->getDeliveryTag());
}

$rabbit->disconnect();

Сканер подключается к очереди содержащей страницы ожидающие индексации и начинает их обработку. Если на странице обнаружена ссылка или картинка в рамках текущего домена, они добавляются в соответствующие очереди.

Теперь посмотрим как будут выглядеть PageAnalyzer и ImageAnalyzer :

analyze_page.php
include 'web.php';
include 'page_analyzer.php';
include 'storage.php';

error_reporting(E_ERROR);
libxml_use_internal_errors(true);
/**
 * Подключаемся к брокеру и точке обмена сообщениями
 */
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPExchange($channel);
$queue->setName('amq.direct');
/**
 * Добавляем очередь откуда будем брать страницы
 */
$q = new AMQPQueue($channel);
$q->setName('pages_to_scan');
$q->declare();
$q->bind('amq.direct', 'analyze_page');
/**
 * Обрабатываем пока в очереди не закончатся сообщения
 */
while (true) {
 $page = $q->get();
 if ($page) {
  $url = $page->getBody();
  echo "Parsing: $url\n";
  $analyzer = new PageAnalyzer($url);
  /**
   * Если страница еще не была проанализирована, обрабатываем и добавляем в индекс
   */
  $analyzer->analyze(); 
  $q->ack($page->getDeliveryTag());
 } else sleep(1);
}

$rabbit->disconnect();
analyze_image.php
include 'web.php';
include 'image_analyzer.php';
include 'storage.php';
/**
 * Подключаемся к брокеру и точке обмена сообщениями
 */
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPExchange($channel);
$queue->setName('amq.direct');
/**
 * Добавляем очередь откуда будем брать страницы
 */
$q = new AMQPQueue($channel);
$q->setName('images_to_scan');
$q->declare();
$q->bind('amq.direct', 'analyze_image');
/**
 * Обрабатываем пока в очереди не закончатся сообщения
 */
while (true) {
 $image = $q->get();
 if ($image) {
  $url = $image->getBody();
  echo "Checking: $url\n";
  $analyzer = new ImageAnalyzer($url);
  /**
   * Если картинка еще не была проанализирована, обрабатываем и добавляем в индекс
   */
  $analyzer->analyze();
  $q->ack($image->getDeliveryTag());
 } else sleep(1);
}

$rabbit->disconnect();

Аналогично сканеру -- подключаемся к очереди и обрабатываем сообщения используя соответствующие классы.

Так как наш сканер обрабатывает только ссылки уже находящиеся в очереди, для того чтобы начать индексацию сайта необходимо записать в очередь стартовую страницу:

init.php
if (!isset($argv[1])) {
 die('Page url should be specified');
}

$url = $argv[1];
/**
 * Подключаемся к брокеру и точке обмена сообщениями
 */
$rabbit = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPExchange($channel);
$queue->setName('amq.direct');
/**
 * Добавляем стартовую страницу в очередь для индексирования
 */
$q = new AMQPQueue($channel);
$q->setName('pages_to_scan');
$q->declare();
$q->bind('amq.direct', 'scan');

$queue->publish($url, 'scan');

Ну что, по-тестируем

Запустим в разных терминалах анализаторы:

php analyze_page.php

и

php analyze_image.php

Которые будут ждать появления новых элементов для сканирования в очереди.

И выполнив в третьем:

php init.php http://phphighload.com && php scanner.php

мы запустим цепную реакцию индексирования сайта phphighload.com и в результате получим список всех страниц и картинок в рамках домена.

Для тех кто устанавливал из официального репозитория и не поленился настроить UI, можно наблюдать за работой очереди перейдя по ссылке: http://rabbit_server_ip:15672/ (стандартные логин/пароль guest).

Если немного подождать, то проверив таблицу pages получим страницы сайта с названиями, а в images все картинки которые лежат на Дропбоксе размером больше 10х10.

Конечно же сам индексатор можно сделать более умным, чтобы он не зацикливался, не добавлял дубликаты в очередь для индексирования, но это тема другой статьи).

Такая распределенная архитектура хороша тем, что поддерживает горизонтальное масштабирование практически неограниченного размера. При необходимости всегда можно добавить новые сервера занимающиеся только анализом изображений или только сканированием страниц, соединенные в одну систему при помощи очереди сообщений, которая в свою очередь тоже легко масштабируется и с коробки умеет работать на нескольких серверах, быстро обрабатывая огромное количество сообщений.

Надеюсь у меня получилось показать насколько удобны и универсальны очереди для построения распределенных систем. Если у вас есть дополнения, замечания или коррекции, всегда рад пообщаться в комментариях).

P.S. Кому интересно запустить "индексатор" у себя, исходники примера выложил на github: https://github.com/kooler/phphighload.com/tree/master/rabbitmq

В тему:

Если пост понравился - нажмите на +1 - мне будет приятно.

@kkooler

@kkooler

Занимаюсь разработкой высоконагруженных проектов и распределенных систем на PHP.
В свободное время разрабатываю нано-проекты:

Следить за блогом

RSS канал Twitter