Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/tck_edge_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ auto TCKEdgeNode::create_edge_node(const std::string& host_id,
const std::string& edge_node_id,
const std::vector<std::string>& device_ids)
-> stdx::expected<void, std::string> {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (edge_node_) {
return stdx::unexpected("Edge Node already exists");
Expand Down
2 changes: 1 addition & 1 deletion examples/tck_host_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void TCKHostApplication::handle_prompt_specific(const std::string& message) {

auto TCKHostApplication::establish_session(const std::string& host_id)
-> stdx::expected<void, std::string> {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (host_application_) {
return {};
Expand Down
4 changes: 2 additions & 2 deletions examples/tck_test_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ TCKTestRunner::~TCKTestRunner() {
}

auto TCKTestRunner::start() -> stdx::expected<void, std::string> {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (running_) {
return stdx::unexpected("TCK test runner is already running");
Expand Down Expand Up @@ -91,7 +91,7 @@ auto TCKTestRunner::start() -> stdx::expected<void, std::string> {
}

void TCKTestRunner::stop() {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!running_) {
return;
Expand Down
6 changes: 3 additions & 3 deletions include/sparkplug/edge_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class EdgeNode {
* @note Useful for monitoring and debugging.
*/
[[nodiscard]] uint64_t get_seq() const {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
return seq_num_;
}

Expand All @@ -296,7 +296,7 @@ class EdgeNode {
* @note Used by SCADA to detect new sessions/rebirths.
*/
[[nodiscard]] uint64_t get_bd_seq() const {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
return bd_seq_num_;
}

Expand All @@ -310,7 +310,7 @@ class EdgeNode {
* @note Used to determine if NBIRTH/DBIRTH can be published.
*/
[[nodiscard]] bool is_primary_host_online() const {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
return primary_host_online_;
}

Expand Down
50 changes: 24 additions & 26 deletions src/edge_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void EdgeNode::on_connection_lost(void* context, char* cause) {
}

{
std::lock_guard<std::mutex> lock(edge_node->mutex_);
std::scoped_lock lock(edge_node->mutex_);
edge_node->is_connected_ = false;
}

Expand Down Expand Up @@ -101,7 +101,7 @@ int EdgeNode::on_message_arrived(void* context,
std::string payload_str(static_cast<const char*>(message->payload),
message->payloadlen);

std::lock_guard<std::mutex> lock(edge_node->mutex_);
std::scoped_lock lock(edge_node->mutex_);
if (payload_str.find("\"online\":true") != std::string::npos) {
edge_node->primary_host_online_ = true;
} else if (payload_str.find("\"online\":false") != std::string::npos) {
Expand Down Expand Up @@ -152,16 +152,14 @@ EdgeNode::EdgeNode(EdgeNode&& other) noexcept
device_states_(std::move(other.device_states_)), is_connected_(other.is_connected_)
// mutex_ is default-constructed (mutexes are not moveable)
{
std::lock_guard<std::mutex> lock(other.mutex_);
std::scoped_lock lock(other.mutex_);
other.is_connected_ = false;
}

EdgeNode& EdgeNode::operator=(EdgeNode&& other) noexcept {
if (this != &other) {
// Lock both mutexes in consistent order to avoid deadlock
std::lock(mutex_, other.mutex_);
std::lock_guard<std::mutex> lock1(mutex_, std::adopt_lock);
std::lock_guard<std::mutex> lock2(other.mutex_, std::adopt_lock);
// Lock both mutexes with automatic deadlock avoidance
std::scoped_lock lock(mutex_, other.mutex_);

config_ = std::move(other.config_);
client_ = std::move(other.client_);
Expand All @@ -178,23 +176,23 @@ EdgeNode& EdgeNode::operator=(EdgeNode&& other) noexcept {

void EdgeNode::set_credentials(std::optional<std::string> username,
std::optional<std::string> password) {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
config_.username = std::move(username);
config_.password = std::move(password);
}

void EdgeNode::set_tls(std::optional<TlsOptions> tls) {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
config_.tls = std::move(tls);
}

void EdgeNode::set_log_callback(std::optional<LogCallback> callback) {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
config_.log_callback = std::move(callback);
}

stdx::expected<void, std::string> EdgeNode::connect() {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

MQTTAsync raw_client = nullptr;
int rc =
Expand Down Expand Up @@ -361,7 +359,7 @@ stdx::expected<void, std::string> EdgeNode::connect() {
}

stdx::expected<void, std::string> EdgeNode::disconnect() {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!client_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -430,7 +428,7 @@ stdx::expected<void, std::string> EdgeNode::publish_birth(PayloadBuilder& payloa
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -479,7 +477,7 @@ stdx::expected<void, std::string> EdgeNode::publish_birth(PayloadBuilder& payloa
}

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
last_birth_payload_ = std::move(payload_data);
seq_num_ = 0;
}
Expand All @@ -494,7 +492,7 @@ stdx::expected<void, std::string> EdgeNode::publish_data(PayloadBuilder& payload
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -527,7 +525,7 @@ stdx::expected<void, std::string> EdgeNode::publish_death() {
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -567,7 +565,7 @@ stdx::expected<void, std::string> EdgeNode::rebirth() {
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -619,7 +617,7 @@ stdx::expected<void, std::string> EdgeNode::rebirth() {
.and_then([this, &topic_str, &payload_data, qos]() {
MQTTAsync client = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
client = client_.get();
}
return publish_message(client, topic_str, payload_data, qos, false);
Expand All @@ -630,7 +628,7 @@ stdx::expected<void, std::string> EdgeNode::rebirth() {
}

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
seq_num_ = 0;
}

Expand All @@ -645,7 +643,7 @@ EdgeNode::publish_device_birth(std::string_view device_id, PayloadBuilder& paylo
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -713,7 +711,7 @@ EdgeNode::publish_device_birth(std::string_view device_id, PayloadBuilder& paylo
}

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
auto& device_state = device_states_[std::string(device_id)];
device_state.last_birth_payload = std::move(payload_data);
device_state.is_online = true;
Expand All @@ -730,7 +728,7 @@ EdgeNode::publish_device_data(std::string_view device_id, PayloadBuilder& payloa
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -770,7 +768,7 @@ EdgeNode::publish_device_death(std::string_view device_id) {
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -806,7 +804,7 @@ EdgeNode::publish_device_death(std::string_view device_id) {
}

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
auto it = device_states_.find(device_id);
if (it != device_states_.end()) {
it->second.is_online = false;
Expand All @@ -825,7 +823,7 @@ EdgeNode::publish_node_command(std::string_view target_edge_node_id,
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down Expand Up @@ -855,7 +853,7 @@ EdgeNode::publish_device_command(std::string_view target_edge_node_id,
int qos = 0;

{
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);

if (!is_connected_) {
return stdx::unexpected("Not connected");
Expand Down
Loading
Loading