Простой пример реализации очереди на PHP
В предыдущей статье я объяснил, что такое очередь, как она работает, и на абстрактном примере показал, что из себя представляет. В экосистеме PHP существует множество готовых реализаций клиентов очередей. Эта статья будет посвящена практической части работы с очередью. А использовать я буду простую, стабильную и быструю очередь. Сегодня вы увидите пример очереди задач на php, пример добавления заданий, удаления, обработки.
Для себя, вы можете представлять, что очередь является обычной базой данных, к которой можно выполнять множество запросов, к любым таблицам.
В предыдущей статье, в примере было 3 очереди: images
, videos
, emails
. Потому, чтобы не выпадать из общей концепции статьи, реализуем эти очереди на практике.
Добавление задачи в очередь, мало чем отличается от INSERT
-а в БД. Большинство очередей работают по принципу "первый прибыл, первый ушёл". То есть, сохраняя полную последовательность вызовов, сортируя по времени добавления задачи. Когда добавляется первая задача в очередь, она обрабатывается, потом, вторую, третью, и т.д. пока не выполнит все задачи из списка.
Отличие от базы данных - это то, что после выполнения, задача удаляется из очереди.
Одной из самых замечательных вещей является то, что очереди позволяют "резервировать" задание в очереди. Это значит, что, в случае, если очередь отслеживают два воркера, и один из них взял задание на выполнение, то, второй уже не может взять то же самое задание. Тут можно провести аналогию с "блокировкой файлов" - когда ставится флаг, который запрещает перезаписывать файл, пока его кто-то редактирует, или читает. Это важно понимать: одна задача выполняется одним воркером.
Что такое воркер?
Воркером может быть что угодно! Он может быть реализован на любом языке программирования (PHP, C, Ruby, Python), главное - чтобы он позволял подключаться, и общаться с самой очередью. Для PHP есть замечательная библиотека, которую я настоятельно рекомендую испробовать. Воркер - это интерфейс, который позволяет общаться с самой очередью. То есть, у вас должна быть установен драйвер очереди, и сама библиотека для работы с очередью.
Рассмотрим простой пример работы воркера:
use Pheanstalk\Pheanstalk;
// подключимся к очереди
$worker = new Pheanstalk('127.0.0.1');
// этот воркер работает над очередью изображений, и создаёт превью
$worker->watch('images');
// поиск задач в очереди
if ($job = $worker->reserve()) {
// здесь будет код по обработке изображений
}
Код выше просто показывает, как в PHP подключиться к очереди, используя библиотеку Pheanstalk, как настроить очередь images
и доставать доступные задачи из очереди. Скрипт последовательно извлекает задачи, выполняет, и когда задач не останется - завершает своё выполнение.
Однако, в коде выше есть небольшой нюанс. Если запустить этот код, и в нём будет задание, то это задание будет циклично повторно выполняться. Эта задача никогда не будет удалена из очереди. Потому, чтобы удалить задачу из очереди изменим код:
use Pheanstalk\Pheanstalk;
// подключаемся к очереди
$worker = new Pheanstalk('127.0.0.1');
// этот воркер работает над очередью изображений, и создаёт превью
$worker->watch('images');
// поиск задач в очереди
if ($job = $worker->reserve()) {
// здесь будет код по обработке изображений
//после того, как задачу будет выполнена, удалить её из очереди
$worker->delete($job);
}
Запуск воркера
Для одноразового воркера достаточно обычного запуска скрипт (в консоли, или перехода по ссылке в браузере). Однако, этот вариант не подходит, потому что каждый раз вручную запускать скрипт - это, как минимум надоедает.
Потому, есть несколько вариантов.
Первый - можно поставить на крон (скажем, выполнять задачу каждые 5 минут). И каждые 5 минут будут обрабатываться все задачи, добавленные ранее в очередь. Этот вариант рабочий, однако, недостаток в том, что задачи будут выполняться не сразу, а только в течении 5 минут.
Второй варианта - это зациклить скрипт на бесконечное выполнение. Этот подход позволяет один раз запустить скрипт, который, в цикле, с определённым интервалом будет проверять новые задачи, и их выполнять.
Или же, более продвинутый вариант, и рекомендуемый вариант, используя supervisor. Ранее, я уже демонстрировал работу с supervisor-ом при работе с вебсокетами laravel. Его очень просто настраивать, и запускать. Проблема, которую он решает, заключается в том, что он следит за запущеными процессами, и, если какой-то из них прерывает своё выполнение, то он, тут же пытается перезапустить его. Тем самым, supervisor следит, чтобы скрипт был всегда запущен.
Как сделать бесконечный цикл while
Предыдущий пример был рабочий, но он выполнял только те задачи, которые были добавлены ранее. В этом разделе я напишу скрипт, который сможет выполнять задачи добавленные во время того, когда скрипт уже был запущен.
Теперь код будет выглядеть так:
use Pheanstalk\Pheanstalk;
// подключимся к очереди
$worker = new Pheanstalk('127.0.0.1');
// этот воркер работает над очередью изображений, и создаёт превью
$worker->watch('images');
while (true) {
// поиск задач в очереди
if (!$job = $worker->reserve()) {
// если задач пока нету, пропускаем итерацию
// "засыпаем" на 1 секунду
sleep(1);
continue;
}
// здесь будет код по обработке изображений
//после того, как задачу будет выполнена, удалить её из очереди
$worker->delete($job);
// возвращаемся в начало, к следующей задаче
}
Это базовый пример, который должен был показать, что внутри очередей всё не так страшно, как может показаться.
Добавление задач в очередь
Предыдущие примеры были слишком абстрактными. Рассмотрим на примере, приближенном к боевым условиям. Представим, что пользователь присылает изображения, и нужно их принять и обработать:
use Pheanstalk\Pheanstalk;
// подключимся к очереди
$worker = new Pheanstalk('127.0.0.1');
// ...
// код сохранения изображений на сервер
foreach ($_FILES as $image) {
$data = [
'path' => $image['tmp_name'],
'name' => $image['name'],
];
// добавляем задачу на обработку изображения в очередь
$pheanstalk
->useTube('images') //название выше созданной очереди images
->put(json_encode($data)); // полезные, данные, которые потребуются обработчику
}
И добавив немного кода к базовому примеру, мы уже имеем рабочий скрипт, который сможет успешно сохранить изображения, и добавить их в очередь на обработку. Теперь, если пользователь загрузит даже 1000 изображений, то каждой из картинок будет создана задача в очереди.
А уже, внутри самого обработчика задачи, мы получим массив $data
, которые был передан в метод put
.
Данные в метод виде JSON-а, т.к. скрипт принимает строку. А JSON самый удобный формат передачи данных объектов, или массивов.
В итоге, данные такого формата будут получены: {"path": "/tmp/lorem", "name": "new_image.jpg"}
.
Получение данных, и обработка внутри воркера
Ввиду того, что мы получаем JSON в обработчике, то, его нужно декодировать. Для этого, напишем скрипт, который будет доставать переданную информацию:
use Pheanstalk\Pheanstalk;
// подключимся к очереди
$worker = new Pheanstalk('127.0.0.1');
// этот воркер работает над очередью изображений, и создаёт превью
$worker->watch('images');
while (true) {
// поиск задач в очереди
if (!$job = $worker->reserve()) {
// если задач пока нет, пропускаем итерацию
// "засыпаем" на 1 секунду
sleep(1);
continue;
}
$data = json_decode($job->getDat(), true);
// здесь будет код по обработке изображений
//после того, как задачу будет выполнена, удалить её из очереди
$worker->delete($job);
// возвращаемся в начало, к следующей задаче
}
В примере
$data
обрабатывается функциейjson_decode()
, потому что мы уверены, что данные приходят в формате JSON
Резюме
Api этой библиотеки содержит много функций, позволяющих работать с очередью, однако это уже выходит за рамки этой статьи. Основная цель, которую я преследовал - показать, как работать с очередью на PHP. Сегодня вы узнали, как работать с очередью, почему очереди стоит использовать, их преимущества, и, как добавить, и извлечь информацию из очереди.
Как всегда, для того, чтобы хорошо усвоить материал, нужно попробовать создать своими руками то, что мы проделали в этой статье. Пишите код, и понимание быстро придёт. Удачи!