summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py52
1 files changed, 26 insertions, 26 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 9828c838..ef4c48f9 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -159,7 +159,7 @@ class CTRexAsyncClient():
self.stats = CTRexAsyncStatsManager()
self.last_data_recv_ts = 0
- self.async_barriers = []
+ self.async_barrier = None
self.connected = False
@@ -171,10 +171,6 @@ class CTRexAsyncClient():
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
-
- self.logger.log(msg)
-
# Socket to talk to server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
@@ -190,8 +186,7 @@ class CTRexAsyncClient():
self.connected = True
- # send a barrier and wait for ack
- rc = self.block_on_stats()
+ rc = self.barrier()
if not rc:
self.disconnect()
return rc
@@ -216,14 +211,14 @@ class CTRexAsyncClient():
# done
self.connected = False
+
# thread function
def _run (self):
-
# socket must be created on the same thread
- self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+ self.socket.connect(self.tr)
got_data = False
@@ -299,32 +294,37 @@ class CTRexAsyncClient():
# async barrier handling routine
def handle_async_barrier (self, type, data):
-
- for b in self.async_barriers:
- if b['key'] == type:
- b['ack'] = True
+ if self.async_barrier['key'] == type:
+ self.async_barrier['ack'] = True
- # force update a new snapshot from the server
- def block_on_stats(self, timeout = 5):
-
+ # block on barrier for async channel
+ def barrier(self, timeout = 5):
+
# set a random key
key = random.getrandbits(32)
- barrier = {'key': key, 'ack' : False}
-
- # add to the queue
- self.async_barriers.append(barrier)
-
- rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
- if not rc:
- return rc
+ self.async_barrier = {'key': key, 'ack': False}
+ # expr time
expr = time.time() + timeout
- while not barrier['ack']:
- time.sleep(0.001)
+
+ while not self.async_barrier['ack']:
+
+ # inject
+ rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ # fast loop
+ for i in xrange(0, 100):
+ if self.async_barrier['ack']:
+ break
+ time.sleep(0.001)
+
if time.time() > expr:
return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
return RC_OK()
+