summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py83
1 files changed, 60 insertions, 23 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 66e65a32..ef4c48f9 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -16,6 +16,7 @@ import time
import datetime
import zmq
import re
+import random
from common.trex_stats import *
from common.trex_streams import *
@@ -143,18 +144,22 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client, prn_func = None):
+ def __init__ (self, server, port, stateless_client):
self.port = port
self.server = server
+
self.stateless_client = stateless_client
- self.prn_func = prn_func
+
+ self.event_handler = stateless_client.event_handler
+ self.logger = self.stateless_client.logger
self.raw_snapshot = {}
self.stats = CTRexAsyncStatsManager()
self.last_data_recv_ts = 0
+ self.async_barrier = None
self.connected = False
@@ -166,13 +171,6 @@ class CTRexAsyncClient():
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
-
- if self.prn_func:
- self.prn_func(msg)
- else:
- print msg
-
# Socket to talk to server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
@@ -188,17 +186,15 @@ class CTRexAsyncClient():
self.connected = True
-
- # wait for data streaming from the server
- timeout = time.time() + 5
- while not self.is_alive():
- time.sleep(0.01)
- if time.time() > timeout:
- self.disconnect()
- return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+ rc = self.barrier()
+ if not rc:
+ self.disconnect()
+ return rc
return RC_OK()
+
+
# disconnect
def disconnect (self):
@@ -215,14 +211,14 @@ class CTRexAsyncClient():
# done
self.connected = False
+
# thread function
def _run (self):
-
# socket must be created on the same thread
- self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+ self.socket.connect(self.tr)
got_data = False
@@ -234,7 +230,7 @@ class CTRexAsyncClient():
# signal once
if not got_data:
- self.stateless_client.on_async_alive()
+ self.event_handler.on_async_alive()
got_data = True
@@ -243,7 +239,7 @@ class CTRexAsyncClient():
# signal once
if got_data:
- self.stateless_client.on_async_dead()
+ self.event_handler.on_async_dead()
got_data = False
continue
@@ -283,11 +279,52 @@ class CTRexAsyncClient():
def __dispatch (self, name, type, data):
# stats
if name == "trex-global":
- self.stateless_client.handle_async_stats_update(data)
+ self.event_handler.handle_async_stats_update(data)
+
# events
elif name == "trex-event":
- self.stateless_client.handle_async_event(type, data)
+ self.event_handler.handle_async_event(type, data)
+
+ # barriers
+ elif name == "trex-barrier":
+ self.handle_async_barrier(type, data)
else:
pass
+ # async barrier handling routine
+ def handle_async_barrier (self, type, data):
+ if self.async_barrier['key'] == type:
+ self.async_barrier['ack'] = True
+
+
+ # block on barrier for async channel
+ def barrier(self, timeout = 5):
+
+ # set a random key
+ key = random.getrandbits(32)
+ self.async_barrier = {'key': key, 'ack': False}
+
+ # expr time
+ expr = time.time() + timeout
+
+ while not self.async_barrier['ack']:
+
+ # inject
+ rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ # fast loop
+ for i in xrange(0, 100):
+ if self.async_barrier['ack']:
+ break
+ time.sleep(0.001)
+
+ if time.time() > expr:
+ return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
+