Асинхронный JSON-Logger для FastAPI

Всем привет.

Цель данной статьи и моей личной разработки — написать о том, как я придумал свой собственный и удобный формат json-журналирования специально для компании, в которой я работаю, но не нашел готовых решений для реализации, который позволит мне воплощать очень гибко и удобно некоторые вещи с единым наименованием полей, чтобы логгирование протекало быстро и асинхронно, а также: не заставляло бы меня писать много кода и обойтись одной-двумя библиотеками максимум.

Пошел искать решение в лоб: искал готовые решения в виде сторонних библиотек. Отыскал пару-тройку годных решений. Например библиотека json-logging. Всё хорошо: почитал, что делает и поставил не глядя. Данная библиотека имеет массу преимуществ:

  • может напрямую работать с FastAPI, Flask, Aiohttp, Sanic,

  • перехватывает uvicorn access log,

  • производит форматирование запроса в JSON формат.

Данная библиотека генерирует прекрасный лог сообщения с полями запроса, который поступил в сервис:

Код
{ "type": "request", "written_at": "2017-12-23T16:55:37.280Z", "written_ts": 1514048137280721000, "component_id": "-", "component_name": "-", "component_instance_idx": 0, "correlation_id": "1975a02e-e802-11e7-8971-28b2bd90b19a", "remote_user": "user_a", "request": "/index.html", "referer": "-", "x_forwarded_for": "-", "protocol": "HTTP/1.1", "method": "GET", "remote_ip": "127.0.0.1", "request_size_b": 1234, "remote_host": "127.0.0.1", "remote_port": 50160, "request_received_at": "2017-12-23T16:55:37.280Z", "response_time_ms": 0, "response_status": 200, "response_size_b": "122", "response_content_type": "text/html; charset=utf-8", "response_sent_at": "2017-12-23T16:55:37.280Z"
}

Но у этой библиотеки обнаружились серьёзные недостатки:

  • Название полей в логе изменить нельзя без наследования Formatter и кастомизацию.

  • Нельзя никаким образом поставить в логирование тело ответа без написания собственного Middleware для FastAPI, который перехватит еще и тело ответа, что мы добавили в заголовки.

  • Логгирование перехватывало синхронный поток вывода логгера uvicorn.access.

Я не буду писать, как я пытался закостылить исправить первые два минуса, пока не столкнулся с тем, что мне всё-таки нужен асинхронный логер. Я лучше Вам расскажу, как я решил сделать собственное решение.
Для начала я решил отказаться от этой библиотеки сразу:

pip uninstall json-logging

Далее я хотел обозначить структуру журнала. Мне очень помог Pydantic для элегантного описания данного вопроса:

from typing import Union from pydantic import BaseModel, Field class BaseJsonLogSchema(BaseModel): """ Схема основного тела лога в формате JSON """ thread: Union[int, str] level: int level_name: str message: str source: str timestamp: str = Field(..., alias='@timestamp') app_name: str app_version: str app_env: str duration: int exceptions: Union[list[str], str] = None trace_id: str = None span_id: str = None parent_id: str = None class Config: allow_population_by_field_name = True class RequestJsonLogSchema(BaseModel): """ Схема части запросов-ответов лога в формате JSON """ request_uri: str request_referer: str request_protocol: str request_method: str request_path: str request_host: str request_size: int request_content_type: str request_headers: str request_body: str request_direction: str remote_ip: str remote_port: str response_status_code: int response_size: int response_headers: str response_body: str duration: int

После того, как была сформирована структура тела запроса необходимо определить сам механизм форматирования. Формирование, что логично, производится с помощью создания класса JSONLogFormatter с наследованием от стандартного logging.Formatter.
Код test_project.utils.json_logger.py

