[C++] часть 2: МЬЮТЕКС. Пишем наш первый код для многопоточной среды


Часть 1, Часть 2

В прошлой статье мы разобрались с тем, что такое конкурентность/параллелизм и зачем нужна синхронизация. Настала пора изучить примитивы синхронизации, которые предлагает нам стандартная библиотека шаблонов C++.

Первым из них будет std::mutex. Но сначала ознакомьтесь с картой статьи (она пригодится, если вы вдруг запутаетесь).

Итак, начнём.

Что такое мьютекс?

Мьютекс (англ. mutex, от mutual exclusion — «взаимное исключение») — это базовый механизм синхронизации. Он предназначен для организации взаимоисключающего доступа к общим данным для нескольких потоков с использованием барьеров памяти (для простоты можно считать мьютекс дверью, ведущей к общим данным).

Синтаксис
  • Заголовочный файл | #include <mutex>
  • Объявление | std::mutex mutex_name;
  • Захват мьютекса | mutex_name.lock();Поток запрашивает монопольное использование общих данных, защищаемых мьютексом. Дальше два варианта развития событий: происходит захват мьютекса этим потоком (и в этом случае ни один другой поток не сможет получить доступ к этим данным) или поток блокируется (если мьютекс уже захвачен другим потоком).
  • Освобождение мьютекса | mutex_name.unlock();Когда ресурс больше не нужен, текущий владелец должен вызвать функцию разблокирования unlock, чтобы и другие потоки могли получить доступ к этому ресурсу. Когда мьютекс освобождается, доступ предоставляется одному из ожидающих потоков.
#include <mutex> #include <vector>std::mutex door; // объявление мьютекса std::vector<int> v; // общие данные door.lock(); /*-----------------------*/ /* Это потокобезопасная зона: допускается только один поток за раз * * Гарантируется монопольное использование вектора v */ /*-----------------------*/ door.unlock(); Как создать потокобезопасную очередь

Разберёмся, как реализовать простейшие потокобезопасные очереди, то есть очереди с безопасным доступом для потоков.В библиотеке стандартных шаблонов уже есть готовая очередь (rawQueue). Наша реализация будет предполагать: а) извлечение и удаление целочисленного значения из начала очереди и б) добавление нового в конец очереди. И всё это при обеспечении потокобезопасности.

Сначала выясним, почему и как эти две операции могут создавать проблемы для многопоточности.

  • Извлечение и удалениеДля извлечения и удаления значения из начала очереди необходимо выполнить три операции:1. Проверить, не пуста ли очередь.2. Если нет, получается ссылка на начало очереди (rawQueue.front()). 3. Удаляется начало очереди (rawQueue.pop()). В промежутках между этими этапами к очереди могут получать доступ и другие потоки с целью внесения изменений или чтения. Попробуйте сами.

Например:

Удалили “1”, хотя до его извлечения даже не дошли, потому что поток B извлекает 0 и удаляет 1.Дальше — больше: если rawQueue состоит из одного элемента, поток B видит непустую очередь, и тут же поток A удаляет последнее значение. Теперь поток B пытается удалить первое значение из пустой очереди, приводя к неопределённому поведению. Настоящая страшилка!
  • ДобавлениеРассмотрим теперь добавление нового значения с помощью rawQueue.push(): новый элемент добавляется в конец контейнера и становится следующим за последним на данный момент элементом. Дальше на единицу увеличивается размер. Заметили здесь проблему? А что, если два потока одновременно добавят новое значение, увидев этот последний элемент? И что может произойти в интервале между добавлением нового элемента и увеличением размера? Кто-нибудь возьмёт да и прочитает неправильный размер. 

Получается, мы должны быть уверены, что никто не будет трогать очередь, пока мы выполняем наши задачи. Используем мьютекс для защиты этих многоступенчатых операций и сделаем так, чтобы все вместе они смотрелись как одна атомарная операция.

