В статье я опишу механизмы Python 3 с помощью которых выполняется синхронизация потоков, а именно классы Event, Condition, Barrier и Semaphore.
Введение
Синхронизация потоков Python 3 нужна в разных случаях, например:
- Потоки должны ожидать какого-то события, и в случае его наступления выполнять работу дальше.
- Один поток должен подождать разблокировки объекта, так как с ним уже работает другой поток. При этом разблокировка будет выполняться по како-му то событию.
- Приложение должно дождаться работы нескольких потоков и только потом продолжить выполнение кода.
- Может быть запущено большое количество потоков, но выполняться они должны параллельно по нескольку штук.
Синхронизация потоков в Python 3 обеспечивается благодаря классам модуля threading: Event, Condition, Barrier, Semaphore.
Класс Event
Здесь мы рассмотрим класс Event модуля threading. Этот класс содержит 3 метода: set — устанавливает флаг, clear — очищает флаг, wait — заставляет поток ожидать, пока флаг не будет установлен.
По умолчанию флаг снят, то есть строчка event.wait() заставит поток приостановиться. Дальше можно установить флаг с помощью event.set(), и тогда поток продолжит своё выполнение.
Более понятным языком это можно описать так, запускаем процессы, они что-то делают, затем приостанавливаются и ждут каково-то события. И как только это событие наступает, то продолжают свою работу.
Чтобы лучше с этим разобраться напишем демо-программу. Начнём с импорта нужных модулей: time и threading. Функция current_thread — определяет имя текущего потока, а функция active_count — сообщает текущее число потоков.
from time import sleep from threading import Thread, Event, current_thread, active_count
Создадим объект класса Event. По умолчанию у созданного объекта флаг снят (то есть событие не наступило).
event = Event()
Укажем количество потоков которые будут запускаться:
max_t = 5
Напишем функцию, которую будем запускать в потоках. Она в переменную t_nane сохраняет имя текущего потока. Сообщает что запустился такой-то поток. А после ждёт пока наступит событие (event.wait()). И как только событие наступит, продолжает свою работу.
def f(): thr_num = current_thread().name print(f"Поток {thr_num} запустился. Но ждёт остальных.") event.wait() print(f"Событие наступило! Поток {thr_num} продолжил свою работу")
Теперь в цикле запускаем потоки с задержкой 0.2 секунды:
for i in range(max_t): Thread(target=f).start() sleep(0.2)
И как только запустятся все потоки устанавливаем флаг объекту event (говорим что событие наступило):
if active_count() >= max_t: event.set()
Результат выполнения написанной выше программы:
Поток Thread-1 (f) запустился. Но ждёт остальных. Поток Thread-2 (f) запустился. Но ждёт остальных. Поток Thread-3 (f) запустился. Но ждёт остальных. Поток Thread-4 (f) запустился. Но ждёт остальных. Поток Thread-5 (f) запустился. Но ждёт остальных. Событие наступило! Поток Thread-1 (f) продолжил свою работу Событие наступило! Поток Thread-2 (f) продолжил свою работу Событие наступило! Поток Thread-5 (f) продолжил свою работу Событие наступило! Поток Thread-4 (f) продолжил свою работу Событие наступило! Поток Thread-3 (f) продолжил свою работу
Дополнительная информация по классу Event().
Класс Condition
Класс Condition тоже используется для синхронизации потоков. Он является некоторой комбинацией классов Event и Lock, что позволяет потокам ожидать пока ресурс с которым происходит работа будет разблокирован. А разблокировать мы можем объект при каком-то условии.
Например первый поток перебирает число от 1 до 20, и если число делится на 5 без остатка, то разблокирует объект (устанавливает флаг). А второй поток выводит надпись «Событие наступило», когда дождётся установленного флага. Вот такую программу мы и реализуем.
Начнем писать код с импорта необходимых модулей и создание объекта класса Condition:
from time import sleep from threading import Thread, Condition cond = Condition()
Напишем первую функцию. Она будет в бесконечном цикле ставить блокировку — cond.wait(). И с Condition важно использовать менеджер контекста (with) чтобы он сам следил за блокировкой и разблокировкой.
def f1(): while True: with cond: cond.wait() print("Получили событие!")
Затем напишем вторую функцию. В цикле пробежимся по цифрам от 0 до 20. И если число делится на 5 без остатка, то будем снимать блокировку (cond.notify()). Здесь также используем менеджер контекста (with). В противном случае (если число не делится на 2 без остатка) мы выводим это число на экран. И дополнительно установим небольшую задержку (sleep(0.2)).
def f2(): for i in range(21): if i % 5 == 0: with cond: cond.notify() else: print(i) sleep(0.2)
И наконец запускаем оба потока:
Thread(target=f1, daemon=True).start() Thread(target=f2).start()
Первый поток запускаю в режиме демона, так как там используется бесконечный цикл и нужно чтобы этот поток завершился при завершении основной программы.
Результат выполнения нашей программы:
Получили событие! 1 2 3 4 Получили событие! 6 7 8 9 Получили событие! 11 12 13 14 Получили событие! 16 17 18 19 Получили событие!
За надпись «Получили событие!» отвечает первая функция. Разблокировка происходит во второй функции, когда число делится на 5 без остатка.
Дополнительная информация по Condition().
Класс Barrier
Barrier позволяет реализовать алгоритм, когда необходимо дождаться завершения работы группы потоков, прежде чем продолжить выполнение задачи.
Например у вас первый поток что-то высчитывает и второй поток что-то высчитывает. А третий поток (возможно основной) должен использовать результаты первого и второго потока для дальнейшего расчёта. Для этого основная программа должна дождаться выполнения обоих потоков и только затем продолжить работать. Вот и напишем подобную программу.
Начнем как обычно с импорта необходимых модулей:
from time import sleep from threading import Thread, Barrier
Создадим барьер. При его создании необходимо указать сколько потоков будет работать с барьером. У нас будет работать 3 потока — 1 основной и 2 дополнительных.
br = Barrier(3)
Дальше сделаем две глобальные переменные, в которые будем помещать результаты работы каждой функции:
a = 0 b = 0
Напишем первую функцию (возведение числа в квадрат). Она будет ожидать 1 секунду. А дальше будет ждать заполнения барьера до 3 потоков (br.wait()).
def f1(x): print("Высчитываем 1 число") global a a = x**2 sleep(1) br.wait()
Напишем вторую функцию (умножение на 2). Она будет ожидать 2 секунды. И тоже ожидать заполнения барьера.
def f2(x): print("Высчитываем 2 число") global b b = x*2 sleep(2) br.wait()
Я использую разное ожидание (первый поток ждёт 1 секунду, второй — 2 секунды), чтобы продемонстрировать, что потоки могут работать разное время. Но в ожидание br.wait() должны попасть три потока, и только тогда потоки продолжат своё выполнение.
Запустим оба потока:
Thread(target=f1, args=(3,)).start() Thread(target=f2, args=(7,)).start()
Укажем основному потоку программы тоже ждать заполнения барьера:
br.wait()
И вот тут у нас 3 потока попали в ожидание барьера, как только это случилось, программа продолжила своё выполнение. После чего на экран выводится сумма полученных ранее двух результатов:
print("Результат = ", a+b)
Результат выполнения этой программы:
Высчитываем 1 число Высчитываем 2 число Результат = 23
Класс Semaphore
Семафоры — это технология в основе которой лежит счетчик. Когда поток заходит в семафор, то его счётчик уменьшается на 1. И когда счетчик становится равным нулю, то новый поток уже не может попасть в семафор и ожидает освобождения семафора. Когда поток в семафоре завершается, то счетчик увеличивается на 1.
Например создаём семафор = 5. Но запускаем 10 потоков. В итоге только 5 потоков получат доступ к семафору. А остальным 5 потокам придётся ждать своей очереди.
Это удобно, чтобы ограничить число подключений к сети или одновременно авторизованных пользователей программы.
Напишем демо-программу. Начнем с импорта необходимых модулей и создания объекта семафора (укажем что семафор работает одновременно только с двумя потоками):
from time import sleep, time from threading import Thread, Semaphore, current_thread s = Semaphore(2)
Напишем функцию которую будем запускать в потоках. Она будет запоминать время запуска потока и время перед самым завершением. При этом, поток после запуска будет попадать в семафор, в котором помещаются только 2 потока.
def f(): thname = current_thread().name start_tread = time() with s: sleep(1) print(f"Время выполнения потока {thname} = {time() - start_tread}")
То есть наша функция сохраняет имя потока и текущее время. Затем использует семафор. В семафоре ожидает 1 секунду, и выводит общее время работы потока.
И наконец запускаем 10 потоков в цикле:
for i in range(10): Thread(target=f).start()
Результат выполнения этой программы:
Время выполнения потока Thread-1 (f) = 1.0010035037994385 Время выполнения потока Thread-2 (f) = 1.0010035037994385 Время выполнения потока Thread-3 (f) = 2.001539468765259 Время выполнения потока Thread-4 (f) = 2.0020761489868164 Время выполнения потока Thread-5 (f) = 3.002070665359497 Время выполнения потока Thread-6 (f) = 3.003070592880249 Время выполнения потока Thread-7 (f) = 4.003071546554565 Время выполнения потока Thread-8 (f) = 4.003068447113037 Время выполнения потока Thread-9 (f) = 5.003071069717407 Время выполнения потока Thread-10 (f) = 5.0040764808654785
Как видим, потоки выполняются одновременно по 2.
Итог
Мы познакомились с классами Event, Condition, Barrier, Semaphore модуля threading. И научились синхронизировать работу нескольких потоков.
Остальные статье по Python 3 можете посмотреть здесь.