From f5db3711b28db4e364ac01be8b124dd24d573782 Mon Sep 17 00:00:00 2001 From: Ole Troan Date: Wed, 20 May 2020 15:47:06 +0200 Subject: api: add new stream message convention Instead of having to wrap dump/detail calls in control ping, send details messages in between a normal reply / request pair. As expressed in the below service statement. Example: service { rpc map_domains_gets returns map_domains_get_reply stream map_domain_details; }; define map_domains_get { u32 client_index; u32 context; u32 cursor; }; define map_domains_get_reply { u32 context; i32 retval; u32 cursor; }; To avoid blocking the main thread for too long, the replies are now sent in client message queue size chunks. The reply message returns VNET_API_ERROR_EAGAIN when there is more to read. The API handler must also include a "cursor" that is used to the next call to the get function. API handler example: REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains, ({ send_domain_details (cursor, rp, mp->context); })); The macro starts from cursor and iterates through the pool until vl_api_process_may_suspend() returns true or the iteration reaches the end of the list. Client Example: cursor = 0 d = [] while True: rv, details = map_domains_get(cursor=cursor) d += details if rv.retval == 0 or rv.retval != -165: break cursor = rv.cursor or the convenience iterator: for x in vpp.details_iter(vpp.api.map_domains_get): pass or list(details_iter(map_domains_get)) Change-Id: Iad9f6b41b0ef886adb584c97708dd91cf552749e Type: feature Signed-off-by: Ole Troan --- src/plugins/map/map.api | 22 ++++++++- src/plugins/map/map_api.c | 87 ++++++++++++++++++++++----------- src/plugins/map/test/test_map.py | 43 ++++++++++++++++ src/tools/vppapigen/vppapigen.py | 7 ++- src/tools/vppapigen/vppapigen_json.py | 2 + src/vlibapi/api_helper_macros.h | 37 ++++++++++++++ src/vlibmemory/api.h | 22 +++++++++ src/vnet/api_errno.h | 1 + src/vpp-api/python/vpp_papi/vpp_papi.py | 50 +++++++++++++------ 9 files changed, 226 insertions(+), 45 deletions(-) (limited to 'src') diff --git a/src/plugins/map/map.api b/src/plugins/map/map.api index 79deac86f8f..0ae1901e07a 100644 --- a/src/plugins/map/map.api +++ b/src/plugins/map/map.api @@ -13,7 +13,7 @@ * limitations under the License. */ -option version = "4.1.1"; +option version = "4.2.1"; import "vnet/ip/ip_types.api"; import "vnet/interface_types.api"; @@ -91,8 +91,28 @@ autoreply define map_add_del_rule /** \brief Get list of map domains @param client_index - opaque cookie to identify the sender */ +service { + rpc map_domains_get returns map_domains_get_reply + stream map_domain_details; +}; + +define map_domains_get +{ + u32 client_index; + u32 context; + u32 cursor; +}; + +define map_domains_get_reply +{ + u32 context; + i32 retval; + u32 cursor; +}; + define map_domain_dump { + option deprecated="v20.12"; u32 client_index; u32 context; }; diff --git a/src/plugins/map/map_api.c b/src/plugins/map/map_api.c index 7327732c6a7..13f05526afa 100644 --- a/src/plugins/map/map_api.c +++ b/src/plugins/map/map_api.c @@ -86,14 +86,48 @@ vl_api_map_add_del_rule_t_handler (vl_api_map_add_del_rule_t * mp) } static void -vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp) +send_domain_details (u32 map_domain_index, vl_api_registration_t * rp, + u32 context) { + map_main_t *mm = &map_main; vl_api_map_domain_details_t *rmp; + map_domain_t *d = pool_elt_at_index (mm->domains, map_domain_index); + + /* Make sure every field is initiated (or don't skip the clib_memset()) */ + map_domain_extra_t *de = + vec_elt_at_index (mm->domain_extras, map_domain_index); + int tag_len = clib_min (ARRAY_LEN (rmp->tag), vec_len (de->tag) + 1); + + /* *INDENT-OFF* */ + REPLY_MACRO_DETAILS4(VL_API_MAP_DOMAIN_DETAILS, rp, context, + ({ + rmp->domain_index = htonl (map_domain_index); + clib_memcpy (&rmp->ip6_prefix.address, &d->ip6_prefix, + sizeof (rmp->ip6_prefix.address)); + clib_memcpy (&rmp->ip4_prefix.address, &d->ip4_prefix, + sizeof (rmp->ip4_prefix.address)); + clib_memcpy (&rmp->ip6_src.address, &d->ip6_src, + sizeof (rmp->ip6_src.address)); + rmp->ip6_prefix.len = d->ip6_prefix_len; + rmp->ip4_prefix.len = d->ip4_prefix_len; + rmp->ip6_src.len = d->ip6_src_len; + rmp->ea_bits_len = d->ea_bits_len; + rmp->psid_offset = d->psid_offset; + rmp->psid_length = d->psid_length; + rmp->flags = d->flags; + rmp->mtu = htons (d->mtu); + memcpy (rmp->tag, de->tag, tag_len - 1); + rmp->tag[tag_len - 1] = '\0'; + })); + /* *INDENT-ON* */ +} + +static void +vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp) +{ map_main_t *mm = &map_main; - map_domain_t *d; - map_domain_extra_t *de; + int i; vl_api_registration_t *reg; - u32 map_domain_index; if (pool_elts (mm->domains) == 0) return; @@ -103,33 +137,28 @@ vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp) return; /* *INDENT-OFF* */ - pool_foreach(d, mm->domains, + pool_foreach_index(i, mm->domains, ({ - map_domain_index = d - mm->domains; - de = vec_elt_at_index(mm->domain_extras, map_domain_index); - int tag_len = clib_min(ARRAY_LEN(rmp->tag), vec_len(de->tag) + 1); - - /* Make sure every field is initiated (or don't skip the clib_memset()) */ - rmp = vl_msg_api_alloc (sizeof (*rmp) + tag_len); - - rmp->_vl_msg_id = htons(VL_API_MAP_DOMAIN_DETAILS + mm->msg_id_base); - rmp->context = mp->context; - rmp->domain_index = htonl(map_domain_index); - clib_memcpy(&rmp->ip6_prefix.address, &d->ip6_prefix, sizeof(rmp->ip6_prefix.address)); - clib_memcpy(&rmp->ip4_prefix.address, &d->ip4_prefix, sizeof(rmp->ip4_prefix.address)); - clib_memcpy(&rmp->ip6_src.address, &d->ip6_src, sizeof(rmp->ip6_src.address)); - rmp->ip6_prefix.len = d->ip6_prefix_len; - rmp->ip4_prefix.len = d->ip4_prefix_len; - rmp->ip6_src.len = d->ip6_src_len; - rmp->ea_bits_len = d->ea_bits_len; - rmp->psid_offset = d->psid_offset; - rmp->psid_length = d->psid_length; - rmp->flags = d->flags; - rmp->mtu = htons(d->mtu); - memcpy(rmp->tag, de->tag, tag_len-1); - rmp->tag[tag_len-1] = '\0'; + send_domain_details(i, reg, mp->context); + })); + /* *INDENT-ON* */ +} - vl_api_send_msg (reg, (u8 *) rmp); +static void +vl_api_map_domains_get_t_handler (vl_api_map_domains_get_t * mp) +{ + map_main_t *mm = &map_main; + vl_api_map_domains_get_reply_t *rmp; + + i32 rv = 0; + + if (pool_elts (mm->domains) == 0) + return; + + /* *INDENT-OFF* */ + REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains, + ({ + send_domain_details (cursor, rp, mp->context); })); /* *INDENT-ON* */ } diff --git a/src/plugins/map/test/test_map.py b/src/plugins/map/test/test_map.py index 59c23335052..93ea3f06976 100644 --- a/src/plugins/map/test/test_map.py +++ b/src/plugins/map/test/test_map.py @@ -100,6 +100,48 @@ class TestMAP(VppTestCase): self.assertEqual(rv[0].tag, tag, "output produced incorrect tag value.") + def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str): + ip4_pfx = ipaddress.ip_network(ip4_pfx_str) + ip6_dst = ipaddress.ip_network(ip6_pfx_str) + mod = ip4_pfx.num_addresses / 1024 + indicies = [] + for i in range(ip4_pfx.num_addresses): + rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str, + ip4_prefix=str(ip4_pfx[i]) + "/32", + ip6_src=ip6_src_str) + indicies.append(rv.index) + return indicies + + def test_api_map_domains_get(self): + # Create a bunch of domains + domains = self.create_domains('130.67.0.0/24', '2001::/32', + '2001::1/128') + self.assertEqual(len(domains), 256) + + d = [] + cursor = 0 + + # Invalid cursor + rv, details = self.vapi.map_domains_get(cursor=1234) + self.assertEqual(rv.retval, -7) + + # Delete a domain in the middle of walk + rv, details = self.vapi.map_domains_get(cursor=0) + self.assertEqual(rv.retval, -165) + self.vapi.map_del_domain(index=rv.cursor) + domains.remove(rv.cursor) + + # Continue at point of deleted cursor + rv, details = self.vapi.map_domains_get(cursor=rv.cursor) + self.assertEqual(rv.retval, -165) + + d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get)) + self.assertEqual(len(d), 255) + + # Clean up + for i in domains: + self.vapi.map_del_domain(index=i) + def test_map_e_udp(self): """ MAP-E UDP""" @@ -916,5 +958,6 @@ class TestMAP(VppTestCase): ip6_nh_address="4001::1", is_add=0) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/src/tools/vppapigen/vppapigen.py b/src/tools/vppapigen/vppapigen.py index 06bfbff238f..94e770e38bc 100755 --- a/src/tools/vppapigen/vppapigen.py +++ b/src/tools/vppapigen/vppapigen.py @@ -176,10 +176,11 @@ def vla_is_last_check(name, block): class Service(): - def __init__(self, caller, reply, events=None, stream=False): + def __init__(self, caller, reply, events=None, stream_message=None, stream=False): self.caller = caller self.reply = reply self.stream = stream + self.stream_message = stream_message self.events = [] if events is None else events @@ -511,6 +512,10 @@ class VPPAPIParser(object): else: p[0] = Service(p[2], p[4]) + def p_service_statement2(self, p): + '''service_statement : RPC ID RETURNS ID STREAM ID ';' ''' + p[0] = Service(p[2], p[4], stream_message=p[6], stream=True) + def p_event_list(self, p): '''event_list : events | event_list events ''' diff --git a/src/tools/vppapigen/vppapigen_json.py b/src/tools/vppapigen/vppapigen_json.py index 35dcbcafbbd..6e7aaa2e6f5 100644 --- a/src/tools/vppapigen/vppapigen_json.py +++ b/src/tools/vppapigen/vppapigen_json.py @@ -26,6 +26,8 @@ def walk_services(s): d = {'reply': e.reply} if e.stream: d['stream'] = True + if e.stream_message: + d['stream_msg'] = e.stream_message if e.events: d['events'] = e.events r[e.caller] = d diff --git a/src/vlibapi/api_helper_macros.h b/src/vlibapi/api_helper_macros.h index b19d4f90f81..57502570f11 100644 --- a/src/vlibapi/api_helper_macros.h +++ b/src/vlibapi/api_helper_macros.h @@ -90,6 +90,15 @@ do { \ vl_api_send_msg (rp, (u8 *)rmp); \ } while(0); +#define REPLY_MACRO_DETAILS4(t, rp, context, body) \ +do { \ + rmp = vl_msg_api_alloc (sizeof (*rmp)); \ + rmp->_vl_msg_id = htons((t)+(REPLY_MSG_ID_BASE)); \ + rmp->context = context; \ + do {body;} while (0); \ + vl_api_send_msg (rp, (u8 *)rmp); \ +} while(0); + #define REPLY_MACRO3(t, n, body) \ do { \ vl_api_registration_t *rp; \ @@ -153,6 +162,34 @@ do { \ vl_api_send_msg (rp, (u8 *)rmp); \ } while(0); +#define REPLY_AND_DETAILS_MACRO(t, p, body) \ +do { \ + vl_api_registration_t *rp; \ + rp = vl_api_client_index_to_registration (mp->client_index); \ + if (rp == 0) \ + return; \ + u32 cursor = clib_net_to_host_u32 (mp->cursor); \ + vlib_main_t *vm = vlib_get_main (); \ + f64 start = vlib_time_now (vm); \ + if (pool_is_free_index (p, cursor)) { \ + cursor = pool_next_index (p, cursor); \ + if (cursor == ~0) \ + rv = VNET_API_ERROR_INVALID_VALUE; \ + } \ + while (cursor != ~0) { \ + do {body;} while (0); \ + cursor = pool_next_index (p, cursor); \ + if (vl_api_process_may_suspend (vm, rp, start)) { \ + if (cursor != ~0) \ + rv = VNET_API_ERROR_EAGAIN; \ + break; \ + } \ + } \ + REPLY_MACRO2 (t, ({ \ + rmp->cursor = clib_host_to_net_u32 (cursor); \ + })); \ +} while(0); + /* "trust, but verify" */ static inline uword diff --git a/src/vlibmemory/api.h b/src/vlibmemory/api.h index 6cd645bb1d5..662805373ba 100644 --- a/src/vlibmemory/api.h +++ b/src/vlibmemory/api.h @@ -53,6 +53,28 @@ vl_api_can_send_msg (vl_api_registration_t * rp) return vl_mem_api_can_send (rp->vl_input_queue); } +/* + * Suggests to an API handler to relinguish control. Currently limits + * an handler to a maximum of 1ms or it earlier if the client queue is + * full. + * + * May be enhanced in the future based on other performance + * characteristics of the main thread. + */ +#define VL_API_MAX_TIME_IN_HANDLER 0.001 /* 1 ms */ +always_inline int +vl_api_process_may_suspend (vlib_main_t * vm, vl_api_registration_t * rp, + f64 start) +{ + /* Is client queue full (leave space for reply message) */ + if (rp->registration_type <= REGISTRATION_TYPE_SHMEM && + rp->vl_input_queue->cursize + 1 >= rp->vl_input_queue->maxsize) + return true; + if (vlib_time_now (vm) > start + VL_API_MAX_TIME_IN_HANDLER) + return true; + return false; +} + always_inline vl_api_registration_t * vl_api_client_index_to_registration (u32 index) { diff --git a/src/vnet/api_errno.h b/src/vnet/api_errno.h index 15b17a86d5d..dd206ce6e88 100644 --- a/src/vnet/api_errno.h +++ b/src/vnet/api_errno.h @@ -156,6 +156,7 @@ _(MISSING_CERT_KEY, -161, "Missing certifcate or key") \ _(LIMIT_EXCEEDED, -162, "limit exceeded") \ _(IKE_NO_PORT, -163, "port not managed by IKE") \ _(UDP_PORT_TAKEN, -164, "UDP port already taken") \ +_(EAGAIN, -165, "Retry stream call with cursor") \ typedef enum { diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 6c17fa88c95..192168772ec 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -472,12 +472,7 @@ class VPPApiClient(object): # Create function for client side messages. if name in self.services: - if 'stream' in self.services[name] and \ - self.services[name]['stream']: - multipart = True - else: - multipart = False - f = self.make_function(msg, i, multipart, do_async) + f = self.make_function(msg, i, self.services[name], do_async) setattr(self._api, name, FuncWrapper(f)) else: self.logger.debug( @@ -644,7 +639,7 @@ class VPPApiClient(object): n[1]['avg'], n[1]['max']) return s - def _call_vpp(self, i, msgdef, multipart, **kwargs): + def _call_vpp(self, i, msgdef, service, **kwargs): """Given a message, send the message and await a reply. msgdef - the message packing definition @@ -686,10 +681,21 @@ class VPPApiClient(object): self.transport.write(b) - if multipart: - # Send a ping after the request - we use its response - # to detect that we have seen all results. - self._control_ping(context) + msgreply = service['reply'] + stream = True if 'stream' in service else False + if stream: + if 'stream_msg' in service: + # New service['reply'] = _reply and service['stream_message'] = _details + stream_message = service['stream_msg'] + modern =True + else: + # Old service['reply'] = _details + stream_message = msgreply + msgreply = 'control_ping_reply' + modern = False + # Send a ping after the request - we use its response + # to detect that we have seen all results. + self._control_ping(context) # Block until we get a reply. rl = [] @@ -702,11 +708,14 @@ class VPPApiClient(object): # Message being queued self.message_queue.put_nowait(r) continue - - if not multipart: + if msgname != msgreply and (stream and (msgname != stream_message)): + print('REPLY MISMATCH', msgreply, msgname, stream_message, stream) + if not stream: rl = r break - if msgname == 'control_ping_reply': + if msgname == msgreply: + if modern: # Return both reply and list + rl = r, rl break rl.append(r) @@ -847,6 +856,19 @@ class VPPApiClient(object): self.logger, self.read_timeout, self.use_socket, self.server_address) + def details_iter(self, f, **kwargs): + cursor = 0 + while True: + kwargs['cursor'] = cursor + rv, details = f(**kwargs) + # + # Convert to yield from details when we only support python 3 + # + for d in details: + yield d + if rv.retval == 0 or rv.retval != -165: + break + cursor = rv.cursor # Provide the old name for backward compatibility. VPP = VPPApiClient -- cgit 1.2.3-korg