summaryrefslogtreecommitdiffstats
path: root/src/rpc-server
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-08-12 15:13:25 +0300
committerimarom <imarom@cisco.com>2015-08-12 15:13:25 +0300
commit218b12788ace0683918dc335639cdd92c4e33ab5 (patch)
tree19ff4056f67be6fcf7da5c166dc5bfcab90ecab2 /src/rpc-server
parente69bd2e2858c21421e1fb05aee58746cbbfeeea5 (diff)
adding some files
unsteady version
Diffstat (limited to 'src/rpc-server')
-rw-r--r--src/rpc-server/include/trex_rpc_req_resp.h51
-rw-r--r--src/rpc-server/include/trex_rpc_server_api.h114
-rw-r--r--src/rpc-server/src/trex_rpc_req_resp.cpp112
-rw-r--r--src/rpc-server/src/trex_rpc_server.cpp100
4 files changed, 377 insertions, 0 deletions
diff --git a/src/rpc-server/include/trex_rpc_req_resp.h b/src/rpc-server/include/trex_rpc_req_resp.h
new file mode 100644
index 00000000..46d01579
--- /dev/null
+++ b/src/rpc-server/include/trex_rpc_req_resp.h
@@ -0,0 +1,51 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#ifndef __TREX_RPC_REQ_RESP_API_H__
+#define __TREX_RPC_REQ_RESP_API_H__
+
+#include <trex_rpc_server_api.h>
+
+/**
+ * request-response RPC server
+ *
+ * @author imarom (11-Aug-15)
+ */
+class TrexRpcServerReqRes : public TrexRpcServerInterface {
+public:
+
+ TrexRpcServerReqRes(TrexRpcServerArray::protocol_type_e protocol, uint16_t port);
+
+protected:
+ void _rpc_thread_cb();
+ void _stop_rpc_thread();
+
+private:
+ void handle_request(const uint8_t *msg, uint32_t size);
+
+ static const int RPC_MAX_MSG_SIZE = 2048;
+ void *m_context;
+ void *m_socket;
+ uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE];
+};
+
+
+#endif /* __TREX_RPC_REQ_RESP_API_H__ */
diff --git a/src/rpc-server/include/trex_rpc_server_api.h b/src/rpc-server/include/trex_rpc_server_api.h
new file mode 100644
index 00000000..98e5f977
--- /dev/null
+++ b/src/rpc-server/include/trex_rpc_server_api.h
@@ -0,0 +1,114 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#ifndef __TREX_RPC_SERVER_API_H__
+#define __TREX_RPC_SERVER_API_H__
+
+#include <stdint.h>
+#include <vector>
+#include <thread>
+#include <string>
+#include <stdexcept>
+
+/**
+ * generic exception for RPC errors
+ *
+ */
+class TrexRpcException : public std::runtime_error
+{
+public:
+ TrexRpcException(const std::string &what) : std::runtime_error(what) {
+ }
+};
+
+/* forward decl. of class */
+class TrexRpcServerInterface;
+
+/**
+ * servers array
+ *
+ * @author imarom (12-Aug-15)
+ */
+class TrexRpcServerArray {
+public:
+ /**
+ * different types the RPC server supports
+ */
+ enum protocol_type_e {
+ RPC_PROT_TCP
+ };
+
+ TrexRpcServerArray(protocol_type_e protocol, uint16_t port);
+ ~TrexRpcServerArray();
+
+ void start();
+ void stop();
+
+private:
+ std::vector<TrexRpcServerInterface *> m_servers;
+ protocol_type_e m_protocol;
+ uint16_t m_port;
+};
+
+/**
+ * generic type RPC server instance
+ *
+ * @author imarom (12-Aug-15)
+ */
+class TrexRpcServerInterface {
+public:
+
+ TrexRpcServerInterface(TrexRpcServerArray::protocol_type_e protocol, uint16_t port);
+ virtual ~TrexRpcServerInterface();
+
+ /**
+ * starts the server
+ *
+ */
+ void start();
+
+ /**
+ * stops the server
+ *
+ */
+ void stop();
+
+ /**
+ * return TRUE if server is active
+ *
+ */
+ bool is_running();
+
+protected:
+ /**
+ * instances implement this
+ *
+ */
+ virtual void _rpc_thread_cb() = 0;
+ virtual void _stop_rpc_thread() = 0;
+
+ TrexRpcServerArray::protocol_type_e m_protocol;
+ uint16_t m_port;
+ bool m_is_running;
+ std::thread *m_thread;
+};
+
+#endif /* __TREX_RPC_SERVER_API_H__ */
diff --git a/src/rpc-server/src/trex_rpc_req_resp.cpp b/src/rpc-server/src/trex_rpc_req_resp.cpp
new file mode 100644
index 00000000..a64129aa
--- /dev/null
+++ b/src/rpc-server/src/trex_rpc_req_resp.cpp
@@ -0,0 +1,112 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_rpc_server_api.h>
+#include <trex_rpc_req_resp.h>
+
+#include <unistd.h>
+#include <sstream>
+#include <iostream>
+
+#include <zmq.h>
+#include <json/json.h>
+
+
+TrexRpcServerReqRes::TrexRpcServerReqRes(TrexRpcServerArray::protocol_type_e protocol, uint16_t port) : TrexRpcServerInterface(protocol, port) {
+ /* ZMQ is not thread safe - this should be outside */
+ m_context = zmq_ctx_new();
+}
+
+void TrexRpcServerReqRes::_rpc_thread_cb() {
+ std::stringstream ss;
+
+ // Socket to talk to clients
+ m_socket = zmq_socket (m_context, ZMQ_REP);
+
+ switch (m_protocol) {
+ case TrexRpcServerArray::RPC_PROT_TCP:
+ ss << "tcp://*:";
+ break;
+ default:
+ throw TrexRpcException("unknown protocol for RPC");
+ }
+
+ ss << m_port;
+
+ int rc = zmq_bind (m_socket, ss.str().c_str());
+ if (rc != 0) {
+ throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
+ }
+
+ printf("listening on %s\n", ss.str().c_str());
+
+ /* server main loop */
+ while (m_is_running) {
+ int msg_size = zmq_recv (m_socket, m_msg_buffer, sizeof(m_msg_buffer), 0);
+
+ if (msg_size == -1) {
+ /* normal shutdown and zmq_term was called */
+ if (errno == ETERM) {
+ break;
+ } else {
+ throw TrexRpcException("Unhandled error of zmq_recv");
+ }
+ }
+
+ handle_request(m_msg_buffer, msg_size);
+ }
+
+ /* must be done from the same thread */
+ zmq_close(m_socket);
+}
+
+void TrexRpcServerReqRes::_stop_rpc_thread() {
+ /* by calling zmq_term we signal the blocked thread to exit */
+ zmq_term(m_context);
+
+}
+
+void TrexRpcServerReqRes::handle_request(const uint8_t *msg, uint32_t msg_size) {
+ Json::Reader reader;
+ Json::Value request;
+ std::string response;
+
+ /* parse the json request */
+ bool rc = reader.parse( (const char *)msg, (const char *)msg + msg_size, request, false);
+ if (!rc) {
+ throw TrexRpcException("Unable to decode JSON RPC request: " + std::string( (const char *)msg, msg_size));
+ }
+ std::cout << request << std::endl;
+
+ #if 0
+ TrexJsonRpcRequest rpc_request(msg, msg_size);
+
+ rpc_request->parse();
+ rpc_request->execute();
+
+ rpc_request->get_response(response);
+
+ zmq_send(m_socket, response, response.size(), 0);
+ #endif
+
+ zmq_send(m_socket, "ACK", 3 ,0);
+
+}
diff --git a/src/rpc-server/src/trex_rpc_server.cpp b/src/rpc-server/src/trex_rpc_server.cpp
new file mode 100644
index 00000000..3f6f5107
--- /dev/null
+++ b/src/rpc-server/src/trex_rpc_server.cpp
@@ -0,0 +1,100 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_rpc_server_api.h>
+#include <trex_rpc_req_resp.h>
+#include <unistd.h>
+#include <zmq.h>
+#include <sstream>
+
+/************** RPC server array *************/
+
+TrexRpcServerArray::TrexRpcServerArray(protocol_type_e protocol, uint16_t prot) {
+
+ /* add the request response server */
+ m_servers.push_back(new TrexRpcServerReqRes(protocol, prot));
+}
+
+TrexRpcServerArray::~TrexRpcServerArray() {
+
+ /* make sure they are all stopped */
+ TrexRpcServerArray::stop();
+
+ for (auto server : m_servers) {
+ delete server;
+ }
+}
+
+/**
+ * start the server array
+ *
+ */
+void TrexRpcServerArray::start() {
+ for (auto server : m_servers) {
+ server->start();
+ }
+}
+
+/**
+ * stop the server array
+ *
+ */
+void TrexRpcServerArray::stop() {
+ for (auto server : m_servers) {
+ if (server->is_running()) {
+ server->stop();
+ }
+ }
+}
+
+/************** RPC server interface ***************/
+
+TrexRpcServerInterface::TrexRpcServerInterface(TrexRpcServerArray::protocol_type_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) {
+ m_is_running = false;
+}
+
+TrexRpcServerInterface::~TrexRpcServerInterface() {
+
+}
+
+void TrexRpcServerInterface::start() {
+ m_is_running = true;
+
+ m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this);
+ if (!m_thread) {
+ throw TrexRpcException("unable to create RPC thread");
+ }
+}
+
+void TrexRpcServerInterface::stop() {
+ m_is_running = false;
+
+ /* call the dynamic type class stop */
+ _stop_rpc_thread();
+
+ /* hold until thread has joined */
+ m_thread->join();
+}
+
+bool TrexRpcServerInterface::is_running() {
+ return m_is_running;
+}
+