Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 321 additions & 26 deletions app/api/endpoint/mediation_service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
import asyncio
import ssl
from fastapi import APIRouter, HTTPException, Header, status, BackgroundTasks
from botocore.exceptions import ClientError
from datetime import datetime
from models.info import DataInfoSummary, VoiceInfo, DataInfoSTT,JudgeRequest,STTRequest, ConflictAnalysisRequest,ConflictAnalysisResponseData, ConflictAnalysisResponse
from services.situation_summary import situation_summary_GPT,stt_model,generate_response,test_response
from pydantic import ValidationError
from models.info import (DataInfoSummary, DataInfoSTT, JudgeRequest, STTRequest,
ConflictAnalysisRequest, ConflictAnalysisResponseData,
ConflictAnalysisResponse, DataInfoOCR, OCRRequest)
from services.situation_summary import situation_summary_GPT, stt_model, generate_response, test_response
from services.audio_process import process_audio_file
from services.image_process import process_image_file
from services.score_multi import process_request
Expand Down Expand Up @@ -80,7 +84,9 @@
# logger.error(f"Unexpected error during conflict analysis: {e}")
# raise HTTPException(status_code=500, detail="Internal server error during conflict analysis")

@router.post("/speech-to-text", response_model=VoiceInfo, status_code=201) # response_model을 VoiceInfo로 변경

@router.post("/speech-to-text", response_model=dict, status_code=201)

async def get_voice(request: STTRequest):
logger.info("get_infos start")
logger.info(f"audio URL : {request.url}")
Expand Down Expand Up @@ -120,15 +126,13 @@ async def get_voice(request: STTRequest):
raise HTTPException(status_code=500, detail="STT_PROCESSING_ERROR")

# 응답 생성
# response = VoiceInfo(
# status="Created",
# timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# data=DataInfoSTT(
# script=transcription
# )
# )
# logger.info(f"Response: {response}")
# return response

response = {
"script": transcription
}
logger.info(f"Response: {response}")
return response


@router.post("/image-to-text", response_model=dict, status_code=201)
async def get_image(request: STTRequest):
Expand Down Expand Up @@ -156,17 +160,12 @@ async def get_image(request: STTRequest):
raise HTTPException(status_code=500, detail="OCR_PROCESSING_ERROR")

# 응답 생성
response = VoiceInfo(
status="Created",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
data=DataInfoSTT(
script=transcription
)
)
response = {
"script": transcription
}
logger.info(f"Response: {response}")
return response


#동기식 함수
# @router.post("/judgement", response_model=DataInfoSummary, status_code=201)
# async def process_judge(request: JudgeRequest):
Expand Down Expand Up @@ -494,10 +493,13 @@ def process_message(ch, method, properties, body):
logger.error(f"Error processing message: {e}")
# 실패 메시지 재시도 방지
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)


port = 5671
# vhost = "/"
rabbitmq_url = f"amqps://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_URL}:{port}"


def start_worker():
"""
RabbitMQ 워커 시작
Expand Down Expand Up @@ -539,11 +541,304 @@ def start_worker():
except Exception as e:
logger.error(f"알 수 없는 오류 발생: {e}")

# def process_message(ch, method, properties, body):

######################
OCR_CALLBACK_URL = settings.OCR_CALLBACK_URL
#######################
# #OCR 비동기 처리입니다. 필요시 활성화 하세요
#
# @router.post("/image-to-text", response_model=dict, status_code=202)
# async def process_image(request: STTRequest, background_tasks: BackgroundTasks):
# """
# 요청을 수락하고 202 응답을 반환.
# BackgroundTasks를 이용해 OCR 작업 수행 후 결과를 CALLBACK_URL로 POST 전송.
# """
# logger.info("Received image-to-text request")
# logger.info(f"Image URL: {request.url}")
#
# # URL이 제공되지 않은 경우 오류 반환
# if not request.url:
# raise HTTPException(status_code=400, detail="URL_NOT_PROVIDED")
#
# # Background 작업 등록
# logger.info("Starting background task for OCR processing...")
# background_tasks.add_task(execute_OCR_and_callback, request.url)
#
# # 202 Accepted 응답 반환
# return {"status": "accepted", "message": "OCR processing started."}

#################3
#OCR_MQ부분입니다.
async def execute_OCR_and_callback(member_id: int, private_post_id: int, url: str):
"""
OCR 호출 후 결과를 OCR_CALLBACK_URL로 전송
"""
try:
logger.info(f"Executing OCR function for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}")

