В статье рассмотрим многопроцессорность в Python 3, а именно модуль multiprocessing и его классы: Process, Queue, Manager, Listen и Client.
Многопроцессорность в Python 3
Процесс — это своеобразный контейнер в рамках которого выполняется код запущенной программы. Этот контейнер содержит какое-то количество оперативной памяти. В одном процессе должен работать как минимум один поток. Или потоков может быть несколько. Про многопоточность на Python 3 я писал здесь и здесь.
Но одно приложение может работать сразу в нескольких процессах. В каждом из этих процессах будет работать один или несколько потоков. Многопроцессорность в Python 3 обеспечивается модулем multiprocessing.
Плюсом многопроцессорной программы является то, что каждый процесс работает в рамках своего GIL. Поэтому процессы могут работать с ЦПУ параллельно (на нескольких ядрах). А недостатком процессов является их тяжеловесность по сравнению с потоками.
В этой статье мы разбираем модуль multiprocessing. Он схож по функциональности с модулем threading, в частности имеет классы (Lock, Event, Condition, Semaphore и т.д.). Их мы рассматривать не будет, а рассмотрим создание процессов и обмен данными между процессами.
Создание процессов
Для создания процессов используют класс Process модуля multiprocessing. Для большего понимания напишем демо программу.
Начнём с импорта необходимых модулей.
from multiprocessing import Process
Теперь напишем функцию, которую будем запускать в процессе:
def f(num): print(f"Процесс № {num}")
При запуске процессов нужно обязательно использовать конструкцию if name == «main». В этой конструкции запустим в цикле 5 процессов.
if __name__ == "__main__": for i in range(5): Process(target=f, args=(i,)).start()
Создание процесса, ничем не отличается от создания потока. В параметр target мы передаём запускаемую функцию. А в параметр args — передаём массив параметров функции. Так как мы должны передать массив, а у нас всего 1 параметр, то после него ставим запятую. Точно также как поток, процесс можно запускать в режиме демона, для этого имеется параметр daemot=True.
Результат выполнения этой программы:
Процесс № 2 Процесс № 1 Процесс № 0 Процесс № 3 Процесс № 4
Класс Queue
Класс Queue это очередь на основе FIFO (первый пришёл, первый ушёл). В очередь можно что-то поместить (put()) или что-то вытащить (get()).
При помещении в очередь объект блокируется, пока не станет доступен следующий слот. То есть в один слот два разных процесса одновременно не смогут поместить два разных объекта.
Метод get() не просто возвращает элемент из очереди, он его от туда удаляет, то есть освобождает слот.
Напишем демо-программу. Начнем с импорта необходимых модулей.
from multiprocessing import Process, Queue
Напишем функцию, которая будет принимать два параметра: массив чисел (value) и очередь (queue). Функция будет брать числа из массива и возводить их в квадрат. Затем полученные числа будут помещаться в очередь.
def square(value, queue): for i in value: queue.put(i*i)
Напишем ещё одну функцию. Она будет делать почти тоже самое, но возводить числа в куб.
def cube(value, queue): for i in value: queue.put(i*i*i)
Теперь напишем основной блок кода. Там создадим список чисел (my_numbers), и очередь (queue).
if __name__ == "__main__": my_numbers = (2, 3, 4) queue = Queue()
Дальше создадим два процесса для запуска написанных ранее функций. В качестве параметров передаём в эти функции список чисел и очередь. И запустим эти процессы. И будем ожидать их выполнения с помощью метода join().
p_square = Process(target=square, args=(my_numbers, queue)) p_cube = Process(target=cube, args=(my_numbers, queue)) p_square.start() p_cube.start() p_square.join() p_cube.join()
Пока очередь не пустая выводим числа из очереди используя метод get().
while not queue.empty(): print(queue.get(), end= " / ")
Процессы могут помещать объекты в очередь асинхронно, поэтому вывод программы будет всегда разным:
### Результат выполнения 1 4 / 9 / 16 / 8 / 27 / 64 / ### Результат выполнения 2 8 / 27 / 64 / 4 / 9 / 16 /
То есть в первом случае первее отработала функция возведения в квадрат и поместила 4 в очередь. А во втором случае первее отработала функция возведения в куб и поместила 8 в очередь.
Класс Manager
Этот класс позволяет создать объекты, которыми смогут совместно пользоваться разные процессы. Такой совместный доступ используется в рамках Namespace, который предоставляет класс Manager.
Разберём всё в примере. Начнем с импорта необходимых модулей.
from multiprocessing import Process, Manager, Event
Создадим функцию. Она будет принимать параметры: ns (namespace в котором будут размещаться общие объекты), event (событие, блокирующее выполнение процесса, пока не будет установлен флаг, его мы рассматривали в потоках). Функция будет заполнять ns объектами. В частности в список (my_list) будут добавляться объекты, а переменная (my_value) получит определённое значение. После заполнения ns, будет установлен флаг event.set(), то есть ожидающие потоки смогут работать дальше.
def slave(ns, event): ns.my_list.append(1) ns.my_list.append(2) ns.my_list.append(3) ns.my_value = 3.14 event.set()
Затем создадим ещё одну функцию. Она также принимает параметры ns и event. И показывает значения переменных из ns (my_list и my_value) до того как первая функция их заполнит и после. Это достигается с помощью метода event.wait().
def master(ns, event): print(f'my_list до установки флага события: {ns.my_list}') print(f'my_value до установки флага события: {ns.my_value}') event.wait() print(f'my_list после установки флага события: {ns.my_list}') print(f'my_value после установки флага события: {ns.my_value}')
Теперь напишем основной блок кода. В нём создадим объект класса Manager(), и с помощью ного создадим namespace.
if __name__ == '__main__': mgr = Manager() namespace = mgr.Namespace()
Создадим в namespace переменные: list() — это список, Value(‘d’, 0) — это объект с типом данных и значением.
Тип данных может быть ‘i’ — int, ‘d’ — float.
namespace.my_list = mgr.list() namespace.my_value = mgr.Value('d', 0.0)
Создадим event:
event = Event()
И наконец создадим и запустим оба процесса. И будет дожидаться их завершения.
slave_process = Process(target=slave, args=(namespace, event)) master_process = Process(target=master, args=(namespace, event)) master_process.start() slave_process.start() master_process.join() slave_process.join()
Результат выполнения данного кода может быть разным:
# 1 вариант my_list до установки флага события: [1] my_value до установки флага события: Value('d', 0.0) my_list после установки флага события: [1, 2, 3] my_value после установки флага события: 3.14 2 вариант my_list до установки флага события: [] my_value до установки флага события: Value('d', 0.0) my_list после установки флага события: [1, 2, 3] my_value после установки флага события: 3.14
Это происходит из за того, что мы запускаем два процесса Master (выводит переменные из Namespace, ожидает снятия блокировки и снова их выводит). Slave (заполняет Namespace, и снимает блокировку). Если начнёт выполнятся первее Slave, то он успеет добавить 1 или несколько значений в список. И Master их выведет ещё до попадания в блокировку. А если выполнится первее Master, то он выведет пустой список, встретит блокировку и будет ожидать, пока Slave не снимет блокировку.
Классы Listen и Client
Данные классы находятся не в самом модуле multiprocessing а во вложенном multiprocessing.connection. Они позволяют создать серверный и клиентский процессы. При этом эти процессы смогут передавать данные друг друг по сети с помощью методов send() и recv(). Реализуя обмен между процессами важно помнить о блокировках, или чтобы облегчить себе жизнь можно использовать менеджер контекста (with).
В качестве примера напишем следующий код. Начнем как всегда с импорта необходимых модулей.
from multiprocessing.connection import Listener, Client from multiprocessing import Process
Напишем функцию server(). Сервер будет слушать 6000 порт на localhost, и требовать пароль. При подключении к серверу об этом событии сообщается. Сервер передаёт на клиент данные с помощью метода send().
def server(): address = ('localhost', 6000) with Listener(address, authkey=b'secret') as listener: with listener.accept() as conn: print(f'Подключение разрешено с адреса {listener.last_accepted}') conn.send([1, 2, 3]) conn.send('alex')
Теперь напишем функцию client(). Эта функция подключается к 6000 порту на localhost, и предоставляет пароль.
Получает данные с сервера с помощью метода recv(), и выводит их.
def client(): address = ('localhost', 6000) with Client(address,authkey=b'secret') as conn: print(conn.recv()) # [1, 2, 3] print(conn.recv()) # alex
В основном блоке кода запускаем два процесса.
if __name__ == '__main__': Process(target=server).start() Process(target=client).start()
Результат выполнения этой программы:
Подключение разрешено с адреса ('127.0.0.1', 54581) [1, 2, 3] alex
Итог
Многопроцессорность в Python 3 обеспечивает модуль — multiprocessing. Этот модуль схож с модулем threading. Создаются процессы с помощью класса Process. Запускаются с помощью метода start(), и также как потоки их можно ожидать с помощью метода join().
В статье в основном разбирались классы позволяющие реализовать обмен данными между процессами:
- Queue — Позволяет сделать очередь с объектами. В эту очередь процессы могут помещать объекты и извлекать их.
- Manager — Позволяет сделать namespace, в котором будут находится общие объекты. Различные процессы могут использовать этот namespace и получать доступ к общим объектам.
- Listen и Client — позволяют реализовать сетевое взаимодействие между процессами. Процессы могут, с помощью методов send() и recv(), отправлять и получать данные по сети.
Более подробно об этом модуле можете почитать здесь.