#include <mutex> #include <queue> class threadSafe_queue { std::queue<int> rawQueue; // структура, общая для всех потоков std::mutex m; // красная дверь rawQueue public: int& retrieve_and_delete() { int front_value = 0; // если пустая, возвращает 0 m.lock(); // Отныне текущий поток единственный, который имеет доступ к rawQueue if( !rawQueue.empty() ) { front_value = rawQueue.front(); rawQueue.pop(); } m.unlock(); // теперь другие потоки могут захватить мьютекс return front_value; }; void push(int val) { m.lock(); rawQueue.push(val); m.unlock(); }; }; Обратите внимание:
  • Связь между мьютексом и защищаемым ресурсом — только в голове программиста. Мы знаем, что мьютекс m защищает rawQueue, но напрямую это не указывается.
  • Захват с необходимой степенью распараллеливания. Использование мьютекса уменьшает параллелизм. Предположим, у нас только один мьютекс для защиты вектора и строки без каких-либо зависимостей (например, значение переменной не зависит от вектора и наоборот). Поток А захватывает мьютекс, читает строку и начинает обработку каких-то данных перед добавлением нового значения в вектор и освобождением мьютекса. И тут появляется поток B, который хочет внести пару изменений в строку, но при попытке захватить мьютекс (какая досада!) оказывается блокированным до того момента, пока не будут завершены все операции с вектором. Проблему решаем дополнительным мьютексом для строки (захваченным перед чтением и освобождённым сразу по завершении).→ Всегда прикидывайте, какой объём данных будет защищён одним мьютексом.
  • Проводите захват только для тех операций, которым это необходимо. См. предыдущий пункт. 
  • Не вызывайте lock(), если мьютекс у вас уже есть. Мьютекс уже заблокирован, и попытки повторного захвата приведут вас к состоянию бесконечного ожидания. Если он вам так нужен, можно воспользоваться классом std::recursive_mutex. Рекурсивный мьютекс можно получить одним и тем же потоком много раз, но столько же раз он должен быть и освобождён.
  • Используйте try_lock() или std::timed_mutex, если не хотите блокироваться и ожидать неопределённое время.try_lock() — это неблокирующий метод в std::mutex. Он возвращает немедленно, даже если владение не получено, причём со значением true, если мьютекс захвачен, и false — если нет. → std::timed_mutex предлагает два неблокирующих метода для захвата мьютекса: try_lock_for() и try_lock_until(), причём оба возвращаются по истечении времени со значением true или false в зависимости от успешности захвата.
  • Не забывайте вызывать unlock() или используйте std::lock_guard (или другие шаблонные классы), когда есть возможность. См. ниже.
  • Lock guard и парадигма RAII

    У нас две большие проблемы с этим простым мьютексом:

    • Что произойдёт, если мы забудем вызвать unlock()? Ресурс будет недоступен в течение всего времени существования мьютекса, и уничтожение последнего в неразблокированном состоянии приведёт к неопределённому поведению.
    • Что произойдёт, если до вызова unlock() будет выброшено исключение? unlock() так и не будет исполнен, а у нас будут все перечисленные выше проблемы.

    К счастью, проблемы можно решить с помощью класса std::lock_guard. Он всегда гарантированно освобождает мьютекс, используя парадигму RAII (Resource Acquisition Is Initialization, что означает «получение ресурса есть инициализация»). Вот как это происходит: мьютекс инкапсулируется внутри lock_guard, который вызывает lock() в его конструкторе и unlock() в деструкторе при выходе из области видимости. Это безопасно даже при исключениях: раскрутка стека уничтожит lock_guard, вызывая деструктор и таким образом освобождая мьютекс.

    std::lock_guard<std::mutex> lock_guard_name(raw_mutex); #include <mutex> #include <vector> std::mutex door; // объявление мьютекса std::vector<int> v; { std::lock_guard<std::mutex> lg(door); /* Вызывается конструктор lg, эквивалентный door.lock(); * lg, размещается в стеке */ /*-----------------------*/ /* Гарантируется монопольное использование вектора v */ /*-----------------------*/ } /* lg выходит из области видимости. Вызывается деструктор, эквивалентный door.unlock(); */

    Посмотрим теперь, как можно изменить нашу потокобезопасную очередь threadSafe_queue (на этот раз обращаем внимание на то, где освобождается мьютекс).

    #include <mutex> #include <queue> class threadSafe_queue { std::queue<int> rawQueue; // структура, общая для всех потоков std::mutex m; // красная дверь rawQueue public: int& retrieve_and_delete() { int front_value = 0; // если пустая, return будет 0 std::lock_guard<std::mutex> lg(m); // Отныне текущий поток единственный, который имеет доступ к rawQueue if( !rawQueue.empty() ) { front_value = rawQueue.front(); rawQueue.pop(); } return front_value; }; // теперь другие потоки могут захватить мьютекс void push(int val) { std::lock_guard<std::mutex> lg(m); // отныне текущий поток единственный, который имеет доступ к rawQueue rawQueue.push(val); }; // теперь другие потоки могут захватить мьютекс };

    Unique lock, дающий свободу

    Как только владение мьютексом получено (благодаря std::lock_guard), он может быть освобождён. std::unique_lock действует в схожей манере плюс делает возможным многократный захват и освобождение (всегда в таком порядке) мьютекса, используя те же преимущества безопасности парадигмы RAII.

    std::unique_lock<std::mutex> unique_lock_name(raw_mutex); #include <mutex> #include <vector> std::mutex door; //объявление мьютекса std::vector<int> v; { std::unique_lock<std::mutex> ul(door); // Вызывается конструктор ul, эквивалентный door.lock(); // ul, размещённый в стеке // гарантируется монопольное использование вектора door.unlock(); // выполнение операций, не связанных с вектором // .... // теперь мне снова нужен доступ к вектору door.lock(); //Снова гарантируется монопольное использование вектора } /* unique_lock выходит из области видимости. Вызывается деструктор, эквивалентный door.unlock(); */ Когда использовать?
    • Когда вам не всегда нужен захват ресурса.
    • Вместе с std::condition_variable (в следующей статье).
    • При захвате std::shared_mutex в эксклюзивном режиме (см. далее). 

    Общий мьютекс + общий захват дают больше читателей

    std::mutex  — это мьютекс, которым одномоментно может владеть только один поток. Однако это ограничение не всегда обязательно. Например, потоки могут одновременно и безопасно читать одни и те же общие данные. Просто читать, не производя с ними никаких изменений. Но в случае с доступом к записи только записывающий поток может иметь доступ к данным.

    Начиная с C++17, std::shared_mutex формирует доступ двух типов:

    • Общий доступ: потоки вместе могут владеть мьютексом и иметь доступ к одному и тому же ресурсу. Доступ такого типа можно запросить с помощью std::shared_lock (lock guard для общего мьютекса). При таком доступе любой эксклюзивный доступ блокируется.
    • Эксклюзивный доступ: доступ к ресурсу есть только у одного потока. Запрос этого типа осуществляется с помощью класса unique lock.
    Синтаксис
    • Заголовочный файл | #include <shared_mutex>;
    • Объявление | std::shared_mutex raw_sharedMutex;
    • Для захвата в общем режиме |std::shared_lock<std::shared_mutex> sharedLock_name(raw_sharedMutex);
    • Для захвата в эксклюзивном режиме |std::unique_lock<std::shared_mutex> uniqueLock_name(raw_sharedMutex);
    #include <shared_mutex> #include <vector> std::shared_mutex door; //объявление мьютекса std::vector<int> v; int readVectorSize() { /* потоки могут вызывать эту функцию одновременно * доступ на запись запрещена, когда получен sl */ std::shared_lock<std::shared_mutex> sl(door); return v.size(); } void pushElement(int new_element) { /* гарантирован эксклюзивный доступ к вектору */ std::unique_lock<std::shared_mutex> ul(door); v.push_back(new_element); }

    Scoped lock, дающий больше мьютексов (и без клинча)

    Впервые появившийся в C++17 и действующий в схожей манере, что и std::lock_guard, он даёт возможность получения нескольких мьютексов. Без std::scoped_lock такая операция очень опасна, так как может привести к взаимной блокировке.

    Краткая история взаимоблокировки:

    Поток A хочет увести 200$ с банковского счёта Жеки на счёт Бяки в виде одной атомарной операции. Он начинает с того, что захватывает мьютекс, защищающий счёт Жеки, чтобы изъять деньги, а затем пытается захватить счёт Бяки.В то же время поток B хочет увести 100$ со счёта Бяки на счёт Жеки. Он получает захват счёта Бяки, чтобы изъять деньги и попытаться захватить счёт Жеки. Оба потока блокируются, уснув в ожидании друг друга.

    std::scoped_lock одновременно захватывают (а затем освобождают) все мьютексы, передаваемые в качестве аргумента, по принципу «всё или ничего»: если хотя бы один захват выбрасывает исключение, освобождаются все уже захваченные мьютексы.

    • std::scoped_lock<std::mutex> scoped_lock_name(raw_mutex1, raw_mutex2, ..);

    Заключение

    Если вы вдруг запутались в этом ворохе новой информации:

    • воспользуйтесь картой в начале статьи (или составьте свою);
    • применяйте на практике новые знания и пробуйте писать простенький код.

    До встречи в следующей статье, в которой речь пойдёт о condition_variableи вы узнаете, как синхронизировать потоки!


    Перевод статьи Valentina: [C++] MUTEX: Write Your First Concurrent Code


    Поделиться статьей:


    Вернуться к статьям

    Комментарии

      Ничего не найдено.