Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion occ/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ find_package(Boost 1.74 REQUIRED COMPONENTS ${BOOST_COMPONENTS})

# Protobuf
set(protobuf_MODULE_COMPATIBLE TRUE)
find_package(protobuf 3.14.0 CONFIG REQUIRED)
find_package(protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${protobuf_VERSION}")

# OpenSSL on Mac
Expand Down
2 changes: 1 addition & 1 deletion occ/cmake/OccConfig.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ include(CMakeFindDependencyMacro)
list(APPEND CMAKE_MODULE_PATH ${Occ_CMAKE_DIR})
find_dependency(Boost 1.68 REQUIRED COMPONENTS program_options)
set(protobuf_MODULE_COMPATIBLE TRUE)
find_dependency(protobuf 3.7.1 CONFIG REQUIRED)
find_dependency(protobuf CONFIG REQUIRED)
find_dependency(gRPC 1.19.1 CONFIG REQUIRED)

list(REMOVE_AT CMAKE_MODULE_PATH -1)
Expand Down
40 changes: 20 additions & 20 deletions occ/occlib/OccServer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,22 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context,
boost::uuids::basic_random_generator<boost::mt19937> gen;
std::string id = boost::uuids::to_string(gen());

boost::lockfree::queue<pb::DeviceEvent*> eventQueue;
boost::lockfree::queue<occ_pb::DeviceEvent*> eventQueue;
m_eventQueues[id] = &eventQueue;
DEFER({
m_eventQueues.erase(id);
});

bool isStreamOpen = true;
while (!m_destroying && isStreamOpen && m_rco->getState() != t_State::done) {
pb::DeviceEvent *newEvent;
occ_pb::DeviceEvent *newEvent;
bool ok = eventQueue.pop(newEvent);
if (!ok) { // queue empty, sleep and retry
std::this_thread::sleep_for(2ms);
continue;
}

pb::EventStreamReply response;
occ_pb::EventStreamReply response;
if (newEvent) {
response.mutable_event()->CopyFrom(*newEvent);
isStreamOpen = writer->Write(response);
Expand All @@ -94,8 +94,8 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context,
}

grpc::Status OccServer::StateStream(grpc::ServerContext* context,
const pb::StateStreamRequest* request,
grpc::ServerWriter<pb::StateStreamReply>* writer)
const occ_pb::StateStreamRequest* request,
grpc::ServerWriter<occ_pb::StateStreamReply>* writer)
{
(void) context;
(void) request;
Expand All @@ -118,8 +118,8 @@ grpc::Status OccServer::StateStream(grpc::ServerContext* context,
continue;
}

pb::StateStreamReply response;
response.set_type(pb::STATE_STABLE);
occ_pb::StateStreamReply response;
response.set_type(occ_pb::STATE_STABLE);
response.set_state(getStringFromState(newState));

if (newState != t_State::done) {
Expand All @@ -133,8 +133,8 @@ grpc::Status OccServer::StateStream(grpc::ServerContext* context,
}

grpc::Status OccServer::GetState(grpc::ServerContext* context,
const pb::GetStateRequest* request,
pb::GetStateReply* response)
const occ_pb::GetStateRequest* request,
occ_pb::GetStateReply* response)
{
std::lock_guard<std::mutex> lock(m_mu);

Expand All @@ -159,8 +159,8 @@ grpc::Status OccServer::GetState(grpc::ServerContext* context,
* @return the status, either grpc::Status::OK or an error status
*/
grpc::Status OccServer::Transition(grpc::ServerContext* context,
const pb::TransitionRequest* request,
pb::TransitionReply* response)
const occ_pb::TransitionRequest* request,
occ_pb::TransitionReply* response)
{
std::lock_guard<std::mutex> lock(m_mu);

Expand Down Expand Up @@ -213,11 +213,11 @@ grpc::Status OccServer::Transition(grpc::ServerContext* context,
response->set_transitionevent(request->transitionevent());
response->set_ok(newStateStr == finalState);
if (newState == error) { // ERROR state
response->set_trigger(pb::DEVICE_ERROR);
response->set_trigger(occ_pb::DEVICE_ERROR);
} else if (newStateStr == finalState) { // correct destination state
response->set_trigger(pb::EXECUTOR);
response->set_trigger(occ_pb::EXECUTOR);
} else { // some other state, for whatever reason - we assume DEVICE_INTENTIONAL
response->set_trigger(pb::DEVICE_INTENTIONAL);
response->set_trigger(occ_pb::DEVICE_INTENTIONAL);
}

std::cout << "[OCC] new state: " << newStateStr << std::endl;
Expand Down Expand Up @@ -394,14 +394,14 @@ void OccServer::publishState(t_State s)
}
}

void OccServer::pushEvent(pb::DeviceEvent* event)
void OccServer::pushEvent(occ_pb::DeviceEvent* event)
{
for (auto item : m_eventQueues) {
item.second->push(event);
}
printf("[OCC] Object: %s - pushing event = %s\n",
m_rco->getName().c_str(),
pb::DeviceEventType_Name(event->type()).c_str());
occ_pb::DeviceEventType_Name(event->type()).c_str());
}

bool OccServer::checkMachineDone()
Expand All @@ -428,8 +428,8 @@ void OccServer::runChecker()
int err = m_rco->iterateRunning();
if (err == 1) { // signal EndOfData event
endOfData = true;
auto eodEvent = new pb::DeviceEvent;
eodEvent->set_type(pb::END_OF_STREAM);
auto eodEvent = new occ_pb::DeviceEvent;
eodEvent->set_type(occ_pb::END_OF_STREAM);
pushEvent(eodEvent);
}
else if (err) {
Expand All @@ -446,8 +446,8 @@ void OccServer::runChecker()

// the above publishes a state change event to the StateStream, but we also push an exception event on the
// EventStream because the transition was initiated by the task
auto taskErrorEvent = new pb::DeviceEvent;
taskErrorEvent->set_type(pb::TASK_INTERNAL_ERROR);
auto taskErrorEvent = new occ_pb::DeviceEvent;
taskErrorEvent->set_type(occ_pb::TASK_INTERNAL_ERROR);
pushEvent(taskErrorEvent);
}
}
Expand Down
24 changes: 11 additions & 13 deletions occ/occlib/OccServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
#include <mutex>
#include <thread>

namespace pb = occ_pb;


namespace boost {
namespace property_tree
Expand All @@ -61,7 +59,7 @@ const std::unordered_map<std::string, std::string> EXPECTED_FINAL_STATE = {
{"RECOVER", "STANDBY"},
};

class OccServer final : public pb::Occ::Service
class OccServer final : public occ_pb::Occ::Service
{
public:
/**
Expand All @@ -82,20 +80,20 @@ class OccServer final : public pb::Occ::Service
virtual ~OccServer();

grpc::Status EventStream(grpc::ServerContext* context,
const pb::EventStreamRequest* request,
grpc::ServerWriter<pb::EventStreamReply>* writer) override;
const occ_pb::EventStreamRequest* request,
grpc::ServerWriter<occ_pb::EventStreamReply>* writer) override;

grpc::Status StateStream(grpc::ServerContext* context,
const pb::StateStreamRequest* request,
grpc::ServerWriter<pb::StateStreamReply>* writer) override;
const occ_pb::StateStreamRequest* request,
grpc::ServerWriter<occ_pb::StateStreamReply>* writer) override;

grpc::Status GetState(grpc::ServerContext* context,
const pb::GetStateRequest* request,
pb::GetStateReply* response) override;
const occ_pb::GetStateRequest* request,
occ_pb::GetStateReply* response) override;

grpc::Status Transition(grpc::ServerContext* context,
const pb::TransitionRequest* request,
pb::TransitionReply* response) override;
const occ_pb::TransitionRequest* request,
occ_pb::TransitionReply* response) override;

bool checkMachineDone();

Expand All @@ -104,7 +102,7 @@ class OccServer final : public pb::Occ::Service
void updateState(t_State s);

void publishState(t_State s);
void pushEvent(pb::DeviceEvent* event);
void pushEvent(occ_pb::DeviceEvent* event);

void runChecker();

Expand All @@ -116,7 +114,7 @@ class OccServer final : public pb::Occ::Service
bool m_machineDone;

std::unordered_map<std::string, boost::lockfree::queue<t_State>* > m_stateQueues;
std::unordered_map<std::string, boost::lockfree::queue<pb::DeviceEvent*>* > m_eventQueues;
std::unordered_map<std::string, boost::lockfree::queue<occ_pb::DeviceEvent*>* > m_eventQueues;
};


Expand Down
26 changes: 13 additions & 13 deletions occ/plugin/OccPluginServer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ OccPluginServer::EventStream(grpc::ServerContext* context,
if (state == "EXITING") {
std::unique_lock<std::mutex> finished_lk(finished_mu);

auto nilEvent = new pb::DeviceEvent();
nilEvent->set_type(pb::NULL_DEVICE_EVENT);
pb::EventStreamReply response;
auto nilEvent = new occ_pb::DeviceEvent();
nilEvent->set_type(occ_pb::NULL_DEVICE_EVENT);
occ_pb::EventStreamReply response;
response.mutable_event()->CopyFrom(*nilEvent);

writer->WriteLast(response, grpc::WriteOptions());
Expand Down Expand Up @@ -100,8 +100,8 @@ OccPluginServer::EventStream(grpc::ServerContext* context,

grpc::Status
OccPluginServer::StateStream(grpc::ServerContext* context,
const pb::StateStreamRequest* request,
grpc::ServerWriter<pb::StateStreamReply>* writer)
const occ_pb::StateStreamRequest* request,
grpc::ServerWriter<occ_pb::StateStreamReply>* writer)
{

(void) context;
Expand All @@ -116,14 +116,14 @@ OccPluginServer::StateStream(grpc::ServerContext* context,
std::lock_guard<std::mutex> lock(writer_mu);
auto state = fair::mq::PluginServices::ToStr(reachedState);
last_known_state = state;
pb::StateType sType = isIntermediateFMQState(state) ? pb::STATE_INTERMEDIATE : pb::STATE_STABLE;
occ_pb::StateType sType = isIntermediateFMQState(state) ? occ_pb::STATE_INTERMEDIATE : occ_pb::STATE_STABLE;

pb::StateStreamReply response;
occ_pb::StateStreamReply response;
response.set_type(sType);
response.set_state(state);

OLOG(debug) << "[StateStream] new state: " << state << "; type: "
<< pb::StateType_Name(sType);
<< occ_pb::StateType_Name(sType);

if (state != "EXITING") {
writer->Write(response);
Expand Down Expand Up @@ -151,8 +151,8 @@ OccPluginServer::StateStream(grpc::ServerContext* context,
}

grpc::Status OccPluginServer::GetState(grpc::ServerContext* context,
const pb::GetStateRequest* request,
pb::GetStateReply* response)
const occ_pb::GetStateRequest* request,
occ_pb::GetStateReply* response)
{
std::lock_guard<std::mutex> lock(m_mu);

Expand All @@ -178,8 +178,8 @@ grpc::Status OccPluginServer::GetState(grpc::ServerContext* context,
*/
grpc::Status
OccPluginServer::Transition(grpc::ServerContext* context,
const pb::TransitionRequest* request,
pb::TransitionReply* response)
const occ_pb::TransitionRequest* request,
occ_pb::TransitionReply* response)
{
// Valid FairMQ state machine transitions, mapped to DeviceStateTransition objects:
// {DeviceStateTransition::Auto, "Auto"}, // ever needed?
Expand Down Expand Up @@ -238,7 +238,7 @@ OccPluginServer::Transition(grpc::ServerContext* context,

auto nopbResponse = std::get<0>(transitionOutcome);
response->set_state(nopbResponse.state);
response->set_trigger(static_cast<pb::StateChangeTrigger>(nopbResponse.trigger));
response->set_trigger(static_cast<occ_pb::StateChangeTrigger>(nopbResponse.trigger));
response->set_transitionevent(nopbResponse.transitionEvent);
response->set_ok(nopbResponse.ok);

Expand Down
20 changes: 9 additions & 11 deletions occ/plugin/OccPluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

#include <mutex>

namespace pb = occ_pb;

namespace fair
{
namespace mq
Expand All @@ -42,7 +40,7 @@ class PluginServices;
}
}

class OccPluginServer final : public pb::Occ::Service
class OccPluginServer final : public occ_pb::Occ::Service
{
public:
explicit OccPluginServer(fair::mq::PluginServices*);
Expand All @@ -51,20 +49,20 @@ class OccPluginServer final : public pb::Occ::Service
{}

grpc::Status EventStream(grpc::ServerContext* context,
const pb::EventStreamRequest* request,
grpc::ServerWriter<pb::EventStreamReply>* writer) override;
const occ_pb::EventStreamRequest* request,
grpc::ServerWriter<occ_pb::EventStreamReply>* writer) override;

grpc::Status StateStream(grpc::ServerContext* context,
const pb::StateStreamRequest* request,
grpc::ServerWriter<pb::StateStreamReply>* writer) override;
const occ_pb::StateStreamRequest* request,
grpc::ServerWriter<occ_pb::StateStreamReply>* writer) override;

grpc::Status GetState(grpc::ServerContext* context,
const pb::GetStateRequest* request,
pb::GetStateReply* response) override;
const occ_pb::GetStateRequest* request,
occ_pb::GetStateReply* response) override;

grpc::Status Transition(grpc::ServerContext* context,
const pb::TransitionRequest* request,
pb::TransitionReply* response) override;
const occ_pb::TransitionRequest* request,
occ_pb::TransitionReply* response) override;

