Глава 17. Многопоточность.

Обычно, приложения с графическим интерфейсом исполняются в рамках одного потока. Если такое приложение начинает выполнять продолжительную по времени операцию, то возникает эффект "замораживания" интерфейса, который длится до тех пор, пока длительная операция не будет завершена. В Главе 7 был приведен один из вариантов решения этой проблемы. Другой вариант -- многопоточность.

В многопоточных приложениях, обслуживание интерфейса производится в отдельном потоке, а обработка данных -- в другом (одном или нескольких) потоке. В результате приложение сохраняет возможность откликаться на действия пользователя даже во время интенсивной обработки данных. Еще одно преимущество многопоточности -- на многопроцессорных системах различные потоки могут выполняться на различных процессорах одновременно, что несомненно увеличивает скорость исполнения.

В этой главе мы поговорим о классе QThread и покажем, как синхронизировать потоки с помощью классов QMutex, QSemaphore и QWaitCondition. Затем коснемся темы взаимодействия между потоками и завершим главу перечислением классов Qt, которые могут использоваться за пределами главного потока приложения, где исполняется цикл обработки событий Qt.

Многопоточность -- очень объемная тема. Ей посвящено огромное количество книг. Здесь мы будем исходить из предположения, что вы уже знакомы с основными принципами разработки многопоточных приложений, и все наше внимание сконцентрируем не на использовании многопоточности как таковой, а на основных положениях создания многопоточных Qt-приложений.

17.1. Потоки.

Добавить несколько потоков в приложение, написанное с использованием библиотеки Qt, очень просто. Для этого нужно лишь создать дочерний класс от QThread и перекрыть метод run(). В качестве примера мы напишем простой класс, потомок класса QThread, который выводит текст на консоль.

class Thread : public QThread { public: Thread(); void setMessage(const QString &message); void run(); void stop(); private: QString messageStr; volatile bool stopped; }; Этот класс перекрывает метод родителя run() и добавляет еще две функции: setMessage() и stop().

Ключевое слово volatile, которое присутствует в объявлении переменной stopped, означает, что доступ к ней может производиться из других потоков и поэтому необходимо выполнять чтение значения переменной всякий раз, когда это необходимо. Если опустить этот спецификатор, компилятор может выполнить оптимизацию кода обращения к переменной, что в некоторых ситуациях приведет к неправильным результатам.

Thread::Thread() { stopped = false; } В конструкторе, переменной stopped присваивается значение false. void Thread::run() { while (!stopped) cerr << messageStr.ascii(); stopped = false; cerr << endl; } Функция run() предназначена для запуска потока. Поток будет исполняться до тех пор, пока в переменную stopped не будет записано значение true. В процессе исполнения потока, на консоль будет выводиться заданный текст сообщения. Поток завершит свою работу, как только функция run() вернет управление. void Thread::stop() { stopped = true; } Функция stop() записывает в переменную stopped значение true и тем самым останавливает исполнение потока. Эта функция может быть вызвана из другого потока в любой момент времени. В данном примере мы исходим из предположения, что присваивание значения булевой переменной является атомарной (в смысле -- непрерываемой) операцией. Чуть ниже, в этом же разделе, мы покажем, как обеспечить атомарность операции присваивания, с помощью экземпляра класса QMutex.

Класс QThread имеет метод terminate(), который завершает работу потока. Однако мы не рекомендуем использовать его, поскольку этот метод может остановить поток в любой точке его исполнения, не давая ему возможность самому корректно завершить свою работу. Более безопасный способ остановки потока -- с помощью функции stop(), как это делается в нашем примере.

Рисунок 17.1. Внешний вид приложения Threads.


