•   Новости. Советы. Уроки.
Блог / Советы и уроки / Работа с событиями в Laravel. Асинхронная обработка очереди.

Работа с событиями в Laravel. Асинхронная обработка очереди.

Виталий Николенко
17.03.2016

Эта статья является продолжением статьи Работа с событиями в Laravel. Рассылка push уведомлений при публикации статьи.

В ней мы ознакомились с механизмом событий в Laravel, на примере отправки push-уведомления при публикации поста в блоге. Код который мы произвели на свет оказался не идеален:

1) отправка у нас происходит в реальном времени при сохранении модели, если добавятся более тяжелые и медленные операции, до сохранения не дойдет и все упадет.

2) при записи статуса отправки мы не учитываем ответ сервиса, если сервис откажет в отправке, мы статью посчитаем обработанной и больше по ней пытаться отправить уведомления не будем.

Давайте исправлять ситуацию.

Переосмысливаем архитектуру.

Очевидно мы избрали не самый верный путь. Нам необходима асинхронная обработка нашего события. Для этого событие нужно поставить в очередь. Подробнее об очередях можно почитать в официальной документации.

Для работы с очередями у Laravel есть несколько драйверов для специально-обученного ПО: Beanstalkd, Amazon SQS, Redis. Но есть и родной механизм для использования БД. На нем и остановимся, как на самом простом варианте, не требующим установки ПО и использования внешних сервисов.

Настройка.

Укажем в конфиге, что хотим обрабатывать очереди через БД, в .env пропишем QUEUE_DRIVER=database

Создадим и выполним миграцию для создания таблиц очереди.

php artisan queue:table

php artisan migrate

В БД должна появиться таблица jobs, в которой будут храниться наши события.

Поставить событие в очередь в Laravel проще простого.

Необходимо лишь чтобы наш слушатель реализовал интерфейс Illuminate\Contracts\Queue\ShouldQueue.

Таким образом: class PostActionsListener implements ShouldQueue

Тестируем

Напишем такой метод для проверки получает ли очередь задание

public  function testQueueEvent()
{

    Queue::shouldReceive('push')->once();
    $post  = factory(App\Models\Post::class)->create();
    $post->delete();

}    

Заодно добавим в самое начало тест на простую проверку сохранения поста, у нас такого не было.

public  function testPostCreate()
{

    $post  = factory(App\Models\Post::class)->create();
    $this->seeInDatabase('post', ['title' => $post->title]);
    $post->delete();

}

phpunit и... упс..

1) PostCreateTest::testPostCreate Illuminate\Database\Eloquent\ModelNotFoundException: No query results for model [App\Models\Post].

Что это? и почему произошло?

Суть ошибки в том, что модель не может быть найдена и судя по стеку

/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Queue/SerializesModels.php:67
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Queue/SerializesModels.php:41
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Events/CallQueuedHandler.php:42
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:129
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Queue/Jobs/SyncJob.php:44
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Queue/SyncQueue.php:30
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Queue/QueueManager.php:256
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php:416
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php:347
/home/vagrant/blog/vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php:221

Это происходит при сериализации модели для очереди.

Вот в чем загвоздка! Мы же бросаем событие перед сохранением объекта в БД. Давайте срочно это исправим, тем более, что сама идея обращаться в сторонние сервисы по API перед сохранением объекта - довольно рискованная.

Меняем saving на saved. Заодно убираем установку статуса публикации. Она переедет в другое место.

public static function boot()
{
    static::saved(function($instance)
    {
       //Мы проверяем статус статьи – если он «Опубликован», смотрим на статус оповещения, если он еще не «Опубликован»
    if ($instance->status == self::STATUS_PUBLISHED 
        && $instance->notify_status < self::STATUS_PUBLISHED){
        //и  «выстреливаем» событие PostPublishedEvent, передавая в него собственный инстанс.
        \Event::fire(new PostPublishedEvent($instance));
    });
    parent::boot();
}

Запускаем phpunit, все отлично OK (5 tests, 3 assertions)

Можно сохранить реальный пост со статусом "Опубликован" и посмотреть, что записалось в таблицу jobs

Обработка очереди

Теперь необходимо обработать задание из очереди. Согласно документации сделать это можно либо с помощью запуска обработчика или демона, который будет в реальном времени (либо с указанным таймаутом) обрабатывать поступающие задания php artisan queue:listen, либо вызывать обработку одного задания php artisan queue:work

Поступим вторым способом, потому что событий у нас будет немного, мониторить "не умер ли давно обработчик наш" не хочу, а демонов на php я боюсь ))

Так что давайте будем запускать php artisan queue:work каждые 5 минут и все.

Настройка задач по расписанию

В Laravel есть замечательний шедулер, позволяющий практически мгновенно добавить новую задачу для выполннения по расписанию, при этом не надо помнить эти ужасные крон-выражения, почти все ситуации описываются с помощью "говорящих методов".

Но для начала надо один раз в cron добавить выполнение самого шедулера раз в минуту. crontab -e Добавляем:

* * * * *  /usr/bin/php   /path/to/artisan schedule:run >> /dev/null 2>&1

Теперь идем в app/Console/Kernel.php И добавляем задание:

protected function schedule(Schedule $schedule)
{
    $schedule->command('queue:work')->everyFiveMinutes();
}

Тесты

И тут

Похоже полноценно протестировать очереди простыми средствами не получится. Придется тестировать функционально, ручками.

