Рабочий код для многопоточности на Python (3). Работает как в обычных программах, так и при запуске через Jupyter.
Суть в том, что одна и та же функция «calculations» запускается в несколько экземпляров, но с разными входными параметрами.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from multiprocessing import cpu_count import os def ar_split(a, n): """Разделяет массив на приблизительно равные части :params a: массив :params n: кол-во частей Пример: ar_split(range(11), 3) вернёт [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10]] """ k, m = divmod(len(a), n)) res = [a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)] res = [i for i in res if i != []] return res def calculations(a): """"Функция, которая будет параллельно запущена в несколько экземпляров. Каждому экземпляру надо давать свой аргумент для обработки, чтобы получились одновременные вычисления Внимание: эта функция ДОЛЖНА быть вне классов и других функций, иначе её экземпляры не будут работать параллельно (всё пойдёт в один поток)!""" # внутри процессов можно распечатать их id, чтобы понять # действительно ли они разделены: print(f"nid процесса родителя: {os.getppid()}. "+ f"id дочернего процесса: {os.getpid()}. "+ f"Аргумент для обработки: {a}") # for i in range(10): # a *= a return a def paralell_execution(func, arg, cpu_cnt=0, method=''): """Запускает параллельные вычисления разными способами. :params func: функция, которую будут вызывать параллельно :params list arg: аргументы для функции :params int cpu_count: Количество CPU для расчётов :params str method: Выбор способа разделения задач по процессорам: multithreading - годится для задач, где нужно ждать. К примеру, параллельная загрузка файлов. multiprocessing - годится для задач, где нужна вычислительная мощность всех ядер. Экспериментально установлено: multiprocessing будет выигрывать multithreading только на больших объёмах расчётов и большом количестве CPU (6+). В противном случае multithreading будет слегка быстрее. :return list: список возвращённых значений от функций func, которые были запущены параллельно. Порядок элементов не соответствует порядку запуска, все элементы будут перемешаны! """ res = None # способ параллельной обработки if not len(method): if cpu_cnt < 6: method = 'multithreading' else: method = 'multiprocessing' # кол-во ядер для обработки # в многоядерной системе одно ядро резервируем для работы системы if not cpu_cnt: cpu_cnt = cpu_count() - 1 if cpu_count() > 1 else 1 # Если процессоров меньше, чем задач, то разбиваем аргументы # на примерно равные группы для параллелизации if cpu_cnt < len(arg): arg = ar_split(arg, cpu_cnt) # запускаем параллельные вычисления if method == 'multiprocessing': with ProcessPoolExecutor(cpu_cnt) as ex: res = ex.map(func, arg) if method == 'multithreading': with ThreadPoolExecutor(cpu_cnt) as ex: res = ex.map(func, arg) # Преобразование list(res) запускает весь процесс подсчёта! # Если нужно запустить принудительно ранее, то можно заменить # map на submit и выполнить метод res.result() res = list(res) # на этом моменте все параллелные расчёты закончились и # результаты объединились в массив res. Теперь надо # проверить, что в этом массиве столько результатов, # сколько потоков запускали. assert len(arg) == len(res), 'Во время расчётов была была утеряна часть данных! ' 'Некоторые потоки программы не вернули свой результат!' return list(res) # Аргументы функции, с которыми их надо запускать # Если записать одномерный массив, то каждый экземпляр функции # получит по одному аргументу: args_to_process = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] # Если функции надо передать несколько аргументов, то передаём их # через массивы, а в исполняемой параллельно функции # ловим их через *args: # args_to_process = [['a', 1], ['d', 2], ['c', 3]] # Можно передать словарь # args_to_process = [{'a': 1}, {'b': 1}, {'c': 1}] # запускаем процесс параллельных вычислений res = paralell_execution(func=calculations, arg=args_to_process, method='multiprocessing') print('Готово (в этот момент все параллельные вычисления '+ 'окончились, можно обрабатывать объединённый результат)') print(f'Результаты вычислений: {res}')
2023-02-19T12:31:34
Программирование