Теперь напишем небольшое приложение, которое будет запускать два дополнительных потока -- A и B. class ThreadForm : public QDialog { Q_OBJECT public: ThreadForm(QWidget *parent = 0, const char *name = 0); protected: void closeEvent(QCloseEvent *event); private slots: void startOrStopThreadA(); void startOrStopThreadB(); private: Thread threadA; Thread threadB; QPushButton *threadAButton; QPushButton *threadBButton; QPushButton *quitButton; }; В классе ThreadForm объявлены две приватных переменных типа Thread и несколько кнопок, которыми выполняется управление приложением. ThreadForm::ThreadForm(QWidget *parent, const char *name) : QDialog(parent, name) { setCaption(tr("Threads")); threadA.setMessage("A"); threadB.setMessage("B"); threadAButton = new QPushButton(tr("Start A"), this); threadBButton = new QPushButton(tr("Start B"), this); quitButton = new QPushButton(tr("Quit"), this); quitButton->setDefault(true); connect(threadAButton, SIGNAL(clicked()), this, SLOT(startOrStopThreadA())); connect(threadBButton, SIGNAL(clicked()), this, SLOT(startOrStopThreadB())); connect(quitButton, SIGNAL(clicked()), this, SLOT(close())); ... } В конструкторе формы, с помощью вызовов функций setMessage(), потокам назначается текст для вывода на консоль. Таким образом, поток A будет печатать букву "A", а поток B -- букву "B". void ThreadForm::startOrStopThreadA() { if (threadA.running()) { threadA.stop(); threadAButton->setText(tr("Start A")); } else { threadA.start(); threadAButton->setText(tr("Stop A")); } } Когда пользователь нажимает кнопку, которая управляет потоком A, вызывается функция-слот startOrStopThreadA(). Она останавливает работу потока, если он запущен, и запускает -- в противном случае. Кроме того, функция так же изменяет надпись на кнопке. void ThreadForm::startOrStopThreadB() { if (threadB.running()) { threadB.stop(); threadBButton->setText(tr("Start B")); } else { threadB.start(); threadBButton->setText(tr("Stop B")); } } Код функции startOrStopThreadB() практически один-в-один повторяет код функции startOrStopThreadA(). void ThreadForm::closeEvent(QCloseEvent *event) { threadA.stop(); threadB.stop(); threadA.wait(); threadB.wait(); event->accept(); } Если пользователь закрывает окно приложения, то потокам посылается команда на останов, после чего программа ждет (с помощью функции QThread::wait()), пока они не завершат свою работу. Затем вызывается QCloseEvent:: accept().

Для сборки приложения, необходимо добавить следующую строку в файл .pro:

CONFIG += thread Она сообщает qmake о том, что для сборки приложения должна использоваться версия Qt, совместимая с потоками. Чтобы собрать потоко-совместимую версию библиотеки Qt, необходимо передать ключ -thread скрипту configure, во время установки Qt в Unix или Mac OS X. В Windows, библиотека Qt является потоко-совместимой по-умолчанию. Для данного примера необходимо добавить еще и опцию console, поскольку необходимо обеспечить вывод текста на консоль в операционной системе Windows: win32:CONFIG += console Теперь, если запустить приложение и нажать на кнопку Start A, на консоли будет печататься последовательность символов "A". После нажатия на кнопку Start B последовательность символов "A" будет перемежаться символами "B". После нажатия на кнопку Stop A, будут выводиться одни символы "B".

Обычно в многопоточных приложениях возникает проблема синхронизации потоков. Для этих целей в Qt имеются классы QMutex, QMutexLocker, QSemaphore и QWaitCondition.

Класс QMutex являет собой средство защиты. С его помощью можно исключить возможность доступа к переменным или участкам кода из нескольких потоков одновременно. Класс имеет функцию lock(), которая "запирает" мьютекс. Если мьютекс не заперт, то текущий поток захватывает его и тут же "запирает", в противном случае, поток, который попытался захватить запертый мьютекс, блокируется до тех пор пока мьютекс не освободится. Когда функция lock() возвратит управление в поток, он станет держателем мьютекса до того момента, пока не будет вызвана функция unlock(). Кроме функции lock(), класс QMutex имеет функцию tryLock(), которая возвращает управление потоку немедленно, даже если мьютекс уже "заперт" другим потоком.

Для примере предположим, что необходимо защитить мьютексом переменную stopped. Для этого нужно добавить новый член класса Thread:

QMutex mutex; Функция run() теперь будет выглядеть так: void Thread::run() { for (;;) { mutex.lock(); if (stopped) { stopped = false; mutex.unlock(); break; } mutex.unlock(); cerr << messageStr.ascii(); } cerr << endl; } А функция stop() так: void Thread::stop() { mutex.lock(); stopped = true; mutex.unlock(); } В сложных функциях, особенно при использовании исключений C++, легко можно ошибиться при выполнении последовательностей операций по запиранию/отпиранию мьютексов. Поэтому, в состав Qt включен класс QMutexLocker, который значительно упрощает работу с мьютексами. Конструктор класса QMutexLocker принимает объект QMutex в виде аргумента и запирает его. Деструктор класса QMutexLocker -- отпирает мьютекс. Например, с использованием класса QMutexLocker, функция stop() могла бы быть переписана следующим образом: void Thread::stop() { QMutexLocker locker(&mutex); stopped = true; } Семафоры в Qt реализованы в виде класса QSemaphore. Семафоры являются дальнейшим обобщением мьютексов и могут использоваться для защиты от одновременного доступа к нескольким идентичным ресурсам.

В следующей таблице приведено соответствие между QSemaphore и QMutex:

QSemaphore semaphore(1); | QMutex mutex; semaphore++; | mutex.lock(); semaphore--; | mutex.unlock(); Постфиксные операторы "++" и "--" захватывают и отпускают один ресурс, доступ к которому защищен семафором. Аргумент 1, который передается конструктору, указывает, что семафор обслуживает один единственный ресурс. Основное преимущество семафора состоит в том, что с его помощью можно захватить сразу несколько ресурсов, последовательно вызывая "++" несколько раз.

