summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-01-18 06:59:36 -0500
committerimarom <imarom@cisco.com>2016-01-21 10:11:54 -0500
commit9932ff8dcf4f8b6b6f3986832f8a1a8f8461c743 (patch)
tree28c577725377131eafbd54319407066dcbd385a8 /scripts/automation/trex_control_plane/client
parent11d328d3e40b04540489eec83ac484d5b06254bb (diff)
async publish now
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py53
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py11
2 files changed, 53 insertions, 11 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 66e65a32..2bb0e9cd 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -16,6 +16,7 @@ import time
import datetime
import zmq
import re
+import random
from common.trex_stats import *
from common.trex_streams import *
@@ -155,6 +156,7 @@ class CTRexAsyncClient():
self.stats = CTRexAsyncStatsManager()
self.last_data_recv_ts = 0
+ self.async_barriers = []
self.connected = False
@@ -188,17 +190,16 @@ class CTRexAsyncClient():
self.connected = True
-
- # wait for data streaming from the server
- timeout = time.time() + 5
- while not self.is_alive():
- time.sleep(0.01)
- if time.time() > timeout:
- self.disconnect()
- return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+ # send a barrier and wait for ack
+ rc = self.block_on_stats()
+ if not rc:
+ self.disconnect()
+ return rc
return RC_OK()
+
+
# disconnect
def disconnect (self):
@@ -284,10 +285,46 @@ class CTRexAsyncClient():
# stats
if name == "trex-global":
self.stateless_client.handle_async_stats_update(data)
+
# events
elif name == "trex-event":
self.stateless_client.handle_async_event(type, data)
+
+ # barriers
+ elif name == "trex-barrier":
+ self.handle_async_barrier(type, data)
else:
pass
+ # async barrier handling routine
+ def handle_async_barrier (self, type, data):
+
+ for b in self.async_barriers:
+ if b['key'] == type:
+ b['ack'] = True
+
+
+ # force update a new snapshot from the server
+ def block_on_stats(self, timeout = 5):
+
+ # set a random key
+ key = random.getrandbits(32)
+ barrier = {'key': key, 'ack' : False}
+
+ # add to the queue
+ self.async_barriers.append(barrier)
+
+ rc = self.stateless_client.transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ expr = time.time() + timeout
+ while not barrier['ack']:
+ time.sleep(0.001)
+ if time.time() > expr:
+ return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index b7700c7c..105c4d01 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -55,12 +55,16 @@ class LoggerApi(object):
return (self.level >= level)
+ # simple log message with verbose
def log (self, msg, level = VERBOSE_REGULAR, newline = True):
if not self.check_verbose(level):
return
self.write(msg, newline)
+ # annotates an action with a RC - writes to log the result
+ def annotate (self, rc, desc = None, show_status = True):
+ rc.annotate(self.log, desc, show_status)
# default logger - to stdout
class DefaultLogger(LoggerApi):
@@ -82,7 +86,7 @@ class CTRexStatelessClient(object):
server = "localhost",
sync_port = 4501,
async_port = 4500,
- quiet = False,
+ verbose_level = LoggerApi.VERBOSE_REGULAR,
virtual = False,
logger = None):
@@ -90,13 +94,14 @@ class CTRexStatelessClient(object):
self.user = username
+ # logger
if not logger:
self.logger = DefaultLogger()
else:
self.logger = logger
- if quiet:
- self.logger.set_verbose(self.logger.VERBOSE_QUIET)
+ # initial verbose
+ self.logger.set_verbose(verbose_level)
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger)