Подписка

PCNTL - параллельные вычисления

Поддержка параллельных вычислений важная часть любого высокоуровневого языка программирования. Возможность выполнять операции в разных процессах с разделением ресурсов не только увеличивает производительность приложения, но и упрощает вертикальное маштабирование. Ниже речь пойдет об упомянутом раньше расширении PCNTL, содержащем интересный набор функций для реализации многопоточности.

P.S. В даном контексте под многопоточностью имеется ввиду многопроцессность, так как поддержки "настоящих" потоков в PHP нету. Для уменьшения тавтологии, иногда вместо "процесса" я использую понятие "поток", но помните, в нашем контексте поток=процесс)

Придумаем себе задание и потренируемся на нем в реализации многопоточности - представим, что нужно разработать демон, состоящий из сканера таблицы, который будет проверять не появились ли новые данные, и двух воркеров, которые эти данные будут обрабатывать.

Проблема на лицо - нужно чтобы 3 отдельных процесса выполнялись одновременно, не отбирая ресурсов один в одного.

1. Создание потоков

Начнем из создания файла демона, главный поток которого будет проверять изменения в базе, а дочерние станут воркерами.
$parentPid = posix_getpid();
$child1 = pcntl_fork();
if ($child1 != 0) {
    $child2 = pcntl_fork();
    if ($child2 != 0) {
        echo "Parent\n";
    } else {
        echo "Child-worker 2\n";
    }
} else {
    echo "Child-worker 1\n";
}

Останавливаться на принципах работы PNCTL и форках не буду, кому интересно, велкам в предыдущий пост.

2. Установка приоритетов

PCNTL поддерживает установку приоритетов для работающих процессов, таким образом, мы можем выделить больше ресурсов под более важные задачи.

Для получения текущего приоритета используется функция pcntl_getpriority, которая принимает параметром идентификатор приоритета, а если он не указан - возвращает приоритет текущего процесса. Приоритет представляет собой число от -20 до 20 и по умолчанию равен 0.

Функция pcntl_setpriority в виде первого параметра принимает значение приоритета, а в виде второго идентификатор процесса (если не указан - устанавливаем для текущего).

Установим для главного процесса приоритет 10, а для воркеров 5:

$child1 = pcntl_fork();
if ($child1 != 0) {
    $child2 = pcntl_fork();
    if ($child2 != 0) {
        pcntl_setpriority(10);
        echo "Parent\n";
    } else {
        pcntl_setpriority(5);
        echo "Child-worker 2\n";
    }
} else {
    pcntl_setpriority(5);
    echo "Child-worker 1\n";
}

3. Обмен сообщениями между потоками

Теперь самое интересное - наши потоки должны быть всегда "на чеку", при чем если главный поток будет проверять изменения через определенный интервал времени, воркеры должны работать только когда есть работа, а все остальное время занимать минимальный объем ресурсов. Для реализации общения между потоками, воспользуемся сигналами, при помощи которых будем "будить" воркер из главного потока при возникновении задачи.

PCNTL использует unix систему сигналов (одна из причин, почему PCNTL не будет работать в Windows), включающую обширное разнообразие и интересующие нас SIGHUP, SIGUSR1 и SIGUSR2, которые будем использовать для общения между потоками.

Если наблюдающий за базой поток, обнаружит новые данные, он пошлет сигнал SIGHUP свободному воркеру, чтобы тот начал обработку:

$firstChildIsFree = true;
$secondChildIsFree = true;
while (true) {
    //Чтобы не париться с базой данных, будем считать, что когда в файле что-то записано - появилась задача
    $changes = file_get_contents('db.txt');
    if (!empty($changes)) {
        $got2execution = false;
        //Если первый воркер свободен - активизируем его отправкой сигнала
        if ($firstChildIsFree) {
            //Собственно отправка сигнала
            posix_kill($child1, SIGHUP); 
            //Устанавливаем значение переменной статуса - воркер занят
            $firstChildIsFree = false;
            $got2execution = true;
        } elseif ($secondChildIsFree) {
            posix_kill($child2, SIGHUP);
            $secondChildIsFree = false;
            $got2execution = true;
        }
        if ($got2execution) {
            //Задача выполненa, очищаем файл
            file_put_contents('db.txt', '');
        } else {
            echo "No free workers\n";
        }
    }
    sleep(1);
}

