asyncio — параллелизм в Python

Параллелизм в Python — одна из самых сложных тем для понимания, не говоря уже о реализации. Не помогает и то, что существует множество способов создания параллельных программ. Возникает куча вопросов. Нужно ли запускать несколько потоков? Использовать несколько процессов? Использовать асинхронное программирование?

Что ж, ответ здесь один — использовать тот способ, который лучше всего подходит для вашего случая. Но если вы сомневаетесь, то используйте асинхронный ввод-вывод, когда это возможно, и потоковое программирование, когда это необходимо.

В этой статье мы рассмотрим асинхронные программы как в старых версиях Python (на случай, если вы имеете дело с устаревшим кодом), так и в новых.

Хотите скачать книги по Python в 2 клика? Тогда вам в наш телеграм канал PythonBooks 

Содержание

Что такое asyncio?

Asyncio означает «асинхронный ввод-вывод» и относится к парадигме программирования, позволяющей достичь высокого параллелизма с помощью одного потока или цикла событий. Эта модель не является исключительно «питоничной» и реализуется также в других языках и фреймворках, наиболее известным из которых является NodeJS на JavaScript.

Понимание asyncio на примере

Чтобы понять концепцию asyncio, рассмотрим ресторан с одним официантом. Внезапно появляются три клиента, Кохли, Амир и Джон. После того как они получили от официанта меню, им требуется разное количество времени для принятия решения о том, что они будут есть.

Предположим, что Кохли требуется 5 минут, Амиру — 10 минут, а Джону — 1 минута. Если один официант начинает с Амира, то он принимает его заказ через 10 минут. Затем он обслуживает Кохли и тратит 5 минут на то, чтобы записать его заказ. Наконец, официант тратит еще 1 минуту на то, чтобы узнать, что хочет съесть Джон. Таким образом, в общей сложности он тратит 10 + 5 + 1 = 16 минут на то, чтобы записать их заказы. Однако обратите внимание, что в этой последовательности событий Джон ждет 15 минут, пока официант доберется до него, Кохли ждет 10 минут, а Амир ждет 0 минут.

Теперь подумаем, знает ли официант время, которое потребуется каждому клиенту для принятия решения. Он может начать с Джона, затем перейти к Амиру и, наконец, к Кохли. Таким образом, каждый клиент будет ждать 0 минут. Создается иллюзия трех официантов, по одному на каждого клиента, хотя на самом деле есть только один. Наконец, общее время, необходимое официанту для принятия всех трех заказов, составит 10 минут, что гораздо меньше, чем 16 минут при первом сценарии.

Тем, кто знаком с JavaScript, asyncio покажется очень похожим на работу NodeJS. В NodeJS под капотом находится однопоточный цикл событий, который обслуживает все входящие запросы.

Зачем использовать asyncio вместо многопоточности в Python?

Во-первых, очень сложно написать код, безопасный для потоков. В асинхронном коде вы точно знаете, где код будет переходить от одной задачи к другой, и возникновение состояния гонки маловероятнее.

Во-вторых, потоки потребляют достаточно много данных, поскольку каждый поток должен иметь свой стек. В асинхронном коде весь код использует один и тот же стек, и его размер остается небольшим за счет постоянного разматывания между задачами.

В-третьих, потоки являются структурами ОС и поэтому требуют большего объема памяти для поддержки платформой. С асинхронными задачами такой проблемы нет.

Как создавать асинхронные программы в старой версии Python

Итак, вы приступили к новой работе и обнаружили, что кодовая база изобилует устаревшим кодом на языке Python. В этом разделе мы познакомим вас со старым способом создания асинхронных программ.

Здесь есть о чем рассказать, поэтому давайте просто погрузимся в тему. Первое понятие, с которым вам стоит познакомиться, — это итерируемые объекты и итераторы. Они служат основой для генераторов, которые открыли двери для асинхронного программирования.

