From 062e1dbc92e665c24ede865aa357b990ce387033 Mon Sep 17 00:00:00 2001 From: wsxarcher Date: Sat, 31 Jul 2021 19:58:56 +0300 Subject: [PATCH] Futures WIP --- src/CMakeLists.txt | 4 ++-- src/example_service.cpp | 12 +++++++++++ src/example_service.h | 2 ++ src/jcon/json_rpc_future.cpp | 42 ++++++++++++++++++++++++++++++++++++ src/jcon/json_rpc_future.h | 39 +++++++++++++++++++++++++++++++++ src/jcon/json_rpc_server.cpp | 34 ++++++++++++++++++++++------- src/main.cpp | 19 ++++++++++++++++ 7 files changed, 142 insertions(+), 10 deletions(-) create mode 100644 src/jcon/json_rpc_future.cpp create mode 100644 src/jcon/json_rpc_future.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4057ed9..a81a9ae 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,8 +38,8 @@ endif() target_link_libraries(${PROJECT_NAME} jcon) if(USE_QT) - find_package(Qt5 COMPONENTS Core Network WebSockets) - target_link_libraries(${PROJECT_NAME} Qt5::Core Qt5::Network Qt5::WebSockets) + find_package(Qt5 COMPONENTS Core Network WebSockets Concurrent) + target_link_libraries(${PROJECT_NAME} Qt5::Core Qt5::Network Qt5::WebSockets Qt5::Concurrent) endif() if(APPLE) diff --git a/src/example_service.cpp b/src/example_service.cpp index 0f8aacb..a3ebcf1 100644 --- a/src/example_service.cpp +++ b/src/example_service.cpp @@ -33,3 +33,15 @@ void ExampleService::namedParams(QString& msg, int answer) qDebug().noquote() << " msg: " << msg; qDebug().noquote() << " answer: " << answer; } + +jcon::JsonRpcFuture ExampleService::futureGetRandomInt(int limit) +{ + qDebug().noquote() << QString("-> getRandomInt: '%1' (client IP: %2)") + .arg(limit) + .arg(jcon::JsonRpcServer::clientEndpoint()->peerAddress().toString()); + + return {[limit]{ + QThread::sleep(5); + return qrand() % limit; + }}; +} diff --git a/src/example_service.h b/src/example_service.h index 1899441..28d0842 100644 --- a/src/example_service.h +++ b/src/example_service.h @@ -1,5 +1,6 @@ #pragma once #include +#include class ExampleService : public QObject { @@ -13,4 +14,5 @@ class ExampleService : public QObject Q_INVOKABLE QString printMessage(const QString& msg); Q_INVOKABLE void printNotification(const QString& msg); Q_INVOKABLE void namedParams(QString& msg, int answer); + Q_INVOKABLE jcon::JsonRpcFuture futureGetRandomInt(int limit); }; diff --git a/src/jcon/json_rpc_future.cpp b/src/jcon/json_rpc_future.cpp new file mode 100644 index 0000000..e20b734 --- /dev/null +++ b/src/jcon/json_rpc_future.cpp @@ -0,0 +1,42 @@ +#include "json_rpc_future.h" + +namespace jcon { + JsonRpcFuture::JsonRpcFuture(std::function f, QThreadPool *threadPool) : + threadPool(threadPool != nullptr ? threadPool : QThreadPool::globalInstance()), + f(std::move(f)), + watcher(new QFutureWatcher()), + scheduled(new std::once_flag) { + } + + void JsonRpcFuture::start(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, std::function success, + std::function failure) { + std::call_once(*scheduled, &JsonRpcFuture::startPrivate, this, receiver, endpoint, success, failure); + } + + void JsonRpcFuture::startPrivate(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, + std::function success, + std::function failure) { + QObject::connect(watcher, &QFutureWatcher::finished, receiver, + [endpoint, success = std::move(success), watcher = this->watcher] { + watcher->deleteLater(); + if (endpoint != nullptr) { + QObject::disconnect(endpoint, &jcon::JsonRpcEndpoint::socketDisconnected, watcher, + nullptr); + } + success(watcher->result()); + }); + + if (endpoint != nullptr) { + QObject::connect(endpoint, &jcon::JsonRpcEndpoint::socketDisconnected, watcher, + [failure = std::move(failure), watcher = this->watcher, receiver] { + watcher->deleteLater(); + QObject::disconnect(watcher, &QFutureWatcher::finished, receiver, + nullptr); + failure(); + }); + } + + watcher->setFuture(QtConcurrent::run(threadPool, f)); + } + +}; \ No newline at end of file diff --git a/src/jcon/json_rpc_future.h b/src/jcon/json_rpc_future.h new file mode 100644 index 0000000..786ecc1 --- /dev/null +++ b/src/jcon/json_rpc_future.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "json_rpc_endpoint.h" + +namespace jcon { + + class JsonRpcFuture { + public: + JsonRpcFuture() = default; + + ~JsonRpcFuture() = default; + + JsonRpcFuture(const JsonRpcFuture &) = default; + + JsonRpcFuture &operator=(const JsonRpcFuture &) = default; + + JsonRpcFuture(std::function f, QThreadPool *threadPool = QThreadPool::globalInstance()); + + void start(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, std::function success, + std::function failure = [] {}); + + private: + void startPrivate(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, std::function success, + std::function failure = [] {}); + + QThreadPool *threadPool; + std::function f; + QFutureWatcher *watcher; + std::shared_ptr scheduled; + }; + +}; + +Q_DECLARE_METATYPE(jcon::JsonRpcFuture); \ No newline at end of file diff --git a/src/jcon/json_rpc_server.cpp b/src/jcon/json_rpc_server.cpp index 507b256..fcc9770 100644 --- a/src/jcon/json_rpc_server.cpp +++ b/src/jcon/json_rpc_server.cpp @@ -4,6 +4,7 @@ #include "json_rpc_error.h" #include "json_rpc_file_logger.h" #include "string_util.h" +#include "json_rpc_future.h" #include #include @@ -27,6 +28,7 @@ JsonRpcServer::JsonRpcServer(QObject* parent, , m_logger(logger) , m_allowNotification(false) { + qRegisterMetaType(); if (!m_logger) { m_logger = std::make_shared("json_server_log.txt"); } @@ -146,16 +148,32 @@ void JsonRpcServer::jsonRequestReceived(const QJsonObject& request, // send response if request had valid ID if (request_id != InvalidRequestId) { - QJsonDocument response = createResponse(request_id, - return_value, - method_name); + if (return_value.canConvert()) { + auto fut = return_value.value(); + fut.start(this, sm_client_endpoint, [this, request_id, method_name, endpoint = sm_client_endpoint](QVariant res) { + QJsonDocument response = createResponse(request_id, + res, + method_name); + + if (!endpoint) { + logError("invalid client socket, cannot send response"); + return; + } - if (!sm_client_endpoint) { - logError("invalid client socket, cannot send response"); - return; - } + endpoint->send(response); + }); + } else { + QJsonDocument response = createResponse(request_id, + return_value, + method_name); - sm_client_endpoint->send(response); + if (!sm_client_endpoint) { + logError("invalid client socket, cannot send response"); + return; + } + + sm_client_endpoint->send(response); + } } } diff --git a/src/main.cpp b/src/main.cpp index 0eb8df4..af6c7cc 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -110,6 +110,24 @@ void invokeMethodAsync(jcon::JsonRpcClient* rpc_client) }); } +void invokeFutureMethodAsync(jcon::JsonRpcClient* rpc_client) +{ + qsrand(std::time(nullptr)); + + auto req = rpc_client->callAsync("futureGetRandomInt", 10); + + req->connect(req.get(), &jcon::JsonRpcRequest::result, + [](const QVariant& result) { + qDebug() << "result of future asynchronous RPC call:" << result; + }); + + req->connect(req.get(), &jcon::JsonRpcRequest::error, + [](int code, const QString& message) { + qDebug() << "RPC error:" << message + << " (" << code << ")"; + }); +} + void invokeMethodSync(jcon::JsonRpcClient* rpc_client) { qsrand(std::time(nullptr)); @@ -227,6 +245,7 @@ void runServerAndClient(int argc, char* argv[]) invokeNotification(rpc_client); invokeMethodAsync(rpc_client); + invokeFutureMethodAsync(rpc_client); invokeMethodSync(rpc_client); invokeStringMethodSync(rpc_client); invokeStringMethodAsync(rpc_client);