import datetime
import json
import logging
import traceback from test_project.main.settings import APP_NAME, \ APP_VERSION, ENVIRONMENT
from test_project.utils.utils_schemas import BaseJsonLogSchema class JSONLogFormatter(logging.Formatter): """ Кастомизированный класс-форматер для логов в формате json """ def format(self, record: logging.LogRecord, *args, **kwargs) -> str: """ Преобразование объект журнала в json :param record: объект журнала :return: строка журнала в JSON формате """ log_object: dict = self._format_log_object(record) return json.dumps(log_object, ensure_ascii=False) @staticmethod def _format_log_object(record: logging.LogRecord) -> dict: """ Перевод записи объекта журнала в json формат с необходимым перечнем полей :param record: объект журнала :return: Словарь с объектами журнала """ now = datetime \ .datetime \ .fromtimestamp(record.created) \ .astimezone() \ .replace(microsecond=0) \ .isoformat() message = record.getMessage() duration = record.duration \ if hasattr(record, 'duration') \ else record.msecs # Инициализация тела журнала json_log_fields = BaseJsonLogSchema( thread=record.process, timestamp=now, level=record.levelno, level_name=LEVEL_TO_NAME[record.levelno], message=message, source=record.name, duration=duration, app_name=APP_NAME, app_version=APP_VERSION, app_env=ENVIRONMENT, ) if hasattr(record, 'props'): json_log_fields.props = record.props if record.exc_info: json_log_fields.exceptions = \ traceback.format_exception(*record.exc_info) elif record.exc_text: json_log_fields.exceptions = record.exc_text # Преобразование Pydantic объекта в словарь json_log_object = json_log_fields.dict( exclude_unset=True, by_alias=True, ) # Соединение дополнительных полей логирования if hasattr(record, 'request_json_fields'): json_log_object.update(record.request_json_fields) return json_log_object

Нужно заставить FastApi ловить request и response, желательно с наличием body. Я сначала пытался обратиться к dependency подходу FastApi. Но понял, что не хочу описывать в каждом методе метод логирования. Естественно, к нам на помощь приходит Middleware.
Я написал класс, который позволяет внедрить в app-объект журналирование тела запроса и ответа, а также их полей: status code, headers, remote ip, protocol, etc. Класс я назвал LoggingMiddleware и все выполнение происходит методу async def __call__()

Код test_project.utils.middlewares.py

from test_project.main.settings import PORT
from test_project.utils.json_logger import EMPTY_VALUE
from test_project.utils.utils_schemas import RequestJsonLogSchema class LoggingMiddleware: """ Middleware для обработки запросов и ответов с целью журналирования """ @staticmethod async def get_protocol(request: Request) -> str: protocol = str(request.scope.get('type', '')) http_version = str(request.scope.get('http_version', '')) if protocol.lower() == 'http' and http_version: return f'{protocol.upper()}/{http_version}' return EMPTY_VALUE @staticmethod async def set_body(request: Request, body: bytes) -> None: async def receive() -> Message: return {'type': 'http.request', 'body': body} request._receive = receive async def get_body(self, request: Request) -> bytes: body = await request.body() await self.set_body(request, body) return body async def __call__( self, request: Request, call_next: RequestResponseEndpoint, *args, **kwargs ): start_time = time.time() exception_object = None # Request Side try: raw_request_body = await request.body() # Последующие действия нужны, # чтобы не перезатереть тело запроса # и не уйти в зависание event-loop'a # при последующем получении тела ответа await self.set_body(request, raw_request_body) raw_request_body = await self.get_body(request) request_body = raw_request_body.decode() except Exception: request_body = EMPTY_VALUE server: tuple = request.get('server', ('localhost', PORT)) request_headers: dict = dict(request.headers.items()) # Response Side try: response = await call_next(request) except Exception as ex: response_body = bytes( http.HTTPStatus.INTERNAL_SERVER_ERROR.phrase.encode() ) response = Response( content=response_body, status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR.real, ) exception_object = ex response_headers = {} else: response_headers = dict(response.headers.items()) response_body = b'' async for chunk in response.body_iterator: response_body += chunk response = Response( content=response_body, status_code=response.status_code, headers=dict(response.headers), media_type=response.media_type ) duration: int = math.ceil((time.time() - start_time) * 1000) # Инициализация и формирования полей для запроса-ответа request_json_fields = RequestJsonLogSchema( request_uri=str(request.url), request_referer=request_headers.get('referer', EMPTY_VALUE), request_protocol=await self.get_protocol(request), request_method=request.method, request_path=request.url.path, request_host=f'{server[0]}:{server[1]}', request_size=int(request_headers.get('content-length', 0)), request_content_type=request_headers.get( 'content-type', EMPTY_VALUE), request_headers=json.dumps(request_headers), request_body=request_body, request_direction='in', remote_ip=request.client[0], remote_port=request.client[1], response_status_code=response.status_code, response_size=int(response_headers.get('content-length', 0)), response_headers=json.dumps(response_headers), response_body=response_body.decode(), duration=duration ).dict() # Хочется на каждый запрос читать # и понимать в сообщении самое главное, # поэтому message мы сразу делаем типовым message = f'{"Ошибка" if exception_object else "Ответ"} ' \ f'с кодом {response.status_code} ' \ f'на запрос {request.method} \"{str(request.url)}\", ' \ f'за {duration} мс' logger.info( message, extra={ 'request_json_fields': request_json_fields, 'to_mask': True, }, exc_info=exception_object, ) return response

У меня есть одержимость по поводу асинхронного логирования в FastApi. Методы вывода потока в консоль, запись в файл, logstash — это тот же I/O процесс, как и сгонять по сетке в БД или в другой сервис, согласны? А так как это I/O процесс, то в асинхронном FastApi очень важно не замедлять работу такой простой штукой как логирование. Я нашел очень хорошую библиотеку, которая за одну конфигурацию перестраивает вывод в консоль в асинхронный режим (также можно провернуть и с файлами и отправкой в logstash, и другие ресурсы). Данная библиотека называется: asynclog. Настройка и установка производится очень просто. Не нужно иметь никаких классов и переопределять handler-ы: Нужно просто настроить логирование через словарь в конфигурации проекта и задать handler.

Создаем конфигурацию логера и использовать его везде. Благо, в python есть для этого конфигурация, которую можно воплотить в настройках проекта.

LOG_CONFIG = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'json': { '()': 'test_project.utils.json_logger.JSONLogFormatter', }, }, 'handlers': {
# Используем AsyncLogDispatcher для асинхронного вывода потока. 'json': { 'formatter': 'json', 'class': 'asynclog.AsyncLogDispatcher', 'func': 'test_project.utils.json_logger.write_log', }, }, 'loggers': { 'test_project': { 'handlers': ['json'], 'level': 'DEBUG' if DEBUG else 'INFO', 'propagate': False, }, 'uvicorn': { 'handlers': ['json'], 'level': 'INFO', 'propagate': False, }, # Не даем стандартному логгеру fastapi работать по пустякам и замедлять работу сервиса 'uvicorn.access': { 'handlers': ['json'], 'level': 'ERROR', 'propagate': False, }, },
}

Здесь у handler есть особенность в поле func, тут должна быть указана функция вывода потока. Это может быть stdout, print, write file. Что угодно, вплоть до передачи файла по сети в logstash. В нашем случае я решил просто вывести в консоль простейшим способом для примера.
Код test_project.utils.json_logger.py:

def write_log(msg): print(msg)

Настраиваем приложение на работу с данным middleware, инициализируем logging и его конфигурацию. Далее производим написание обычного route с логгированием сообщения ‘Test log’.

import logging
from logging.config import dictConfig from fastapi import FastAPI
from test_project.utils.middlewares import LoggingMiddleware
dictConfig(LOG_CONFIG)
logger = logging.getLogger(name)
app = FastAPI()
app.middleware('http')( LoggingMiddleware()
)
router = APIRouter( tags=['root'], responses={404: {'description': 'Not found'}},
)
@router.get('/')
async def root(request: Request): logger.info('Test log') return {'foo': 'bar'}
app.include_router(router)

Получаем сразу два сообщения в консоли:
Первое: как раз таки тот вывод из Middleware, который гарантирует отображение полей запроса

{ "thread": 60535, "level": 20, "level_name": "Information", "message": "Ответ с кодом 200 на запрос GET \"http://localhost:8080/\", за 4 мс", "source": "test_project.utils.middlewares", "@timestamp": "2021-08-28T01:49:23+03:00", "app_name": "test_project", "app_version": "1.0.0", "app_env": "LOCAL", "duration": 4, "request_uri": "http://localhost:8080/", "request_referer": "", "request_protocol": "HTTP/1.1", "request_method": "GET", "request_path": "/", "request_host": "127.0.0.1:8080", "request_size": 0, "request_content_type": "", "request_headers": "{\"host\": \"localhost:8080\"}", "request_body": "", "request_direction": "in", "remote_ip": "127.0.0.1", "remote_port": "50259", "response_status_code": 200, "response_size": 13, "response_headers": "{\"content-length\": \"13\", \"content-type\": \"application/json\"}", "response_body": "{\"foo\":\"baz\"}"
}