От редакции Pythonist: также предлагаем почитать статью «Итераторы и генераторы в Python».

Итерируемые объекты и итераторы

В Python итерируемый объект — это объект, элементы которого можно перебирать с помощью цикла for. Итератор способен возвращать свои члены по одному, при этом наиболее распространенным типом итераторов в Python являются последовательности, включающие списки, строки и кортежи.

Когда мы хотим получить элемент по индексу, вызывается метод __getitem__(). Помните, что не каждый тип в Python является последовательностью. Словари, множества, файловые объекты и генераторы не могут быть проиндексированы, но они являются итерируемыми. Python также позволяет создавать бесконечные итераторы, называемые генераторами.

Для того чтобы считаться итерируемым, объект должен определять один из двух методов:

  • __iter__()
  • __getitem__()

Итератор — это объект, который может использоваться для последовательного доступа к элементам итерируемого объекта. В Python 3 итератор предоставляет метод __next__(), а в Python 2 — метод next(). Оба метода извлекают следующий элемент из последовательности итерируемого объекта. Примечание: итератор должен поддерживать следующие методы:

  • __iter__()
  • __next__()

Объект итератора возвращает самого себя для метода __iter__(). Это позволяет нам использовать итератор и итерируемый объект в цикле for.

Перебрав элементы объекта до конца, функция next() выбрасывает исключение StopIteration. В совокупности эти правила называются протоколом итератора. Метод __iter__() для контейнера может также возвращать так называемый генератор, который тоже является итератором.

Оператор yield()

Используемый в Python оператор yield может как выдавать значения, так и принимать параметры. Это становится особенно важным при создании функций-генераторов.

От редакции Pythonist: также предлагаем почитать «Сравнение операторов yield и return в Python (с примерами)».

Рассмотрим следующий код, который возвращает строку:

def keep_learning_synchronous():
   return "Educative"


if __name__ == "__main__":
   str = keep_learning_synchronous()
   print(str)

Заменив return на yield, вы заметите, что возвращаемый объект — это объект-генератор. Фактически наш метод keep_learning_asynchronous() теперь является генераторной функцией.

Функции-генераторы называются генераторами, поскольку они генерируют значения. Для того чтобы объект-генератор выдал строку из приведенного выше фрагмента кода, можно вызвать для него функцию next().

Мы можем использовать yield в функции в виде yield <expression> . Оператор yield позволяет функции возвращать значение и приостанавливать состояние функции до тех пор, пока не будет вызвана функция next() на связанном с ней объекте-генераторе.

Операторы генераторов

Функции, содержащие выражение yield, компилируются как генераторы. Использование выражения yield в теле функции приводит к тому, что эта функция становится генератором. Эти функции возвращают объект, поддерживающий методы протокола итерации.

Созданный объект-генератор автоматически получает метод __next__(). Давайте вернемся к примеру из предыдущего раздела. Вместо использования метода next() мы можем вызвать __next__ непосредственно на объекте-генераторе:

def keep_learning_asynchronous():
    yield "Educative"


if __name__ == "__main__":
    gen = keep_learning_asynchronous()

    str = gen.__next__()
    print(str)

О генераторах нужно помнить следующие факты:

  • Функции-генераторы позволяют откладывать получение сложновычисляемых значений. Следующее значение вычисляется только при необходимости. То есть генераторы не сохраняют в памяти длинные последовательности и не выполняют все дорогостоящие вычисления заранее. Это делает генераторы эффективными с точки зрения памяти и вычислений.
  • Генераторы, будучи приостановленными, сохраняют местоположение кода, в котором был выполнен последний оператор yield, и всю свою локальную область видимости. Это позволяет им возобновить выполнение с того места, на котором они остановились.
  • Объекты-генераторы — это не что иное, как итераторы.
  • Следует различать функцию-генератор и связанный с ней объект-генератор. Функция-генератор при вызове возвращает объект-генератор, а функция next() вызывается на объекте-генераторе для выполнения кода внутри функции-генератора.

