diff --git a/timeless-api/pom.xml b/timeless-api/pom.xml index 154c661..e6004a7 100644 --- a/timeless-api/pom.xml +++ b/timeless-api/pom.xml @@ -71,6 +71,10 @@ quarkus-langchain4j-openai ${quarkus-langchain4j-openai.version} + + io.quarkiverse.amazonservices + quarkus-messaging-amazon-sqs + io.quarkiverse.amazonservices quarkus-amazon-sqs diff --git a/timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java b/timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java index 6aae662..297af47 100644 --- a/timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java +++ b/timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java @@ -14,20 +14,20 @@ import dev.matheuscruz.infra.ai.data.RecognizedTransaction; import dev.matheuscruz.infra.ai.data.SimpleMessage; import io.quarkus.narayana.jta.QuarkusTransaction; -import io.quarkus.scheduler.Scheduled; +import io.smallrye.mutiny.Multi; import jakarta.enterprise.context.ApplicationScoped; -import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; -import java.util.UUID; -import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.logging.Logger; import software.amazon.awssdk.services.sqs.SqsClient; @ApplicationScoped public class SQS { - final String incomingMessagesUrl; - final String processedMessagesUrl; final SqsClient sqs; final ObjectMapper objectMapper; final TextAiService aiService; @@ -39,43 +39,47 @@ public class SQS { private static final ObjectReader AI_RESPONSE_READER = new ObjectMapper().readerFor(RecognizedOperation.class); - public SQS(SqsClient sqs, @ConfigProperty(name = "whatsapp.incoming-message.queue-url") String incomingMessagesUrl, - @ConfigProperty(name = "whatsapp.recognized-message.queue-url") String messagesProcessedUrl, - ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository, + public SQS(SqsClient sqs, ObjectMapper objectMapper, TextAiService aiService, RecordRepository recordRepository, UserRepository userRepository) { this.sqs = sqs; - this.incomingMessagesUrl = incomingMessagesUrl; - this.processedMessagesUrl = messagesProcessedUrl; this.objectMapper = objectMapper; this.aiService = aiService; this.recordRepository = recordRepository; this.userRepository = userRepository; } - @Scheduled(every = "5s") - public void receiveMessages() { - sqs.receiveMessage(req -> req.maxNumberOfMessages(10).queueUrl(incomingMessagesUrl)).messages() - .forEach(message -> processMessage(message.body(), message.receiptHandle())); - } - - private void processMessage(String body, String receiptHandle) { + @Incoming("whatsapp-incoming") + @Outgoing("whatsapp-recognized") + public Multi> receiveMessages(Message message) { + String body = message.getPayload(); IncomingMessage incomingMessage = parseIncomingMessage(body); - if (!MessageKind.TEXT.equals(incomingMessage.kind())) - return; + + if (!MessageKind.TEXT.equals(incomingMessage.kind())) { + return Multi.createFrom().item(message); + } Optional user = this.userRepository.findByPhoneNumber(incomingMessage.sender()); if (user.isEmpty()) { - logger.error("User not found. Deleting message from queue."); - deleteMessageUsing(receiptHandle); - return; + logger.error("User not found."); + return Multi.createFrom().item(message); } - handleUserMessage(user.get(), incomingMessage, receiptHandle); + return Multi.createFrom().iterable(handleUserMessage(user.get(), incomingMessage)).map(processedMessage -> { + try { + String processedBody = objectMapper.writeValueAsString(processedMessage); + return Message.of(processedBody).withAck(() -> message.ack()) + .withNack(throwable -> message.nack(throwable)); + } catch (JsonProcessingException e) { + logger.error("Failed to serialize message", e); + throw new RuntimeException(e); + } + }); } - private void handleUserMessage(User user, IncomingMessage message, String receiptHandle) { + private List handleUserMessage(User user, IncomingMessage message) { + List results = new ArrayList<>(); try { AllRecognizedOperations allRecognizedOperations = aiService.handleMessage(message.messageBody(), user.getId()); @@ -83,26 +87,23 @@ private void handleUserMessage(User user, IncomingMessage message, String receip for (RecognizedOperation recognizedOperation : allRecognizedOperations.all()) { switch (recognizedOperation.operation()) { case AiOperations.ADD_TRANSACTION -> - processAddTransactionMessage(user, message, receiptHandle, recognizedOperation); + results.add(processAddTransactionMessage(user, message, recognizedOperation)); case AiOperations.GET_BALANCE -> { logger.info("Processing GET_BALANCE operation" + recognizedOperation.recognizedTransaction()); - processSimpleMessage(user, message, receiptHandle, recognizedOperation); + results.add(processSimpleMessage(user, message, recognizedOperation)); } default -> logger.warnf("Unknown operation type: %s", recognizedOperation.operation()); } } - } catch (Exception e) { logger.error("Failed to process message: " + message.messageId(), e); } + return results; } - private void processAddTransactionMessage(User user, IncomingMessage message, String receiptHandle, - RecognizedOperation recognizedOperation) throws IOException { + private TransactionMessageProcessed processAddTransactionMessage(User user, IncomingMessage message, + RecognizedOperation recognizedOperation) { RecognizedTransaction recognizedTransaction = recognizedOperation.recognizedTransaction(); - sendProcessedMessage(new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(), - message.messageId(), MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(), - recognizedTransaction)); Record record = new Record.Builder().userId(user.getId()).amount(recognizedTransaction.amount()) .description(recognizedTransaction.description()).transaction(recognizedTransaction.type()) @@ -110,25 +111,22 @@ private void processAddTransactionMessage(User user, IncomingMessage message, St QuarkusTransaction.requiringNew().run(() -> recordRepository.persist(record)); - deleteMessageUsing(receiptHandle); - logger.infof("Message %s processed as ADD_TRANSACTION", message.messageId()); + + return new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(), message.messageId(), + MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(), + recognizedTransaction); } - private void processSimpleMessage(User user, IncomingMessage message, String receiptHandle, - RecognizedOperation recognizedOperation) throws IOException { + private SimpleMessageProcessed processSimpleMessage(User user, IncomingMessage message, + RecognizedOperation recognizedOperation) { logger.infof("Processing simple message for user %s", recognizedOperation.recognizedTransaction()); SimpleMessage response = new SimpleMessage(recognizedOperation.recognizedTransaction().description()); - sendProcessedMessage(new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(), - MessageStatus.PROCESSED, user.getPhoneNumber(), response)); - deleteMessageUsing(receiptHandle); + logger.infof("Message %s processed as GET_BALANCE", message.messageId()); - } - private void sendProcessedMessage(Object processedMessage) throws JsonProcessingException { - String messageBody = objectMapper.writeValueAsString(processedMessage); - sqs.sendMessage(req -> req.messageBody(messageBody).messageGroupId("ProcessedMessages") - .messageDeduplicationId(UUID.randomUUID().toString()).queueUrl(processedMessagesUrl)); + return new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(), + MessageStatus.PROCESSED, user.getPhoneNumber(), response); } private IncomingMessage parseIncomingMessage(String messageBody) { @@ -139,10 +137,6 @@ private IncomingMessage parseIncomingMessage(String messageBody) { } } - private void deleteMessageUsing(String receiptHandle) { - sqs.deleteMessage(req -> req.queueUrl(incomingMessagesUrl).receiptHandle(receiptHandle)); - } - public record TransactionMessageProcessed(String kind, String messageId, MessageStatus status, String user, Boolean withError, RecognizedTransaction content) { } diff --git a/timeless-api/src/main/resources/application.properties b/timeless-api/src/main/resources/application.properties index f7cc93b..3beb6f7 100644 --- a/timeless-api/src/main/resources/application.properties +++ b/timeless-api/src/main/resources/application.properties @@ -5,6 +5,14 @@ security.sensible.secret=${SECURITY_KEY} whatsapp.incoming-message.queue-url=${INCOMING_MESSAGE_FIFO_URL} whatsapp.recognized-message.queue-url=${RECOGNIZED_MESSAGE_FIFO_URL} +# smallrye reactive messaging sqs +mp.messaging.incoming.whatsapp-incoming.connector=smallrye-sqs +mp.messaging.incoming.whatsapp-incoming.queue=${INCOMING_MESSAGE_FIFO_URL} +mp.messaging.incoming.whatsapp-incoming.visibility-timeout=30 + +mp.messaging.outgoing.whatsapp-recognized.connector=smallrye-sqs +mp.messaging.outgoing.whatsapp-recognized.queue=${RECOGNIZED_MESSAGE_FIFO_URL} + # aws sqs quarkus.sqs.devservices.enabled=false diff --git a/timeless-api/src/main/webui/package-lock.json b/timeless-api/src/main/webui/package-lock.json index 7020eed..cae08e8 100644 --- a/timeless-api/src/main/webui/package-lock.json +++ b/timeless-api/src/main/webui/package-lock.json @@ -345,6 +345,7 @@ "resolved": "https://registry.npmjs.org/@angular/animations/-/animations-21.0.6.tgz", "integrity": "sha512-dSxhkh/ZlljdglZ0rriSy7GdC1Y3rGaagkx6oAzF5XqAoBbFmiVFEBZPxssSeQ+O0izmAw3GwsUnz3E/1JYsbA==", "license": "MIT", + "peer": true, "dependencies": { "tslib": "^2.3.0" }, @@ -996,6 +997,7 @@ "resolved": "https://registry.npmjs.org/@angular/common/-/common-21.0.6.tgz", "integrity": "sha512-Yd8PF0dR37FAzqEcBHAyVCiSGMJOezSJe6rV/4BC6AVLfaZ7oZLl8CNVxKsod2UHd6rKxt1hzx05QdVcVvYNeA==", "license": "MIT", + "peer": true, "dependencies": { "tslib": "^2.3.0" }, @@ -1012,6 +1014,7 @@ "resolved": "https://registry.npmjs.org/@angular/compiler/-/compiler-21.0.6.tgz", "integrity": "sha512-rBMzG7WnQMouFfDST+daNSAOVYdtw560645PhlxyVeIeHMlCm0j1jjBgVPGTBNpVgKRdT/sqbi6W6JYkY9mERA==", "license": "MIT", + "peer": true, "dependencies": { "tslib": "^2.3.0" }, @@ -1025,6 +1028,7 @@ "integrity": "sha512-UcIUx+fbn0VLlCBCIYxntAzWG3zPRUo0K7wvuK0MC6ZFCWawgewx9SdLLZTqcaWe1g5FRQlQeVQcFgHAO5R2Mw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/core": "7.28.4", "@jridgewell/sourcemap-codec": "^1.4.14", @@ -1057,6 +1061,7 @@ "resolved": "https://registry.npmjs.org/@angular/core/-/core-21.0.6.tgz", "integrity": "sha512-SvWbOkkrsqprYJSBmzQEWkWjfZB/jkRYyFp2ClMJBPqOLxP1a+i3Om2rolcNQjZPz87bs9FszwgRlXUy7sw5cQ==", "license": "MIT", + "peer": true, "dependencies": { "tslib": "^2.3.0" }, @@ -1082,6 +1087,7 @@ "resolved": "https://registry.npmjs.org/@angular/forms/-/forms-21.0.6.tgz", "integrity": "sha512-aAkAAKuUrP8U7R4aH/HbmG/CXP90GlML77ECBI5b4qCSb+bvaTEYsaf85mCyTpr9jvGkia2LTe42hPcOuyzdsQ==", "license": "MIT", + "peer": true, "dependencies": { "@standard-schema/spec": "^1.0.0", "tslib": "^2.3.0" @@ -1101,6 +1107,7 @@ "resolved": "https://registry.npmjs.org/@angular/platform-browser/-/platform-browser-21.0.6.tgz", "integrity": "sha512-tPk8rlUEBPXIUPRYq6Xu7QhJgKtnVr0dOHHuhyi70biKTupr5VikpZC5X9dy2Q3H3zYbK6MHC6384YMuwfU2kg==", "license": "MIT", + "peer": true, "dependencies": { "tslib": "^2.3.0" }, @@ -1141,6 +1148,7 @@ "resolved": "https://registry.npmjs.org/@angular/router/-/router-21.0.6.tgz", "integrity": "sha512-HOfomKq7jRSgxt/uUvpdbB8RNaYuGB/FJQ3BfQCFfGw1O9L3B72b7Hilk6AcjCruul6cfv/kmT4EB6Vqi3dQtA==", "license": "MIT", + "peer": true, "dependencies": { "tslib": "^2.3.0" }, @@ -1185,6 +1193,7 @@ "integrity": "sha512-2BCOP7TN8M+gVDj7/ht3hsaO/B/n5oDbiAyyvnRlNOs+u1o+JWNYTQrmpuNp1/Wq2gcFrI01JAW+paEKDMx/CA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.3", @@ -2203,6 +2212,7 @@ "integrity": "sha512-X7/+dG9SLpSzRkwgG5/xiIzW0oMrV3C0HOa7YHG1WnrLK+vCQHfte4k/T80059YBdei29RBC3s+pSMvPJDU9/A==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@inquirer/checkbox": "^4.3.0", "@inquirer/confirm": "^5.1.19", @@ -4699,6 +4709,7 @@ "integrity": "sha512-3vMNr4TzNQyjHcRZadojpRaD9Ofr6LsonZAoQ+HMUa/9ORTPoxVIw0e0mpqWpdjj8xybyCM+oKOUH2vwFu/oEw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -5116,6 +5127,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -5593,7 +5605,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "dependencies": { "is-what": "^3.14.1" }, @@ -6036,7 +6047,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "dependencies": { "prr": "~1.0.1" }, @@ -7088,7 +7098,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "bin": { "image-size": "bin/image-size.js" }, @@ -7293,8 +7302,7 @@ "integrity": "sha512-sNxgpk9793nzSs7bA6JQJGeIuRBQhAaNGG77kzYQgMkrID+lS6SlK07K5LaptscDlSaIgH+GPFzf+d75FVxozA==", "dev": true, "license": "MIT", - "optional": true, - "peer": true + "optional": true }, "node_modules/isbinaryfile": { "version": "4.0.10", @@ -7416,13 +7424,15 @@ "resolved": "https://registry.npmjs.org/jasmine-core/-/jasmine-core-5.13.0.tgz", "integrity": "sha512-vsYjfh7lyqvZX5QgqKc4YH8phs7g96Z8bsdIFNEU3VqXhlHaq+vov/Fgn/sr6MiUczdZkyXRC3TX369Ll4Nzbw==", "dev": true, - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/jiti": { "version": "1.21.7", "resolved": "https://registry.npmjs.org/jiti/-/jiti-1.21.7.tgz", "integrity": "sha512-/imKNG4EbWNrVjoNC/1H5/9GFy+tqjGBHCaSsN+P2RnPqjsLmv6UD3Ej+Kj8nBWaRAwyk7kK5ZUc+OEatnTR3A==", "license": "MIT", + "peer": true, "bin": { "jiti": "bin/jiti.js" } @@ -7520,6 +7530,7 @@ "integrity": "sha512-LrtUxbdvt1gOpo3gxG+VAJlJAEMhbWlM4YrFQgql98FwF7+K8K12LYO4hnDdUkNjeztYrOXEMqgTajSWgmtI/w==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@colors/colors": "1.5.0", "body-parser": "^1.19.0", @@ -7848,7 +7859,6 @@ "dev": true, "license": "Apache-2.0", "optional": true, - "peer": true, "dependencies": { "copy-anything": "^2.0.1", "parse-node-version": "^1.0.1", @@ -7877,7 +7887,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "dependencies": { "pify": "^4.0.1", "semver": "^5.6.0" @@ -7893,7 +7902,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "bin": { "mime": "cli.js" }, @@ -7908,7 +7916,6 @@ "dev": true, "license": "ISC", "optional": true, - "peer": true, "bin": { "semver": "bin/semver" } @@ -7920,7 +7927,6 @@ "dev": true, "license": "BSD-3-Clause", "optional": true, - "peer": true, "engines": { "node": ">=0.10.0" } @@ -7949,6 +7955,7 @@ "integrity": "sha512-ME4Fb83LgEgwNw96RKNvKV4VTLuXfoKudAmm2lP8Kk87KaMK0/Xrx/aAkMWmT8mDb+3MlFDspfbCs7adjRxA2g==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "cli-truncate": "^5.0.0", "colorette": "^2.0.20", @@ -8602,7 +8609,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "dependencies": { "iconv-lite": "^0.6.3", "sax": "^1.2.4" @@ -8621,7 +8627,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "dependencies": { "safer-buffer": ">= 2.1.2 < 3.0.0" }, @@ -9094,7 +9099,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "engines": { "node": ">= 0.10" } @@ -9258,7 +9262,6 @@ "dev": true, "license": "MIT", "optional": true, - "peer": true, "engines": { "node": ">=6" } @@ -9314,6 +9317,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", @@ -9538,8 +9542,7 @@ "integrity": "sha512-yPw4Sng1gWghHQWj0B3ZggWUm4qVbPwPFcRG8KyxiU7J2OHFSoEHKS+EZ3fv5l1t9CyCiop6l/ZYeWbrgoQejw==", "dev": true, "license": "MIT", - "optional": true, - "peer": true + "optional": true }, "node_modules/punycode": { "version": "1.4.1", @@ -9887,6 +9890,7 @@ "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.2.tgz", "integrity": "sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA==", "license": "Apache-2.0", + "peer": true, "dependencies": { "tslib": "^2.1.0" } @@ -9922,6 +9926,7 @@ "integrity": "sha512-t+YPtOQHpGW1QWsh1CHQ5cPIr9lbbGZLZnbihP/D/qZj/yuV68m8qarcV17nvkOX81BCrvzAlq2klCQFZghyTg==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "chokidar": "^4.0.0", "immutable": "^5.0.2", @@ -9943,8 +9948,7 @@ "integrity": "sha512-yqYn1JhPczigF94DMS+shiDMjDowYO6y9+wB/4WgO0Y19jWYk0lQ4tuG5KI7kj4FTp1wxPj5IFfcrz/s1c3jjQ==", "dev": true, "license": "BlueOak-1.0.0", - "optional": true, - "peer": true + "optional": true }, "node_modules/semver": { "version": "7.7.3", @@ -10725,6 +10729,7 @@ "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.17.tgz", "integrity": "sha512-w33E2aCvSDP0tW9RZuNXadXlkHXqFzSkQew/aIa2i/Sj8fThxwovwlXHSPXTbAHwEIhBFXAedUhP2tueAKP8Og==", "license": "MIT", + "peer": true, "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", @@ -10936,7 +10941,8 @@ "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "license": "0BSD" + "license": "0BSD", + "peer": true }, "node_modules/tuf-js": { "version": "4.1.0", @@ -10973,6 +10979,7 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -11155,6 +11162,7 @@ "integrity": "sha512-BxAKBWmIbrDgrokdGZH1IgkIk/5mMHDreLDmCJ0qpyJaAteP8NvMhkwr/ZCQNqNH97bw/dANTE9PDzqwJghfMQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -11458,6 +11466,7 @@ "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.0.tgz", "integrity": "sha512-4lLa/EcQCB0cJkyts+FpIRx5G/llPxfP6VQU5KByHEhLxY3IJCH0f0Hy1MHI8sClTvsIb8qwRJ6R/ZdlDJ/leQ==", "license": "ISC", + "peer": true, "bin": { "yaml": "bin.mjs" }, @@ -11525,6 +11534,7 @@ "integrity": "sha512-AvvthqfqrAhNH9dnfmrfKzX5upOdjUVJYFqNSlkmGf64gRaTzlPwz99IHYnVs28qYAybvAlBV+H7pn0saFY4Ig==", "dev": true, "license": "MIT", + "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } @@ -11543,7 +11553,8 @@ "version": "0.15.0", "resolved": "https://registry.npmjs.org/zone.js/-/zone.js-0.15.0.tgz", "integrity": "sha512-9oxn0IIjbCZkJ67L+LkhYWRyAy7axphb3VgE2MBDlOqnmHMPWGYMxJxBYFueFq/JGY2GMwS0rU+UCLunEmy5UA==", - "license": "MIT" + "license": "MIT", + "peer": true } } }