Обложка

Если опустить первое и самое главное предубеждение относительно питонячьей многопоточности у большинства программистов — что её не существует из-за GIL, — то остается другое, и, наверное, вполне достоверное: многопоточность — это сложно, и нам этого, пожалуйста, не надо. И знаете что? Так оно и есть. Многопоточность — это сложно, особенно когда выбираешься за пределы стандартных руководств и попадаешь со своей многопоточной поделкой в реальный мир. И, возможно, вам не нужно. Ни здесь, ни далее я не буду обсуждать целесообразность написания многопоточного кода на Python и сразу перейду к тому, как это делать.

Если эта статья всем понравится, за ней может последовать серия на ту же тему: как писать многопоточный код с примерами на Python. Конкретно в этой статье примеры будут взяты из библиотеки Polog, которую я пишу в свободное время. Она предназначена для логирования, но я постарался вынести все базовые принципы и паттерны в эту статью в специфико-агностичной форме. А там, где без специфики библиотеки не обойтись, буду указывать, в чём именно она состоит. Хочется верить, что это даже поспособствует усвоению материала за счёт каких-то ассоциативных связей, что ли.

Дисклеймер

Несмотря на то, что я привожу собственный код в качестве примера, я тем самым не утверждаю, что он написан лучшим из возможным способов. Более того, я даже не уверен, что вы не сможете найти в нём каких-либо (возможно, фатальных) недостатков. Если вдруг вы в состоянии это сделать, было бы круто, не сдерживайте себя.

Кроме того, здесь не будет ликбеза про многопоточность. Предполагается, что вы уже прочитали что-то вроде документации к модулю threading из стандартной библиотеки, и в целом хотя бы немного знакомы с темой. Вам не нужно объяснять, чем потоки отличаются от вашей бабушки, кто такие эти наши локи (и каковы их семейные взаимоотношения с дедом), что такое состояние гонки и всякое подобное. Про это всё вы уже сами прочитали и теперь вам интересно не столько то, что такое многопоточность, сколько то, как её применение может выглядеть в конкретном проекте.

Чего мы хотим?

Мем

Существует самый простой и очевидный способ хранить настройки приложения в качестве обычных переменных внутри модуля или класса. Его используют в том числе крупные веб-фреймворки, и я поначалу делал так же. Так почему же по итогу пришлось всё переделывать? Дело в том, что к хранению настроек внутри библиотеки возникли довольно специфические требования:

  • Прежде всего, необходима гибкая валидация значений. Причём, поскольку это библиотека, пользователь должен получать информативное исключение с описанием того, что именно он сделал не так, когда попытался применить заведомо невозможную настройку (например, установить какой-нибудь интервал времени в виде отрицательной величины).

  • Необходимо валидировать не просто значения сами по себе, но и их комбинации. К примеру, в Polog поддерживается два типа движков: синхронный (без использования воркеров в отдельных потоках) и асинхронный. Синхронный подгружается, когда настройка pool_size установлена в значение 0, асинхронный — при любом значении больше нуля. Асинхронный движок работает по схеме producer–consumer с передачей данных через очередь. Так вот, для этой самой очереди можно установить лимит — целое число больше нуля, а можно не устанавливать, оставив значение 0 по умолчанию. «Невозможной» комбинацией является pool_size равный нолю, и при этом max_queue_size (лимит размера очереди) больше нуля, поскольку никакой очереди в синхронном движке просто нет. Хотя каждое отдельное из значений — нулевой размер пула и ненулевой размер очереди — само по себе валидно, при попытке установить такую комбинацию мы тоже должны кинуть исключение с описанием конфликтующих полей.

  • Нужна возможность изменять настройки в любой момент. Исходно всё работало так: перед записью самого первого лога вы указывали нужные значения для всех пунктов настроек, и потом, когда первый лог создавался, инициализировались все внутренние объекты фреймворка согласно настройкам на этот момент времени. Дальше уже ничего поменять было нельзя. Так быть не должно, в идеале любой пункт настроек должно быть возможно изменить в любой момент времени, не потеряв при этом ни одной записи. Дальше станет ясно, почему этого добиться не так уж просто.

  • Каждый пункт настроек должен иметь значение по умолчанию.

  • Всё должно быть защищено от состояния гонки.

  • Хранилище должно иметь интерфейс словаря. Это просто и удобно, а ещё так компоненты фреймворка проще тестировать, подменяя хранилище настроек словарем.

Базовая концепция

У нас есть класс, оборачивающий доступ к словарю, объявленному как один из его атрибутов. Примерно вот такой:

from threading import Lock
from polog.core.utils.reload_engine import reload_engine
from polog.core.utils.read_only_singleton import ReadOnlySingleton
... # Прочие всякие импорты.


class SettingsStore(ReadOnlySingleton):
    # Перечисление пунктов настроек. Каждый пункт - экземпляр класса SettingPoint, внутри которого проводятся все необходимые проверки и блокировки.
    # По сути весь класс SettingsStore проксирует доступ к этому словарю.
    points = {
        'pool_size': SettingPoint(
            2,
            proves={
                'the value must be an integer': lambda x: isinstance(x, int),
                'the value must be greater than or equal to zero': lambda x: x >= 0,
            },
            conflicts={
                'max_queue_size': lambda new_value, old_value, other_field_value: new_value == 0 and other_field_value != 0,
            },
            action=lambda old_value, new_value, store: reload_engine() if old_value != new_value else None,
            read_lock=True,
            shared_lock_with=(
                'max_queue_size',
                'started',
            ),
        ),
        'max_queue_size': SettingPoint(
            0,
            proves={
                'the value must be an integer': lambda x: isinstance(x, int),
                'the value must be greater than or equal to zero': lambda x: x >= 0,
            },
            conflicts={
                'pool_size': lambda new_value, old_value, other_field_value: new_value != 0 and other_field_value == 0,
            },
            action=lambda old_value, new_value, store: reload_engine() if old_value != new_value else None,
            read_lock=True,
            shared_lock_with=(
                'pool_size',
                'started',
            ),
        ),
        ... # Там ниже должны быть еще другие пункты настроек, мы их не будем приводить здесь в целях экономии трафика.
    }
    lock = Lock()
    points_are_informed = False

    def __init__(self):
        with self.lock:
            if not self.points_are_informed:
                for name, point in self.points.items():
                    # Передаем в объект поля объект хранилища.
                    # Таким образом теперь каждый объект поля может обращаться ко всем остальным объектам поля.
                    point.set_store_object(self)
                    # Оповещаем объект поля о его собственном имени. Это полезно для конструирования сообщений об ошибках, например.
                    point.set_name(name)
                for name, point in self.points.items():
                    # По умолчанию внутри каждого поля есть собственный объект лока.
                    # В данном методе объект поля передает свой лок всем связанным с ним полям.
                    # Это нужно, чтобы блокировка одного поля блочила и остальные из этой группы. Используется в ситуациях, когда атомарное изменение должно быть комплексным.
                    point.share_lock_object()
                self.points_are_informed = True

    def __getitem__(self, key):
        point = self.get_point(key)
        return point.get()

    def __setitem__(self, key, value):
        point = self.get_point(key)
        point.set(value)

    def get_point(self, key):
        if key not in self.points:
            raise KeyError(f'{key} - there is no settings point with this name.')
        return self.points[key]

  ... # Какие-то прочие методы, для нас пока не существенные.

Ключи в этом словаре — имена пунктов настроек, а значения — объекты, в которых хранятся значения. Именно в этих объектах класса SettingPoint и происходит почти вся описанная выше магия: валидация значений с киданием красивых исключений, блокировки и всё прочее. Далее мы сосредоточимся на устройстве и механизмах внутри этого класса.

Валидируем одиночные значения

Рассмотрение класса SettingPoint мы начнём с самого простого — как валидируются значения. Напомню, что создание экземпляра выглядело примерно так:

SettingPoint(
    2,
    proves={
        'the value must be an integer': lambda x: isinstance(x, int),
        'the value must be greater than or equal to zero': lambda x: x >= 0,
    },
    conflicts={
        'max_queue_size': lambda new_value, old_value, other_field_value: new_value == 0 and other_field_value != 0,
    },
    action=lambda old_value, new_value, store: reload_engine() if old_value != new_value else None,
    read_lock=True,
    shared_lock_with=(
        'max_queue_size',
        'started',
    ),
)

На этом этапе нас интересует словарь, который мы передаем для инициализации экземпляра как аргумент proves:

{
    'the value must be an integer': lambda x: isinstance(x, int),
    'the value must be greater than or equal to zero': lambda x: x >= 0,
}

Как несложно догадаться, ключи в этом словаре — фрагменты тех сообщений об ошибках, с которыми будут подниматься исключения. А значения — функции. Вот так выглядит метод объекта SettingPoint, где применяются эти лямбды:

def prove_value(self, value):
    for message, prove in self.proves.items(): # Предварительно словарь с лямбдами был сохранен в экземпляре как self.proves.
        if not prove(value):
            if not hasattr(self, 'name'):
                full_message = f'You used an incorrect value "{value}": {message}.'
            else:
                full_message = f'You used an incorrect value "{value}" for the field "{self.name}": {message}.'
            raise ValueError(full_message)

Как видим, перед поднятием исключения конструируется сообщение, в котором указано, с каким полем возникла проблема и какая именно. Всё это на человеческом английском. К слову говоря, каждый объект SettingPoint был оповещён о своём имени внутри метода __init__ класса SettingsStore.

Ищем конфликты

Немного усложняем задачу: теперь нам нужно проверить, что два разных значения настроек не образуют невозможную комбинацию, хотя каждое из них в отдельности валидно. Для этого при инициализации объекта SettingPoint мы передаём словарь с описанием конфликтов, примерно такой:

{
    'max_queue_size': lambda new_value, old_value, other_field_value: new_value == 0 and other_field_value != 0,
}

Ключи в этом словаре — это названия полей, конфликты с которыми мы проверяем, а значения — функции. При каждой попытке сохранить новое значение этого пункта настроек мы передаём в каждую из функций три параметра: новое значение; значение, которое было до этого; и текущее значение того поля, конфликты с которым мы ищем. Для этого в объекте поля запускается простенький метод:

def prove_conflicts(self, old_value, new_value):
    for field_name, conflict_checker in self.conflicts.items(): # Предварительно словарь с описанием конфликтов был сохранен в self.conflicts.
        if conflict_checker(new_value, old_value, self.store.force_get(field_name)): # .force_get() - метод хранилища настроек, позволяющий получить значение любого пункта в обход блокировки, то есть осуществить по сути "грязное чтение". Но о блокировках будет дальше, до них мы еще не дошли.
            raise ValueError(f'The new value "{new_value}" of the field "{self.name}" is incompatible with the current value "{self.store.force_get(field_name)}" of the field "{field_name}".')

Что ж, мы провели два типа проверок и теперь должны быть уверены, что новое значение настройки нам подходит. Теперь наша задача — применить полученные данные.

Меняем настройки в рантайме

Пунктов настроек больше одного, но я опишу конвейер изменения одного из них, с которым пришлось повозиться больше всех. Чтобы объяснить, почему так сложно его изменить, придётся немного погрузить в устройство фреймворка до и после. Будет немного специфики, без неё никуда.

Исходно ядро Polog было исключительно многопоточным, на базе самописного thread pool. Тредпул работал по паттерну издатель-подписчик, то есть состоял из трёх логических частей:

  1. Очередь для коммуникации.

  2. Объект, который передаёт логи в очередь.

  3. Набор потоков-воркеров. В каждом из них запущен бесконечный цикл, в котором они ожидают появления новых записей в очереди. Как только первый из них забирает лог, он сразу его как-то обрабатывает, например, записывает в файл.