Состояния генератора

Генератор проходит через следующие состояния:

  • GEN_CREATED, когда объект генератора был возвращен впервые из функции генератора и итерация еще не началась.
  • GEN_RUNNING, когда для объекта генератора был вызван next, и он выполняется интерпретатором Python.
  • GEN_SUSPENDED, когда генератор приостановлен на выходе.
  • GEN_CLOSED, когда генератор завершил выполнение или был закрыт.
Схема жизненного цикла генератора

Методы для объектов генераторов

Объект генератора предоставляет различные методы, которые могут быть вызваны для работы с генератором. Например, методы throw(), send() и close().

Корутины на основе генератора

Python сделал различие между генераторами Python и генераторами, которые предназначены для использования как корутины (асинхронные функции). Такие корутины называются генераторными и требуют добавления декоратора @asynio.coroutine в определение функции, хотя это не является строгим требованием.

В генераторных корутинах вместо синтаксиса yield используется синтаксис yield from.

От редакции Pythonist: также предлагаем почитать «Конструкция yield from».

Корутина может:

  • выходить из другой корутины
  • возвращать выражение
  • вызывать исключение

Корутины в Python позволяют реализовать кооперативную многозадачность. Кооперативная многозадачность — это подход, при котором выполняющаяся задача добровольно уступает поток другим процессам. Задача может сделать это, когда она логически заблокирована, например, в ожидании пользовательского ввода или когда она инициировала сетевой запрос и будет простаивать некоторое время.

Корутину можно определить как специальную функцию, которая может передавать управление вызывающему ее процессу без потери своего состояния.

В чем же разница между короутинами и генераторами?

Генераторы — это, по сути, итераторы, хотя внешне они похожи на функции. В общем случае различие между генераторами и корутинами заключается в следующем:

  • Генераторы возвращают значение, в то время как корутина передает управление другой корутине и может возобновить выполнение с того момента, когда она передала управление.
  • Генератор не может принимать аргументы после запуска, в то время как корутина может.
  • Генераторы используются в основном для упрощения написания итераторов. Они являются разновидностью корутин и иногда также называются семикорутинами.

Пример корутины на основе генератора

Простейшая генераторная корутина, которую мы можем написать, выглядит следующим образом:

@asyncio.coroutine
def do_something_important():
    yield from asyncio.sleep(1)

Корутина находится в состоянии сна в течение одной секунды. Обратите внимание на декоратор и использование yield from. Без них вы не смогли бы использовать корутину с asyncio.

Оператор yield from передает управление обратно циклу событий и возобновляет выполнение после завершения работы корутины asyncio.sleep().

Заметим, что asyncio.sleep() сама является корутиной. Модифицируем эту корутину так, чтобы она вызывала другую корутину, выполняющую сон. Изменения показаны ниже:

@asyncio.coroutine
def go_to_sleep(sleep):
    print("sleeping for " + str(sleep) + " seconds")
    yield from asyncio.sleep(sleep)


@asyncio.coroutine
def do_something_important(sleep):
    # what is more important than getting
    # enough sleep!
    yield from go_to_sleep(sleep)

Теперь представьте, что вы трижды последовательно вызываете корутину do_something_important() со значениями 1, 2 и 3 соответственно.

Без использования потоков или мультипроцессинга последовательный код будет выполнен за 1 + 2 + 3 = 6 секунд. Но если использовать asyncio, то тот же самый код может быть выполнен примерно за 3 секунды, несмотря на то, что все вызовы выполняются в одном потоке.

Интуитивно понятно, что при возникновении блокирующей операции управление передается обратно в цикл событий, и выполнение возобновляется только после завершения блокирующей операции.

В случае Python генераторы используются как производители данных, а корутины — как их потребители. Раньше, до появления в Python 3.5 поддержки собственных корутин, корутины реализовывались с помощью генераторов.