Типичное применение семафоров -- операции обмена данными (DataSize) между потоками, с помощью общего циклического буфера некоторого объема (BufferSize):

const int DataSize = 100000; const int BufferSize = 4096; char buffer[BufferSize]; Поток-источник пишет данные в буфер до тех пор, пока не заполнит его и затем продолжает запись данных с начала буфера, затирая данные, записанные ранее. Поток-приемник читает данные по мере их поступления. Рисунок 17.2 иллистрирует процесс записи/чтения в/из буфер(а) размером в 16 байт.

Рисунок 17.2. Модель источник/приемник.


Потребность в синхронизации очевидна: если поток-источник будет писать данные слишком быстро, то он затрет данные, которые еще не прочитаны потоком-приемником, и наоборот, если поток-приемник будет читать слишком быстро, то он "обгонит" поток-источник и начнет считывать неверные данные.

Самое простое решение этих проблем -- поток-источник заполняет весь буфер целиком, затем ожидает, пока поток-приемник не прочитает его, и так далее. Однако, на многопроцессорных системах такой прием приведет к снижению производительности. Более гибкое решение -- позволить потокам работать с разными частями буфера одновременно.

Как один из вариантов реализации подобного подхода -- использовать два семафора:

QSemaphore freeSpace(BufferSize); QSemaphore usedSpace(BufferSize); Семафор freeSpace управляет частью буфера, которая заполняется потоком-источником, а семафор usedSpace -- областью, которая доступна на чтение потоку-приемнику. Эти области не пересекаются между собой. Обе имеют размер BufferSize (4096).

В данном случае каждый байт области считается отдельным ресурсом. В реальных приложениях часто используются более крупные единицы измерения (например 64-х или 256-ти байтные блоки), чтобы уменьшить количество обращений к семафорам.

void acquire(QSemaphore &semaphore) { semaphore++; } Функция acquire() предпринимает попытку захватить один ресурс (один байт в буфере). Для этих целей класс QSemaphore использует постфиксный оператор "++", но в нашем конкретном случае более удобным будет использовать функцию с именем acquire(). void release(QSemaphore &semaphore) { semaphore--; } Аналогичным образом реализована функция release(), являющаяся синонимом постфиксного оператора "--". void Producer::run() { for (int i = 0; i < DataSize; ++i) { acquire(freeSpace); buffer[i % BufferSize] = "ACGT"[(uint)rand() % 4]; release(usedSpace); } } Источник захватывает один "свободный" байт. Если буфер заполнен данными, которые еще не прочитаны потоком-приемником, то вызов acquire() заблокирует работу источника до тех пор, пока приемник не начнет чтение данных. Как только байт будет захвачен, в него записывается случайный символ ('A', 'C', 'G' или 'T'), после чего байт отпускается как "используемый", благодаря этому он становится доступен приемнику. void Consumer::run() { for (int i = 0; i < DataSize; ++i) { acquire(usedSpace); cerr << buffer[i % BufferSize]; release(freeSpace); } cerr << endl; } Приемник захватывает один "используемый" байт. Если в буфере нет данных, доступных для чтения, то функция acquire() заблокирует работу приемника до тех пор, пока источник не запишет какие нибудь данные в буфер. Как только байт будет захвачен, символ выводится на консоль, после чего байт освобождается как "свободный", благодаря этому он становится доступен источнику. int main() { usedSpace += BufferSize; Producer producer; Consumer consumer; producer.start(); consumer.start(); producer.wait(); consumer.wait(); return 0; } Функция main() захватывает все байты как "используемые" (с помощью оператора "+=" класса QSemaphore), чтобы предотвратить преждевременное чтение данных приемником. Затем запускается поток-источник и вслед за ним -- поток приемник. В результате, поток-источник будет записывать данные, а вслед за ним, поток-приемник -- читать их.

После запуска программа будет выводить на консоль символы 'A', 'C', 'G' и 'T' в случайном порядке. После того, как программа выведет 100 000 символов, она завершит свою работу. Чтобы окончательно прояснить порядок работы потоков, попробуйте запретить вывод символов, принимаемых потоком-источником и выводите символ 'P', когда источник записывает один байт в буфер, и символ 'c' -- когда поток-приемник читает один байт из буфера. Чтобы еще больше упростить понимание, можно уменьшить DataSize и BufferSize.

