From dbdd04e71b832a6ec6794fe38b1760665f2f55e9 Mon Sep 17 00:00:00 2001 From: PokingTeemo Date: Fri, 20 Dec 2024 16:25:57 +0900 Subject: [PATCH 1/4] =?UTF-8?q?fix:=20OCR,=20STT=20=EC=9D=91=EB=8B=B5=20?= =?UTF-8?q?=ED=98=95=EC=8B=9D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 스크립트를 내용 안에 넣기 --- app/api/endpoint/mediation_service.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/app/api/endpoint/mediation_service.py b/app/api/endpoint/mediation_service.py index be7591f3..115c1807 100644 --- a/app/api/endpoint/mediation_service.py +++ b/app/api/endpoint/mediation_service.py @@ -76,7 +76,7 @@ async def analyze_conflict(request: ConflictAnalysisRequest): logger.error(f"Unexpected error during conflict analysis: {e}") raise HTTPException(status_code=500, detail="Internal server error during conflict analysis") -@router.post("/speech-to-text", response_model=VoiceInfo, status_code=201) +@router.post("/speech-to-text", response_model=dict, status_code=201) async def get_voice(request: STTRequest): logger.info("get_infos start") logger.info(f"audio URL : {request.url}") @@ -100,17 +100,13 @@ async def get_voice(request: STTRequest): raise HTTPException(status_code=500, detail="STT_PROCESSING_ERROR") # 응답 생성 - response = VoiceInfo( - status="Created", - timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - data=DataInfoSTT( - script=transcription - ) - ) + response = { + "script": transcription + } logger.info(f"Response: {response}") return response -@router.post("/image-to-text", response_model=VoiceInfo, status_code=201) +@router.post("/image-to-text", response_model=dict, status_code=201) async def get_image(request: STTRequest): logger.info("get_infos start") logger.info(f"image URL : {request.url}") @@ -135,17 +131,12 @@ async def get_image(request: STTRequest): raise HTTPException(status_code=500, detail="OCR_PROCESSING_ERROR") # 응답 생성 - response = VoiceInfo( - status="Created", - timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - data=DataInfoSTT( - script=transcription - ) - ) + response = { + "script": transcription + } logger.info(f"Response: {response}") return response - #동기식 함수 # @router.post("/judgement", response_model=DataInfoSummary, status_code=201) # async def process_judge(request: JudgeRequest): From 43b704459bdf521cf0c9f7a8af040a31dd50fc74 Mon Sep 17 00:00:00 2001 From: KHyunJoong Date: Sat, 21 Dec 2024 02:12:54 +0900 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20ocr=20Mq=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoint/mediation_service.py | 156 ++++++++++++++++++++++++-- app/core/config.py | 1 + app/main.py | 15 ++- app/models/info.py | 8 +- 4 files changed, 169 insertions(+), 11 deletions(-) diff --git a/app/api/endpoint/mediation_service.py b/app/api/endpoint/mediation_service.py index 115c1807..8e963932 100644 --- a/app/api/endpoint/mediation_service.py +++ b/app/api/endpoint/mediation_service.py @@ -3,8 +3,9 @@ from fastapi import APIRouter, HTTPException, Header, status, BackgroundTasks from botocore.exceptions import ClientError from datetime import datetime -from models.info import DataInfoSummary, VoiceInfo, DataInfoSTT,JudgeRequest,STTRequest, ConflictAnalysisRequest,ConflictAnalysisResponseData, ConflictAnalysisResponse -from services.situation_summary import situation_summary_GPT,stt_model,generate_response,test_response +from pydantic import ValidationError +from models.info import DataInfoSummary, VoiceInfo, DataInfoSTT, JudgeRequest, STTRequest, ConflictAnalysisRequest, ConflictAnalysisResponseData, ConflictAnalysisResponse, DataInfoOCR +from services.situation_summary import situation_summary_GPT, stt_model, generate_response, test_response from services.audio_process import process_audio_file from services.image_process import process_image_file from core.logging import logger @@ -43,7 +44,8 @@ async def analyze_conflict(request: ConflictAnalysisRequest): backend_payload = analysis_result['data'] # 백엔드 서버 URL 설정 -> 이부분 수정해야 함 - backend_server_url = os.getenv("BACKEND_SERVER_URL", "https://api.ktb-aimo.link/api/v1/private-posts/judgement/callback") + backend_server_url = os.getenv("BACKEND_SERVER_URL", + "https://api.ktb-aimo.link/api/v1/private-posts/judgement/callback") # 백엔드 서버로 데이터 전송 async with httpx.AsyncClient() as client: @@ -76,6 +78,7 @@ async def analyze_conflict(request: ConflictAnalysisRequest): logger.error(f"Unexpected error during conflict analysis: {e}") raise HTTPException(status_code=500, detail="Internal server error during conflict analysis") + @router.post("/speech-to-text", response_model=dict, status_code=201) async def get_voice(request: STTRequest): logger.info("get_infos start") @@ -106,6 +109,7 @@ async def get_voice(request: STTRequest): logger.info(f"Response: {response}") return response + @router.post("/image-to-text", response_model=dict, status_code=201) async def get_image(request: STTRequest): logger.info("get_infos start") @@ -416,10 +420,13 @@ def process_message(ch, method, properties, body): logger.error(f"Error processing message: {e}") # 실패 메시지 재시도 방지 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + + port = 5671 # vhost = "/" rabbitmq_url = f"amqps://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_URL}:{port}" + def start_worker(): """ RabbitMQ 워커 시작 @@ -459,11 +466,142 @@ def start_worker(): except Exception as e: logger.error(f"알 수 없는 오류 발생: {e}") -# def process_message(ch, method, properties, body): + +###################### +OCR_CALLBACK_URL = settings.OCR_CALLBACK_URL +####################### +# #OCR 비동기 처리입니다. 필요시 활성화 하세요 +# +# @router.post("/image-to-text", response_model=dict, status_code=202) +# async def process_image(request: STTRequest, background_tasks: BackgroundTasks): # """ -# 메시지 처리 콜백 함수 +# 요청을 수락하고 202 응답을 반환. +# BackgroundTasks를 이용해 OCR 작업 수행 후 결과를 CALLBACK_URL로 POST 전송. # """ -# decoded_body = body.decode('utf-8') -# logger.info(f"Received message: {decoded_body}") -# # 메시지 처리 로직 추가 -# ch.basic_ack(delivery_tag=method.delivery_tag) # 메시지 확인 +# logger.info("Received image-to-text request") +# logger.info(f"Image URL: {request.url}") +# +# # URL이 제공되지 않은 경우 오류 반환 +# if not request.url: +# raise HTTPException(status_code=400, detail="URL_NOT_PROVIDED") +# +# # Background 작업 등록 +# logger.info("Starting background task for OCR processing...") +# background_tasks.add_task(execute_OCR_and_callback, request.url) +# +# # 202 Accepted 응답 반환 +# return {"status": "accepted", "message": "OCR processing started."} + +#################3 +#OCR_MQ부분입니다. +def execute_OCR_and_callback(url: str): + """ + OCR 호출 후 결과를 OCR_CALLBACK_URL로 전송 + """ + try: + logger.info("Executing OCR function...") + transcription = process_image_file(url) # OCR 처리 + + # DataInfoOCR 데이터 생성 + ocr_data = { + "status": bool(transcription), # script가 있으면 True + "url": url, + "script": transcription, + "accessKey": ACCESSTOKEN + } + + # DataInfoOCR 모델로 데이터 검증 + try: + validated_data = DataInfoOCR(**ocr_data) + except ValidationError as e: + logger.error(f"Validation error in OCR data: {e}") + raise ValueError("Invalid data format for OCR callback.") + + # 콜백 URL로 데이터 전송 + logger.info(f"Sending POST request to OCR_CALLBACK_URL with data: {validated_data.dict()}") + response = requests.post(OCR_CALLBACK_URL, json=validated_data.dict()) + logger.info(f"Callback response: {response.status_code}, {response.text}") + + except Exception as e: + logger.error(f"Error during OCR processing: {e}") + # 실패한 경우 콜백 URL로 에러 메시지 전송 + error_response = { + "status": False, + "url": url, + "script": None, + "accessKey": ACCESSTOKEN + } + requests.post(CALLBACK_URL, json=error_response) + + +def process_message(ch, method, properties, body): + """ + RabbitMQ 메시지 소비 후 OCR 처리 및 콜백 전송 + """ + try: + # 메시지 디코딩 및 검증 + decoded_body = body.decode('utf-8') + logger.info(f"Received message: {decoded_body}") + message = json.loads(decoded_body) + + # 데이터 검증 + try: + request_data = STTRequest(**message) + except ValidationError as e: + logger.error(f"Validation error in received message: {e}") + raise ValueError("Invalid message format.") + + # OCR 처리 및 콜백 전송 + execute_OCR_and_callback(request_data.url) + + # 메시지 처리 완료 + ch.basic_ack(delivery_tag=method.delivery_tag) + logger.info(f"Message processed successfully: {request_data.url}") + + except Exception as e: + logger.error(f"Error processing message: {e}") + # 실패 메시지 재시도 방지 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + + +port = 5671 +vhost = "/" +rabbitmq_url = f"amqps://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_URL}:{port}" + + +def start_second_worker(): + """ + RabbitMQ 워커 시작 + """ + try: + # SSL 컨텍스트 설정 + ssl_context = ssl.create_default_context() + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_default_certs() + + # RabbitMQ 연결 설정 + params = pika.URLParameters(rabbitmq_url) + params.ssl_options = pika.SSLOptions(ssl_context) + + connection = pika.BlockingConnection(params) + channel = connection.channel() + + # Exchange 및 Queue 설정 + channel.exchange_declare(exchange="aiOCRExchange", exchange_type="direct", durable=True) + channel.queue_declare(queue="aiOCRQueue", durable=True) + channel.queue_bind(exchange="aiOCRExchange", queue="aiOCRQueue", routing_key="ai.ocr.key") + + # 메시지 소비 시작 + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue="aiOCRQueue", on_message_callback=process_message) + + logger.info("OCR Worker is waiting for messages...") + channel.start_consuming() + + except ssl.SSLError as e: + logger.error(f"SSL 연결 오류 발생: {e}") + except pika.exceptions.AMQPConnectionError as e: + logger.error(f"OCR RabbitMQ 연결 오류 발생: {e}") + except Exception as e: + logger.error(f"알 수 없는 오류 발생: {e}") diff --git a/app/core/config.py b/app/core/config.py index c936b2f0..447415f2 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -5,6 +5,7 @@ class Settings(BaseSettings): # DATABASE_URL: str = " " OPENAI_API_KEY: str = "" CALLBACK_URL: str = "" + OCR_CALLBACK_URL: str = "" ACCESSTOKEN: str = "" #MQ RABBITMQ_URL: str = "" diff --git a/app/main.py b/app/main.py index b198fdb0..3c1395bb 100644 --- a/app/main.py +++ b/app/main.py @@ -4,7 +4,7 @@ from core.config import settings import asyncio import threading -from api.endpoint.mediation_service import start_worker +from api.endpoint.mediation_service import start_worker, start_second_worker from contextlib import asynccontextmanager from core.logging import logger @@ -16,6 +16,14 @@ def run_worker_in_thread(): asyncio.set_event_loop(loop) start_worker() +def run_second_worker_in_thread(): + """ + 두 번째 RabbitMQ 워커를 별도 스레드에서 실행 + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + start_second_worker() # 두 번째 워커 실행 + @asynccontextmanager async def lifespan(app: FastAPI): """ @@ -26,6 +34,11 @@ async def lifespan(app: FastAPI): worker_thread.start() logger.info("RabbitMQ worker thread started.") + # 두 번째 워커 실행 + second_worker_thread = threading.Thread(target=run_second_worker_in_thread, daemon=True) + second_worker_thread.start() + logger.info("Second OCR RabbitMQ worker thread started.") + yield # 애플리케이션 실행 유지 logger.info("Shutting down application...") diff --git a/app/models/info.py b/app/models/info.py index 98540363..fec7b1c1 100644 --- a/app/models/info.py +++ b/app/models/info.py @@ -3,6 +3,12 @@ from datetime import datetime, date from services.emotion_behavior_situation import RelationshipAnalyzer, SituationSummary +class DataInfoOCR(BaseModel): + status: bool + url: str + script: Optional[str] + accessKey: str + class DataInfoSTT(BaseModel): script: Optional[str] @@ -15,7 +21,7 @@ class DataInfoSummary(BaseModel): summaryAi: Optional[str] judgement: Optional[str] faultRate: Optional[float] - accesstoken: str + accessKey: str class VoiceInfo(BaseModel): status: Optional[str] From 70d1762541f7945f5d8f4fcbec8d86944ba6f740 Mon Sep 17 00:00:00 2001 From: KHyunJoong Date: Sat, 21 Dec 2024 09:41:11 +0900 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20ocr=20Mq=20=EC=84=A4=EC=A0=95=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoint/mediation_service.py | 27 +++++++++++++++++---------- app/models/info.py | 5 +++++ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/app/api/endpoint/mediation_service.py b/app/api/endpoint/mediation_service.py index 8e963932..4996cb9e 100644 --- a/app/api/endpoint/mediation_service.py +++ b/app/api/endpoint/mediation_service.py @@ -1,10 +1,13 @@ import json +import asyncio import ssl from fastapi import APIRouter, HTTPException, Header, status, BackgroundTasks from botocore.exceptions import ClientError from datetime import datetime from pydantic import ValidationError -from models.info import DataInfoSummary, VoiceInfo, DataInfoSTT, JudgeRequest, STTRequest, ConflictAnalysisRequest, ConflictAnalysisResponseData, ConflictAnalysisResponse, DataInfoOCR +from models.info import (DataInfoSummary, VoiceInfo, DataInfoSTT, JudgeRequest, STTRequest, + ConflictAnalysisRequest, ConflictAnalysisResponseData, + ConflictAnalysisResponse, DataInfoOCR, OCRRequest) from services.situation_summary import situation_summary_GPT, stt_model, generate_response, test_response from services.audio_process import process_audio_file from services.image_process import process_image_file @@ -494,17 +497,20 @@ def start_worker(): #################3 #OCR_MQ부분입니다. -def execute_OCR_and_callback(url: str): +async def execute_OCR_and_callback(id: int, url: str): """ OCR 호출 후 결과를 OCR_CALLBACK_URL로 전송 """ try: - logger.info("Executing OCR function...") - transcription = process_image_file(url) # OCR 처리 + logger.info(f"Executing OCR function for ID: {id}, URL: {url}") + + # 비동기 OCR 처리 + transcription = await process_image_file(url) # DataInfoOCR 데이터 생성 ocr_data = { "status": bool(transcription), # script가 있으면 True + "id": id, "url": url, "script": transcription, "accessKey": ACCESSTOKEN @@ -523,15 +529,16 @@ def execute_OCR_and_callback(url: str): logger.info(f"Callback response: {response.status_code}, {response.text}") except Exception as e: - logger.error(f"Error during OCR processing: {e}") + logger.error(f"Error during OCR processing for ID: {id}, URL: {url}, Error: {e}") # 실패한 경우 콜백 URL로 에러 메시지 전송 error_response = { "status": False, + "id": id, "url": url, "script": None, "accessKey": ACCESSTOKEN } - requests.post(CALLBACK_URL, json=error_response) + requests.post(OCR_CALLBACK_URL, json=error_response) def process_message(ch, method, properties, body): @@ -546,17 +553,17 @@ def process_message(ch, method, properties, body): # 데이터 검증 try: - request_data = STTRequest(**message) + request_data = OCRRequest(**message) except ValidationError as e: logger.error(f"Validation error in received message: {e}") raise ValueError("Invalid message format.") - # OCR 처리 및 콜백 전송 - execute_OCR_and_callback(request_data.url) + # 비동기 OCR 처리 및 콜백 전송 + asyncio.run(execute_OCR_and_callback(request_data.id, request_data.url)) # 메시지 처리 완료 ch.basic_ack(delivery_tag=method.delivery_tag) - logger.info(f"Message processed successfully: {request_data.url}") + logger.info(f"Message processed successfully: ID {request_data.id}, URL {request_data.url}") except Exception as e: logger.error(f"Error processing message: {e}") diff --git a/app/models/info.py b/app/models/info.py index fec7b1c1..d1ce7e75 100644 --- a/app/models/info.py +++ b/app/models/info.py @@ -3,8 +3,13 @@ from datetime import datetime, date from services.emotion_behavior_situation import RelationshipAnalyzer, SituationSummary +class OCRRequest(BaseModel): + id: int + url: str + class DataInfoOCR(BaseModel): status: bool + id: int url: str script: Optional[str] accessKey: str From 37154529353cabafc69a8b54e47ee50c9de35be5 Mon Sep 17 00:00:00 2001 From: KHyunJoong Date: Mon, 23 Dec 2024 04:08:41 +0900 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20ocr,stt=20Mq=20=EC=84=A4=EC=A0=95=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoint/mediation_service.py | 186 ++++++++++++++++++++++++-- app/core/config.py | 1 + app/main.py | 34 +++-- app/models/info.py | 19 ++- 4 files changed, 211 insertions(+), 29 deletions(-) diff --git a/app/api/endpoint/mediation_service.py b/app/api/endpoint/mediation_service.py index 4996cb9e..55d833dd 100644 --- a/app/api/endpoint/mediation_service.py +++ b/app/api/endpoint/mediation_service.py @@ -5,7 +5,7 @@ from botocore.exceptions import ClientError from datetime import datetime from pydantic import ValidationError -from models.info import (DataInfoSummary, VoiceInfo, DataInfoSTT, JudgeRequest, STTRequest, +from models.info import (DataInfoSummary, DataInfoSTT, JudgeRequest, STTRequest, ConflictAnalysisRequest, ConflictAnalysisResponseData, ConflictAnalysisResponse, DataInfoOCR, OCRRequest) from services.situation_summary import situation_summary_GPT, stt_model, generate_response, test_response @@ -497,12 +497,12 @@ def start_worker(): #################3 #OCR_MQ부분입니다. -async def execute_OCR_and_callback(id: int, url: str): +async def execute_OCR_and_callback(member_id: int, private_post_id: int, url: str): """ OCR 호출 후 결과를 OCR_CALLBACK_URL로 전송 """ try: - logger.info(f"Executing OCR function for ID: {id}, URL: {url}") + logger.info(f"Executing OCR function for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}") # 비동기 OCR 처리 transcription = await process_image_file(url) @@ -510,7 +510,8 @@ async def execute_OCR_and_callback(id: int, url: str): # DataInfoOCR 데이터 생성 ocr_data = { "status": bool(transcription), # script가 있으면 True - "id": id, + "memberId": member_id, + "privatePostId": private_post_id, "url": url, "script": transcription, "accessKey": ACCESSTOKEN @@ -529,11 +530,12 @@ async def execute_OCR_and_callback(id: int, url: str): logger.info(f"Callback response: {response.status_code}, {response.text}") except Exception as e: - logger.error(f"Error during OCR processing for ID: {id}, URL: {url}, Error: {e}") + logger.error(f"Error during OCR processing for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}, Error: {e}") # 실패한 경우 콜백 URL로 에러 메시지 전송 error_response = { "status": False, - "id": id, + "memberId": member_id, + "privatePostId": private_post_id, "url": url, "script": None, "accessKey": ACCESSTOKEN @@ -559,24 +561,23 @@ def process_message(ch, method, properties, body): raise ValueError("Invalid message format.") # 비동기 OCR 처리 및 콜백 전송 - asyncio.run(execute_OCR_and_callback(request_data.id, request_data.url)) + asyncio.run(execute_OCR_and_callback(request_data.memberId, request_data.privatePostId, request_data.url)) # 메시지 처리 완료 ch.basic_ack(delivery_tag=method.delivery_tag) - logger.info(f"Message processed successfully: ID {request_data.id}, URL {request_data.url}") + logger.info(f"Message processed successfully: Member ID {request_data.memberId}, Private Post ID {request_data.privatePostId}, URL {request_data.url}") except Exception as e: logger.error(f"Error processing message: {e}") # 실패 메시지 재시도 방지 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + except Exception as e: + logger.error(f"Error processing message: {e}") + # 실패 메시지 재시도 방지 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) -port = 5671 -vhost = "/" -rabbitmq_url = f"amqps://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_URL}:{port}" - - -def start_second_worker(): +def start_ocr_worker(): """ RabbitMQ 워커 시작 """ @@ -612,3 +613,160 @@ def start_second_worker(): logger.error(f"OCR RabbitMQ 연결 오류 발생: {e}") except Exception as e: logger.error(f"알 수 없는 오류 발생: {e}") + + + +################################# + + +###################### +STT_CALLBACK_URL = settings.STT_CALLBACK_URL +####################### +# #OCR 비동기 처리입니다. 필요시 활성화 하세요 +# +# @router.post("/speech-to-text", response_model=dict, status_code=202) +# async def process_speech(request: STTRequest, background_tasks: BackgroundTasks): +# """ +# 요청을 수락하고 202 응답을 반환. +# BackgroundTasks를 이용해 STT 작업 수행 후 결과를 CALLBACK_URL로 POST 전송. +# """ +# logger.info("Received speech-to-text request") +# logger.info(f"Member ID: {request.memberId}, Private Post ID: {request.privatePostId}, Audio URL: {request.url}") +# +# # URL 및 필드가 제공되지 않은 경우 오류 반환 +# if not request.url: +# raise HTTPException(status_code=400, detail="URL_NOT_PROVIDED") +# +# # Background 작업 등록 +# logger.info("Starting background task for STT processing...") +# background_tasks.add_task( +# execute_STT_and_callback, request.memberId, request.privatePostId, request.url +# ) +# +# # 202 Accepted 응답 반환 +# return {"status": "accepted", "message": "STT processing started."} + +#################3 +#OCR_MQ부분입니다. +async def execute_STT_and_callback(member_id: int, private_post_id: int, url: str): + """ + STT 호출 후 결과를 STT_CALLBACK_URL로 전송 + """ + try: + logger.info(f"Executing STT function for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}") + + # 비동기 STT 처리 + transcription = await process_audio_file(url) + + # DataInfoSTT 데이터 생성 + stt_data = { + "status": bool(transcription), # script가 있으면 True + "memberId": member_id, + "privatePostId": private_post_id, + "url": url, + "script": transcription, + "accessKey": ACCESSTOKEN + } + + # DataInfoSTT 모델로 데이터 검증 + try: + validated_data = DataInfoSTT(**stt_data) + except ValidationError as e: + logger.error(f"Validation error in STT data: {e}") + raise ValueError("Invalid data format for STT callback.") + + # 콜백 URL로 데이터 전송 + logger.info(f"Sending POST request to STT_CALLBACK_URL with data: {validated_data.dict()}") + response = requests.post(STT_CALLBACK_URL, json=validated_data.dict()) + logger.info(f"Callback response: {response.status_code}, {response.text}") + + except Exception as e: + logger.error(f"Error during STT processing for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}, Error: {e}") + # 실패한 경우 콜백 URL로 에러 메시지 전송 + error_response = { + "status": False, + "memberId": member_id, + "privatePostId": private_post_id, + "url": url, + "script": None, + "accessKey": ACCESSTOKEN + } + requests.post(STT_CALLBACK_URL, json=error_response) + + +def process_stt_message(ch, method, properties, body): + """ + RabbitMQ 메시지 소비 후 STT 처리 및 콜백 전송 + """ + try: + # 메시지 디코딩 및 검증 + decoded_body = body.decode('utf-8') + logger.info(f"Received STT message: {decoded_body}") + message = json.loads(decoded_body) + + # 데이터 검증 + try: + request_data = STTRequest(**message) + except ValidationError as e: + logger.error(f"Validation error in received STT message: {e}") + raise ValueError("Invalid message format.") + + # 비동기 STT 처리 및 콜백 전송 + asyncio.run( + execute_STT_and_callback( + request_data.memberId, request_data.privatePostId, request_data.url + ) + ) + + # 메시지 처리 완료 + ch.basic_ack(delivery_tag=method.delivery_tag) + logger.info( + f"Message processed successfully: Member ID {request_data.memberId}, Private Post ID {request_data.privatePostId}, URL {request_data.url}" + ) + + except Exception as e: + logger.error(f"Error processing STT message: {e}") + # 실패 메시지 재시도 방지 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def start_stt_worker(): + """ + STT RabbitMQ 워커 시작 + """ + try: + # SSL 컨텍스트 설정 + ssl_context = ssl.create_default_context() + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_default_certs() + + # RabbitMQ 연결 설정 + + params = pika.URLParameters(rabbitmq_url) + params.ssl_options = pika.SSLOptions(ssl_context) + + connection = pika.BlockingConnection(params) + channel = connection.channel() + + # Exchange 및 Queue 설정 + channel.exchange_declare(exchange="aiSTTExchange", exchange_type="direct", durable=True) + channel.queue_declare(queue="aiSTTQueue", durable=True) + channel.queue_bind(exchange="aiSTTExchange", queue="aiSTTQueue", routing_key="ai.stt.key") + + # 메시지 소비 시작 + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue="aiSTTQueue", on_message_callback=process_stt_message) + + logger.info("STT Worker is waiting for messages...") + channel.start_consuming() + + except ssl.SSLError as e: + logger.error(f"SSL 연결 오류 발생: {e}") + except pika.exceptions.AMQPConnectionError as e: + logger.error(f"STT RabbitMQ 연결 오류 발생: {e}") + except Exception as e: + logger.error(f"알 수 없는 오류 발생: {e}") + + + +################################# \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py index 447415f2..8a19f7ac 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -6,6 +6,7 @@ class Settings(BaseSettings): OPENAI_API_KEY: str = "" CALLBACK_URL: str = "" OCR_CALLBACK_URL: str = "" + STT_CALLBACK_URL: str = "" ACCESSTOKEN: str = "" #MQ RABBITMQ_URL: str = "" diff --git a/app/main.py b/app/main.py index 3c1395bb..fef277e8 100644 --- a/app/main.py +++ b/app/main.py @@ -4,7 +4,7 @@ from core.config import settings import asyncio import threading -from api.endpoint.mediation_service import start_worker, start_second_worker +from api.endpoint.mediation_service import start_worker, start_ocr_worker, start_stt_worker from contextlib import asynccontextmanager from core.logging import logger @@ -16,33 +16,49 @@ def run_worker_in_thread(): asyncio.set_event_loop(loop) start_worker() -def run_second_worker_in_thread(): +def run_ocr_worker_in_thread(): """ - 두 번째 RabbitMQ 워커를 별도 스레드에서 실행 + OCR RabbitMQ 워커를 별도 스레드에서 실행 """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - start_second_worker() # 두 번째 워커 실행 + start_ocr_worker() +def run_stt_worker_in_thread(): + """ + 세 번째 RabbitMQ 워커(STT)를 별도 스레드에서 실행 + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + start_stt_worker() @asynccontextmanager async def lifespan(app: FastAPI): """ Lifespan 핸들러로 RabbitMQ 워커 스레드 관리 """ logger.info("Initializing application...") + + # 첫 번째 워커 실행 worker_thread = threading.Thread(target=run_worker_in_thread, daemon=True) worker_thread.start() logger.info("RabbitMQ worker thread started.") - # 두 번째 워커 실행 - second_worker_thread = threading.Thread(target=run_second_worker_in_thread, daemon=True) - second_worker_thread.start() - logger.info("Second OCR RabbitMQ worker thread started.") + # OCR 워커 실행 + ocr_worker_thread = threading.Thread(target=run_ocr_worker_in_thread, daemon=True) + ocr_worker_thread.start() + logger.info("OCR RabbitMQ worker thread started.") + + # 세 번째 워커(STT) 실행 + stt_worker_thread = threading.Thread(target=run_stt_worker_in_thread, daemon=True) + stt_worker_thread.start() + logger.info("STT RabbitMQ worker thread started.") yield # 애플리케이션 실행 유지 logger.info("Shutting down application...") - worker_thread.join(timeout=5) # 스레드 종료 대기 + worker_thread.join(timeout=5) # 첫 번째 스레드 종료 대기 + ocr_worker_thread.join(timeout=5) # OCR 스레드 종료 대기 + stt_worker_thread.join(timeout=5) # STT 스레드 종료 대기 app = FastAPI(title=settings.PROJECT_NAME, lifespan=lifespan) diff --git a/app/models/info.py b/app/models/info.py index d1ce7e75..461f968c 100644 --- a/app/models/info.py +++ b/app/models/info.py @@ -4,18 +4,29 @@ from services.emotion_behavior_situation import RelationshipAnalyzer, SituationSummary class OCRRequest(BaseModel): - id: int + memberId: int + privatePostId: int + url: str +class STTRequest(BaseModel): + memberId: int + privatePostId: int url: str class DataInfoOCR(BaseModel): status: bool - id: int + memberId: int + privatePostId: int url: str script: Optional[str] accessKey: str class DataInfoSTT(BaseModel): + status: bool + memberId: int + privatePostId: int + url: str script: Optional[str] + accessKey: str class DataInfoSummary(BaseModel): status: bool @@ -28,10 +39,6 @@ class DataInfoSummary(BaseModel): faultRate: Optional[float] accessKey: str -class VoiceInfo(BaseModel): - status: Optional[str] - timestamp: Optional[datetime] - data: Optional[Union[DataInfoSTT]] class JudgeRequest(BaseModel): id: int