Объекты и тех, и других имеют тип generator. Однако, начиная с версии 3.5, Python делает различие между корутинами и генераторами.

Как создавать асинхронные программы на Python 3

Существует три основных элемента для создания асинхронных программ на Python: нативные корутины, циклы событий (event loops) и фьючерсы (futures). Давайте углубимся в каждый из них и рассмотрим подробнее.

Нативные корутины

В Python 3.5 в языке появилась поддержка нативных корутин. Под «нативными» подразумевается, что в языке появился синтаксис для специального определения корутин. Нативные корутины могут быть определены с помощью синтаксиса async/await.

Прежде чем перейти к более подробному описанию, приведем пример очень простой нативной корутины:

async def coro():
    await asyncio.sleep(1)

Эта корутина может быть запущена с помощью цикла событий следующим образом:

loop = asyncio.get_event_loop()
loop.run_until_complete(coro())

Async

Создать нативную корутину можно с помощью async def. Метод с префиксом async def автоматически становится корутиной.

async def useless_native_coroutine():
  pass

Метод inspect.iscoroutine() вернет True для объекта coroutine, возвращенного из приведенной выше функции coroutine. Заметьте, что yield или yield from не могут появляться в теле асинхронной функции, иначе это будет отмечено как синтаксическая ошибка.

import inspect
import asyncio

async def useless_native_coroutine():
 pass

if __name__ == "__main__":
 coro = useless_native_coroutine()
 print(inspect.iscoroutine(coro))

//Returns True

Await

Функция await может использоваться для получения результата выполнения объекта coroutine. Вы используете команду await следующим образом: await <expr>, где <expr> должен быть awaitable объектом.

Awaitable объекты должны реализовывать метод __await__(), который должен возвращать итератор. Если вы помните, yield from также ожидает, что его аргументом будет итератор, из которого можно получить итератор. Под капотом await заимствует реализацию yield from с дополнительной проверкой, действительно ли его аргумент является awaitable.

Следующие объекты принадлежат к типу awaitable объектов:

  • объект нативной корутины, возвращаемый при вызове функции нативной корутины
  • объект coroutine на основе генератора, возвращаемый из генератора, декорированного @types.coroutine или @asyncio.coroutine. Декорированные генераторные корутины являются awaitable, даже если у них нет метода __await__().
  • объекты Future
  • объекты Task (Task является подклассом Future).
  • объекты, определенные с помощью CPython C API, имеют функцию tp_as_async.am_await(), возвращающую итератор (аналогично методу __await__()).

Кроме того, await должен появляться внутри async-определенного метода, иначе это синтаксическая ошибка. В настоящее время:

  • генераторы используются для обозначения функций, которые только производят значения,
  • ванильные корутины только получают значения,
  • корутины на основе генераторов идентифицируются по наличию yield from в теле метода,
  • нативные корутины определяются с использованием синтаксиса async/await.

Подытожим данный раздел:

  • Для возврата значений в генераторах используется yield
  • Генераторы, которые могут получать значения извне, являются корутинами
  • Генераторы с yield from в теле функции являются генераторными корутинами, а методы, определенные с помощью async def, — нативными корутинами.
  • Используйте декораторы @asyncio.coroutine или @types.coroutine для генераторных корутинов, чтобы сделать их совместимыми с нативными корутинами.

Циклы событий

Цикл событий (event loop) — это программная конструкция, которая ожидает возникновения событий, а затем передает их обработчику событий.

Событием может быть клик пользователя на кнопку интерфейса пользователя, запуск процесса загрузки файла или получение ответа от стороннего сервера.

В основе асинхронного программирования лежат циклы событий.

Эта концепция не является чем-то новым. На самом деле многие языки программирования обеспечивают асинхронное программирование с помощью циклов событий. В Python циклы событий запускают асинхронные задачи и колбэки, выполняют сетевые операции ввода-вывода, запускают подпроцессы и делегируют дорогостоящие вызовы функций пулу потоков.