Второе: Наше сообщение 'Test Log' вне контекста HTTP запроса, а уже в контексте работы внутри route

{ "thread": 60535, "level": 20, "level_name": "Information", "message": "Test log", "source": "test_project.main.routes", "@timestamp": "2021-08-28T01:49:23+03:00", "app_name": "test_project", "app_version": "1.0.0", "app_env": "LOCAL", "duration": 385, "trace_id": "3bd6f269dbbbd275d46320c7e240bfe5", "span_id": "2417303bd1af1ee6", "parent_id": null
}

Бонус. Демонстрация производительности асинхронного вывода.

Почему я так сильно хотел асинхронный вывод журнала? Сейчас будет представлено нагрузочное тестирование на обычном примере одного и того же запроса, с одними и теми же конфигурациями сервера.

  • Количество воркеров: 10

  • Количество одновременно-подключенных клиентов: 200

  • Таймаут: 15 секунд.

  • Время выполнения теста: 15 секунд (больше и не надо)

  • Логгирование одного и того же сообщения: test.

  • Formatter не менял свой код.

Команда запуска нагрузочного тестирования:

wrk -c200 -t1 -d15s http://localhost:8080/users/public/ --timeout 15

Результат нагрузочного тестирования синхронного логгирования:

Результат нагрузочного тестирования асинхронного логгирования:

Как можно заметить, что синхронный вариант вывода дал 2294 rps, а асинхронный вариант вывода выдал целых 8364 rps. Получается, разница производительности целого сложного веб-сервиса увеличилась в 3.64 раза благодаря простому переходу и подходу к логгированию.

Вывод

Никогда не стоит соглашаться на сторонние библиотеки, если чувствуете, что они не могут удовлетворить ваших потребностей. Наследоваться от сторонних библиотек может обернуться плохой шуткой и лучше сразу не полениться и попробовать написать что-то своё. Да, это решение вряд-ли подойдет 99.99% пользователям, есть моменты, которые бы улучшил, оптимизировал и сделал бы более гибкими. Как раз про это я бы хотел почитать в комментариях, обсудить, как можно улучшить данный подход.

Пока что данная реализация находится в закрытой разработке одного из моих проектов, но я обещаю, что дополню эту статью ссылкой на гитхаб, либо подготовлю уже PyPi решение для вас, дорогие читатели данной статьи.

Telegram: @Lev_key

Читайте так же:

  • Поисковая реклама и SEO: практики, работающие в 2021Поисковая реклама и SEO: практики, работающие в 2021 Описание Приглашаем 27 мая в 13:00 по МСК на вебинар «Поисковая реклама и SEO: практики. Работающие в 2021» от агентства интернет-рекламы R-брокер и сервиса Rookee. Регистрация по ссылке  https://clck.ru/V4kKПрограмма:Эльвира Сафиуллина. Евангелист агентства R-брокер. […]
  • Первые умные часы с HarmonyOS прибыли в Россию: можно получить Huawei Watch 3 бесплатноПервые умные часы с HarmonyOS прибыли в Россию: можно получить Huawei Watch 3 бесплатно Компания Huawei представила в России свои первые умные часы на основе новой фирменной платформы — HarmonyOS 2. В линейку вошли Huawei Watch 3 и Huawei Watch 3 Pro. Обе модели становятся доступны российским пользователям.  Рекомендованная розничная цена новинок составляет 29 990 […]
  • Заполнение сайтов работаЗаполнение сайтов работа О            Льготах            Работа                        Заработок Подтверждение            Дохода Выплаты            Отзывы            Зарегистрируйтесь Сейчас О Компании : Работа По Заполнению Формуляров Вакансии для заполнения формуляров подходят всем. Кто ищет законную […]
  • Верстка контентаВерстка контента Множество сайтов сияют цветом, яркими изображениями и полезным контентом. Но если вы сорвете цвет, видео, изображения, контент, что останется? Вы придете к голым костям веб-сайта и вдруг поймете: это макет. Который действительно накачивает блеск на веб-сайт. Что такое макет сайта? […]