From f5db3711b28db4e364ac01be8b124dd24d573782 Mon Sep 17 00:00:00 2001 From: Ole Troan Date: Wed, 20 May 2020 15:47:06 +0200 Subject: api: add new stream message convention Instead of having to wrap dump/detail calls in control ping, send details messages in between a normal reply / request pair. As expressed in the below service statement. Example: service { rpc map_domains_gets returns map_domains_get_reply stream map_domain_details; }; define map_domains_get { u32 client_index; u32 context; u32 cursor; }; define map_domains_get_reply { u32 context; i32 retval; u32 cursor; }; To avoid blocking the main thread for too long, the replies are now sent in client message queue size chunks. The reply message returns VNET_API_ERROR_EAGAIN when there is more to read. The API handler must also include a "cursor" that is used to the next call to the get function. API handler example: REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains, ({ send_domain_details (cursor, rp, mp->context); })); The macro starts from cursor and iterates through the pool until vl_api_process_may_suspend() returns true or the iteration reaches the end of the list. Client Example: cursor = 0 d = [] while True: rv, details = map_domains_get(cursor=cursor) d += details if rv.retval == 0 or rv.retval != -165: break cursor = rv.cursor or the convenience iterator: for x in vpp.details_iter(vpp.api.map_domains_get): pass or list(details_iter(map_domains_get)) Change-Id: Iad9f6b41b0ef886adb584c97708dd91cf552749e Type: feature Signed-off-by: Ole Troan --- src/vpp-api/python/vpp_papi/vpp_papi.py | 50 ++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 14 deletions(-) (limited to 'src/vpp-api/python/vpp_papi/vpp_papi.py') diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 6c17fa88c95..192168772ec 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -472,12 +472,7 @@ class VPPApiClient(object): # Create function for client side messages. if name in self.services: - if 'stream' in self.services[name] and \ - self.services[name]['stream']: - multipart = True - else: - multipart = False - f = self.make_function(msg, i, multipart, do_async) + f = self.make_function(msg, i, self.services[name], do_async) setattr(self._api, name, FuncWrapper(f)) else: self.logger.debug( @@ -644,7 +639,7 @@ class VPPApiClient(object): n[1]['avg'], n[1]['max']) return s - def _call_vpp(self, i, msgdef, multipart, **kwargs): + def _call_vpp(self, i, msgdef, service, **kwargs): """Given a message, send the message and await a reply. msgdef - the message packing definition @@ -686,10 +681,21 @@ class VPPApiClient(object): self.transport.write(b) - if multipart: - # Send a ping after the request - we use its response - # to detect that we have seen all results. - self._control_ping(context) + msgreply = service['reply'] + stream = True if 'stream' in service else False + if stream: + if 'stream_msg' in service: + # New service['reply'] = _reply and service['stream_message'] = _details + stream_message = service['stream_msg'] + modern =True + else: + # Old service['reply'] = _details + stream_message = msgreply + msgreply = 'control_ping_reply' + modern = False + # Send a ping after the request - we use its response + # to detect that we have seen all results. + self._control_ping(context) # Block until we get a reply. rl = [] @@ -702,11 +708,14 @@ class VPPApiClient(object): # Message being queued self.message_queue.put_nowait(r) continue - - if not multipart: + if msgname != msgreply and (stream and (msgname != stream_message)): + print('REPLY MISMATCH', msgreply, msgname, stream_message, stream) + if not stream: rl = r break - if msgname == 'control_ping_reply': + if msgname == msgreply: + if modern: # Return both reply and list + rl = r, rl break rl.append(r) @@ -847,6 +856,19 @@ class VPPApiClient(object): self.logger, self.read_timeout, self.use_socket, self.server_address) + def details_iter(self, f, **kwargs): + cursor = 0 + while True: + kwargs['cursor'] = cursor + rv, details = f(**kwargs) + # + # Convert to yield from details when we only support python 3 + # + for d in details: + yield d + if rv.retval == 0 or rv.retval != -165: + break + cursor = rv.cursor # Provide the old name for backward compatibility. VPP = VPPApiClient -- cgit 1.2.3-korg