Один из наиболее распространенных примеров использования этой концепции — это веб-серверы, реализованные с использованием асинхронного дизайна. Веб-сервер ожидает получения HTTP-запроса и возвращает соответствующий ресурс. Те, кто знаком с JavaScript, могут вспомнить, что NodeJS работает по тому же принципу: это веб-сервер, который запускает цикл событий для получения веб-запросов в одном потоке.

Некоторые другие веб-серверы для обработки каждого запроса создают новый поток или форк нового процесса. В некоторых бенчмарках асинхронные веб-серверы на основе цикла событий превосходили многопоточные, что может показаться нелогичным на первый взгляд.

Запуск цикла событий

В Python 3.7+ предпочтительным способом запуска цикла событий является использование метода asyncio.run(). Метод представляет собой блокирующий вызов, который блокирует до тех пор, пока не завершится переданная корутина. Пример программы:

async def do_something_important():
    await asyncio.sleep(10)


if __name__ == "__main__":

  asyncio.run(do_something_important())

Примечание. Если вы работаете с Python 3.5, то API asyncio.run() недоступен. В этом случае необходимо явно получить цикл событий с помощью asyncio.new_event_loop() и запустить желаемую корутину с помощью run_until_complete(), определенной для объекта цикла.

Запуск нескольких циклов событий

Вам никогда не придется самостоятельно запускать цикл событий. Вместо этого следует использовать API более высокого уровня для отправки корутин. В учебных целях мы продемонстрируем запуск цикла событий в одном потоке.

В приведенном ниже примере кода используется API asyncio.new_event_loop() для получения нового цикла событий и последующего использования его для запуска другой корутины.

import asyncio, random
from threading import Thread
from threading import current_thread


async def do_something_important(sleep_for):
    print("Is event loop running in thread {0} = {1}\n".format(current_thread().getName(),
                                                         asyncio.get_event_loop().is_running()))

    await asyncio.sleep(sleep_for)


def launch_event_loops():
    # get a new event loop
    loop = asyncio.new_event_loop()

    # set the event loop for the current thread
    asyncio.set_event_loop(loop)

    # run a coroutine on the event loop
    loop.run_until_complete(do_something_important(random.randint(1, 5)))

    # remember to close the loop
    loop.close()


if __name__ == "__main__":
    t1 = Thread(target=launch_event_loops)
    t2 = Thread(target=launch_event_loops)

    t1.start()
    t2.start()

    print("Is event loop running in thread {0} = {1}\n".format(current_thread().getName(),
                                                         asyncio.get_event_loop().is_running()))

    t1.join()
    t2.join()

Попробуйте запустить код, изучите вывод, и вы поймете, что каждый порожденный поток выполняет свой собственный цикл событий.

Типы циклов событий

Существует два типа циклов событий:

  • SelectorEventLoop
  • ProactorEventLoop

SelectorEventLoop основан на модуле selectors и является циклом по умолчанию на всех платформах. Модуль selectors содержит API-интерфейсы poll() и select().

ProactorEventLoop, напротив, использует порты завершения ввода/вывода Windows и поддерживается только на Windows. Мы не будем вдаваться в более тонкие детали реализации обоих типов, но закончим здесь замечанием, что как тип, так и связанная с ним политика управления циклом контролируют поведение цикла событий.

Фьючерсы и задачи

Futures

Future представляет вычисления, которые либо уже выполняются, либо будут запланированы в будущем. Это специальный низкоуровневый ожидаемый объект, который представляет собой конечный результат асинхронной операции.

Не путайте threading.Future и asyncio.Future. Первый является частью модуля потоков и для него не определен метод __iter__().

asyncio.Future является awaitable и может быть использован с оператором yield from. В общем случае вам не придется иметь дело с фьючерсами напрямую. Обычно они предоставляются библиотеками или API asyncio.

Для наглядности мы покажем пример, в котором создается фьючерс, ожидаемый корутиной. Изучите приведенный ниже фрагмент:

