summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py53
1 files changed, 45 insertions, 8 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 66e65a32..2bb0e9cd 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -16,6 +16,7 @@ import time
import datetime
import zmq
import re
+import random
from common.trex_stats import *
from common.trex_streams import *
@@ -155,6 +156,7 @@ class CTRexAsyncClient():
self.stats = CTRexAsyncStatsManager()
self.last_data_recv_ts = 0
+ self.async_barriers = []
self.connected = False
@@ -188,17 +190,16 @@ class CTRexAsyncClient():
self.connected = True
-
- # wait for data streaming from the server
- timeout = time.time() + 5
- while not self.is_alive():
- time.sleep(0.01)
- if time.time() > timeout:
- self.disconnect()
- return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+ # send a barrier and wait for ack
+ rc = self.block_on_stats()
+ if not rc:
+ self.disconnect()
+ return rc
return RC_OK()
+
+
# disconnect
def disconnect (self):
@@ -284,10 +285,46 @@ class CTRexAsyncClient():
# stats
if name == "trex-global":
self.stateless_client.handle_async_stats_update(data)
+
# events
elif name == "trex-event":
self.stateless_client.handle_async_event(type, data)
+
+ # barriers
+ elif name == "trex-barrier":
+ self.handle_async_barrier(type, data)
else:
pass
+ # async barrier handling routine
+ def handle_async_barrier (self, type, data):
+
+ for b in self.async_barriers:
+ if b['key'] == type:
+ b['ack'] = True
+
+
+ # force update a new snapshot from the server
+ def block_on_stats(self, timeout = 5):
+
+ # set a random key
+ key = random.getrandbits(32)
+ barrier = {'key': key, 'ack' : False}
+
+ # add to the queue
+ self.async_barriers.append(barrier)
+
+ rc = self.stateless_client.transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ expr = time.time() + timeout
+ while not barrier['ack']:
+ time.sleep(0.001)
+ if time.time() > expr:
+ return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+