From ea45fb6509161219c79f4632f9bf3833ce2b9a06 Mon Sep 17 00:00:00 2001 From: Aleksandr Kovalko Date: Sun, 30 Nov 2025 20:16:41 +0500 Subject: [PATCH] Add ffmpeg progress tracking --- handlers/file.py | 244 +++++++++++++++++++++++-------------------- main.py | 2 + schedulers/ffmpeg.py | 119 +++++++++++++++++++++ utils/tg.py | 6 +- 4 files changed, 259 insertions(+), 112 deletions(-) diff --git a/handlers/file.py b/handlers/file.py index cc9cab1..ec612d3 100644 --- a/handlers/file.py +++ b/handlers/file.py @@ -16,6 +16,13 @@ from utils.tg import is_supported_mime, sanitize_filename from utils.speechkit import cost_yc_async_rub, format_duration, MAX_AUDIO_DURATION from utils.tg import extract_local_path +from schedulers.ffmpeg import ( + BASE_MESSAGE, + start_tracking, + start_conversion, + stop_tracking, + update_download, +) USE_LOCAL_PTB = os.environ.get("USE_LOCAL_PTB") is not None @@ -30,128 +37,143 @@ async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non if user is None: user = add_user(telegram_id, message.from_user.username) - await message.reply_text( - "Файл получен\n\n" - "Определяю длительность и стоимость перевода в текст\n" - "Скоро попрошу подтвердить запуск задачи...", - ) - + status_message = await message.reply_text(BASE_MESSAGE) file = None mime = "" file_name = "file" - if message.document: - file = await message.document.get_file(read_timeout=120) - mime = message.document.mime_type or "" - file_name = message.document.file_name or file_name - elif message.audio: - file = await message.audio.get_file(read_timeout=120) - mime = message.audio.mime_type or "" - file_name = message.audio.file_name or file_name - elif message.video: - file = await message.video.get_file(read_timeout=120) - mime = message.video.mime_type or "" - file_name = message.video.file_name or file_name - elif message.voice: - file = await message.voice.get_file(read_timeout=120) - mime = "audio/ogg" - file_name = "voice.ogg" - else: - await message.reply_text("Файл не поддерживается") - return - - if not is_supported_mime(mime): - await message.reply_text( - "Этот тип файла не поддерживается\n" - "Пожалуйста, отправьте видео или аудио" - ) - return - - with tempfile.TemporaryDirectory() as workdir: - workdir = Path(workdir) - in_dir = workdir / "in" - out_dir = workdir / "out" - in_dir.mkdir() - out_dir.mkdir() - - local_path = in_dir / Path(file_name).name + tracking_key = start_tracking( + context.bot_data, message.chat_id, status_message.message_id + ) - if USE_LOCAL_PTB: - file_path = extract_local_path(file.file_path) - shutil.copy(file_path, local_path) # читаем напрямую + def on_download_progress(current: int, total: int, bot_data, key): + percent = int(current / total * 100) if total else 0 + update_download(bot_data, key, percent) + + try: + if message.document: + file = await message.document.get_file(read_timeout=120) + mime = message.document.mime_type or "" + file_name = message.document.file_name or file_name + elif message.audio: + file = await message.audio.get_file(read_timeout=120) + mime = message.audio.mime_type or "" + file_name = message.audio.file_name or file_name + elif message.video: + file = await message.video.get_file(read_timeout=120) + mime = message.video.mime_type or "" + file_name = message.video.file_name or file_name + elif message.voice: + file = await message.voice.get_file(read_timeout=120) + mime = "audio/ogg" + file_name = "voice.ogg" else: - await file.download_to_drive(custom_path=str(local_path)) - - duration = await get_media_duration(local_path) - if not duration: - await message.reply_text( - "Не удалось определить длительность файла\n" - "Возможно, формат не поддерживается или файл повреждён" - ) + await message.reply_text("Файл не поддерживается") return - duration_str = format_duration(int(duration)) - if duration > MAX_AUDIO_DURATION: + if not is_supported_mime(mime): await message.reply_text( - "Файл слишком длинный: {duration_str}\n" - "Максимально допустимая длительность — 4 часа" + "Этот тип файла не поддерживается\n" + "Пожалуйста, отправьте видео или аудио" ) return - price = cost_yc_async_rub(duration) - price_dec = Decimal(price) - if user.balance < price_dec: - await message.reply_text( - f"Недостаточно средств\n" - f"Баланс: {user.balance} ₽, требуется: {price} ₽\n\n" - f"Для пополнения баланса используйте команду /topup" - ) - return - - safe_stem = sanitize_filename(local_path.stem) - - ogg_name = f"{safe_stem}.ogg" - ogg_path = out_dir / ogg_name - - progress_name = f"{safe_stem}.progress" - progress_path = out_dir / progress_name - - success = await convert_to_ogg(local_path, ogg_path, progress_path) - if not success: - await message.reply_text( - "Не удалось преобразовать файл\n" - "Возможно, он имеет неподдерживаемый формат" - ) - return - - object_name = f"source/{telegram_id}/{ogg_path.name}" - s3_uri = await upload_file(ogg_path, object_name) - if s3_uri is None: - await message.reply_text( - "Не удалось загрузить файл\n" - "Пожалуйста, попробуйте ещё раз чуть позже" - ) - return + with tempfile.TemporaryDirectory() as workdir: + workdir = Path(workdir) + in_dir = workdir / "in" + out_dir = workdir / "out" + in_dir.mkdir() + out_dir.mkdir() + + local_path = in_dir / Path(file_name).name + + update_download(context.bot_data, tracking_key, 0) + + if USE_LOCAL_PTB: + file_path = extract_local_path(file.file_path) + shutil.copy(file_path, local_path) # читаем напрямую + update_download(context.bot_data, tracking_key, 100) + else: + await file.download_to_drive( + custom_path=str(local_path), + progress=on_download_progress, + progress_args=(context.bot_data, tracking_key), + ) + update_download(context.bot_data, tracking_key, 100) + + duration = await get_media_duration(local_path) + if not duration: + await message.reply_text( + "Не удалось определить длительность файла\n" + "Возможно, формат не поддерживается или файл повреждён" + ) + return + + duration_str = format_duration(int(duration)) + if duration > MAX_AUDIO_DURATION: + await message.reply_text( + "Файл слишком длинный: {duration_str}\n" + "Максимально допустимая длительность — 4 часа" + ) + return + + price = cost_yc_async_rub(duration) + price_dec = Decimal(price) + if user.balance < price_dec: + await message.reply_text( + f"Недостаточно средств\n" + f"Баланс: {user.balance} ₽, требуется: {price} ₽\n\n" + f"Для пополнения баланса используйте команду /topup" + ) + return + + safe_stem = sanitize_filename(local_path.stem) + + ogg_name = f"{safe_stem}.ogg" + ogg_path = out_dir / ogg_name + + progress_name = f"{safe_stem}.progress" + progress_path = out_dir / progress_name + + start_conversion(context.bot_data, tracking_key, progress_path, duration) + success = await convert_to_ogg(local_path, ogg_path, progress_path) + if not success: + await message.reply_text( + "Не удалось преобразовать файл\n" + "Возможно, он имеет неподдерживаемый формат" + ) + return + + object_name = f"source/{telegram_id}/{ogg_path.name}" + s3_uri = await upload_file(ogg_path, object_name) + if s3_uri is None: + await message.reply_text( + "Не удалось загрузить файл\n" + "Пожалуйста, попробуйте ещё раз чуть позже" + ) + return + + history = add_transcription( + telegram_id=telegram_id, + status="pending", + audio_s3_path=s3_uri, + duration_seconds=int(duration), + price_rub=price_dec, + result_s3_path=None, + ) - history = add_transcription( - telegram_id=telegram_id, - status="pending", - audio_s3_path=s3_uri, - duration_seconds=int(duration), - price_rub=price_dec, - result_s3_path=None, - ) + buttons = [ + InlineKeyboardButton( + "Распознать", callback_data=f"create_task:{history.id}" + ), + InlineKeyboardButton( + "Отменить", callback_data=f"cancel_task:{history.id}" + ), + ] - buttons = [ - InlineKeyboardButton( - "Распознать", callback_data=f"create_task:{history.id}" - ), - InlineKeyboardButton( - "Отменить", callback_data=f"cancel_task:{history.id}" - ), - ] - - await message.reply_text( - f"Длительность: {duration_str}\nСтоимость: {price} ₽", - reply_markup=InlineKeyboardMarkup([buttons]), - ) + await message.reply_text( + f"Длительность: {duration_str}\nСтоимость: {price} ₽", + reply_markup=InlineKeyboardMarkup([buttons]), + ) + finally: + stop_tracking(context.bot_data, tracking_key) diff --git a/main.py b/main.py index a5df802..63c9292 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,7 @@ ) from schedulers.transcription import check_running_tasks +from schedulers.ffmpeg import update_ffmpeg_messages from handlers.balance import handle_balance from handlers.cancel_task import handle_cancel_task @@ -89,6 +90,7 @@ def main() -> None: CallbackQueryHandler(handle_cancel_payment, pattern=r"^payment:cancel:.+$") ) application.job_queue.run_repeating(check_running_tasks, interval=1.0) + application.job_queue.run_repeating(update_ffmpeg_messages, interval=1.0) application.run_polling() diff --git a/schedulers/ffmpeg.py b/schedulers/ffmpeg.py index e69de29..afec498 100644 --- a/schedulers/ffmpeg.py +++ b/schedulers/ffmpeg.py @@ -0,0 +1,119 @@ +"""Periodic scheduler for updating ffmpeg preparation messages.""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Tuple + +from telegram.ext import ContextTypes + +from utils.ffmpeg import get_conversion_progress +from utils.tg import safe_edit_message_text + + +BASE_MESSAGE = ( + "Файл получен\n\n" + "Определяю длительность и стоимость перевода в текст\n" + "Скоро попрошу подтвердить запуск задачи..." +) + + +@dataclass +class FFMpegStatus: + """State for a single Telegram message with preparation progress.""" + + chat_id: int + message_id: int + download_progress: int | None = None + progress_file: Path | None = None + duration_seconds: float | None = None + conversion_started_at: float | None = None + last_text: str | None = None + + def as_text(self) -> str: + """Render current status text.""" + + parts = [BASE_MESSAGE] + + if self.download_progress is not None: + parts.append(f"\n\nСкачивание: {self.download_progress}%") + + conversion_progress = self.current_conversion_progress() + if conversion_progress is not None: + parts.append(f"\nКонвертация: {conversion_progress}%") + + return "".join(parts) + + def current_conversion_progress(self) -> int | None: + if ( + self.progress_file is None + or self.duration_seconds is None + or self.conversion_started_at is None + ): + return None + + percent, _, _ = get_conversion_progress( + self.progress_file, self.duration_seconds, self.conversion_started_at + ) + return percent + + +def _storage(bot_data) -> Dict[Tuple[int, int], FFMpegStatus]: + return bot_data.setdefault("ffmpeg_tasks", {}) + + +def start_tracking(bot_data, chat_id: int, message_id: int) -> Tuple[int, int]: + """Create a tracking entry and return its key.""" + + key = (chat_id, message_id) + _storage(bot_data)[key] = FFMpegStatus(chat_id=chat_id, message_id=message_id) + return key + + +def update_download(bot_data, key: Tuple[int, int], percent: int) -> None: + """Update download progress for the tracked message.""" + + task = _storage(bot_data).get(key) + if task: + task.download_progress = max(0, min(100, percent)) + + +def start_conversion( + bot_data, + key: Tuple[int, int], + progress_file: Path, + duration_seconds: float, +) -> None: + """Mark that conversion has started for the tracked message.""" + + task = _storage(bot_data).get(key) + if task: + task.progress_file = Path(progress_file) + task.duration_seconds = duration_seconds + task.conversion_started_at = time.time() + + +def stop_tracking(bot_data, key: Tuple[int, int]) -> None: + """Remove tracking entry when work is finished.""" + + _storage(bot_data).pop(key, None) + + +async def update_ffmpeg_messages(context: ContextTypes.DEFAULT_TYPE) -> None: + """Periodically update Telegram messages with download/conversion progress.""" + + tasks = _storage(context.bot_data) + for key, task in list(tasks.items()): + text = task.as_text() + if text == task.last_text: + continue + + await safe_edit_message_text( + context.bot, + task.chat_id, + task.message_id, + text, + ) + task.last_text = text diff --git a/utils/tg.py b/utils/tg.py index fd4d03a..9464254 100644 --- a/utils/tg.py +++ b/utils/tg.py @@ -46,10 +46,14 @@ def is_supported_mime(mime: str) -> bool: return mime.startswith("audio/") or mime.startswith("video/") -async def safe_edit_message_text(bot, chat_id, message_id, text): +async def safe_edit_message_text(bot, chat_id, message_id, *text_parts): """Safely edit a message text, catching exceptions.""" + if not chat_id or not message_id: return + + text = "".join(text_parts) + try: await bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text) except Exception as e: