Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions timeless-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
<artifactId>quarkus-langchain4j-openai</artifactId>
<version>${quarkus-langchain4j-openai.version}</version>
</dependency>
<dependency>
<groupId>io.quarkiverse.amazonservices</groupId>
<artifactId>quarkus-messaging-amazon-sqs</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.amazonservices</groupId>
<artifactId>quarkus-amazon-sqs</artifactId>
Expand Down
92 changes: 43 additions & 49 deletions timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@
import dev.matheuscruz.infra.ai.data.RecognizedTransaction;
import dev.matheuscruz.infra.ai.data.SimpleMessage;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import software.amazon.awssdk.services.sqs.SqsClient;

@ApplicationScoped
public class SQS {

final String incomingMessagesUrl;
final String processedMessagesUrl;
final SqsClient sqs;
final ObjectMapper objectMapper;
final TextAiService aiService;
Expand All @@ -39,96 +39,94 @@ public class SQS {

private static final ObjectReader AI_RESPONSE_READER = new ObjectMapper().readerFor(RecognizedOperation.class);

public SQS(SqsClient sqs, @ConfigProperty(name = "whatsapp.incoming-message.queue-url") String incomingMessagesUrl,
@ConfigProperty(name = "whatsapp.recognized-message.queue-url") String messagesProcessedUrl,
ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository,
public SQS(SqsClient sqs, ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository,
UserRepository userRepository) {

this.sqs = sqs;
this.incomingMessagesUrl = incomingMessagesUrl;
this.processedMessagesUrl = messagesProcessedUrl;
this.objectMapper = objectMapper;
this.aiService = aiService;
this.recordRepository = recordRepository;
this.userRepository = userRepository;
}

@Scheduled(every = "5s")
public void receiveMessages() {
sqs.receiveMessage(req -> req.maxNumberOfMessages(10).queueUrl(incomingMessagesUrl)).messages()
.forEach(message -> processMessage(message.body(), message.receiptHandle()));
}

private void processMessage(String body, String receiptHandle) {
@Incoming("whatsapp-incoming")
@Outgoing("whatsapp-recognized")
public Multi<Message<String>> receiveMessages(Message<String> message) {
String body = message.getPayload();
IncomingMessage incomingMessage = parseIncomingMessage(body);
if (!MessageKind.TEXT.equals(incomingMessage.kind()))
return;

if (!MessageKind.TEXT.equals(incomingMessage.kind())) {
return Multi.createFrom().item(message);
}

Optional<User> user = this.userRepository.findByPhoneNumber(incomingMessage.sender());

if (user.isEmpty()) {
logger.error("User not found. Deleting message from queue.");
deleteMessageUsing(receiptHandle);
return;
logger.error("User not found.");
return Multi.createFrom().item(message);
}

handleUserMessage(user.get(), incomingMessage, receiptHandle);
return Multi.createFrom().iterable(handleUserMessage(user.get(), incomingMessage)).map(processedMessage -> {
try {
String processedBody = objectMapper.writeValueAsString(processedMessage);
return Message.of(processedBody).withAck(() -> message.ack())
.withNack(throwable -> message.nack(throwable));
} catch (JsonProcessingException e) {
logger.error("Failed to serialize message", e);
throw new RuntimeException(e);
}
});
}

private void handleUserMessage(User user, IncomingMessage message, String receiptHandle) {
private List<Object> handleUserMessage(User user, IncomingMessage message) {
List<Object> results = new ArrayList<>();
try {
AllRecognizedOperations allRecognizedOperations = aiService.handleMessage(message.messageBody(),
user.getId());

for (RecognizedOperation recognizedOperation : allRecognizedOperations.all()) {
switch (recognizedOperation.operation()) {
case AiOperations.ADD_TRANSACTION ->
processAddTransactionMessage(user, message, receiptHandle, recognizedOperation);
results.add(processAddTransactionMessage(user, message, recognizedOperation));
case AiOperations.GET_BALANCE -> {
logger.info("Processing GET_BALANCE operation" + recognizedOperation.recognizedTransaction());
processSimpleMessage(user, message, receiptHandle, recognizedOperation);
results.add(processSimpleMessage(user, message, recognizedOperation));
}
default -> logger.warnf("Unknown operation type: %s", recognizedOperation.operation());
}
}

} catch (Exception e) {
logger.error("Failed to process message: " + message.messageId(), e);
}
return results;
}

private void processAddTransactionMessage(User user, IncomingMessage message, String receiptHandle,
RecognizedOperation recognizedOperation) throws IOException {
private TransactionMessageProcessed processAddTransactionMessage(User user, IncomingMessage message,
RecognizedOperation recognizedOperation) {
RecognizedTransaction recognizedTransaction = recognizedOperation.recognizedTransaction();
sendProcessedMessage(new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(),
message.messageId(), MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(),
recognizedTransaction));

Record record = new Record.Builder().userId(user.getId()).amount(recognizedTransaction.amount())
.description(recognizedTransaction.description()).transaction(recognizedTransaction.type())
.category(recognizedTransaction.category()).build();

QuarkusTransaction.requiringNew().run(() -> recordRepository.persist(record));

deleteMessageUsing(receiptHandle);

logger.infof("Message %s processed as ADD_TRANSACTION", message.messageId());

return new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(), message.messageId(),
MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(),
recognizedTransaction);
}

private void processSimpleMessage(User user, IncomingMessage message, String receiptHandle,
RecognizedOperation recognizedOperation) throws IOException {
private SimpleMessageProcessed processSimpleMessage(User user, IncomingMessage message,
RecognizedOperation recognizedOperation) {
logger.infof("Processing simple message for user %s", recognizedOperation.recognizedTransaction());
SimpleMessage response = new SimpleMessage(recognizedOperation.recognizedTransaction().description());
sendProcessedMessage(new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(),
MessageStatus.PROCESSED, user.getPhoneNumber(), response));
deleteMessageUsing(receiptHandle);

logger.infof("Message %s processed as GET_BALANCE", message.messageId());
}

private void sendProcessedMessage(Object processedMessage) throws JsonProcessingException {
String messageBody = objectMapper.writeValueAsString(processedMessage);
sqs.sendMessage(req -> req.messageBody(messageBody).messageGroupId("ProcessedMessages")
.messageDeduplicationId(UUID.randomUUID().toString()).queueUrl(processedMessagesUrl));
return new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(),
MessageStatus.PROCESSED, user.getPhoneNumber(), response);
}

private IncomingMessage parseIncomingMessage(String messageBody) {
Expand All @@ -139,10 +137,6 @@ private IncomingMessage parseIncomingMessage(String messageBody) {
}
}

private void deleteMessageUsing(String receiptHandle) {
sqs.deleteMessage(req -> req.queueUrl(incomingMessagesUrl).receiptHandle(receiptHandle));
}

public record TransactionMessageProcessed(String kind, String messageId, MessageStatus status, String user,
Boolean withError, RecognizedTransaction content) {
}
Expand Down
8 changes: 8 additions & 0 deletions timeless-api/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ security.sensible.secret=${SECURITY_KEY}
whatsapp.incoming-message.queue-url=${INCOMING_MESSAGE_FIFO_URL}
whatsapp.recognized-message.queue-url=${RECOGNIZED_MESSAGE_FIFO_URL}

# smallrye reactive messaging sqs
mp.messaging.incoming.whatsapp-incoming.connector=smallrye-sqs
mp.messaging.incoming.whatsapp-incoming.queue=${INCOMING_MESSAGE_FIFO_URL}
mp.messaging.incoming.whatsapp-incoming.visibility-timeout=30

mp.messaging.outgoing.whatsapp-recognized.connector=smallrye-sqs
mp.messaging.outgoing.whatsapp-recognized.queue=${RECOGNIZED_MESSAGE_FIFO_URL}

# aws sqs
quarkus.sqs.devservices.enabled=false

Expand Down
Loading