Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 36 additions & 37 deletions auth_backend/auth_plugins/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import hmac
import logging
from typing import Any
from urllib.parse import quote, unquote
from urllib.parse import unquote

import jwt
from event_schema.auth import UserLogin
from fastapi import Depends
from fastapi.background import BackgroundTasks
from fastapi_sqlalchemy import db
from pydantic import BaseModel, Field
from pydantic import BaseModel

from auth_backend.auth_method import AuthPluginMeta, OauthMeta, Session
from auth_backend.exceptions import AlreadyExists, OauthAuthFailed
Expand All @@ -26,59 +26,54 @@

class TelegramSettings(Settings):
TELEGRAM_REDIRECT_URL: str = "https://app.test.profcomff.com/auth"
TELEGRAM_BOT_TOKEN: str | None = None
TELEGRAM_BOT_TOKEN: str | None = None # Сделал так для тестов, однако токен нужен для работы авторизации!


class TelegramAuth(OauthMeta):
"""Вход в приложение 'Твой ФФ' через Telegram Login Widget."""

prefix = '/telegram'
tags = ['Telegram']
settings = TelegramSettings()

class OauthResponseSchema(BaseModel):
id_token: str | None = Field(default=None, help="Telegram JWT token identifier")
id: str | None = None
first_name: str | None = None
class TGAuthResponseSchema(BaseModel):
id: str
first_name: str
last_name: str | None = None
username: str | None = None
photo_url: str | None = None
auth_date: str | None = None
hash: str | None = None
scopes: list[Scope] | None = None
session_name: str | None = None
auth_date: str
hash: str
scopes: list[Scope] | None = None # Телеграм не передает это поле. Осталось "исторически".
session_name: str | None = None # Телеграм не передает это поле. Осталось "исторически".

@classmethod
async def _register(
cls,
user_inp: OauthResponseSchema,
user_inp: TGAuthResponseSchema,
background_tasks: BackgroundTasks,
user_session: UserSession = Depends(UnionAuth(auto_error=True, scopes=[], allow_none=True)),
) -> Session:
"""Добавление метода аутентификации через (виджет) Телеграма."""
old_user = None
new_user = {}
telegram_user_id = None
userinfo = None

if user_inp.id_token is None:
userinfo = await cls._check(user_inp)
telegram_user_id = user_inp.id
logger.debug(userinfo)
else:
userinfo = jwt.decode(user_inp.id_token, cls.settings.ENCRYPTION_KEY, algorithms=["HS256"])
telegram_user_id = userinfo['id']
logger.debug(userinfo)

user = await cls._get_user('user_id', telegram_user_id, db_session=db.session)
# Проверяем получение корректных данных
userinfo = await cls._check(user_inp)
tg_user_id = userinfo['id']

user = await cls._get_user('user_id', tg_user_id, db_session=db.session)
if user is not None:
raise AlreadyExists(User, user.id)

if user_session is None:
user = await cls._create_user(db_session=db.session) if user_session is None else user_session.user
user = await cls._create_user(db_session=db.session)
else:
user = user_session.user
old_user = {'user_id': user.id}
new_user["user_id"] = user.id
tg_id = cls.create_auth_method_param('user_id', telegram_user_id, user.id, db_session=db.session)
new_user[cls.get_name()] = {"user_id": tg_id.value}
tg_auth = cls.create_auth_method_param('user_id', tg_user_id, user.id, db_session=db.session)
new_user[cls.get_name()] = {"user_id": tg_auth.value}
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
background_tasks.add_task(
get_kafka_producer().produce,
Expand All @@ -95,10 +90,10 @@ async def _register(
)

@classmethod
async def _login(cls, user_inp: OauthResponseSchema, background_tasks: BackgroundTasks) -> Session:
"""Вход в пользователя с помощью аккаунта https://lk.msu.ru
async def _login(cls, user_inp: TGAuthResponseSchema, background_tasks: BackgroundTasks) -> Session:
"""Вход в пользователя с помощью аккаунта ТГ.

Производит вход, если находит пользователя по уникаотному идендификатору. Если аккаунт не
Производит вход, если находит пользователя по id (из Телеграма). Если аккаунт не
найден, возвращает ошибка.
"""

Expand Down Expand Up @@ -129,23 +124,26 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun

@classmethod
async def _redirect_url(cls):
"""URL на который происходит редирект после завершения входа на стороне провайдера"""
"""URL на который происходит редирект после завершения входа на стороне провайдера.

В данном случае не предполагается к использованию, т.к. данный URL вшит в виджет.
"""
return OauthMeta.UrlSchema(url=cls.settings.TELEGRAM_REDIRECT_URL)

@classmethod
async def _auth_url(cls):
"""URL на который происходит редирект из приложения для авторизации на стороне провайдера"""
"""URL на который происходит редирект из приложения, чтобы авторизоваться на стороне провайдера.

return OauthMeta.UrlSchema(
url=f"https://oauth.telegram.org/auth?bot_id={cls.settings.TELEGRAM_BOT_TOKEN.split(':')[0]}&origin={quote(cls.settings.TELEGRAM_REDIRECT_URL)}&return_to={quote(cls.settings.TELEGRAM_REDIRECT_URL)}"
)
В данном случае не предполагается, т.к. URL вшит в виджет. Отдается атрибут src виджета.
"""
return OauthMeta.UrlSchema(url='https://telegram.org/js/telegram-widget.js?22')

@classmethod
async def _check(cls, user_inp):
'''Проверка данных пользователя
"""Проверка данных пользователя.

https://core.telegram.org/widgets/login#checking-authorization
'''
"""
data_check = {
'id': user_inp.id,
'first_name': user_inp.first_name,
Expand All @@ -170,6 +168,7 @@ async def _check(cls, user_inp):

@classmethod
async def _convert_data_to_userdata_format(cls, data: dict[str, Any]) -> UserLogin:
"""Конвертация данных в формат для userdata-api."""
first_name, last_name = '', ''
if 'first_name' in data.keys() and data['first_name'] is not None:
first_name = data['first_name']
Expand Down