Для отслеживания статуса воркеров мы ввели две переменные firstChildIsFree и secondChildIsFree, в зависимости от их значений мы определяем кому назначать задачу, а если оба воркер заняты - ждем лучших времен. Функцией posix_kill($child1, SIGHUP); мы отправляем сигнал SIGHUP нужному потоку, таким образом извещая его, что нужно выйти из спячки и начать обработку данных.

В свою очередь воркер, должен быть в "режиме ожидания", готовым получить сигнал от главного потока и приступить к выполнению:

echo "Child1 waiting for SIGHUP\n";
while (true) {
    pcntl_sigwaitinfo(array(SIGHUP));
    echo "Child1 has been activated\n";
    sleep(10);
    echo "Child1 is now free\n";
}

Выполнением pcntl_sigwaitinfo(array(SIGHUP)) переводим текущий процесс (воркер) в режим suspended, в котором он будет находится до тех пор, пока не получит сигнал SIGHUP.

Вот мы и научили главный поток, обращаться к воркерам и активизировать их когда появятся задачи. Теперь нужно чтобы воркеры сообщали главному потоку, когда выполнение задачи окончено, и он мог обновить переменные статуса firstChildIsFree и secondChildIsFree.

Так как передавать информацию при помощи сигналов нельзя, придется ограничиться их типами - первый воркер будет отправлять сигнал типа SIGUSR1, когда освободиться, а второй SIGUSR2.

Кроме того, так как главный поток постоянно должен проверять обновления, использовать функцию pcntl_sigwaitinfo блокирующую поток, не получится. Для того, чтобы поток продолжал работать и в то же время получал сигналы, воспользуемся функцией pcntl_sigprocmask, она блокирует сигналы, сохраняя их в своеобразный стек, с которым работает pcntl_sigtimedwait (аналог pcntl_sigwaitinfo, но с возможностью установить таймаут).

Итак, перед началом цикла главного потока ставим:

pcntl_sigprocmask(SIG_BLOCK, array(SIGUSR1, SIGUSR2));

указывая, что будем блокировать сигналы SIGUSR1 и SIGUSR2.

А в конце каждого цикла, проверяем не появилось ли новых в очереди:

if (pcntl_sigtimedwait(array(SIGUSR1, SIGUSR2), $siginfo, 0, 1) != -1) {
    if ($siginfo['signo'] == SIGUSR1) {
        echo "Got signal from child1\n";
        $firstChildIsFree = true;
    } elseif ($siginfo['signo'] == SIGUSR2) {
        echo "Got signal from child2\n";
        $secondChildIsFree = true;
    }
}

Первым параметром в pcntl_sigtimedwait передается массив кодов сигналов, которые ожидаем получить, второй параметр массив в который будет записана информация о полученном сигнале. Дальше в зависимости от кода сигнала, определяем воркер, который его отправил и обновляем его статус.

Осталось дописать работягам отправку сообщений по завершению обработки данных:

posix_kill($parentPid, SIGUSR1);//SIGUSR2 - для второго потока

Вот и все, в результате получили демон с тремя потоками, которые умеют общаться между собой.

Нельзя сказать, что работа с потоками и коммуникации между ними выглядят очень изящно, но учитывая динамику развития PHP, очень вероятно, что в недалеком будущем полноценная поддержка потоков будет реализована на уровне интерпретатора.

Полный код примера pcntl.php

В тему:

Если пост понравился - нажмите на +1 - мне будет приятно.

@kkooler

@kkooler

Занимаюсь разработкой высоконагруженных проектов и распределенных систем на PHP.
В свободное время разрабатываю нано-проекты:

Следить за блогом

RSS канал Twitter