Главная | Все статьи | Дневник студента

Учим_Питон_#5 — Практика: асинхронные функции

Время чтения статьи ~6 минут
Статья написана студентом Хекслета. Мнение автора может не совпадать с позицией редакции
Учим_Питон_#5 — Практика: асинхронные функции главное изображение

Внимание! Данный материал может содержать ошибки, и является лишь попыткой закрепить изученный материал через изложение. Комментарии с правками приветствуются.

Используем: Python 3.9 Asyncio Aiohttp Postgresql Docker Sqlalchemy**

Задача: написать асинхронный код, который скачивает данные с сайта и загружает их в БД.

Создаем 5 файлов:

main.py — точка входа для запуска программы
jsonplaceholder.py — модуль, скачивающий данные с сайта
models.py — ф-и для БД
docker-compose.yaml — информация для докера
postgres.env — логин, пароль, название БД

docker-compose.yaml

version: '3'

services:
  pg:
    image: postgres # use latest official postgres version
    env_file:
      - postgres.env # configure postgres
    ports:
      - 5432:5432
    volumes:
      - ./db-data/pg-data:/var/lib/postgresql/data/ # persist data even if container shuts down

postgres.env

POSTGRES_USER=user
POSTGRES_PASSWORD=password
POSTGRES_DB=postgres

jsonplaceholder.py

import aiohttp
import asyncio  # нужен только для пробного запуска

USERS_DATA_URL = "https://jsonplaceholder.typicode.com/users"
POSTS_DATA_URL = "https://jsonplaceholder.typicode.com/posts"

async def fetch_json(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await resonse.json()

if '__name__' = '__main__':
    asyncio.run(fetch_json(USERS_DATA_URL))

aiohttp — это «async HTTP client/server for asyncio and Python».

asyncio — это «a library to write concurrent code using the async/await syntax».

async — создает корутину await — позволяет переключить контекст, переходя к другой ф-и, если текущая занята. async with — асинхронный менеджер контекста, который автоматически закроется после выполнения задачи (см. Менеджер контекста или with as) Корутина — cooperative routine, код, который может выполняться одновременно с другим кодом.

Что мы здесь делаем:

Импортируем библиотеки Пишем константы (по соглашению разработчиков, константы — это переменные, записанные с большой буквы, и которые НЕ следует менять!)

Создаем асинхронную ф-и(корутину) fetch_json — дословно: «получить json».

Внутри ф-и вызываем асинхронный менеджер, в нем открываем сессию через aiohttp, и следующим шагом посылаем запрос GET по урлу session.get(url). Результат кладем в переменную response и возвращаем его, преобразовав в json (return await response.json()) — код с return читается справа налево. Вначале await переменной, затем return того, что получилось)

Важно: данные, приходящие с сервера, не нужно мутировать! Почему? Потому что другие разработчики знают, как работает json() и ожидают json, а не мутанта:) Не усложняйте жизнь другим и себе, например, спустя месяц.

Итак, результат работы модуля jsonplaceholder.py — возврат данных в виде json

models.py

# импортируем модуль даты и времени
from datetime import datetime

# импортируем логгер (типа дебаггера)
from loguru import logger

import asyncio

# импортируем ORM для работы с БД
from sqlalchemy import (
    Column,
    String,
    Integer,
    DateTime,
    ForeignKey,
    func
)

# импортируем асинхронные методы sqlalchemy для работы с БД
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

# импортируем метод работы с бд, фабрику сессий и связи
from sqlalchemy.orm import declarative_base, sessionmaker, relationship


PG_ASYNC_CONN_URI = 'postgresql+asyncpg://user:password@localhost/postgres'

# создаем движок
engine = create_async_engine(PG_ASYNC_CONN_URI, echo=False)

# создаем метод описания БД (Создаем базовый класс для декларативных определений классов.)
Base = declarative_base()

# создаем сессию (Фабрика sessionmaker генерирует новые объекты Session при вызове)
Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# запускаем докер, вызвав команду в консоли используя create_subprocess_shell(cmd)
cmd = 'docker compose up -d'

