diff options
author | Stanislav Zaikin <zstaseg@gmail.com> | 2022-07-21 19:07:50 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2023-08-18 06:09:10 +0000 |
commit | 56777b9409c9e0be2ca86504aae95ad6472a78ea (patch) | |
tree | 8e3508f9dd22a43ef2cee17c2a0926a2fbbbde34 /src | |
parent | f6beee077ef3e79a32043dd4685e87d7a6d16a5b (diff) |
vapi: support services
Add missing support for
service { rpc X_get returns X_get_reply stream X_details; }
Type: improvement
Change-Id: I27555f61a2974e414cb6554f32c550b8ee5eb037
Signed-off-by: Stanislav Zaikin <stanislav.zaikin@46labs.com>
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/vpp-api/vapi/vapi.c | 42 | ||||
-rw-r--r-- | src/vpp-api/vapi/vapi.hpp | 100 | ||||
-rwxr-xr-x | src/vpp-api/vapi/vapi_c_gen.py | 105 | ||||
-rw-r--r-- | src/vpp-api/vapi/vapi_c_test.c | 75 | ||||
-rwxr-xr-x | src/vpp-api/vapi/vapi_cpp_gen.py | 11 | ||||
-rw-r--r-- | src/vpp-api/vapi/vapi_cpp_test.cpp | 47 | ||||
-rw-r--r-- | src/vpp-api/vapi/vapi_internal.h | 14 | ||||
-rw-r--r-- | src/vpp-api/vapi/vapi_json_parser.py | 26 |
8 files changed, 367 insertions, 53 deletions
diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c index 7700eb06d3a..45241e1a4b2 100644 --- a/src/vpp-api/vapi/vapi.c +++ b/src/vpp-api/vapi/vapi.c @@ -63,7 +63,8 @@ typedef struct u32 context; vapi_cb_t callback; void *callback_ctx; - bool is_dump; + vapi_msg_id_t response_id; + enum vapi_request_type type; } vapi_req_t; static const u32 context_counter_mask = (1 << 31); @@ -137,15 +138,17 @@ vapi_requests_end (vapi_ctx_t ctx) } void -vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump, - vapi_cb_t callback, void *callback_ctx) +vapi_store_request (vapi_ctx_t ctx, u32 context, vapi_msg_id_t response_id, + enum vapi_request_type request_type, vapi_cb_t callback, + void *callback_ctx) { assert (!vapi_requests_full (ctx)); /* if the mutex is not held, bad things will happen */ assert (0 != pthread_mutex_trylock (&ctx->requests_mutex)); const int requests_end = vapi_requests_end (ctx); vapi_req_t *slot = &ctx->requests[requests_end]; - slot->is_dump = is_dump; + slot->type = request_type; + slot->response_id = response_id; slot->context = context; slot->callback = callback; slot->callback_ctx = callback_ctx; @@ -1116,8 +1119,34 @@ vapi_dispatch_response (vapi_ctx_t ctx, vapi_msg_id_t id, int payload_offset = vapi_get_payload_offset (id); void *payload = ((u8 *) msg) + payload_offset; bool is_last = true; - if (ctx->requests[tmp].is_dump) + switch (ctx->requests[tmp].type) { + case VAPI_REQUEST_STREAM: + if (ctx->requests[tmp].response_id == id) + { + is_last = false; + } + else + { + VAPI_DBG ("Stream response ID doesn't match current ID, move to " + "next ID"); + clib_memset (&ctx->requests[tmp], 0, + sizeof (ctx->requests[tmp])); + ++ctx->requests_start; + --ctx->requests_count; + if (ctx->requests_start == ctx->requests_size) + { + ctx->requests_start = 0; + } + tmp = ctx->requests_start; + if (ctx->requests[tmp].context != context) + { + VAPI_ERR ("Unexpected context %u, expected context %u!", + ctx->requests[tmp].context, context); + } + } + break; + case VAPI_REQUEST_DUMP: if (vapi_msg_id_control_ping_reply == id) { payload = NULL; @@ -1126,6 +1155,9 @@ vapi_dispatch_response (vapi_ctx_t ctx, vapi_msg_id_t id, { is_last = false; } + break; + case VAPI_REQUEST_REG: + break; } if (payload_offset != -1) { diff --git a/src/vpp-api/vapi/vapi.hpp b/src/vpp-api/vapi/vapi.hpp index a1e33a93fd4..58d170603e5 100644 --- a/src/vpp-api/vapi/vapi.hpp +++ b/src/vpp-api/vapi/vapi.hpp @@ -140,6 +140,10 @@ private: template <typename Req, typename Resp, typename... Args> friend class Dump; + template <typename Req, typename Resp, typename StreamMessage, + typename... Args> + friend class Stream; + template <typename M> friend class Event_registration; }; @@ -451,6 +455,10 @@ private: template <typename Req, typename Resp, typename... Args> friend class Dump; + template <typename Req, typename Resp, typename StreamMessage, + typename... Args> + friend class Stream; + template <typename M> friend class Result_set; template <typename M> friend class Event_registration; @@ -497,6 +505,10 @@ template <typename Req, typename Resp, typename... Args> class Request; template <typename Req, typename Resp, typename... Args> class Dump; +template <typename Req, typename Resp, typename StreamMessage, + typename... Args> +class Stream; + template <class, class = void> struct vapi_has_payload_trait : std::false_type { }; @@ -627,6 +639,10 @@ private: template <typename Req, typename Resp, typename... Args> friend class Dump; + template <typename Req, typename Resp, typename StreamMessage, + typename... Args> + friend class Stream; + template <typename X> friend class Event_registration; template <typename X> friend class Result_set; @@ -772,12 +788,96 @@ private: bool complete; std::vector<Msg<M>, typename Msg<M>::Msg_allocator> set; + template <typename Req, typename Resp, typename StreamMessage, + typename... Args> + friend class Stream; + template <typename Req, typename Resp, typename... Args> friend class Dump; template <typename X> friend class Event_registration; }; /** + * Class representing a RPC request - zero or more identical responses to a + * single request message with a response + */ +template <typename Req, typename Resp, typename StreamMessage, + typename... Args> +class Stream : public Common_req +{ +public: + Stream ( + Connection &con, Args... args, + std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)> + cb = nullptr) + : Common_req{ con }, request{ con, vapi_alloc<Req> (con, args...) }, + response{ con, nullptr }, result_set{ con }, callback{ cb } + { + } + + Stream (const Stream &) = delete; + + virtual ~Stream () {} + + virtual std::tuple<vapi_error_e, bool> + assign_response (vapi_msg_id_t id, void *shm_data) + { + if (id == response.get_msg_id ()) + { + response.assign_response (id, shm_data); + result_set.mark_complete (); + set_response_state (RESPONSE_READY); + if (nullptr != callback) + { + return std::make_pair (callback (*this), true); + } + return std::make_pair (VAPI_OK, true); + } + else + { + result_set.assign_response (id, shm_data); + } + return std::make_pair (VAPI_OK, false); + } + + vapi_error_e + execute () + { + return con.send (this); + } + + const Msg<Req> & + get_request (void) + { + return request; + } + + const Msg<Resp> & + get_response (void) + { + return response; + } + + using resp_type = typename Msg<StreamMessage>::shm_data_type; + + const Result_set<StreamMessage> & + get_result_set (void) const + { + return result_set; + } + +private: + Msg<Req> request; + Msg<Resp> response; + Result_set<StreamMessage> result_set; + std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)> + callback; + + friend class Connection; + friend class Result_set<StreamMessage>; +}; + +/** * Class representing a dump request - zero or more identical responses to a * single request message */ diff --git a/src/vpp-api/vapi/vapi_c_gen.py b/src/vpp-api/vapi/vapi_c_gen.py index 84d9acadeda..37f5ac12447 100755 --- a/src/vpp-api/vapi/vapi_c_gen.py +++ b/src/vpp-api/vapi/vapi_c_gen.py @@ -477,7 +477,7 @@ class CMessage(Message): {{ VAPI_ERR("Truncated '{self.name}' msg received, received %lu" "bytes, expected %lu bytes.", buf_size, - sizeof({self.get_calc_msg_size_func_name()})); + {self.get_calc_msg_size_func_name()}(msg)); return -1; }} return 0; @@ -615,45 +615,66 @@ class CMessage(Message): return "vapi_%s" % self.name def get_op_func_decl(self): - if self.reply.has_payload(): - return "vapi_error_e %s(%s)" % ( - self.get_op_func_name(), - ",\n ".join( - [ - "struct vapi_ctx_s *ctx", - "%s *msg" % self.get_c_name(), - "vapi_error_e (*callback)(struct vapi_ctx_s *ctx", - " void *callback_ctx", - " vapi_error_e rv", - " bool is_last", - " %s *reply)" - % self.reply.get_payload_struct_name(), - "void *callback_ctx", - ] - ), - ) - else: - return "vapi_error_e %s(%s)" % ( - self.get_op_func_name(), - ",\n ".join( - [ - "struct vapi_ctx_s *ctx", - "%s *msg" % self.get_c_name(), - "vapi_error_e (*callback)(struct vapi_ctx_s *ctx", - " void *callback_ctx", - " vapi_error_e rv", - " bool is_last)", - "void *callback_ctx", - ] - ), - ) + stream_param_lines = [] + if self.has_stream_msg: + stream_param_lines = [ + "vapi_error_e (*details_callback)(struct vapi_ctx_s *ctx", + " void *callback_ctx", + " vapi_error_e rv", + " bool is_last", + " %s *details)" + % self.stream_msg.get_payload_struct_name(), + "void *details_callback_ctx", + ] + + return "vapi_error_e %s(%s)" % ( + self.get_op_func_name(), + ",\n ".join( + [ + "struct vapi_ctx_s *ctx", + "%s *msg" % self.get_c_name(), + "vapi_error_e (*reply_callback)(struct vapi_ctx_s *ctx", + " void *callback_ctx", + " vapi_error_e rv", + " bool is_last", + " %s *reply)" + % self.reply.get_payload_struct_name(), + ] + + [ + "void *reply_callback_ctx", + ] + + stream_param_lines + ), + ) def get_op_func_def(self): + param_check_lines = [" if (!msg || !reply_callback) {"] + store_request_lines = [ + " vapi_store_request(ctx, req_context, %s, %s, " + % ( + self.reply.get_msg_id_name(), + "VAPI_REQUEST_DUMP" if self.reply_is_stream else "VAPI_REQUEST_REG", + ), + " (vapi_cb_t)reply_callback, reply_callback_ctx);", + ] + if self.has_stream_msg: + param_check_lines = [ + " if (!msg || !reply_callback || !details_callback) {" + ] + store_request_lines = [ + f" vapi_store_request(ctx, req_context, {self.stream_msg.get_msg_id_name()}, VAPI_REQUEST_STREAM, ", + " (vapi_cb_t)details_callback, details_callback_ctx);", + f" vapi_store_request(ctx, req_context, {self.reply.get_msg_id_name()}, VAPI_REQUEST_REG, ", + " (vapi_cb_t)reply_callback, reply_callback_ctx);", + ] + return "\n".join( [ "%s" % self.get_op_func_decl(), "{", - " if (!msg || !callback) {", + ] + + param_check_lines + + [ " return VAPI_EINVAL;", " }", " if (vapi_is_nonblocking(ctx) && vapi_requests_full(ctx)) {", @@ -669,14 +690,12 @@ class CMessage(Message): ( " if (VAPI_OK == (rv = vapi_send_with_control_ping " "(ctx, msg, req_context))) {" - if self.reply_is_stream + if (self.reply_is_stream and not self.has_stream_msg) else " if (VAPI_OK == (rv = vapi_send (ctx, msg))) {" ), - ( - " vapi_store_request(ctx, req_context, %s, " - "(vapi_cb_t)callback, callback_ctx);" - % ("true" if self.reply_is_stream else "false") - ), + ] + + store_request_lines + + [ " if (VAPI_OK != vapi_producer_unlock (ctx)) {", " abort (); /* this really shouldn't happen */", " }", @@ -792,6 +811,8 @@ def emit_definition(parser, json_file, emitted, o): emit_definition(parser, json_file, emitted, x) if hasattr(o, "reply"): emit_definition(parser, json_file, emitted, o.reply) + if hasattr(o, "stream_msg"): + emit_definition(parser, json_file, emitted, o.stream_msg) if hasattr(o, "get_c_def"): if ( o not in parser.enums_by_json[json_file] @@ -820,14 +841,14 @@ def emit_definition(parser, json_file, emitted, o): print("%s%s" % (function_attrs, o.get_calc_msg_size_func_def())) print("") print("%s%s" % (function_attrs, o.get_verify_msg_size_func_def())) - if not o.is_reply and not o.is_event: + if not o.is_reply and not o.is_event and not o.is_stream: print("") print("%s%s" % (function_attrs, o.get_alloc_func_def())) print("") print("%s%s" % (function_attrs, o.get_op_func_def())) print("") print("%s" % o.get_c_constructor()) - if o.is_reply or o.is_event: + if (o.is_reply or o.is_event) and not o.is_stream: print("") print("%s%s;" % (function_attrs, o.get_event_cb_func_def())) elif hasattr(o, "get_swap_to_be_func_def"): diff --git a/src/vpp-api/vapi/vapi_c_test.c b/src/vpp-api/vapi/vapi_c_test.c index 99a93fb22fd..5eccb0fac38 100644 --- a/src/vpp-api/vapi/vapi_c_test.c +++ b/src/vpp-api/vapi/vapi_c_test.c @@ -28,6 +28,7 @@ #include <vapi/vlib.api.vapi.h> #include <vapi/vpe.api.vapi.h> #include <vapi/interface.api.vapi.h> +#include <vapi/mss_clamp.api.vapi.h> #include <vapi/l2.api.vapi.h> #include <fake.api.vapi.h> @@ -36,6 +37,7 @@ DEFINE_VAPI_MSG_IDS_VPE_API_JSON; DEFINE_VAPI_MSG_IDS_INTERFACE_API_JSON; +DEFINE_VAPI_MSG_IDS_MSS_CLAMP_API_JSON; DEFINE_VAPI_MSG_IDS_L2_API_JSON; DEFINE_VAPI_MSG_IDS_FAKE_API_JSON; @@ -481,6 +483,48 @@ sw_interface_dump_cb (struct vapi_ctx_s *ctx, void *callback_ctx, return VAPI_OK; } +vapi_error_e +vapi_mss_clamp_enable_disable_reply_cb ( + struct vapi_ctx_s *ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_mss_clamp_enable_disable_reply *reply) +{ + bool *x = callback_ctx; + *x = true; + return VAPI_OK; +} + +vapi_error_e +vapi_mss_clamp_get_reply_cb (struct vapi_ctx_s *ctx, void *callback_ctx, + vapi_error_e rv, bool is_last, + vapi_payload_mss_clamp_get_reply *reply) +{ + int *counter = callback_ctx; + ck_assert_int_gt (*counter, 0); // make sure details were called first + ++*counter; + ck_assert_int_eq (is_last, true); + printf ("Got mss clamp reply error %d\n", rv); + ck_assert_int_eq (rv, VAPI_OK); + printf ("counter is %d", *counter); + return VAPI_OK; +} + +vapi_error_e +vapi_mss_clamp_get_details_cb (struct vapi_ctx_s *ctx, void *callback_ctx, + vapi_error_e rv, bool is_last, + vapi_payload_mss_clamp_details *details) +{ + int *counter = callback_ctx; + ++*counter; + if (!is_last) + { + printf ("Got ipv4 mss clamp to %u for sw_if_index %u\n", + details->ipv4_mss, details->sw_if_index); + ck_assert_int_eq (details->ipv4_mss, 1000 + details->sw_if_index); + } + printf ("counter is %d", *counter); + return VAPI_OK; +} + START_TEST (test_loopbacks_1) { printf ("--- Create/delete loopbacks using blocking API ---\n"); @@ -521,6 +565,37 @@ START_TEST (test_loopbacks_1) mac_addresses[i][3], mac_addresses[i][4], mac_addresses[i][5], sw_if_indexes[i]); } + + { // new context + for (int i = 0; i < num_ifs; ++i) + { + vapi_msg_mss_clamp_enable_disable *mc = + vapi_alloc_mss_clamp_enable_disable (ctx); + mc->payload.sw_if_index = sw_if_indexes[i]; + mc->payload.ipv4_mss = 1000 + sw_if_indexes[i]; + mc->payload.ipv4_direction = MSS_CLAMP_DIR_RX; + bool reply_ctx = false; + printf ("Set ipv4 mss clamp to %u for sw_if_index %u\n", + mc->payload.ipv4_mss, mc->payload.sw_if_index); + vapi_error_e rv = vapi_mss_clamp_enable_disable ( + ctx, mc, vapi_mss_clamp_enable_disable_reply_cb, &reply_ctx); + ck_assert_int_eq (VAPI_OK, rv); + ck_assert_int_eq (reply_ctx, true); + } + } + + { // new context + int counter = 0; + vapi_msg_mss_clamp_get *msg = vapi_alloc_mss_clamp_get (ctx); + msg->payload.sw_if_index = ~0; + vapi_error_e rv = + vapi_mss_clamp_get (ctx, msg, vapi_mss_clamp_get_reply_cb, &counter, + vapi_mss_clamp_get_details_cb, &counter); + printf ("counter is %d", counter); + ck_assert_int_eq (VAPI_OK, rv); + ck_assert_int_eq (counter, num_ifs + 1); + } + bool seen[num_ifs]; sw_interface_dump_ctx dctx = { false, num_ifs, sw_if_indexes, seen, 0 }; vapi_msg_sw_interface_dump *dump; diff --git a/src/vpp-api/vapi/vapi_cpp_gen.py b/src/vpp-api/vapi/vapi_cpp_gen.py index 33744a3d58c..165730ca4b8 100755 --- a/src/vpp-api/vapi/vapi_cpp_gen.py +++ b/src/vpp-api/vapi/vapi_cpp_gen.py @@ -96,6 +96,13 @@ class CppMessage(CMessage): return "%s%s" % (self.name[0].upper(), self.name[1:]) def get_req_template_name(self): + if self.has_stream_msg: + return "Stream<%s, %s, %s>" % ( + self.get_c_name(), + self.reply.get_c_name(), + self.stream_msg.get_c_name(), + ) + if self.reply_is_stream: template = "Dump" else: @@ -196,7 +203,7 @@ def gen_json_header(parser, logger, j, io, gen_h_prefix, add_debug_comments): print("/* m.get_cpp_constructor() */") print("%s" % m.get_cpp_constructor()) print("") - if not m.is_reply and not m.is_event: + if not m.is_reply and not m.is_event and not m.is_stream: if add_debug_comments: print("/* m.get_alloc_template_instantiation() */") print("%s" % m.get_alloc_template_instantiation()) @@ -210,6 +217,8 @@ def gen_json_header(parser, logger, j, io, gen_h_prefix, add_debug_comments): print("/* m.get_reply_type_alias() */") print("%s" % m.get_reply_type_alias()) continue + if m.is_stream: + continue if add_debug_comments: print("/* m.get_req_template_instantiation() */") print("%s" % m.get_req_template_instantiation()) diff --git a/src/vpp-api/vapi/vapi_cpp_test.cpp b/src/vpp-api/vapi/vapi_cpp_test.cpp index c0e0cdc3ab8..25df5b70e8f 100644 --- a/src/vpp-api/vapi/vapi_cpp_test.cpp +++ b/src/vpp-api/vapi/vapi_cpp_test.cpp @@ -25,10 +25,12 @@ #include <vapi/vapi.hpp> #include <vapi/vpe.api.vapi.hpp> #include <vapi/interface.api.vapi.hpp> +#include <vapi/mss_clamp.api.vapi.hpp> #include <fake.api.vapi.hpp> DEFINE_VAPI_MSG_IDS_VPE_API_JSON; DEFINE_VAPI_MSG_IDS_INTERFACE_API_JSON; +DEFINE_VAPI_MSG_IDS_MSS_CLAMP_API_JSON; DEFINE_VAPI_MSG_IDS_FAKE_API_JSON; static char *app_name = nullptr; @@ -145,6 +147,51 @@ START_TEST (test_loopbacks_1) } { // new context + for (int i = 0; i < num_ifs; ++i) + { + Mss_clamp_enable_disable d (con); + auto &req = d.get_request ().get_payload (); + req.sw_if_index = sw_if_indexes[i]; + req.ipv4_mss = 1420; + req.ipv4_direction = vapi_enum_mss_clamp_dir::MSS_CLAMP_DIR_RX; + auto rv = d.execute (); + ck_assert_int_eq (VAPI_OK, rv); + WAIT_FOR_RESPONSE (d, rv); + ck_assert_int_eq (VAPI_OK, rv); + } + } + + { // new context + bool seen[num_ifs] = { 0 }; + Mss_clamp_get d (con); + d.get_request ().get_payload ().sw_if_index = ~0; + auto rv = d.execute (); + ck_assert_int_eq (VAPI_OK, rv); + WAIT_FOR_RESPONSE (d, rv); + ck_assert_int_eq (VAPI_OK, rv); + auto &rs = d.get_result_set (); + for (auto &r : rs) + { + auto &p = r.get_payload (); + ck_assert_int_eq (p.ipv4_mss, 1420); + printf ("tcp-clamp: sw_if_idx %u ip4-mss %d dir %d\n", p.sw_if_index, + p.ipv4_mss, p.ipv4_direction); + for (int i = 0; i < num_ifs; ++i) + { + if (sw_if_indexes[i] == p.sw_if_index) + { + ck_assert_int_eq (0, seen[i]); + seen[i] = true; + } + } + } + for (int i = 0; i < num_ifs; ++i) + { + ck_assert_int_eq (1, seen[i]); + } + } + + { // new context bool seen[num_ifs] = {0}; Sw_interface_dump d (con); auto rv = d.execute (); diff --git a/src/vpp-api/vapi/vapi_internal.h b/src/vpp-api/vapi/vapi_internal.h index 49c041769d0..ca47dd10459 100644 --- a/src/vpp-api/vapi/vapi_internal.h +++ b/src/vpp-api/vapi/vapi_internal.h @@ -118,8 +118,18 @@ bool vapi_requests_full (vapi_ctx_t ctx); size_t vapi_get_request_count (vapi_ctx_t ctx); size_t vapi_get_max_request_count (vapi_ctx_t ctx); u32 vapi_gen_req_context (vapi_ctx_t ctx); -void vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump, - vapi_cb_t callback, void *callback_ctx); + +enum vapi_request_type +{ + VAPI_REQUEST_REG = 0, + VAPI_REQUEST_DUMP = 1, + VAPI_REQUEST_STREAM = 2, +}; + +void vapi_store_request (vapi_ctx_t ctx, u32 context, + vapi_msg_id_t response_id, + enum vapi_request_type type, vapi_cb_t callback, + void *callback_ctx); int vapi_get_payload_offset (vapi_msg_id_t id); void (*vapi_get_swap_to_host_func (vapi_msg_id_t id)) (void *payload); void (*vapi_get_swap_to_be_func (vapi_msg_id_t id)) (void *payload); diff --git a/src/vpp-api/vapi/vapi_json_parser.py b/src/vpp-api/vapi/vapi_json_parser.py index 4f29f95c6e9..00c234fc014 100644 --- a/src/vpp-api/vapi/vapi_json_parser.py +++ b/src/vpp-api/vapi/vapi_json_parser.py @@ -158,6 +158,7 @@ class Message(object): self.header = None self.is_reply = json_parser.is_reply(self.name) self.is_event = json_parser.is_event(self.name) + self.is_stream = json_parser.is_stream(self.name) fields = [] for header in get_msg_header_defs( struct_type_class, field_class, json_parser, logger @@ -346,6 +347,7 @@ class JsonParser(object): self.types["string"] = simple_type_class("vl_api_string_t") self.replies = set() self.events = set() + self.streams = set() self.simple_type_class = simple_type_class self.enum_class = enum_class self.union_class = union_class @@ -384,6 +386,8 @@ class JsonParser(object): if "events" in self.services[k]: for x in self.services[k]["events"]: self.events.add(x) + if "stream_msg" in self.services[k]: + self.streams.add(self.services[k]["stream_msg"]) for e in j["enums"]: name = e[0] value_pairs = e[1:-1] @@ -521,6 +525,20 @@ class JsonParser(object): def is_event(self, message): return message in self.events + def is_stream(self, message): + return message in self.streams + + def has_stream_msg(self, message): + return ( + message.name in self.services + and "stream_msg" in self.services[message.name] + ) + + def get_stream_msg(self, message): + if not self.has_stream_msg(message): + return None + return self.messages[self.services[message.name]["stream_msg"]] + def get_reply(self, message): return self.messages[self.services[message]["reply"]] @@ -532,13 +550,15 @@ class JsonParser(object): remove = [] for n, m in j.items(): try: - if not m.is_reply and not m.is_event: + if not m.is_reply and not m.is_event and not m.is_stream: try: m.reply = self.get_reply(n) + m.reply_is_stream = False + m.has_stream_msg = self.has_stream_msg(m) if "stream" in self.services[m.name]: m.reply_is_stream = self.services[m.name]["stream"] - else: - m.reply_is_stream = False + if m.has_stream_msg: + m.stream_msg = self.get_stream_msg(m) m.reply.request = m except: raise ParseError("Cannot find reply to message `%s'" % n) |