private:
fair::mq::PluginServices* m_pluginServices;
Expand Down
4 changes: 2 additions & 2 deletions occ/plugin/litestructs/EventStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SerializationTraits<OccLite::nopb::EventStreamRequest, void>
{
*buffer = *source.JsonMessage::SerializeToByteBuffer();
*own_buffer = true;
return g_core_codegen_interface->ok();
return Status::OK;
}
};

Expand All @@ -111,7 +111,7 @@ class SerializationTraits<OccLite::nopb::EventStreamResponse, void>
{
*buffer = *source.JsonMessage::SerializeToByteBuffer();
*own_buffer = true;
return g_core_codegen_interface->ok();
return Status::OK;
}
};

Expand Down
4 changes: 2 additions & 2 deletions occ/plugin/litestructs/GetState.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class SerializationTraits<OccLite::nopb::GetStateRequest, void>
{
*buffer = *source.JsonMessage::SerializeToByteBuffer();
*own_buffer = true;
return g_core_codegen_interface->ok();
return Status::OK;
}
};

Expand All @@ -94,7 +94,7 @@ class SerializationTraits<OccLite::nopb::GetStateResponse, void>
{
*buffer = *source.JsonMessage::SerializeToByteBuffer();
*own_buffer = true;
return g_core_codegen_interface->ok();
return Status::OK;
}
};