# 비동기 OCR 처리
transcription = await process_image_file(url)

# DataInfoOCR 데이터 생성
ocr_data = {
"status": bool(transcription), # script가 있으면 True
"memberId": member_id,
"privatePostId": private_post_id,
"url": url,
"script": transcription,
"accessKey": ACCESSTOKEN
}

# DataInfoOCR 모델로 데이터 검증
try:
validated_data = DataInfoOCR(**ocr_data)
except ValidationError as e:
logger.error(f"Validation error in OCR data: {e}")
raise ValueError("Invalid data format for OCR callback.")

# 콜백 URL로 데이터 전송
logger.info(f"Sending POST request to OCR_CALLBACK_URL with data: {validated_data.dict()}")
response = requests.post(OCR_CALLBACK_URL, json=validated_data.dict())
logger.info(f"Callback response: {response.status_code}, {response.text}")

except Exception as e:
logger.error(f"Error during OCR processing for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}, Error: {e}")
# 실패한 경우 콜백 URL로 에러 메시지 전송
error_response = {
"status": False,
"memberId": member_id,
"privatePostId": private_post_id,
"url": url,
"script": None,
"accessKey": ACCESSTOKEN
}
requests.post(OCR_CALLBACK_URL, json=error_response)


def process_message(ch, method, properties, body):
"""
RabbitMQ 메시지 소비 후 OCR 처리 및 콜백 전송
"""
try:
# 메시지 디코딩 및 검증
decoded_body = body.decode('utf-8')
logger.info(f"Received message: {decoded_body}")
message = json.loads(decoded_body)

# 데이터 검증
try:
request_data = OCRRequest(**message)
except ValidationError as e:
logger.error(f"Validation error in received message: {e}")
raise ValueError("Invalid message format.")

# 비동기 OCR 처리 및 콜백 전송
asyncio.run(execute_OCR_and_callback(request_data.memberId, request_data.privatePostId, request_data.url))

# 메시지 처리 완료
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Message processed successfully: Member ID {request_data.memberId}, Private Post ID {request_data.privatePostId}, URL {request_data.url}")

except Exception as e:
logger.error(f"Error processing message: {e}")
# 실패 메시지 재시도 방지
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

except Exception as e:
logger.error(f"Error processing message: {e}")
# 실패 메시지 재시도 방지
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

def start_ocr_worker():
"""
RabbitMQ 워커 시작
"""
try:
# SSL 컨텍스트 설정
ssl_context = ssl.create_default_context()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
ssl_context.load_default_certs()

# RabbitMQ 연결 설정
params = pika.URLParameters(rabbitmq_url)
params.ssl_options = pika.SSLOptions(ssl_context)

connection = pika.BlockingConnection(params)
channel = connection.channel()

# Exchange 및 Queue 설정
channel.exchange_declare(exchange="aiOCRExchange", exchange_type="direct", durable=True)
channel.queue_declare(queue="aiOCRQueue", durable=True)
channel.queue_bind(exchange="aiOCRExchange", queue="aiOCRQueue", routing_key="ai.ocr.key")

# 메시지 소비 시작
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="aiOCRQueue", on_message_callback=process_message)

logger.info("OCR Worker is waiting for messages...")
channel.start_consuming()

except ssl.SSLError as e:
logger.error(f"SSL 연결 오류 발생: {e}")
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"OCR RabbitMQ 연결 오류 발생: {e}")
except Exception as e:
logger.error(f"알 수 없는 오류 발생: {e}")



#################################


######################
STT_CALLBACK_URL = settings.STT_CALLBACK_URL
#######################
# #OCR 비동기 처리입니다. 필요시 활성화 하세요
#
# @router.post("/speech-to-text", response_model=dict, status_code=202)
# async def process_speech(request: STTRequest, background_tasks: BackgroundTasks):
# """
# 메시지 처리 콜백 함수
# 요청을 수락하고 202 응답을 반환.
# BackgroundTasks를 이용해 STT 작업 수행 후 결과를 CALLBACK_URL로 POST 전송.
# """
# decoded_body = body.decode('utf-8')
# logger.info(f"Received message: {decoded_body}")
# # 메시지 처리 로직 추가
# ch.basic_ack(delivery_tag=method.delivery_tag) # 메시지 확인
# logger.info("Received speech-to-text request")
# logger.info(f"Member ID: {request.memberId}, Private Post ID: {request.privatePostId}, Audio URL: {request.url}")
#
# # URL 및 필드가 제공되지 않은 경우 오류 반환
# if not request.url:
# raise HTTPException(status_code=400, detail="URL_NOT_PROVIDED")
#
# # Background 작업 등록
# logger.info("Starting background task for STT processing...")
# background_tasks.add_task(
# execute_STT_and_callback, request.memberId, request.privatePostId, request.url
# )
#
# # 202 Accepted 응답 반환
# return {"status": "accepted", "message": "STT processing started."}