import asyncio
from asyncio import Future


async def bar(future):
    print("bar will sleep for 3 seconds")
    await asyncio.sleep(3)
    print("bar resolving the future")
    future.done()
    future.set_result("future is resolved")


async def foo(future):
    print("foo will await the future")
    await future
    print("foo finds the future resolved")


async def main():
    future = Future()
    results = await asyncio.gather(foo(future), bar(future))


if __name__ == "__main__":
    asyncio.run(main())
    print("main exiting")

Обе корутины получают в качестве аргумента объект future. Корутина foo() ожидает разрешения future, в то время как корутина bar() разрешает future через три секунды.

Задачи

Задачи (tasks) похожи на фьючерсы. Фактически, Task является подклассом Future и может быть создан с помощью следующих методов:

  • asyncio.create_task() появился в Python 3.7 и является предпочтительным способом создания задач. Метод принимает корутины и оборачивает их как задачи.
  • loop.create_task() принимает только корутины.
  • asyncio.ensure_future() принимает futures, coroutines и любые ожидающие объекты.

Задачи оборачивают корутины и запускают их в циклах событий. Если корутина ожидает Future, задача приостанавливает выполнение корутины и ждет завершения Future. После завершения Future выполнение обернутой программы возобновляется.

В циклах событий используется кооперативное планирование, то есть цикл событий выполняет одну задачу за раз. Пока задача ожидает завершения Future, цикл событий запускает другие задачи, обратные вызовы или выполняет операции ввода-вывода. Задачи также могут быть отменены.

Мы переписали пример фьючерса с использованием задач следующим образом:

import asyncio
from asyncio import Future


async def bar(future):
    print("bar will sleep for 3 seconds")
    await asyncio.sleep(3)
    print("bar resolving the future")
    future.done()
    future.set_result("future is resolved")


async def foo(future):
    print("foo will await the future")
    await future
    print("foo finds the future resolved")


async def main():
    future = Future()

    loop = asyncio.get_event_loop()
    t1 = loop.create_task(bar(future))
    t2 = loop.create_task(foo(future))

    await t2, t1


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("main exiting")

Цепочка корутин: старый подход против нового

Старый способ создания цепочки корутин

Одно из наиболее распространенных применений корутин — их объединение в цепочки для конвейерной обработки данных. Вы можете выстроить цепочку корутин, подобно тому, как вы передаете команды Unix в shell.

Идея заключается в том, что входные данные проходят через первую корутину, которая может выполнить некоторые действия с входными данными, а затем передает измененные данные второй корутине, которая может выполнить дополнительные операции с входными данными.

Входные данные проходят через цепочку корутин, каждая из которых выполняет некоторую операцию над входными данными, пока входные данные не достигнут последней корутины, где они возвращаются обратно вызывающей стороне (оригинальному вызывающему коду).

Рассмотрим следующий пример, в котором вычисляются значения выражения x**2 + 3 для первых ста натуральных чисел.

Вы вручную работаете с конвейером данных, используя метод next(), поэтому можете настроить цепочку, не заботясь об изменениях, необходимых для ее работы с циклом событий asyncio. Настройка осуществляется следующим образом:

  • Первая корутина генерирует натуральные числа, начиная с 1.
  • Вторая корутина вычисляет квадрат каждого переданного на вход числа.
  • Последняя функция является генератором и прибавляет к переданному ей значению 3 и возвращает результат.
def coro3(k):
    yield (k + 3)


def coro2(j):
    j = j * j
    yield from coro3(j)


def coro1():
    i = 0
    while True:
        yield from coro2(i)
        i += 1


if __name__ == "__main__":

    # The first 100 natural numbers evaluated for the following expression
    # x^2 + 3

    cr = coro1()
    for v in range(100):
        print("f({0}) = {1}".format(v, next(cr)))

