Python. Многопоточность

Рабочий код для многопоточности на Python (3). Работает как в обычных программах, так и при запуске через Jupyter.

Суть в том, что одна и та же функция «calculations» запускается в несколько экземпляров, но с разными входными параметрами.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

from multiprocessing import cpu_count

import os



def ar_split(a, n):

    """Разделяет массив на приблизительно равные части

    :params a: массив

    :params n: кол-во частей

    

    Пример: ar_split(range(11), 3) вернёт

    [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10]]

    """

    k, m = divmod(len(a), n))

    res = [a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)]

    res = [i for i in res if i != []]

    return res



def calculations(a):

    """"Функция, которая будет параллельно запущена в несколько

    экземпляров. Каждому экземпляру надо давать свой аргумент

    для обработки, чтобы получились одновременные вычисления

    

    Внимание: эта функция ДОЛЖНА быть вне классов и других функций, 

    иначе её экземпляры не будут работать параллельно (всё пойдёт 

    в один поток)!"""

    # внутри процессов можно распечатать их id, чтобы понять

    # действительно ли они разделены:

    print(f"nid процесса родителя: {os.getppid()}. "+

          f"id дочернего процесса: {os.getpid()}. "+

          f"Аргумент для обработки: {a}")

    # for i in range(10):

    #     a *= a

    return a



def paralell_execution(func, arg, cpu_cnt=0, method=''):

    """Запускает параллельные вычисления разными способами.

    :params func: функция, которую будут вызывать параллельно

    :params list arg: аргументы для функции

    :params int cpu_count: Количество CPU для расчётов

    :params str method: Выбор способа разделения задач по процессорам:

        multithreading - годится для задач, где нужно ждать. К примеру,

        параллельная загрузка файлов.

        multiprocessing - годится для задач, где нужна вычислительная

        мощность всех ядер.

        Экспериментально установлено: multiprocessing будет выигрывать

        multithreading только на больших объёмах расчётов и большом

        количестве CPU (6+). В противном случае multithreading будет

        слегка быстрее.

    :return list: список возвращённых значений от функций func,

                  которые были запущены параллельно. Порядок

                  элементов не соответствует порядку запуска,

                  все элементы будут перемешаны!

    """

    res = None

    

    # способ параллельной обработки

    if not len(method):

        if cpu_cnt < 6:

            method = 'multithreading'

        else:

            method = 'multiprocessing'

            

    # кол-во ядер для обработки

    # в многоядерной системе одно ядро резервируем для работы системы

    if not cpu_cnt:

        cpu_cnt = cpu_count() - 1 if cpu_count() > 1 else 1 



    # Если процессоров меньше, чем задач, то разбиваем аргументы

    # на примерно равные группы для параллелизации

    if cpu_cnt < len(arg):

        arg = ar_split(arg, cpu_cnt)

    

    # запускаем параллельные вычисления

    if method == 'multiprocessing':

        with ProcessPoolExecutor(cpu_cnt) as ex:

            res = ex.map(func, arg)



    if method == 'multithreading':

        with ThreadPoolExecutor(cpu_cnt) as ex:

            res = ex.map(func, arg)

    

    # Преобразование list(res) запускает весь процесс подсчёта!

    # Если нужно запустить принудительно ранее, то можно заменить

    # map на submit и выполнить метод res.result()

    res = list(res)



    # на этом моменте все параллелные расчёты закончились и 

    # результаты объединились в массив res. Теперь надо

    # проверить, что в этом массиве столько результатов,

    # сколько потоков запускали.

    assert len(arg) == len(res), 

            'Во время расчётов была была утеряна часть данных! ' 

            'Некоторые потоки программы не вернули свой результат!'

    

    return list(res)





# Аргументы функции, с которыми их надо запускать

# Если записать одномерный массив, то каждый экземпляр функции

# получит по одному аргументу:

args_to_process = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

# Если функции надо передать несколько аргументов, то передаём их

# через массивы, а в исполняемой параллельно функции 

# ловим их через *args:

# args_to_process = [['a', 1], ['d', 2], ['c', 3]] 

# Можно передать словарь 

# args_to_process = [{'a': 1}, {'b': 1}, {'c': 1}] 





# запускаем процесс параллельных вычислений

res = paralell_execution(func=calculations,

                         arg=args_to_process,

                         method='multiprocessing')



print('Готово (в этот момент все параллельные вычисления '+

      'окончились, можно обрабатывать объединённый результат)')



print(f'Результаты вычислений: {res}')



2023-02-19T12:31:34
Программирование