Но как же быть с уведомлениями, которые свалятся всем? В API onesignal никакого тестового режима я не нашел. Поэтому можно поступить одним из следующих образов:

  • Опубликовать тест, с датой публикации в будущем. Зайти в админку onesignal и удалить из очереди задание.
  • Завести признак рассылки уведомлений в статьях (Не рассылать/Только мне/Всем)
  • Завести параметр в конфиге.

Поступим последним образом. Добавим в config/onesignal.php 'is_test' => env('ONESIGNAL_IS_TEST',1) В .env переопределение параметра ONESIGNAL_IS_TEST=1

И внесем правки в app/Handlers/OneSignalHandler.php Заодно переделаем его единственный метод в static

<?php

namespace App\Handlers;

use anlutro\cURL\cURL;
use App\Models\Post;

class OneSignalHandler
{

    public  static function sendNotify(Post $post, $test=false)
    {

        $config = \Config::get('onesignal');

        //check if app id is defined
        if (!empty($config['app_id'])) {

            $data = array(
                'app_id' => $config['app_id'],
                'contents' =>
                    [
                        "en" => $post->short_text

                    ],
                'headings' =>
                    [
                        "en" => $post->title
                    ],
                'isAnyWeb' => true,
                'chrome_web_icon' => $config['icon_url'],
                'firefox_icon' => $config['icon_url'],
                'url' => $post->link

            );

            if ($test || $config['is_test'])
            {
                $data['include_player_ids'] = [$config['own_player_id']];
            } else {
                $data['included_segments'] =  ["All"];
            }

            //add future date if needed
            if (strtotime($post->publish_date) > time()) {
                $data['send_after'] = date(DATE_RFC2822, strtotime($post->publish_date));
                $data['delayed_option'] = 'timezone';
                $data['delivery_time_of_day'] = '10:00AM';
            }

            $curl = new cURL();
            $req =  $curl->newJsonRequest('post',$config['url'], $data)->setHeader('Authorization', 'Basic '.$config['api_key']);
            $result = $req->send();
            if ($result->statusCode <> 200) {
                \Log::error('Unable to push to Onesignal', ['error' => $result->body]);
                return false;
            }

            $result = json_decode($result->body);
            if ($result->id)
            {
                return $result->recipients;
            }

        }

    }
}

Не забудем заодно и исправить его вызов в app/Listeners/PostActionsListener.php

 public function handle(Event $event)
    {
        if ($event instanceof PostPublishedEvent)
        {
            OneSignalHandler::sendNotify($event->post);
        }
    }

И в тесте

public  function  testSendOnesignal()
    {
        $post  = factory(App\Models\Post::class)->make();
        $result  =  \App\Handlers\OneSignalHandler::sendNotify($post, true)
        $this->assertEquals(1,$result);

    }

А как же статус отправки?

Я уже собрался закончить этот урок, когда вспомнил про статус отправки уведомления в статье. Мы его предали анафеме убрали из события перед сохранением, чтобы позже перенести запись статуса непосредственно по итогам общения с API сервиса.

По идее можно добавить установку статуса статьи в самом хендлере

if ($result->id)
{
    $post->notify_status = Post::STATUS_PUBLISHED;
    $post->save();
    return $result->recipients;
}

Но это в корне неверно!

Компонент отправки уведомления не должен напрямую менять данные модели. Это не его зона ответсвенности!

Верный способ, исходя из выбранного нами подхода &ldash; кинуть здесь еще одно событие, говорящее о том, что уведомление ушло успешно.

Событие порождающее событие :)

Идем опять в app/Providers/EventServiceProvider.php Добавляем в $listen

'App\Events\PushSentEvent' => [
            'App\Listeners\PostMarketingListener',
        ],

В консоли php artisan event:generate

В созданном событиии, по образу и подобию предыдущего события, добавим проперти $post

class PushSentEvent extends Event
{
    use SerializesModels;

    public $post;
    /**
     * Create a new event instance.
     *
     * @return void
     */
    public function __construct(Post $post)
    {
        $this->post = $post;
    }

В слушателе сделаем обработку события - запись статуса. Саму запись статуса оформим в виде метода в Post

public  function  recordPushNotifySuccess()
{
    $this->notify_status = self::STATUS_PUBLISHED;
    $this->save();
}

app/Listeners/PostMarketingListener.php

public function handle(PushSentEvent $event)
{
    $event->post->recordPushNotifySuccess();
}

Бросим событие в хендлере уведомлений

if ($result->id)
{
    \Event::fire(new PushSentEvent($post));
    return $result->recipients;
}

Все. Можно тестировать!

Заключение. В чем преимущества событий

На этом все, в следующий раз добавим отправку текста в "оригинальные тексты" яндекса и публикацию в соцсетях.

В чем же преимущество событийной архитектуры?

Само событие и его обработка у нас разделены, по сути мы реализуем слабосвязанную архитектуру через микросервисы, что позволяет делает наше приложение более гибким и легко расширять и тестировать его.

Нам не нужно помнить где конкретно происходит событие при добавлении нового функционала в его обработку.

Да, в нашем простом примере, это одно место в коде, но в реальном проекте могут быть десятки мест, в которых это событие происходит: вызов апи, обработка роутинга, входящих данных, сообщений электронной почты и тд. И десятки слушателей его могут обрабатывать.

Но есть и обратная сторона луны: с ростом такого приложения, когда количество событий в нем и их обработчиков разрастается, поддержка такого проекта может превратиться в весьма нетривиальную задачу.

Использовать события в реальной жизни или нет, решать вам.

Но знать о них уж точно надо!

Код к статье https://github.com/laravel-news/laravel-queued-events Но без модели, тк реальная моя модель наследуется от CrudModel из собственного пакета и только запутает.