Python 3 Синхронизация потоков

В статье я опишу механизмы Python 3 с помощью которых выполняется синхронизация потоков, а именно классы Event, Condition, Barrier и Semaphore.















Введение




Синхронизация потоков Python 3 нужна в разных случаях, например:




  • Потоки должны ожидать какого-то события, и в случае его наступления выполнять работу дальше.



  • Один поток должен подождать разблокировки объекта, так как с ним уже работает другой поток. При этом разблокировка будет выполняться по како-му то событию.



  • Приложение должно дождаться работы нескольких потоков и только потом продолжить выполнение кода.



  • Может быть запущено большое количество потоков, но выполняться они должны параллельно по нескольку штук.




Синхронизация потоков в Python 3 обеспечивается благодаря классам модуля threading: Event, Condition, Barrier, Semaphore.




Класс Event




Здесь мы рассмотрим класс Event модуля threading. Этот класс содержит 3 метода: set — устанавливает флаг, clear — очищает флаг, wait — заставляет поток ожидать, пока флаг не будет установлен.




По умолчанию флаг снят, то есть строчка event.wait() заставит поток приостановиться. Дальше можно установить флаг с помощью event.set(), и тогда поток продолжит своё выполнение.




Более понятным языком это можно описать так, запускаем процессы, они что-то делают, затем приостанавливаются и ждут каково-то события. И как только это событие наступает, то продолжают свою работу.




Чтобы лучше с этим разобраться напишем демо-программу. Начнём с импорта нужных модулей: time и threading. Функция current_thread — определяет имя текущего потока, а функция active_count — сообщает текущее число потоков.




from time import sleep
from threading import Thread, Event, current_thread, active_count




Создадим объект класса Event. По умолчанию у созданного объекта флаг снят (то есть событие не наступило).




event = Event()




Укажем количество потоков которые будут запускаться:




max_t = 5




Напишем функцию, которую будем запускать в потоках. Она в переменную t_nane сохраняет имя текущего потока. Сообщает что запустился такой-то поток. А после ждёт пока наступит событие (event.wait()). И как только событие наступит, продолжает свою работу.




def f():
    thr_num = current_thread().name
    print(f"Поток {thr_num} запустился. Но ждёт остальных.")
    event.wait()
    print(f"Событие наступило! Поток {thr_num} продолжил свою работу")




Теперь в цикле запускаем потоки с задержкой 0.2 секунды:




for i in range(max_t):
    Thread(target=f).start()
    sleep(0.2)




И как только запустятся все потоки устанавливаем флаг объекту event (говорим что событие наступило):




if active_count() >= max_t:
    event.set()




Результат выполнения написанной выше программы:




Поток Thread-1 (f) запустился. Но ждёт остальных.
Поток Thread-2 (f) запустился. Но ждёт остальных.
Поток Thread-3 (f) запустился. Но ждёт остальных.
Поток Thread-4 (f) запустился. Но ждёт остальных.
Поток Thread-5 (f) запустился. Но ждёт остальных.
Событие наступило! Поток Thread-1 (f) продолжил свою работу
Событие наступило! Поток Thread-2 (f) продолжил свою работу
Событие наступило! Поток Thread-5 (f) продолжил свою работу
Событие наступило! Поток Thread-4 (f) продолжил свою работу
Событие наступило! Поток Thread-3 (f) продолжил свою работу




Дополнительная информация по классу Event().









Класс Condition




Класс Condition тоже используется для синхронизации потоков. Он является некоторой комбинацией классов Event и Lock, что позволяет потокам ожидать пока ресурс с которым происходит работа будет разблокирован. А разблокировать мы можем объект при каком-то условии.




Например первый поток перебирает число от 1 до 20, и если число делится на 5 без остатка, то разблокирует объект (устанавливает флаг). А второй поток выводит надпись «Событие наступило», когда дождётся установленного флага. Вот такую программу мы и реализуем.




Начнем писать код с импорта необходимых модулей и создание объекта класса Condition:




from time import sleep
from threading import Thread, Condition
cond = Condition()




Напишем первую функцию. Она будет в бесконечном цикле ставить блокировку — cond.wait(). И с Condition важно использовать менеджер контекста (with) чтобы он сам следил за блокировкой и разблокировкой.




def f1():
    while True:
        with cond:
            cond.wait()
            print("Получили событие!")




Затем напишем вторую функцию. В цикле пробежимся по цифрам от 0 до 20. И если число делится на 5 без остатка, то будем снимать блокировку (cond.notify()). Здесь также используем менеджер контекста (with). В противном случае (если число не делится на 2 без остатка) мы выводим это число на экран. И дополнительно установим небольшую задержку (sleep(0.2)).




def f2():
    for i in range(21):
        if i % 5 == 0:
            with cond:
                cond.notify()
        else:
            print(i)
        sleep(0.2)




И наконец запускаем оба потока:




Thread(target=f1, daemon=True).start()
Thread(target=f2).start()




Первый поток запускаю в режиме демона, так как там используется бесконечный цикл и нужно чтобы этот поток завершился при завершении основной программы.




