Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ endif()
target_link_libraries(${PROJECT_NAME} jcon)

if(USE_QT)
find_package(Qt5 COMPONENTS Core Network WebSockets)
target_link_libraries(${PROJECT_NAME} Qt5::Core Qt5::Network Qt5::WebSockets)
find_package(Qt5 COMPONENTS Core Network WebSockets Concurrent)
target_link_libraries(${PROJECT_NAME} Qt5::Core Qt5::Network Qt5::WebSockets Qt5::Concurrent)
endif()

if(APPLE)
Expand Down
12 changes: 12 additions & 0 deletions src/example_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,15 @@ void ExampleService::namedParams(QString& msg, int answer)
qDebug().noquote() << " msg: " << msg;
qDebug().noquote() << " answer: " << answer;
}

jcon::JsonRpcFuture ExampleService::futureGetRandomInt(int limit)
{
qDebug().noquote() << QString("-> getRandomInt: '%1' (client IP: %2)")
.arg(limit)
.arg(jcon::JsonRpcServer::clientEndpoint()->peerAddress().toString());

return {[limit]{
QThread::sleep(5);
return qrand() % limit;
}};
}
2 changes: 2 additions & 0 deletions src/example_service.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <QObject>
#include <jcon/json_rpc_future.h>

class ExampleService : public QObject
{
Expand All @@ -13,4 +14,5 @@ class ExampleService : public QObject
Q_INVOKABLE QString printMessage(const QString& msg);
Q_INVOKABLE void printNotification(const QString& msg);
Q_INVOKABLE void namedParams(QString& msg, int answer);
Q_INVOKABLE jcon::JsonRpcFuture futureGetRandomInt(int limit);
};
42 changes: 42 additions & 0 deletions src/jcon/json_rpc_future.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "json_rpc_future.h"

namespace jcon {
JsonRpcFuture::JsonRpcFuture(std::function<QVariant()> f, QThreadPool *threadPool) :
threadPool(threadPool != nullptr ? threadPool : QThreadPool::globalInstance()),
f(std::move(f)),
watcher(new QFutureWatcher<QVariant>()),
scheduled(new std::once_flag) {
}

void JsonRpcFuture::start(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, std::function<void(QVariant)> success,
std::function<void(void)> failure) {
std::call_once(*scheduled, &JsonRpcFuture::startPrivate, this, receiver, endpoint, success, failure);
}

void JsonRpcFuture::startPrivate(QObject *receiver, jcon::JsonRpcEndpoint *endpoint,
std::function<void(QVariant)> success,
std::function<void(void)> failure) {
QObject::connect(watcher, &QFutureWatcher<QVariant>::finished, receiver,
[endpoint, success = std::move(success), watcher = this->watcher] {
watcher->deleteLater();
if (endpoint != nullptr) {
QObject::disconnect(endpoint, &jcon::JsonRpcEndpoint::socketDisconnected, watcher,
nullptr);
}
success(watcher->result());
});

if (endpoint != nullptr) {
QObject::connect(endpoint, &jcon::JsonRpcEndpoint::socketDisconnected, watcher,
[failure = std::move(failure), watcher = this->watcher, receiver] {
watcher->deleteLater();
QObject::disconnect(watcher, &QFutureWatcher<QVariant>::finished, receiver,
nullptr);
failure();
});
}

watcher->setFuture(QtConcurrent::run(threadPool, f));
}

};
39 changes: 39 additions & 0 deletions src/jcon/json_rpc_future.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <functional>
#include <QMetaType>
#include <QThreadPool>
#include <QFutureWatcher>
#include <QtConcurrent/QtConcurrentRun>
#include "json_rpc_endpoint.h"

namespace jcon {

class JsonRpcFuture {
public:
JsonRpcFuture() = default;

~JsonRpcFuture() = default;

JsonRpcFuture(const JsonRpcFuture &) = default;

JsonRpcFuture &operator=(const JsonRpcFuture &) = default;

JsonRpcFuture(std::function<QVariant()> f, QThreadPool *threadPool = QThreadPool::globalInstance());

void start(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, std::function<void(QVariant)> success,
std::function<void(void)> failure = [] {});

private:
void startPrivate(QObject *receiver, jcon::JsonRpcEndpoint *endpoint, std::function<void(QVariant)> success,
std::function<void(void)> failure = [] {});

QThreadPool *threadPool;
std::function<QVariant()> f;
QFutureWatcher<QVariant> *watcher;
std::shared_ptr<std::once_flag> scheduled;
};

};

Q_DECLARE_METATYPE(jcon::JsonRpcFuture);
34 changes: 26 additions & 8 deletions src/jcon/json_rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "json_rpc_error.h"
#include "json_rpc_file_logger.h"
#include "string_util.h"
#include "json_rpc_future.h"

#include <QJsonArray>
#include <QJsonDocument>
Expand All @@ -27,6 +28,7 @@ JsonRpcServer::JsonRpcServer(QObject* parent,
, m_logger(logger)
, m_allowNotification(false)
{
qRegisterMetaType<jcon::JsonRpcFuture>();
if (!m_logger) {
m_logger = std::make_shared<JsonRpcFileLogger>("json_server_log.txt");
}
Expand Down Expand Up @@ -146,16 +148,32 @@ void JsonRpcServer::jsonRequestReceived(const QJsonObject& request,

// send response if request had valid ID
if (request_id != InvalidRequestId) {
QJsonDocument response = createResponse(request_id,
return_value,
method_name);
if (return_value.canConvert<jcon::JsonRpcFuture>()) {
auto fut = return_value.value<jcon::JsonRpcFuture>();
fut.start(this, sm_client_endpoint, [this, request_id, method_name, endpoint = sm_client_endpoint](QVariant res) {
QJsonDocument response = createResponse(request_id,
res,
method_name);

if (!endpoint) {
logError("invalid client socket, cannot send response");
return;
}

if (!sm_client_endpoint) {
logError("invalid client socket, cannot send response");
return;
}
endpoint->send(response);
});
} else {
QJsonDocument response = createResponse(request_id,
return_value,
method_name);

sm_client_endpoint->send(response);
if (!sm_client_endpoint) {
logError("invalid client socket, cannot send response");
return;
}

sm_client_endpoint->send(response);
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ void invokeMethodAsync(jcon::JsonRpcClient* rpc_client)
});
}

void invokeFutureMethodAsync(jcon::JsonRpcClient* rpc_client)
{
qsrand(std::time(nullptr));

auto req = rpc_client->callAsync("futureGetRandomInt", 10);

req->connect(req.get(), &jcon::JsonRpcRequest::result,
[](const QVariant& result) {
qDebug() << "result of future asynchronous RPC call:" << result;
});

req->connect(req.get(), &jcon::JsonRpcRequest::error,
[](int code, const QString& message) {
qDebug() << "RPC error:" << message
<< " (" << code << ")";
});
}

void invokeMethodSync(jcon::JsonRpcClient* rpc_client)
{
qsrand(std::time(nullptr));
Expand Down Expand Up @@ -227,6 +245,7 @@ void runServerAndClient(int argc, char* argv[])

invokeNotification(rpc_client);
invokeMethodAsync(rpc_client);
invokeFutureMethodAsync(rpc_client);
invokeMethodSync(rpc_client);
invokeStringMethodSync(rpc_client);
invokeStringMethodAsync(rpc_client);
Expand Down