Воркеры создавались «лениво», в момент записи первого лога. Это позволяло применить любые настройки на этапе инициализации программы. Проблема только в том, что тредпул получался «приколоченным гвоздями». После записи первого лога уже ни добавить воркеров, ни убрать лишних. Кроме того, не была предусмотрена возможность использования беспоточного движка: класс движка тоже, получается, приколочен. Валидным размером тредпула в настройках было только число не меньше единицы.

Существует две базовые стратегии изменения тредпула «на лету»:

  1. Изменение текущего тредпула с помощью добавления новых или удаления из него старых воркеров. Это вполне легко реализовать, нужно лишь получить новое число потоков; сравнить его с текущим; определить, добавляем или убираем; и применить изменение.

  2. Обернуть компонент с тредпулом в объект уровнем выше, который при каждом изменении настроек будет пересоздавать пул с нуля.

Казалось бы, первый вариант экономичнее. Надо нам добавить 101-й поток, мы не занимаемся бессмысленным уничтожением и созданием заново предыдущей сотни. Мы просто создаём ещё один и добавляем в пул. Однако я выбрал второй вариант, поскольку его API универсальнее и позволяет легко изменять любые характеристики движка, а не только количество воркеров. Любое изменение — это уничтожение предыдущего компонента и создание нового в соответствии с новыми настройками. У такого решения, кстати, был важный и полезный побочный эффект: стало возможным существование более одного типа движков. Старый уничтожается, а новый может быть каким угодно, он не связан со старым какими-либо общими механизмами.

В новой схеме может быть сколько угодно классов движков. Главное, чтобы движок имел два обязательных метода: write() для записи лога и stop() для самоуничтожения. При уничтожении движок должен подчистить за собой все занятые ресурсы, в том числе присоединить все созданные дополнительные потоки. Текущий движок оборачивается в объект специального класса Engine. «Обёртка» настоящего движка называется так, поскольку для всего окружающего вызывающего кода именно она представляет из себя движок. Вызывающий код не должен знать, что движок на самом деле — это что-то под капотом.

«Настоящих» движков (не обёрток) в настоящее время доступно два типа: синхронный и асинхронный, как я уже писал выше. Их порождает вот такая простенькая функция:

def real_engine_fabric(settings):
    if settings.force_get('pool_size') == 0:
        return SingleThreadedRealEngine(settings)
    return MultiThreadedRealEngine(settings)

Если размер запрошенного пула потоков (настройка pool_size) больше нуля, то создаётся асинхронный движок на базе тредпула. А если равен нулю, то создаётся более примитивный синхронный движок, у которого нет тредпула, а есть просто метод, куда можно передать лог и он будет обработан.

Для понимания, как выглядит минимальный возможный движок, синхронная версия выглядит так:

class SingleThreadedRealEngine(AbstractRealEngine):
    def write(self, log):
        for handler in log.get_handlers():
            self.call_handler(handler, log)

    @exception_escaping
    def call_handler(self, handler, log):
        handler(log)

Это всё. Метод самоуничтожения спрятан в классе, от которого наследуется движок, и не делает ничего, поскольку тут нет никаких ресурсов, которые надо освобождать.

Асинхронный немного сложнее, ведь он должен уметь завершаться, не потеряв ни одного лога в очереди. Давайте на него тоже глянем, прежде чем идти дальше:

class MultiThreadedRealEngine(AbstractRealEngine):
    def __init__(self, settings):
        super().__init__(settings)
        self.pool = ThreadPool(settings)

    def write(self, log):
        self.pool.put(log)

    def stop(self):
        self.pool.stop()

Как видите, вся работа по корректному завершению происходит внутри пула, так что провалимся в него:

import time
from queue import Queue

from polog.core.engine.real_engines.multithreaded.worker import Worker