#################3
#OCR_MQ부분입니다.
async def execute_STT_and_callback(member_id: int, private_post_id: int, url: str):
"""
STT 호출 후 결과를 STT_CALLBACK_URL로 전송
"""
try:
logger.info(f"Executing STT function for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}")

# 비동기 STT 처리
transcription = await process_audio_file(url)

# DataInfoSTT 데이터 생성
stt_data = {
"status": bool(transcription), # script가 있으면 True
"memberId": member_id,
"privatePostId": private_post_id,
"url": url,
"script": transcription,
"accessKey": ACCESSTOKEN
}

# DataInfoSTT 모델로 데이터 검증
try:
validated_data = DataInfoSTT(**stt_data)
except ValidationError as e:
logger.error(f"Validation error in STT data: {e}")
raise ValueError("Invalid data format for STT callback.")

# 콜백 URL로 데이터 전송
logger.info(f"Sending POST request to STT_CALLBACK_URL with data: {validated_data.dict()}")
response = requests.post(STT_CALLBACK_URL, json=validated_data.dict())
logger.info(f"Callback response: {response.status_code}, {response.text}")

except Exception as e:
logger.error(f"Error during STT processing for Member ID: {member_id}, Private Post ID: {private_post_id}, URL: {url}, Error: {e}")
# 실패한 경우 콜백 URL로 에러 메시지 전송
error_response = {
"status": False,
"memberId": member_id,
"privatePostId": private_post_id,
"url": url,
"script": None,
"accessKey": ACCESSTOKEN
}
requests.post(STT_CALLBACK_URL, json=error_response)


def process_stt_message(ch, method, properties, body):
"""
RabbitMQ 메시지 소비 후 STT 처리 및 콜백 전송
"""
try:
# 메시지 디코딩 및 검증
decoded_body = body.decode('utf-8')
logger.info(f"Received STT message: {decoded_body}")
message = json.loads(decoded_body)

# 데이터 검증
try:
request_data = STTRequest(**message)
except ValidationError as e:
logger.error(f"Validation error in received STT message: {e}")
raise ValueError("Invalid message format.")

# 비동기 STT 처리 및 콜백 전송
asyncio.run(
execute_STT_and_callback(
request_data.memberId, request_data.privatePostId, request_data.url
)
)

# 메시지 처리 완료
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(
f"Message processed successfully: Member ID {request_data.memberId}, Private Post ID {request_data.privatePostId}, URL {request_data.url}"
)

except Exception as e:
logger.error(f"Error processing STT message: {e}")
# 실패 메시지 재시도 방지
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

def start_stt_worker():
"""
STT RabbitMQ 워커 시작
"""
try:
# SSL 컨텍스트 설정
ssl_context = ssl.create_default_context()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
ssl_context.load_default_certs()

# RabbitMQ 연결 설정

params = pika.URLParameters(rabbitmq_url)
params.ssl_options = pika.SSLOptions(ssl_context)

connection = pika.BlockingConnection(params)
channel = connection.channel()

# Exchange 및 Queue 설정
channel.exchange_declare(exchange="aiSTTExchange", exchange_type="direct", durable=True)
channel.queue_declare(queue="aiSTTQueue", durable=True)
channel.queue_bind(exchange="aiSTTExchange", queue="aiSTTQueue", routing_key="ai.stt.key")

# 메시지 소비 시작
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="aiSTTQueue", on_message_callback=process_stt_message)

logger.info("STT Worker is waiting for messages...")
channel.start_consuming()

except ssl.SSLError as e:
logger.error(f"SSL 연결 오류 발생: {e}")
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"STT RabbitMQ 연결 오류 발생: {e}")
except Exception as e:
logger.error(f"알 수 없는 오류 발생: {e}")



#################################
Loading
Loading