async def create_pg_docker(cmd):
    result = await asyncio.create_subprocess_shell(cmd)
    await result.communicate()
    logger.info('____pg docker rdy')

# делаем DROP TABLE, CREATE TABLE в БД
async def created_db_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

# cоздаем ф-ю, которая скачивает с сайта юзеров и сохраняет их в БД
async def save_user_in_db(u_data):
    async with Session() as session:
        async with session.begin():
            for user in u_data:
                name = user['name']
                email = user['email']
                user = User(name=name, email=email)
                session.add(user)  # добавляем данные юзера в сессию
                # session.commit() делается автоматически при закрытии менеджера контекста, поэтому его здесь не пишем.


# cоздаем ф-ю, которая скачивает с сайта посты и сохраняет их в БД
async def save_post_in_db(p_data):
    async with Session() as session:
        async with session.begin():
            for post in p_data:
                title = post['title']
                description = post['body']
                user_id = post['userId']
                post = Post(title=title, description=description, user_id=user_id)
                session.add(post)


# создаем модель таблицы User и Post
class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False, default='', server_default='')
    email = Column(String, nullable=False, default='', server_default='')
    created_at = Column(DateTime, nullable=False, default=datetime.utcnow, server_default=func.now())

    posts = relationship('Post', back_populates='users')

    def __str__(self):
        return f'{self.__class__.__name__}(id={self.id}, name={self.name!r}, email={self.email},' \
               f'created_at={self.created_at!r})'

    def __repr__(self):
        return str(self)


class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String, nullable='', default='', server_default='')
    description = Column(String, nullable='', default='', server_default='')

    user_id = Column(Integer, ForeignKey('user.id'), nullable=False)
    users = relationship('User', back_populates='posts')

    def __str__(self):
        return f'{self.__class__.__name__}(id={self.id}, title={self.title!r}, description={self.description!r})'

    def __repr__(self):
        return str(self)

Результат работы модуля models.py:

  1. Подключаемся к БД в докере
  2. Дропаем и создаем заново таблицы (чтобы небыло конфликта с существующими записями)
  3. Сохраняем данные юзеров и постов в БД

main.py:

import asyncio

from json_placeholder import USERS_DATA_URL, POSTS_DATA_URL, fetch_json

from models import create_pg_docker,created_db_tables,cmd, save_post_in_db,save_user_in_db


async def async_main():
    await create_pg_docker(cmd)
    await created_db_tables()
    user_data, post_data = await asyncio.gather(fetch_json(USERS_DATA_URL),
                                                fetch_json(POSTS_DATA_URL))
    await save_user_in_db(user_data)
    await save_post_in_db(post_data)


def main():
    asyncio.run(async_main())

if __name__ == '__main__':
    main()

Что здесь делаем: Создаем асинхронный main() и запускаем его в синхронном main(). Внутри async_main() последовательно запускаем асинхронные ф-и:

  1. Запуск БД в докере
  2. Дроп и создание пустых таблиц
  3. Получение данных юзеров и постов с сайта
  4. Сохранение юзеров в бд
  5. Сохранение постов в бд
  6. Закрытие подключения к бд (происходит неявно, автоматически при завершении работы менеджера контекста)

asyncio.gather — метод, который вернет то, что в него было положено, в том же порядке. (deprecated in python 3.10!!!) asyncio.run - метод, создающий event loop для асинхронных ф-ий.

Результат работы main.py: В БД, в таблице User появляется 10 пользователей, в таблице Post — 100 записей, связанных отношением many to one к юзерам, написавших их.

P.S. Если будете копировать код - используйте IDE для его запуска т.к. где-то может стоять 4 пробела, а где-то табуляция. Но лучше не копировать, а руками перепечатать, так лучше запомнится и поймется.

Аватар пользователя Valeriy Poltoranin
Valeriy Poltoranin 14 мая 2021
1
Похожие статьи