From b726b5682fca2b1e032380401457d1afb47e1713 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 20 Jan 2016 11:09:25 -0500 Subject: draft #3 --- .../trex_control_plane/client/trex_async_client.py | 52 +++++++++++----------- 1 file changed, 26 insertions(+), 26 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 9828c838..ef4c48f9 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -159,7 +159,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 - self.async_barriers = [] + self.async_barrier = None self.connected = False @@ -171,10 +171,6 @@ class CTRexAsyncClient(): self.tr = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - - self.logger.log(msg) - # Socket to talk to server self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) @@ -190,8 +186,7 @@ class CTRexAsyncClient(): self.connected = True - # send a barrier and wait for ack - rc = self.block_on_stats() + rc = self.barrier() if not rc: self.disconnect() return rc @@ -216,14 +211,14 @@ class CTRexAsyncClient(): # done self.connected = False + # thread function def _run (self): - # socket must be created on the same thread - self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + self.socket.connect(self.tr) got_data = False @@ -299,32 +294,37 @@ class CTRexAsyncClient(): # async barrier handling routine def handle_async_barrier (self, type, data): - - for b in self.async_barriers: - if b['key'] == type: - b['ack'] = True + if self.async_barrier['key'] == type: + self.async_barrier['ack'] = True - # force update a new snapshot from the server - def block_on_stats(self, timeout = 5): - + # block on barrier for async channel + def barrier(self, timeout = 5): + # set a random key key = random.getrandbits(32) - barrier = {'key': key, 'ack' : False} - - # add to the queue - self.async_barriers.append(barrier) - - rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) - if not rc: - return rc + self.async_barrier = {'key': key, 'ack': False} + # expr time expr = time.time() + timeout - while not barrier['ack']: - time.sleep(0.001) + + while not self.async_barrier['ack']: + + # inject + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + # fast loop + for i in xrange(0, 100): + if self.async_barrier['ack']: + break + time.sleep(0.001) + if time.time() > expr: return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) return RC_OK() + -- cgit 1.2.3-korg