Внимание! Данный материал может содержать ошибки, и является лишь попыткой закрепить изученный материал через изложение. Комментарии с правками приветствуются.
Используем: Python 3.9 Asyncio Aiohttp Postgresql Docker Sqlalchemy**
Задача: написать асинхронный код, который скачивает данные с сайта и загружает их в БД.
Создаем 5 файлов:
main.py — точка входа для запуска программы
jsonplaceholder.py — модуль, скачивающий данные с сайта
models.py — ф-и для БД
docker-compose.yaml — информация для докера
postgres.env — логин, пароль, название БД
docker-compose.yaml
version: '3'
services:
pg:
image: postgres # use latest official postgres version
env_file:
- postgres.env # configure postgres
ports:
- 5432:5432
volumes:
- ./db-data/pg-data:/var/lib/postgresql/data/ # persist data even if container shuts down
postgres.env
POSTGRES_USER=user
POSTGRES_PASSWORD=password
POSTGRES_DB=postgres
jsonplaceholder.py
import aiohttp
import asyncio # нужен только для пробного запуска
USERS_DATA_URL = "https://jsonplaceholder.typicode.com/users"
POSTS_DATA_URL = "https://jsonplaceholder.typicode.com/posts"
async def fetch_json(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await resonse.json()
if '__name__' = '__main__':
asyncio.run(fetch_json(USERS_DATA_URL))
aiohttp — это «async HTTP client/server for asyncio and Python».
asyncio — это «a library to write concurrent code using the async/await syntax».
async — создает корутину await — позволяет переключить контекст, переходя к другой ф-и, если текущая занята. async with — асинхронный менеджер контекста, который автоматически закроется после выполнения задачи (см. Менеджер контекста или with as) Корутина — cooperative routine, код, который может выполняться одновременно с другим кодом.
Что мы здесь делаем:
Импортируем библиотеки Пишем константы (по соглашению разработчиков, константы — это переменные, записанные с большой буквы, и которые НЕ следует менять!)
Создаем асинхронную ф-и(корутину) fetch_json — дословно: «получить json».
Внутри ф-и вызываем асинхронный менеджер, в нем открываем сессию через aiohttp, и следующим шагом посылаем запрос GET по урлу session.get(url). Результат кладем в переменную response и возвращаем его, преобразовав в json (return await response.json()) — код с return читается справа налево. Вначале await переменной, затем return того, что получилось)
Важно: данные, приходящие с сервера, не нужно мутировать! Почему? Потому что другие разработчики знают, как работает json() и ожидают json, а не мутанта:) Не усложняйте жизнь другим и себе, например, спустя месяц.
Итак, результат работы модуля jsonplaceholder.py — возврат данных в виде json
models.py
# импортируем модуль даты и времени
from datetime import datetime
# импортируем логгер (типа дебаггера)
from loguru import logger
import asyncio
# импортируем ORM для работы с БД
from sqlalchemy import (
Column,
String,
Integer,
DateTime,
ForeignKey,
func
)
# импортируем асинхронные методы sqlalchemy для работы с БД
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
# импортируем метод работы с бд, фабрику сессий и связи
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
PG_ASYNC_CONN_URI = 'postgresql+asyncpg://user:password@localhost/postgres'
# создаем движок
engine = create_async_engine(PG_ASYNC_CONN_URI, echo=False)
# создаем метод описания БД (Создаем базовый класс для декларативных определений классов.)
Base = declarative_base()
# создаем сессию (Фабрика sessionmaker генерирует новые объекты Session при вызове)
Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# запускаем докер, вызвав команду в консоли используя create_subprocess_shell(cmd)
cmd = 'docker compose up -d'
async def create_pg_docker(cmd):
result = await asyncio.create_subprocess_shell(cmd)
await result.communicate()
logger.info('____pg docker rdy')
# делаем DROP TABLE, CREATE TABLE в БД
async def created_db_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
# cоздаем ф-ю, которая скачивает с сайта юзеров и сохраняет их в БД
async def save_user_in_db(u_data):
async with Session() as session:
async with session.begin():
for user in u_data:
name = user['name']
email = user['email']
user = User(name=name, email=email)
session.add(user) # добавляем данные юзера в сессию
# session.commit() делается автоматически при закрытии менеджера контекста, поэтому его здесь не пишем.
# cоздаем ф-ю, которая скачивает с сайта посты и сохраняет их в БД
async def save_post_in_db(p_data):
async with Session() as session:
async with session.begin():
for post in p_data:
title = post['title']
description = post['body']
user_id = post['userId']
post = Post(title=title, description=description, user_id=user_id)
session.add(post)
# создаем модель таблицы User и Post
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False, default='', server_default='')
email = Column(String, nullable=False, default='', server_default='')
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, server_default=func.now())
posts = relationship('Post', back_populates='users')
def __str__(self):
return f'{self.__class__.__name__}(id={self.id}, name={self.name!r}, email={self.email},' \
f'created_at={self.created_at!r})'
def __repr__(self):
return str(self)
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String, nullable='', default='', server_default='')
description = Column(String, nullable='', default='', server_default='')
user_id = Column(Integer, ForeignKey('user.id'), nullable=False)
users = relationship('User', back_populates='posts')
def __str__(self):
return f'{self.__class__.__name__}(id={self.id}, title={self.title!r}, description={self.description!r})'
def __repr__(self):
return str(self)
Результат работы модуля models.py:
- Подключаемся к БД в докере
- Дропаем и создаем заново таблицы (чтобы небыло конфликта с существующими записями)
- Сохраняем данные юзеров и постов в БД
main.py:
import asyncio
from json_placeholder import USERS_DATA_URL, POSTS_DATA_URL, fetch_json
from models import create_pg_docker,created_db_tables,cmd, save_post_in_db,save_user_in_db
async def async_main():
await create_pg_docker(cmd)
await created_db_tables()
user_data, post_data = await asyncio.gather(fetch_json(USERS_DATA_URL),
fetch_json(POSTS_DATA_URL))
await save_user_in_db(user_data)
await save_post_in_db(post_data)
def main():
asyncio.run(async_main())
if __name__ == '__main__':
main()
Что здесь делаем: Создаем асинхронный main() и запускаем его в синхронном main(). Внутри async_main() последовательно запускаем асинхронные ф-и:
- Запуск БД в докере
- Дроп и создание пустых таблиц
- Получение данных юзеров и постов с сайта
- Сохранение юзеров в бд
- Сохранение постов в бд
- Закрытие подключения к бд (происходит неявно, автоматически при завершении работы менеджера контекста)
asyncio.gather — метод, который вернет то, что в него было положено, в том же порядке. (deprecated in python 3.10!!!) asyncio.run - метод, создающий event loop для асинхронных ф-ий.
Результат работы main.py: В БД, в таблице User появляется 10 пользователей, в таблице Post — 100 записей, связанных отношением many to one к юзерам, написавших их.
P.S. Если будете копировать код - используйте IDE для его запуска т.к. где-то может стоять 4 пробела, а где-то табуляция. Но лучше не копировать, а руками перепечатать, так лучше запомнится и поймется.