Expand Down
3 changes: 2 additions & 1 deletion occ/plugin/litestructs/JsonMessage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/

#include "JsonMessage.h"
#include <grpc/slice.h>


std::string OccLite::nopb::JsonMessage::Serialize() const
Expand Down Expand Up @@ -72,7 +73,7 @@ bool OccLite::nopb::JsonMessage::Deserialize(::grpc::ByteBuffer* byte_buffer)
auto rawSlice = sl->c_slice();
std::string str = grpc::StringFromCopiedSlice(rawSlice);
ss << str;
::grpc::g_core_codegen_interface->grpc_slice_unref(rawSlice);
::grpc_slice_unref(rawSlice);
}

OLOG(detail) << "Deserialized JsonMessage: " << ss.str();
Expand Down
4 changes: 2 additions & 2 deletions occ/plugin/litestructs/Transition.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class SerializationTraits<OccLite::nopb::TransitionRequest, void>
{
*buffer = *source.JsonMessage::SerializeToByteBuffer();
*own_buffer = true;
return g_core_codegen_interface->ok();
return Status::OK;
}
};

Expand All @@ -116,7 +116,7 @@ class SerializationTraits<OccLite::nopb::TransitionResponse, void>
{
*buffer = *source.JsonMessage::SerializeToByteBuffer();
*own_buffer = true;
return g_core_codegen_interface->ok();
return Status::OK;
}
};

Expand Down