В приведенном примере конец цепочки состоит из генератора, однако эта цепочка не будет работать с циклом событий asyncio, поскольку он не работает с генераторами. Один из способов исправить это — заменить последний генератор на обычную функцию, возвращающую фьючерс с итоговым результатом.

Метод coro3() будет выглядеть так:

def coro3(k):
    f = Future()
    f.set_result(k + 3)
    f.done()
    return f

Еще один способ — добавить @asyncio.coroutine к coro3() и return вместо yield. Изменения будут выглядеть следующим образом:

@asyncio.coroutine
def coro3(k):
    return k + 3

Важной оговоркой является то, что если бы мы использовали декоратор @types.coroutine, то программа завершилась бы неудачей. Это связано с тем, что @asyncio.coroutine может преобразовать обычную функцию в coroutine, а @types.coroutine — нет.

Обратите внимание, что в предыдущих примерах мы не декорировали функции coro1() и coro2() с помощью @asyncio.coroutine.

Обе функции являются генераторными coroutine-функциями, поскольку в их телах присутствуют yield from. Кроме того, появление декоратора не является строго обязательным, но если использовать декораторы, то программа все равно будет работать корректно.

Новый способ организации цепочки нативных корутин

Аналогично генераторам и корутинам, основанным на генераторах, мы также можем объединять в цепочки нативные корутины.

import asyncio


async def coro3(k):
    return k + 3


async def coro2(j):
    j = j * j
    res = await coro3(j)
    return res


async def coro1():
    i = 0
    while i < 100:
        res = await coro2(i)
        print("f({0}) = {1}".format(i, res))
        i += 1


if __name__ == "__main__":
    # The first 100 natural numbers evaluated for the following expression
    # x^2 + 3
    cr = coro1()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(cr)

Применение asyncio на практике

Задача состоит в том, чтобы реализовать собственную корутину, выполняющую асинхронный сон. Сигнатура корутины выглядит следующим образом.

# Implement the following coroutine where
    # sleep_for is defined in seconds
    async def asleep(sleep_for):
        pass

Решение

Первая мысль, которая приходит в голову, — использовать API time.sleep() для ожидания запрошенного времени сна. Однако этот API является блокирующим и будет блокировать выполняющий его поток. Очевидно, что это исключает обращение к API из главного потока. Но это не исключает возможности выполнения этого API в другом потоке.

Это наводит нас на мысль о возможном решении. Мы можем создать объект Future и ожидать его в корутине asleep(). Единственное требование теперь — чтобы другой поток разрешал фьючерсы по истечении sleep_for секунд. Частичное решение выглядит следующим образом:

async def asleep(sleep_for):
    future = Future()

    Thread(target=sync_sleep, args=(sleep_for, future)).start()
    await future