class ThreadPool:
    def __init__(self, settings):
        self.settings = settings
        self.queue = Queue(maxsize=self.settings.force_get('max_queue_size'))
        self.workers = self.create_workers()

    def put(self, log):
        self.queue.put(log)

    def stop(self):
        self.wait_empty_queue()
        for worker in self.workers:
            worker.set_stop_flag()
        for worker in self.workers:
            worker.stop()

    def wait_empty_queue(self):
        delay = self.settings['time_quant'] * self.settings['delay_on_exit_loop_iteration_in_quants']
        while True:
            if self.queue.empty():
                break
            time.sleep(delay)

    def create_workers(self):
        workers = []
        for index in range(self.settings.force_get('pool_size')):
            worker = Worker(self.queue, index, self.settings)
            workers.append(worker)
        return workers

Итак, завершение работы пула происходит в три этапа:

  1. Дожидаемся, пока очередь с логами опустеет. При этом важно, чтобы с другой стороны её никто не пополнял, но об этом будет дальше. Ожидание опустения очереди — это обычный цикл, в котором с определённой периодичностью запрашивается размер очереди, и который завершается, когда очередь пуста.

  2. Сообщаем каждому воркеру, что пора заканчивать работу. Дело в том, что у него на руках в этот момент ещё может быть последний взятый из очереди лог, поэтому мы не можем прервать его насильно и вынуждены вежливо попросить. Воркер, в свою очередь (на это мы посмотрим дальше), получив такое сообщение, должен закончить с последним взятым из очереди логом, после чего добровольно суициднуться прервать собственный цикл.

  3. Джойним каждый поток с воркером.

Внутри воркера обработка сообщения об остановке выглядит как установка в значение True флага self.stopped:

class Worker:
    def run(self): # Тот самый метод с бесконечным циклом.
        stopped_from_flag = False
        while True:
            try:
                while True:
                    try:
                        log = self.queue.get(timeout=self.settings['time_quant'])
                        break
                    except Empty:
                        if self.stopped:
                            stopped_from_flag = True
                            break
                if stopped_from_flag:
                    break
                self.do_anything(log)
                self.queue.task_done()
            except Exception as e:
                self.queue.task_done()

    ... # Какие-то прочие методы.

Этот пример кода, возможно, придётся пораскуривать некоторое время, мне и самому его не так просто в голове уместить. Прежде всего требует пояснений, почему здесь два бесконечных цикла, а не один, как я обещал ранее. Дело в том, что если воркер ожидает из очереди данных без таймаута, то возможна ситуация, когда мы дождались бы опустения очереди, проставили флаг завершения и дальше бесконечно ждали бы его. Поэтому воркер ожидает данные из очереди не бесконечно, а периодически «просыпается», чтобы проверить, не попросили ли его прерваться. Не попросили — прекрасно, возвращаемся в очередь.

Выше мы выяснили, как происходит рождение и остановка движков, однако здесь мне придётся вернуться немного назад, чтобы рассказать ещё о паре важных моментов.

Во-первых, как вы могли заметить, изменение даже одного пункта настроек может быть сложной комплексной операцией. Опасно, чтобы одновременно выполнялись сразу две таких операции над одной и той же настройкой. Поэтому все изменения настроек всегда происходят из-под блокировки. Также в некоторых случаях значение настройки является невалидным в промежутке, пока оно устанавливается, но не всегда. К примеру, если изменился размер пула потоков в движке, то где-то «посередине» будет момент, когда старый движок уже уничтожен, а новый ещё не создан. В этот момент текущее значение настройки де-факто врёт и считывать его нельзя. Поэтому для отдельных пунктов настроек можно распространить блокировку записи и на чтение. Для этого при создании объекта SettingPoint нужно передать туда параметр read_lock=True.

Внутри SettingPoint при каждом чтении настройки было бы слишком дорого проверять, установлен такой флаг или нет. Поэтому метод get() де-факто конструируется внутри метода set_read_lock() при инициализации SettingPoint, и дальнейшие вызовы происходят с минимальным оверхедом:

# Методы где-то внутри SettingPoint:
def set_read_lock(self, read_lock):
    if read_lock: # Здесь read_lock - это тот самый булеановский флаг, который мы передали при инициализации объекта SettingPoint.
        self.get = self.locked_get
    else:
        self.get = self.unlocked_get

def unlocked_get(self):
    return self.value

def locked_get(self):
    with self.lock:
        return self.value

Во-вторых, при подмене движка во время работы программы нужно учитывать, что всё это время его обёртку могут вызывать логгеры из разных потоков, поэтому обёртку движка мы также временно блокируем. Код ниже демонстрирует, как происходит блокировка и последующая разблокировка движка при перезагрузке:

def reload(self):
    if self.settings['started']: # Настоящая подгрузка движка все еще ленивая, поэтому перезагрузка до записи первого лога лишена всякого смысла.
        self.block()
        self.stop()
        self.load()
        self.unlock()

def block(self):
    if self.settings['started']:
        self.lock.acquire()
        self.write = self._blocked_write

def unlock(self):
    self.write = self._new_write
    self.lock.release()

def _new_write(self, log):
    self.real_engine.write(log)

def _blocked_write(self, log):
    with self.lock:
        self._new_write(log)

Таким образом, любой поток, который попытается записать лог в тот момент, пока движок перезагружается, будет временно заблокирован, и продолжит свое выполнение, когда всё уже завершится, ничего даже не заметив. Лог будет записан как положено и не потеряется.

Если вы добрались до этого места и всё ещё ничего не поняли, то вот краткий пересказ того, как происходит применение нового значения пункта pool_size:

1. Проверяем валидность нового значения.
2. Проверяем конфликты нового значения.
3. Ставим блокировку на запись новых значений этого пункта настроек и всех с ним связанных.
4. Если нужно, ставим блокировку на чтение.
5. Сохраняем новое значение.
6. Запускаем коллбек:
  6.1. Перезагружаем движок:
    6.1.1. Устанавливаем блокировку на запись логов.
    6.1.2. Останавливаем старый движок:
      6.1.2.1. Если он был асинхронный:
        6.1.2.1.1. Дожидаемся, пока очередь опустеет.
        6.1.2.1.2. Устанавливаем для каждого воркера флаг остановки.
        6.1.2.1.2. Внутри воркера:
          6.1.2.1.2.1. Проверяем периодически этот флаг, в том числе пока ожидаем новых данных из очереди.
          6.1.2.1.2.2. Если флаг установлен:
            6.1.2.1.2.2.1. Прерываем бесконечный цикл.
        6.1.2.1.3. Джойним все потоки с воркерами.
      6.1.2.2. Если движок синхронный:
        6.1.2.2.1. Ничего не делаем.
    6.1.3. Создаём экземпляр нового движка:
      6.1.3.1. Внутри фабрики создания движков:
        6.1.3.1.1. Если настройка pool_size установлена в значение 0:
          6.1.3.1.1.1. Создаем синхронный движок.
        6.1.3.1.2. Иначе:
          6.1.3.1.2.1. Создаем асинхронный движок.
    6.1.4. Снимаем блокировку на запись логов.
7. Снимаем блокировку на запись для данного пункта настроек и всех с ним связанных.
8. Если ранее была установлена, то снимаем блокировку на чтение этого пункта настроек.

У других пунктов настроек схема может быть попроще, но принципиально отличаться не будет.

Заключение

Если вы дочитали, а не пролистали, статью до этого места, вам представленный код мог показаться чересчур сложным для решаемой им задачи (а если раньше вы уже делали что-то похожее, возможно, наоборот, слишком простым). Нужно учитывать, что такое навороченное решение с почти реализованной транзакционностью не появляется за один присест. Это результат неоднократного рефакторинга, причём каждая следующая итерация по сути добавляла новый уровень ненаивности в реализацию. И я не знаю, сколько их ещё впереди. Наивные решения задач — это хорошо и эффективно в большинстве случаев, если только речь не про многопоточность. А многопоточность — это сложно.

Оригинальная публикация статьи на хабре.