Для случая, когда DataSize == 10, а BufferSize == 4, вполне возможен результат: PcPcPcPcPcPcPcPcPcPc. Он говорит о том, что поток-приемник читает данные из буфера по мере их поступления -- оба потока работают с одинаковой скоростью. Возможен вариант, когда поток-источник успевает заполнить буфер целиком, до того как поток-приемник начнет чтение: PPPPccccPPPPccccPPcc. Существует масса других вариантов. Семафоры предоставляют большую свободу выбора системному планировщику, который может "изучать" поведение потоков и выбирать для них наиболее оптимальную политику планирования.

Еще один вариант синхронизации потоков может быть реализован на классах QWaitCondition и QMutex. Класс QWaitCondition дает потоку возможность возобновлять работу других потоков, при наступлении некоторого состояния. Что позволяет более точно управлять потоками, чем это возможно только на одних мьютексах. Чтобы продемонстрировать это на примере, мы опять вернемся к связке источник-приемник и реализуем тот же самый обмен данными с помощью классов QWaitCondition и QMutex.

const int DataSize = 100000; const int BufferSize = 4096; char buffer[BufferSize]; QWaitCondition bufferIsNotFull; QWaitCondition bufferIsNotEmpty; QMutex mutex; int usedSpace = 0; В дополнение к буферу обмена, мы объявили два экземпляра класса QWaitCondition, один экземпляр QMutex и одну переменную, которая хранит количество "используемых" байт. void Producer::run() { for (int i = 0; i < DataSize; ++i) { mutex.lock(); while (usedSpace == BufferSize) bufferIsNotFull.wait(&mutex); buffer[i % BufferSize] = "ACGT"[(uint)rand() % 4]; ++usedSpace; bufferIsNotEmpty.wakeAll(); mutex.unlock(); } } Работа потока-источника начинается с проверки -- не заполнен ли буфер. Если буфер заполнен, то он ждет, пока не наступит состояние "буфер не полон". Затем в буфер записывается один байт, содержимое переменной usedSpace увеличивается на 1 и возобновляются ("пробуждаются") все потоки, которые ожидают наступления состояния "буфер не пуст".

В данном примере мьютекс используется для предотвращения одновременного доступа к переменной usedSpace. Функция QWaitCondition::wait() может принимать первым аргументом запертый мьютекс, который отпирается, перед блокировкой вызвавшего ее потока, и опять запирается, перед тем как функция вернет управление.

В этом примере, цикл while

while (usedSpace == BufferSize) bufferIsNotFull.wait(&mutex); может быть заменен условным оператором: if (usedSpace == BufferSize) { mutex.unlock(); bufferIsNotFull.wait(); mutex.lock(); } Однако такой вариант неприемлем для случая, когда одновременно будут работать несколько потоков-источников, так как любой из потоков-источников может захватить мьютекс после выхода из wait(), и сделать ложным условие "буфер не полон". void Consumer::run() { for (int i = 0; i < DataSize; ++i) { mutex.lock(); while (usedSpace == 0) bufferIsNotEmpty.wait(&mutex); cerr << buffer[i % BufferSize]; --usedSpace; bufferIsNotFull.wakeAll(); mutex.unlock(); } cerr << endl; } Поток-приемник являет собой полную противоположность потоку-источнику. Он ожидает наступления состояния "буфер не пуст" и пробуждает все потоки, которые ожидают наступления состояния "буфер не полон".

Во всех наших примерах, потоки обращались к одним и тем же глобальным переменным. Но в некоторых случаях возникает необходимость хранить в глобальной переменной различные значения для различных потоков. Это часто называют как "механизм хранения локальных данных потока" (thread-local storage -- TLS). Добиться этого можно с помощью словаря, где в качестве ключа будет выступать числовой идентификатор потока (возвращаемый функцией QThread::currentThread()), но лучшее решение -- использовать специализированный класс QThreadStorage<T>.

Обычно экземпляр класса QThreadStorage<T> используют в качестве буфера. При наличии отдельных буферов для каждого из потоков, отпадает необходимость в постоянном запирании, отпирании и, возможно, ожидании мьютекса. Например:

QThreadStorage<QMap<int, double> *> cache; void insertIntoCache(int id, double value) { if (!cache.hasLocalData()) cache.setLocalData(new QMap<int, double>); cache.localData()->insert(id, value); } void removeFromCache(int id) { if (cache.hasLocalData()) cache.localData()->remove(id); } Переменная cache хранит по одному указателю на QMap<int, double>, для каждого из потоков. (Из-за ограничений, накладываемых некоторыми компиляторам, шаблонный тип в QThreadStorage<T> должен быть указателем.) При первом обращении к буферу из некоторого потока, когда функция hasLocalData() возвращает false, мы создаем экземпляр класса QMap<int, double>.

В дополнение к буферам, класс QThreadStorage<T> может использоваться для создания глобальных переменных, которые хранят код последней ошибки (подобных переменной errno) и предотвращают взаимовлияние потоков друг на друга.