diff --git a/occ/CMakeLists.txt b/occ/CMakeLists.txt index cf2c38294..31ec6c789 100644 --- a/occ/CMakeLists.txt +++ b/occ/CMakeLists.txt @@ -165,7 +165,7 @@ find_package(Boost 1.74 REQUIRED COMPONENTS ${BOOST_COMPONENTS}) # Protobuf set(protobuf_MODULE_COMPATIBLE TRUE) -find_package(protobuf 3.14.0 CONFIG REQUIRED) +find_package(protobuf CONFIG REQUIRED) message(STATUS "Using protobuf ${protobuf_VERSION}") # OpenSSL on Mac diff --git a/occ/cmake/OccConfig.cmake.in b/occ/cmake/OccConfig.cmake.in index fdf2c248a..609e2952f 100644 --- a/occ/cmake/OccConfig.cmake.in +++ b/occ/cmake/OccConfig.cmake.in @@ -31,7 +31,7 @@ include(CMakeFindDependencyMacro) list(APPEND CMAKE_MODULE_PATH ${Occ_CMAKE_DIR}) find_dependency(Boost 1.68 REQUIRED COMPONENTS program_options) set(protobuf_MODULE_COMPATIBLE TRUE) -find_dependency(protobuf 3.7.1 CONFIG REQUIRED) +find_dependency(protobuf CONFIG REQUIRED) find_dependency(gRPC 1.19.1 CONFIG REQUIRED) list(REMOVE_AT CMAKE_MODULE_PATH -1) diff --git a/occ/occlib/OccServer.cxx b/occ/occlib/OccServer.cxx index 6947624c2..347ae8783 100644 --- a/occ/occlib/OccServer.cxx +++ b/occ/occlib/OccServer.cxx @@ -68,7 +68,7 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context, boost::uuids::basic_random_generator gen; std::string id = boost::uuids::to_string(gen()); - boost::lockfree::queue eventQueue; + boost::lockfree::queue eventQueue; m_eventQueues[id] = &eventQueue; DEFER({ m_eventQueues.erase(id); @@ -76,14 +76,14 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context, bool isStreamOpen = true; while (!m_destroying && isStreamOpen && m_rco->getState() != t_State::done) { - pb::DeviceEvent *newEvent; + occ_pb::DeviceEvent *newEvent; bool ok = eventQueue.pop(newEvent); if (!ok) { // queue empty, sleep and retry std::this_thread::sleep_for(2ms); continue; } - pb::EventStreamReply response; + occ_pb::EventStreamReply response; if (newEvent) { response.mutable_event()->CopyFrom(*newEvent); isStreamOpen = writer->Write(response); @@ -94,8 +94,8 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context, } grpc::Status OccServer::StateStream(grpc::ServerContext* context, - const pb::StateStreamRequest* request, - grpc::ServerWriter* writer) + const occ_pb::StateStreamRequest* request, + grpc::ServerWriter* writer) { (void) context; (void) request; @@ -118,8 +118,8 @@ grpc::Status OccServer::StateStream(grpc::ServerContext* context, continue; } - pb::StateStreamReply response; - response.set_type(pb::STATE_STABLE); + occ_pb::StateStreamReply response; + response.set_type(occ_pb::STATE_STABLE); response.set_state(getStringFromState(newState)); if (newState != t_State::done) { @@ -133,8 +133,8 @@ grpc::Status OccServer::StateStream(grpc::ServerContext* context, } grpc::Status OccServer::GetState(grpc::ServerContext* context, - const pb::GetStateRequest* request, - pb::GetStateReply* response) + const occ_pb::GetStateRequest* request, + occ_pb::GetStateReply* response) { std::lock_guard lock(m_mu); @@ -159,8 +159,8 @@ grpc::Status OccServer::GetState(grpc::ServerContext* context, * @return the status, either grpc::Status::OK or an error status */ grpc::Status OccServer::Transition(grpc::ServerContext* context, - const pb::TransitionRequest* request, - pb::TransitionReply* response) + const occ_pb::TransitionRequest* request, + occ_pb::TransitionReply* response) { std::lock_guard lock(m_mu); @@ -213,11 +213,11 @@ grpc::Status OccServer::Transition(grpc::ServerContext* context, response->set_transitionevent(request->transitionevent()); response->set_ok(newStateStr == finalState); if (newState == error) { // ERROR state - response->set_trigger(pb::DEVICE_ERROR); + response->set_trigger(occ_pb::DEVICE_ERROR); } else if (newStateStr == finalState) { // correct destination state - response->set_trigger(pb::EXECUTOR); + response->set_trigger(occ_pb::EXECUTOR); } else { // some other state, for whatever reason - we assume DEVICE_INTENTIONAL - response->set_trigger(pb::DEVICE_INTENTIONAL); + response->set_trigger(occ_pb::DEVICE_INTENTIONAL); } std::cout << "[OCC] new state: " << newStateStr << std::endl; @@ -394,14 +394,14 @@ void OccServer::publishState(t_State s) } } -void OccServer::pushEvent(pb::DeviceEvent* event) +void OccServer::pushEvent(occ_pb::DeviceEvent* event) { for (auto item : m_eventQueues) { item.second->push(event); } printf("[OCC] Object: %s - pushing event = %s\n", m_rco->getName().c_str(), - pb::DeviceEventType_Name(event->type()).c_str()); + occ_pb::DeviceEventType_Name(event->type()).c_str()); } bool OccServer::checkMachineDone() @@ -428,8 +428,8 @@ void OccServer::runChecker() int err = m_rco->iterateRunning(); if (err == 1) { // signal EndOfData event endOfData = true; - auto eodEvent = new pb::DeviceEvent; - eodEvent->set_type(pb::END_OF_STREAM); + auto eodEvent = new occ_pb::DeviceEvent; + eodEvent->set_type(occ_pb::END_OF_STREAM); pushEvent(eodEvent); } else if (err) { @@ -446,8 +446,8 @@ void OccServer::runChecker() // the above publishes a state change event to the StateStream, but we also push an exception event on the // EventStream because the transition was initiated by the task - auto taskErrorEvent = new pb::DeviceEvent; - taskErrorEvent->set_type(pb::TASK_INTERNAL_ERROR); + auto taskErrorEvent = new occ_pb::DeviceEvent; + taskErrorEvent->set_type(occ_pb::TASK_INTERNAL_ERROR); pushEvent(taskErrorEvent); } } diff --git a/occ/occlib/OccServer.h b/occ/occlib/OccServer.h index 3eb0f20cd..d05ba742c 100644 --- a/occ/occlib/OccServer.h +++ b/occ/occlib/OccServer.h @@ -36,8 +36,6 @@ #include #include -namespace pb = occ_pb; - namespace boost { namespace property_tree @@ -61,7 +59,7 @@ const std::unordered_map EXPECTED_FINAL_STATE = { {"RECOVER", "STANDBY"}, }; -class OccServer final : public pb::Occ::Service +class OccServer final : public occ_pb::Occ::Service { public: /** @@ -82,20 +80,20 @@ class OccServer final : public pb::Occ::Service virtual ~OccServer(); grpc::Status EventStream(grpc::ServerContext* context, - const pb::EventStreamRequest* request, - grpc::ServerWriter* writer) override; + const occ_pb::EventStreamRequest* request, + grpc::ServerWriter* writer) override; grpc::Status StateStream(grpc::ServerContext* context, - const pb::StateStreamRequest* request, - grpc::ServerWriter* writer) override; + const occ_pb::StateStreamRequest* request, + grpc::ServerWriter* writer) override; grpc::Status GetState(grpc::ServerContext* context, - const pb::GetStateRequest* request, - pb::GetStateReply* response) override; + const occ_pb::GetStateRequest* request, + occ_pb::GetStateReply* response) override; grpc::Status Transition(grpc::ServerContext* context, - const pb::TransitionRequest* request, - pb::TransitionReply* response) override; + const occ_pb::TransitionRequest* request, + occ_pb::TransitionReply* response) override; bool checkMachineDone(); @@ -104,7 +102,7 @@ class OccServer final : public pb::Occ::Service void updateState(t_State s); void publishState(t_State s); - void pushEvent(pb::DeviceEvent* event); + void pushEvent(occ_pb::DeviceEvent* event); void runChecker(); @@ -116,7 +114,7 @@ class OccServer final : public pb::Occ::Service bool m_machineDone; std::unordered_map* > m_stateQueues; - std::unordered_map* > m_eventQueues; + std::unordered_map* > m_eventQueues; }; diff --git a/occ/plugin/OccPluginServer.cxx b/occ/plugin/OccPluginServer.cxx index 3f96a0c2a..05b079317 100644 --- a/occ/plugin/OccPluginServer.cxx +++ b/occ/plugin/OccPluginServer.cxx @@ -70,9 +70,9 @@ OccPluginServer::EventStream(grpc::ServerContext* context, if (state == "EXITING") { std::unique_lock finished_lk(finished_mu); - auto nilEvent = new pb::DeviceEvent(); - nilEvent->set_type(pb::NULL_DEVICE_EVENT); - pb::EventStreamReply response; + auto nilEvent = new occ_pb::DeviceEvent(); + nilEvent->set_type(occ_pb::NULL_DEVICE_EVENT); + occ_pb::EventStreamReply response; response.mutable_event()->CopyFrom(*nilEvent); writer->WriteLast(response, grpc::WriteOptions()); @@ -100,8 +100,8 @@ OccPluginServer::EventStream(grpc::ServerContext* context, grpc::Status OccPluginServer::StateStream(grpc::ServerContext* context, - const pb::StateStreamRequest* request, - grpc::ServerWriter* writer) + const occ_pb::StateStreamRequest* request, + grpc::ServerWriter* writer) { (void) context; @@ -116,14 +116,14 @@ OccPluginServer::StateStream(grpc::ServerContext* context, std::lock_guard lock(writer_mu); auto state = fair::mq::PluginServices::ToStr(reachedState); last_known_state = state; - pb::StateType sType = isIntermediateFMQState(state) ? pb::STATE_INTERMEDIATE : pb::STATE_STABLE; + occ_pb::StateType sType = isIntermediateFMQState(state) ? occ_pb::STATE_INTERMEDIATE : occ_pb::STATE_STABLE; - pb::StateStreamReply response; + occ_pb::StateStreamReply response; response.set_type(sType); response.set_state(state); OLOG(debug) << "[StateStream] new state: " << state << "; type: " - << pb::StateType_Name(sType); + << occ_pb::StateType_Name(sType); if (state != "EXITING") { writer->Write(response); @@ -151,8 +151,8 @@ OccPluginServer::StateStream(grpc::ServerContext* context, } grpc::Status OccPluginServer::GetState(grpc::ServerContext* context, - const pb::GetStateRequest* request, - pb::GetStateReply* response) + const occ_pb::GetStateRequest* request, + occ_pb::GetStateReply* response) { std::lock_guard lock(m_mu); @@ -178,8 +178,8 @@ grpc::Status OccPluginServer::GetState(grpc::ServerContext* context, */ grpc::Status OccPluginServer::Transition(grpc::ServerContext* context, - const pb::TransitionRequest* request, - pb::TransitionReply* response) + const occ_pb::TransitionRequest* request, + occ_pb::TransitionReply* response) { // Valid FairMQ state machine transitions, mapped to DeviceStateTransition objects: // {DeviceStateTransition::Auto, "Auto"}, // ever needed? @@ -238,7 +238,7 @@ OccPluginServer::Transition(grpc::ServerContext* context, auto nopbResponse = std::get<0>(transitionOutcome); response->set_state(nopbResponse.state); - response->set_trigger(static_cast(nopbResponse.trigger)); + response->set_trigger(static_cast(nopbResponse.trigger)); response->set_transitionevent(nopbResponse.transitionEvent); response->set_ok(nopbResponse.ok); diff --git a/occ/plugin/OccPluginServer.h b/occ/plugin/OccPluginServer.h index 53c16bdc1..5f12a8019 100644 --- a/occ/plugin/OccPluginServer.h +++ b/occ/plugin/OccPluginServer.h @@ -32,8 +32,6 @@ #include -namespace pb = occ_pb; - namespace fair { namespace mq @@ -42,7 +40,7 @@ class PluginServices; } } -class OccPluginServer final : public pb::Occ::Service +class OccPluginServer final : public occ_pb::Occ::Service { public: explicit OccPluginServer(fair::mq::PluginServices*); @@ -51,20 +49,20 @@ class OccPluginServer final : public pb::Occ::Service {} grpc::Status EventStream(grpc::ServerContext* context, - const pb::EventStreamRequest* request, - grpc::ServerWriter* writer) override; + const occ_pb::EventStreamRequest* request, + grpc::ServerWriter* writer) override; grpc::Status StateStream(grpc::ServerContext* context, - const pb::StateStreamRequest* request, - grpc::ServerWriter* writer) override; + const occ_pb::StateStreamRequest* request, + grpc::ServerWriter* writer) override; grpc::Status GetState(grpc::ServerContext* context, - const pb::GetStateRequest* request, - pb::GetStateReply* response) override; + const occ_pb::GetStateRequest* request, + occ_pb::GetStateReply* response) override; grpc::Status Transition(grpc::ServerContext* context, - const pb::TransitionRequest* request, - pb::TransitionReply* response) override; + const occ_pb::TransitionRequest* request, + occ_pb::TransitionReply* response) override; private: fair::mq::PluginServices* m_pluginServices; diff --git a/occ/plugin/litestructs/EventStream.h b/occ/plugin/litestructs/EventStream.h index 1d3e19c74..a6a80cb02 100644 --- a/occ/plugin/litestructs/EventStream.h +++ b/occ/plugin/litestructs/EventStream.h @@ -90,7 +90,7 @@ class SerializationTraits { *buffer = *source.JsonMessage::SerializeToByteBuffer(); *own_buffer = true; - return g_core_codegen_interface->ok(); + return Status::OK; } }; @@ -111,7 +111,7 @@ class SerializationTraits { *buffer = *source.JsonMessage::SerializeToByteBuffer(); *own_buffer = true; - return g_core_codegen_interface->ok(); + return Status::OK; } }; diff --git a/occ/plugin/litestructs/GetState.h b/occ/plugin/litestructs/GetState.h index cd2679595..f88dafb3d 100644 --- a/occ/plugin/litestructs/GetState.h +++ b/occ/plugin/litestructs/GetState.h @@ -73,7 +73,7 @@ class SerializationTraits { *buffer = *source.JsonMessage::SerializeToByteBuffer(); *own_buffer = true; - return g_core_codegen_interface->ok(); + return Status::OK; } }; @@ -94,7 +94,7 @@ class SerializationTraits { *buffer = *source.JsonMessage::SerializeToByteBuffer(); *own_buffer = true; - return g_core_codegen_interface->ok(); + return Status::OK; } }; diff --git a/occ/plugin/litestructs/JsonMessage.cxx b/occ/plugin/litestructs/JsonMessage.cxx index 1249c71dc..f2fbac4a1 100644 --- a/occ/plugin/litestructs/JsonMessage.cxx +++ b/occ/plugin/litestructs/JsonMessage.cxx @@ -23,6 +23,7 @@ */ #include "JsonMessage.h" +#include std::string OccLite::nopb::JsonMessage::Serialize() const @@ -72,7 +73,7 @@ bool OccLite::nopb::JsonMessage::Deserialize(::grpc::ByteBuffer* byte_buffer) auto rawSlice = sl->c_slice(); std::string str = grpc::StringFromCopiedSlice(rawSlice); ss << str; - ::grpc::g_core_codegen_interface->grpc_slice_unref(rawSlice); + ::grpc_slice_unref(rawSlice); } OLOG(detail) << "Deserialized JsonMessage: " << ss.str(); diff --git a/occ/plugin/litestructs/Transition.h b/occ/plugin/litestructs/Transition.h index 71ea37c35..b856dffb6 100644 --- a/occ/plugin/litestructs/Transition.h +++ b/occ/plugin/litestructs/Transition.h @@ -95,7 +95,7 @@ class SerializationTraits { *buffer = *source.JsonMessage::SerializeToByteBuffer(); *own_buffer = true; - return g_core_codegen_interface->ok(); + return Status::OK; } }; @@ -116,7 +116,7 @@ class SerializationTraits { *buffer = *source.JsonMessage::SerializeToByteBuffer(); *own_buffer = true; - return g_core_codegen_interface->ok(); + return Status::OK; } };