Результат выполнения нашей программы:




Получили событие!
1
2
3
4
Получили событие!
6
7
8
9
Получили событие!
11
12
13
14
Получили событие!
16
17
18
19
Получили событие!




За надпись «Получили событие!» отвечает первая функция. Разблокировка происходит во второй функции, когда число делится на 5 без остатка.




Дополнительная информация по Condition().









Класс Barrier




Barrier позволяет реализовать алгоритм, когда необходимо дождаться завершения работы группы потоков, прежде чем продолжить выполнение задачи.




Например у вас первый поток что-то высчитывает и второй поток что-то высчитывает. А третий поток (возможно основной) должен использовать результаты первого и второго потока для дальнейшего расчёта. Для этого основная программа должна дождаться выполнения обоих потоков и только затем продолжить работать. Вот и напишем подобную программу.




Начнем как обычно с импорта необходимых модулей:




from time import sleep
from threading import Thread, Barrier




Создадим барьер. При его создании необходимо указать сколько потоков будет работать с барьером. У нас будет работать 3 потока — 1 основной и 2 дополнительных.




br = Barrier(3)




Дальше сделаем две глобальные переменные, в которые будем помещать результаты работы каждой функции:




a = 0
b = 0




Напишем первую функцию (возведение числа в квадрат). Она будет ожидать 1 секунду. А дальше будет ждать заполнения барьера до 3 потоков (br.wait()).




def f1(x):
   print("Высчитываем 1 число")
   global a 
   a = x**2
   sleep(1)
   br.wait()




Напишем вторую функцию (умножение на 2). Она будет ожидать 2 секунды. И тоже ожидать заполнения барьера.




def f2(x):
   print("Высчитываем 2 число")
   global b 
   b = x*2
   sleep(2)
   br.wait()




Я использую разное ожидание (первый поток ждёт 1 секунду, второй — 2 секунды), чтобы продемонстрировать, что потоки могут работать разное время. Но в ожидание br.wait() должны попасть три потока, и только тогда потоки продолжат своё выполнение.




Запустим оба потока:




Thread(target=f1, args=(3,)).start()
Thread(target=f2, args=(7,)).start()




Укажем основному потоку программы тоже ждать заполнения барьера:




br.wait()




И вот тут у нас 3 потока попали в ожидание барьера, как только это случилось, программа продолжила своё выполнение. После чего на экран выводится сумма полученных ранее двух результатов:




print("Результат = ", a+b)




Результат выполнения этой программы:




Высчитываем 1 число
Высчитываем 2 число
Результат =  23




Класс Semaphore




Семафоры — это технология в основе которой лежит счетчик. Когда поток заходит в семафор, то его счётчик уменьшается на 1. И когда счетчик становится равным нулю, то новый поток уже не может попасть в семафор и ожидает освобождения семафора. Когда поток в семафоре завершается, то счетчик увеличивается на 1.




Например создаём семафор = 5. Но запускаем 10 потоков. В итоге только 5 потоков получат доступ к семафору. А остальным 5 потокам придётся ждать своей очереди.




Это удобно, чтобы ограничить число подключений к сети или одновременно авторизованных пользователей программы.




Напишем демо-программу. Начнем с импорта необходимых модулей и создания объекта семафора (укажем что семафор работает одновременно только с двумя потоками):




from time import sleep, time
from threading import Thread, Semaphore, current_thread
s = Semaphore(2)




Напишем функцию которую будем запускать в потоках. Она будет запоминать время запуска потока и время перед самым завершением. При этом, поток после запуска будет попадать в семафор, в котором помещаются только 2 потока.




def f():
   thname = current_thread().name
   start_tread = time()
   with s:
       sleep(1)
       print(f"Время выполнения потока {thname} = {time() - start_tread}")




То есть наша функция сохраняет имя потока и текущее время. Затем использует семафор. В семафоре ожидает 1 секунду, и выводит общее время работы потока.




И наконец запускаем 10 потоков в цикле:




for i in range(10):
   Thread(target=f).start()




Результат выполнения этой программы:




Время выполнения потока Thread-1 (f) = 1.0010035037994385
Время выполнения потока Thread-2 (f) = 1.0010035037994385
Время выполнения потока Thread-3 (f) = 2.001539468765259
Время выполнения потока Thread-4 (f) = 2.0020761489868164
Время выполнения потока Thread-5 (f) = 3.002070665359497
Время выполнения потока Thread-6 (f) = 3.003070592880249
Время выполнения потока Thread-7 (f) = 4.003071546554565
Время выполнения потока Thread-8 (f) = 4.003068447113037
Время выполнения потока Thread-9 (f) = 5.003071069717407
Время выполнения потока Thread-10 (f) = 5.0040764808654785




Как видим, потоки выполняются одновременно по 2.









Итог




Мы познакомились с классами Event, Condition, Barrier, Semaphore модуля threading. И научились синхронизировать работу нескольких потоков.




Остальные статье по Python 3 можете посмотреть здесь.



2023-09-07T12:17:59
Python