def sync_sleep(sleep_for, future):
    # sleep synchronously
    time.sleep(sleep_for)

    # resolve the future
    future.set_result(None

Добавим остальное и посмотрим, что получится:

from threading import Thread
from threading import current_thread
from asyncio import Future
import asyncio
import time


async def asleep(sleep_for):
   future = Future()
   Thread(target=sync_sleep, args=(sleep_for, future)).start()
   await future


def sync_sleep(sleep_for, future):

   # sleep synchronously
   time.sleep(sleep_for)

   # resolve the future
   future.set_result(None)

   print("Sleeping completed in {0}".format(current_thread().getName()), flush=True)


if __name__ == "__main__":
   start = time.time()
   work = list()
   work.append(asleep(1))

   loop = asyncio.get_event_loop()
   loop.run_until_complete(asyncio.wait(work, return_when=asyncio.ALL_COMPLETED))
   print("main program exiting after running for {0}".format(time.time() - start))

Удивительно, но приведенная выше программа зависает и не завершается, хотя сообщение из метода sync_sleep() выводится. Почему-то корутина asleep() так и не возобновляется после завершения ожидаемого ею фьючерса.

Причина в том, что Future не является потокобезопасным. К счастью, asyncio предоставляет метод для потокобезопасного выполнения корутины в заданном цикле. В качестве API используется run_coroutine_threadsafe().

Итак, у нас есть способ потокобезопасного выполнения фьючерса, однако делать это нужно в другой корутине, поскольку API run_coroutine_threadsafe() принимает только корутины. Для этого необходимо немного модифицировать метод sync_sleep() следующим образом:

def sync_sleep(sleep_for, future, loop):
    # sleep synchronously
    time.sleep(sleep_for)

    # define a nested coroutine to resolve the future
    async def sleep_future_resolver():
        # resolve the future
        future.set_result(None)

    asyncio.run_coroutine_threadsafe(sleep_future_resolver(), loop)

Мы определяем вложенную корутину sleep_future_resolver, которая разрешает объект Future.

Также отметим, что sync_sleepnow принимает в качестве параметра цикл событий. Это должен быть тот же цикл событий, который изначально выполнял корутину asleep().

Изменения в корутине asleep() показаны ниже:

async def asleep(sleep_for):
    future = Future()
    # get the current event loop
    current_loop = asyncio.get_running_loop()
    Thread(target=sync_sleep, args=(sleep_for, future, current_loop)).start()

    await future

Вот что мы имеем в итоге:

from threading import Thread
from threading import current_thread
from asyncio import Future
import asyncio
import time


async def asleep(sleep_for):
   future = Future()
   current_loop = asyncio.get_event_loop()
   Thread(target=sync_sleep, args=(sleep_for, future, current_loop)).start()

   await future


def sync_sleep(sleep_for, future, loop):
   # sleep synchronously
   time.sleep(sleep_for)

   # define a nested coroutine to resolve the future
   async def sleep_future_resolver():
       # resolve the future
       future.set_result(None)

   asyncio.run_coroutine_threadsafe(sleep_future_resolver(), loop)
   print("Sleeping completed in {0}\n".format(current_thread().getName()), flush=True)


if __name__ == "__main__":
   start = time.time()
   work = list()
   work.append(asleep(5))
   work.append(asleep(5))
   work.append(asleep(5))
   work.append(asleep(5))
   work.append(asleep(5))

   loop = asyncio.get_event_loop()
   loop.run_until_complete(asyncio.wait(work, return_when=asyncio.ALL_COMPLETED))
   print("main program exiting after running for {0}".format(time.time() - start))

Вывод показывает, что сон происходит в порожденных нами потоках, а не в главном потоке. Более того, несмотря на то, что мы пять раз вызываем корутину asleep() для сна на пять секунд каждый раз, общее время выполнения программы составляет примерно пять секунд, как и должно быть, если мы правильно реализовали решение.

В качестве упражнения подумайте, что произойдет, если мы создадим пять потоков и в каждом из них вызовем time.sleep(). В этом случае программа выполнится за пять или двадцать пять секунд? Попробуйте это сделать и понаблюдайте за временем выполнения программы.

from threading import Thread
from threading import current_thread
import time


def sync_sleep(sleep_for):
   time.sleep(sleep_for)
   print("Sleeping completed in {0}".format(current_thread().getName()))


if __name__ == "__main__":
   start = time.time()

   threads = list()

   for _ in range(0, 5):
       threads.append(Thread(target=sync_sleep, args=(5,)))

   for thread in threads:
       thread.start()

   for thread in threads:
       thread.join()

   print("main program exiting after running for {0}".format(time.time() - start))

Тест синхронного сна по-прежнему занимает пять секунд! Вы можете задаться вопросом, в чем разница между нашими программами асинхронного и синхронного сна? Ответ заключается в том, что вызов асинхронного сна является неблокирующим, в отличие от вызова синхронного.

Однако внутри планировщик, увидев, что поток собирается блокировать вызов сна в течение пяти секунд, переключает его на другой поток и возобновляет выполнение только по истечении не менее пяти секунд.

Перевод статьи «Python Concurrency: Making sense of asyncio».