Рабочий код для многопоточности на Python (3). Работает как в обычных программах, так и при запуске через Jupyter.
Суть в том, что одна и та же функция «calculations» запускается в несколько экземпляров, но с разными входными параметрами.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import cpu_count
import os
def ar_split(a, n):
"""Разделяет массив на приблизительно равные части
:params a: массив
:params n: кол-во частей
Пример: ar_split(range(11), 3) вернёт
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10]]
"""
k, m = divmod(len(a), n))
res = [a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)]
res = [i for i in res if i != []]
return res
def calculations(a):
""""Функция, которая будет параллельно запущена в несколько
экземпляров. Каждому экземпляру надо давать свой аргумент
для обработки, чтобы получились одновременные вычисления
Внимание: эта функция ДОЛЖНА быть вне классов и других функций,
иначе её экземпляры не будут работать параллельно (всё пойдёт
в один поток)!"""
# внутри процессов можно распечатать их id, чтобы понять
# действительно ли они разделены:
print(f"nid процесса родителя: {os.getppid()}. "+
f"id дочернего процесса: {os.getpid()}. "+
f"Аргумент для обработки: {a}")
# for i in range(10):
# a *= a
return a
def paralell_execution(func, arg, cpu_cnt=0, method=''):
"""Запускает параллельные вычисления разными способами.
:params func: функция, которую будут вызывать параллельно
:params list arg: аргументы для функции
:params int cpu_count: Количество CPU для расчётов
:params str method: Выбор способа разделения задач по процессорам:
multithreading - годится для задач, где нужно ждать. К примеру,
параллельная загрузка файлов.
multiprocessing - годится для задач, где нужна вычислительная
мощность всех ядер.
Экспериментально установлено: multiprocessing будет выигрывать
multithreading только на больших объёмах расчётов и большом
количестве CPU (6+). В противном случае multithreading будет
слегка быстрее.
:return list: список возвращённых значений от функций func,
которые были запущены параллельно. Порядок
элементов не соответствует порядку запуска,
все элементы будут перемешаны!
"""
res = None
# способ параллельной обработки
if not len(method):
if cpu_cnt < 6:
method = 'multithreading'
else:
method = 'multiprocessing'
# кол-во ядер для обработки
# в многоядерной системе одно ядро резервируем для работы системы
if not cpu_cnt:
cpu_cnt = cpu_count() - 1 if cpu_count() > 1 else 1
# Если процессоров меньше, чем задач, то разбиваем аргументы
# на примерно равные группы для параллелизации
if cpu_cnt < len(arg):
arg = ar_split(arg, cpu_cnt)
# запускаем параллельные вычисления
if method == 'multiprocessing':
with ProcessPoolExecutor(cpu_cnt) as ex:
res = ex.map(func, arg)
if method == 'multithreading':
with ThreadPoolExecutor(cpu_cnt) as ex:
res = ex.map(func, arg)
# Преобразование list(res) запускает весь процесс подсчёта!
# Если нужно запустить принудительно ранее, то можно заменить
# map на submit и выполнить метод res.result()
res = list(res)
# на этом моменте все параллелные расчёты закончились и
# результаты объединились в массив res. Теперь надо
# проверить, что в этом массиве столько результатов,
# сколько потоков запускали.
assert len(arg) == len(res),
'Во время расчётов была была утеряна часть данных! '
'Некоторые потоки программы не вернули свой результат!'
return list(res)
# Аргументы функции, с которыми их надо запускать
# Если записать одномерный массив, то каждый экземпляр функции
# получит по одному аргументу:
args_to_process = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# Если функции надо передать несколько аргументов, то передаём их
# через массивы, а в исполняемой параллельно функции
# ловим их через *args:
# args_to_process = [['a', 1], ['d', 2], ['c', 3]]
# Можно передать словарь
# args_to_process = [{'a': 1}, {'b': 1}, {'c': 1}]
# запускаем процесс параллельных вычислений
res = paralell_execution(func=calculations,
arg=args_to_process,
method='multiprocessing')
print('Готово (в этот момент все параллельные вычисления '+
'окончились, можно обрабатывать объединённый результат)')
print(f'Результаты вычислений: {res}')
2023-02-19T12:31:34
Программирование