/
Блог
/
Дневник студента
/

Учим_Питон_#5 — Практика: асинхронные функции

Учим_Питон_#5 — Практика: асинхронные функции

14 мая 2021 г.
3 минуты
1

Внимание! Данный материал может содержать ошибки, и является лишь попыткой закрепить изученный материал через изложение. Комментарии с правками приветствуются.

Используем: Python 3.9 Asyncio Aiohttp Postgresql Docker Sqlalchemy**

Задача: написать асинхронный код, который скачивает данные с сайта и загружает их в БД.

Создаем 5 файлов:

main.py — точка входа для запуска программы jsonplaceholder.py — модуль, скачивающий данные с сайта models.py — ф-и для БД docker-compose.yaml — информация для докера postgres.env — логин, пароль, название БД

docker-compose.yaml

version: '3' services: pg: image: postgres # use latest official postgres version env_file: - postgres.env # configure postgres ports: - 5432:5432 volumes: - ./db-data/pg-data:/var/lib/postgresql/data/ # persist data even if container shuts down

postgres.env

POSTGRES_USER=user POSTGRES_PASSWORD=password POSTGRES_DB=postgres

jsonplaceholder.py

import aiohttp import asyncio # нужен только для пробного запуска USERS_DATA_URL = "https://jsonplaceholder.typicode.com/users" POSTS_DATA_URL = "https://jsonplaceholder.typicode.com/posts" async def fetch_json(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await resonse.json() if '__name__' = '__main__': asyncio.run(fetch_json(USERS_DATA_URL))

aiohttp — это «async HTTP client/server for asyncio and Python».

asyncio — это «a library to write concurrent code using the async/await syntax».

async — создает корутину await — позволяет переключить контекст, переходя к другой ф-и, если текущая занята. async with — асинхронный менеджер контекста, который автоматически закроется после выполнения задачи (см. Менеджер контекста или with as) Корутина — cooperative routine, код, который может выполняться одновременно с другим кодом.

Что мы здесь делаем:

Импортируем библиотеки Пишем константы (по соглашению разработчиков, константы — это переменные, записанные с большой буквы, и которые НЕ следует менять!)

Создаем асинхронную ф-и(корутину) fetch_json — дословно: «получить json».

Внутри ф-и вызываем асинхронный менеджер, в нем открываем сессию через aiohttp, и следующим шагом посылаем запрос GET по урлу session.get(url). Результат кладем в переменную response и возвращаем его, преобразовав в json (return await response.json()) — код с return читается справа налево. Вначале await переменной, затем return того, что получилось)

Важно: данные, приходящие с сервера, не нужно мутировать! Почему? Потому что другие разработчики знают, как работает json() и ожидают json, а не мутанта:) Не усложняйте жизнь другим и себе, например, спустя месяц.

Итак, результат работы модуля jsonplaceholder.py — возврат данных в виде json

models.py

# импортируем модуль даты и времени from datetime import datetime # импортируем логгер (типа дебаггера) from loguru import logger import asyncio # импортируем ORM для работы с БД from sqlalchemy import ( Column, String, Integer, DateTime, ForeignKey, func ) # импортируем асинхронные методы sqlalchemy для работы с БД from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession # импортируем метод работы с бд, фабрику сессий и связи from sqlalchemy.orm import declarative_base, sessionmaker, relationship PG_ASYNC_CONN_URI = 'postgresql+asyncpg://user:password@localhost/postgres' # создаем движок engine = create_async_engine(PG_ASYNC_CONN_URI, echo=False) # создаем метод описания БД (Создаем базовый класс для декларативных определений классов.) Base = declarative_base() # создаем сессию (Фабрика sessionmaker генерирует новые объекты Session при вызове) Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) # запускаем докер, вызвав команду в консоли используя create_subprocess_shell(cmd) cmd = 'docker compose up -d' async def create_pg_docker(cmd): result = await asyncio.create_subprocess_shell(cmd) await result.communicate() logger.info('____pg docker rdy') # делаем DROP TABLE, CREATE TABLE в БД async def created_db_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) # cоздаем ф-ю, которая скачивает с сайта юзеров и сохраняет их в БД async def save_user_in_db(u_data): async with Session() as session: async with session.begin(): for user in u_data: name = user['name'] email = user['email'] user = User(name=name, email=email) session.add(user) # добавляем данные юзера в сессию # session.commit() делается автоматически при закрытии менеджера контекста, поэтому его здесь не пишем. # cоздаем ф-ю, которая скачивает с сайта посты и сохраняет их в БД async def save_post_in_db(p_data): async with Session() as session: async with session.begin(): for post in p_data: title = post['title'] description = post['body'] user_id = post['userId'] post = Post(title=title, description=description, user_id=user_id) session.add(post) # создаем модель таблицы User и Post class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True) name = Column(String, nullable=False, default='', server_default='') email = Column(String, nullable=False, default='', server_default='') created_at = Column(DateTime, nullable=False, default=datetime.utcnow, server_default=func.now()) posts = relationship('Post', back_populates='users') def __str__(self): return f'{self.__class__.__name__}(id={self.id}, name={self.name!r}, email={self.email},' \ f'created_at={self.created_at!r})' def __repr__(self): return str(self) class Post(Base): __tablename__ = 'posts' id = Column(Integer, primary_key=True) title = Column(String, nullable='', default='', server_default='') description = Column(String, nullable='', default='', server_default='') user_id = Column(Integer, ForeignKey('user.id'), nullable=False) users = relationship('User', back_populates='posts') def __str__(self): return f'{self.__class__.__name__}(id={self.id}, title={self.title!r}, description={self.description!r})' def __repr__(self): return str(self)

Результат работы модуля models.py:

  1. Подключаемся к БД в докере
  2. Дропаем и создаем заново таблицы (чтобы небыло конфликта с существующими записями)
  3. Сохраняем данные юзеров и постов в БД

main.py:

import asyncio from json_placeholder import USERS_DATA_URL, POSTS_DATA_URL, fetch_json from models import create_pg_docker,created_db_tables,cmd, save_post_in_db,save_user_in_db async def async_main(): await create_pg_docker(cmd) await created_db_tables() user_data, post_data = await asyncio.gather(fetch_json(USERS_DATA_URL), fetch_json(POSTS_DATA_URL)) await save_user_in_db(user_data) await save_post_in_db(post_data) def main(): asyncio.run(async_main()) if __name__ == '__main__': main()

Что здесь делаем: Создаем асинхронный main() и запускаем его в синхронном main(). Внутри async_main() последовательно запускаем асинхронные ф-и:

  1. Запуск БД в докере
  2. Дроп и создание пустых таблиц
  3. Получение данных юзеров и постов с сайта
  4. Сохранение юзеров в бд
  5. Сохранение постов в бд
  6. Закрытие подключения к бд (происходит неявно, автоматически при завершении работы менеджера контекста)

asyncio.gather — метод, который вернет то, что в него было положено, в том же порядке. (deprecated in python 3.10!!!) asyncio.run - метод, создающий event loop для асинхронных ф-ий.

Результат работы main.py: В БД, в таблице User появляется 10 пользователей, в таблице Post — 100 записей, связанных отношением many to one к юзерам, написавших их.

P.S. Если будете копировать код - используйте IDE для его запуска т.к. где-то может стоять 4 пробела, а где-то табуляция. Но лучше не копировать, а руками перепечатать, так лучше запомнится и поймется.

Valeriy Poltoranin

4 года назад

1